TEST
App template:

Linear regression on a Kafka stream

Olivier Ruas·December 23, 2022·0 min read

Start doing ML on a streaming data source with Pathway.

In this article, we are going to see how to do a simple linear regression on streaming data from Kafka. This article can be seen as an extension of our realtime sum using CSV connectors.

We have a data stream of data points , and we want to compute a simple linear regression on those points: we want to compute the two parameters so that, for each point , can be approximated by .

We are not going to explain the mathematical details here, but you can find all the details in the Wikipedia article.

Connectors

First, we need a connector to connect to our input data streams and receive values on which the regression will be computed. In this article, we will set up a Kafka connector.

To be able to reproduce this example, you may want to use upstash which provides a free Kafka instance.

To use a Kafka connector, we need to set all the Kafka parameters by using a dictionary, following the format of librdkafka. We also need to define a Kafka topic on which to connect onto: we will go with "linear-regression". Here is an example of settings to connect to Kafka using SASL-SSL authentication over SCRAM-SHA-256 mechanism:

rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}

You need, of course, to replace the server address and the associated credentials.

With this, setting the connector is straightforward, you just need to specify the topic and the schema for your table:

class InputSchema(pw.Schema):
    x: float
    y: float


t = pw.io.kafka.read(
    rdkafka_settings,
    topic="linear-regression",
    schema=InputSchema,
    format="csv",
    autocommit_duration_ms=1000
)

We used the csv format, but there are two other ways to read from Kafka: raw which reads a table with only one column data in which the whole message is dumped and json which reads JSON messages. You can see more about this connector in its dedicated tutorial. In our case we expect CSV messages.

💡 If you only want to test the linear regression, without generating a data stream through Kafka, you can directly use our stream generator:

t = pw.demo.noisy_linear_stream()

For the output, we use a CSV connector, which is set up as follows:

pw.io.csv.write(t, "regression_output_stream.csv")

For more details on how this connector works, checkout our example or the tutorial about it.

Doing a linear regression

To do the regression, we need to compute the sum of the , of the , of the and of the and the total number of data points received until then. This is done as follows:

t = t.select(
    *pw.this,
    x_square=t.x * t.x,
    x_y=t.x * t.y
)
statistics_table = t.reduce(
    count=pw.reducers.count(),
    sum_x=pw.reducers.sum(t.x),
    sum_y=pw.reducers.sum(t.y),
    sum_x_y=pw.reducers.sum(t.x_y),
    sum_x_square=pw.reducers.sum(t.x_square),
)

Then we can compute the estimation of and :

def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):
    d = count * sum_x_square - sum_x * sum_x
    if d == 0:
        return 0
    else:
        return (sum_y * sum_x_square - sum_x * sum_x_y) / d

def compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):
    d = count * sum_x_square - sum_x * sum_x
    if d == 0:
        return 0
    else:
        return (count * sum_x_y - sum_x * sum_y) / d

results_table = statistics_table.select(
    a=pw.apply(compute_a, **statistics_table),
    b=pw.apply(compute_b, **statistics_table),
)

Creating the input stream

You can skip this section if you use our stream generator pw.demo.noisy_linear_stream()

To use the Kafka connector, we have to follow a few rules. First, the Kafka connector expects the first message to contain the names of the columns. The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.

We can use the KafkaProducer API provided by Kafka to send message using Python:


producer = KafkaProducer(
    bootstrap_servers=["server-address:9092"],
    sasl_mechanism="SCRAM-SHA-256",
    security_protocol="SASL_SSL",
    sasl_plain_username="username",
    sasl_plain_password="********",
)
producer.send(topic, ("x,y").encode("utf-8"), partition=0)
producer.send(
    "linear-regression", ("0,0").encode("utf-8"), partition=0
)
producer.send(
    "linear-regression", ("1,1").encode("utf-8"), partition=0
)
producer.close()

This code sample sends and and then closes the Kafka Producer. For our example, we are going to send more messages containing different pairs which are samples from the line . However, for the example not to be too simple, we are going to add a small random error to each .

Note that, depending on your version of Kafka, you may need to specify the API version to make this code work: api_version=(0,10,2).

Gathering everything into one piece

The final version of our project contains two files: realtime_regression.py which processes the stream using Pathway and generating_kafka_stream.py which generates the streams.

Here is realtime_regression.py:

realtime_regression.py
import pathway as pw

rdkafka_settings = {
    "bootstrap.servers": "server-address:9092",
    "security.protocol": "sasl_ssl",
    "sasl.mechanism": "SCRAM-SHA-256",
    "group.id": "$GROUP_NAME",
    "session.timeout.ms": "6000",
    "sasl.username": "username",
    "sasl.password": "********",
}

class InputSchema(pw.Schema):
    x: float
    y: float


t = pw.io.kafka.read(
    rdkafka_settings,
    topic="linear-regression",
    schema=InputSchema,
    format="csv",
    autocommit_duration_ms=1000,
)
pw.io.csv.write(t, "regression_input.csv")

t += t.select(
    x_square=t.x * t.x,
    x_y=t.x * t.y,
)
statistics_table = t.reduce(
    count=pw.reducers.count(),
    sum_x=pw.reducers.sum(t.x),
    sum_y=pw.reducers.sum(t.y),
    sum_x_y=pw.reducers.sum(t.x_y),
    sum_x_square=pw.reducers.sum(t.x_square),
)

def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):
    d = count * sum_x_square - sum_x * sum_x
    if d == 0:
        return 0
    else:
        return (sum_y * sum_x_square - sum_x * sum_x_y) / d

def compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):
    d = count * sum_x_square - sum_x * sum_x
    if d == 0:
        return 0
    else:
        return (count * sum_x_y - sum_x * sum_y) / d

results_table = statistics_table.select(
    a=pw.apply(compute_a, **statistics_table),
    b=pw.apply(compute_b, **statistics_table),
)

pw.io.csv.write(results_table, "regression_output_stream.csv")
pw.run()

Don't forget the pw.run() otherwise no computation will be done! Once pw.run() is called, the computation will be run forever until it gets killed.

And the generating_kafka_stream.py:

generating_kafka_stream.py
from kafka import KafkaProducer
import time
import random

topic = "linear-regression"

random.seed(0)
def get_value(i):
    return i + (2 * random.random() - 1)/10

producer = KafkaProducer(
    bootstrap_servers=["server-address:9092"],
    sasl_mechanism="SCRAM-SHA-256",
    security_protocol="SASL_SSL",
    sasl_plain_username="username",
    sasl_plain_password="********",
)
producer.send(topic, ("x,y").encode("utf-8"), partition=0)

time.sleep(5)
for i in range(10):
    time.sleep(1)
    producer.send(
        topic, (str(i) + "," + str(get_value(i))).encode("utf-8"), partition=0
    )

producer.close()

Output

There are two outputs in this project: the CSV file regression_input.csv which keeps all the updates received from Kafka and the CSV file output_stream.csv in which all the successive updates of the sum values are displayed.

As in our previous example, the outputs are tables of changes. Each new message of Kafka triggers a new computation and the new values are output in the CSV files!

First, we can check that the generated values are correct:

x,y,time,diff
"0","0.06888437030500963",0,1
"1","1.0515908805880605",1,1
"2","1.984114316166169",2,1
"3","2.9517833500585926",3,1
"4","4.002254944273722",4,1
"5","4.980986827490083",5,1
"6","6.056759717806955",6,1
"7","6.9606625452157855",7,1
"8","7.995319390830471",8,1
"9","9.016676407891007",9,1

We obtain ten values which are sampled around the line. Let's check the regression we obtain:

a,b,time,diff
0,0,0,1
0,0,1,-1
0.06888437030500971,0.9827065102830508,1,1
0.06888437030500971,0.9827065102830508,2,-1
0.07724821608916699,0.9576149729305795,2,1
0.0769101730536299,0.9581220374838857,3,1
0.07724821608916699,0.9576149729305795,3,-1
0.05833884879671927,0.9766933617407955,4,1
0.0769101730536299,0.9581220374838857,4,-1
0.05087576879874134,0.9822906717392795,5,1
0.05833884879671927,0.9766933617407955,5,-1
0.03085078333935821,0.9943056630149089,6,1
0.05087576879874134,0.9822906717392795,6,-1
0.03085078333935821,0.9943056630149089,7,-1
0.03590542987734715,0.9917783397459139,7,1
0.03198741430177742,0.9934574892783012,8,1
0.03590542987734715,0.9917783397459139,8,-1
0.025649728471303895,0.9958341214647295,9,1
0.03198741430177742,0.9934574892783012,9,-1

We obtain close values to what we expect ( and ). You can play the values (number of samples, error, linear function to approximate etc.) to see how the algorithm reacts.

To go further

Congrats, you are now able to use Pathway with Kafka and do some non-trivial computation!

Why not try to do some more advanced computation such as linear regression with several explanatory variables? Or you may want to do some classification?