Doing a linear regression on data from Kafka

Start doing ML on a streaming data source with Pathway.

In this article, we are going to see how to do a simple linear regression on a streaming data from Kafka. This article can be seen as an extension of our realtime sum using CSV connectors.

We have a data stream of data points , and we want to compute a simple linear regression on those points: we want to compute the two parameters so that, for each point , can be approximated by .

We are not going to explain the mathematical details here, you can find all the details in the Wikipedia article.

Connectors

Fist, we need a connector to connect to our input data streams and receive values on which the regression will be computed. In this article, we will set up a Kafka connector.

To be able to reproduce this example, you may want to use upstash which provides a free Kafka instance.

To use a Kafka connector, we need to set all the Kafka parameters by using a dictionary, following the format of librdkafka. We also need to define a Kafka topic on which to connect onto: we will go with "linear-regression". Here is an example of settings to connect to Kafka using SASL-SSL authentication over SCRAM-SHA-256 mechanism:

rdkafka_settings = {    "bootstrap.servers": "server-address:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "6000",    "sasl.username": "username",    "sasl.password": "********",}

You need of course to replace the server address and the associated credentials.

With this, setting the connector is straightforward, we just need to specify the topic and the column names:

t = pw.kafka.read(    rdkafka_settings,    topic_names=["linear-regression"],    value_columns=["x", "y"],    format="csv")

We used the csv format, but there are two other ways to read from Kafka: raw which read a table with only one column data in which the whole message is dumped and json which read JSON messages. You can see more about this connector in its dedicated tutorial. In our case we expect CSV messages.

For the output, we use a CSV connector, which is set up as follows:

pw.csv.write(t, "regression_output_stream.csv")

For more details on how this connector works, checkout our example or the tutorial about it.

Doing a linear regression

To do the regression, we need to compute the sum of the , of the , of the and of the and the total number of data point received until then. This is done as follows:

t = t.select(    *pw.this,    x_square=t.x * t.x,    x_y=t.x * t.y)statistics_table = t.reduce(    count=pw.reducers.count(),    sum_x=pw.reducers.sum(t.x),    sum_y=pw.reducers.sum(t.y),    sum_x_y=pw.reducers.sum(t.x_y),    sum_x_square=pw.reducers.sum(t.x_square),)

Then we can compute the estimation of and :

def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):    d = count * sum_x_square - sum_x * sum_x    if d == 0:        return 0    else:        return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):    d = count * sum_x_square - sum_x * sum_x    if d == 0:        return 0    else:        return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select(    a=pw.apply(compute_a, **statistics_table),    b=pw.apply(compute_b, **statistics_table),)

Creating the input stream

To use the Kafka connector, we have to follow few rules. First, the Kafka connector expects the first message to contain the names of the columns. The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row. To be taken into account, updates must be followed by a *COMMIT* message.

We can use the KafkaProducer API provided by Kafka to send message using python:

producer = KafkaProducer(    bootstrap_servers=["server-address:9092"],    sasl_mechanism="SCRAM-SHA-256",    security_protocol="SASL_SSL",    sasl_plain_username="username",    sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)producer.send(    "linear-regression", ("0,0").encode("utf-8"), partition=0)producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.send(    "linear-regression", ("1,1").encode("utf-8"), partition=0)producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()

This code sample send and and then close the Kafka Producer. For our example, we are going to send more messages containing different pairs which are sample from the line . However, for the example not to be too simple, we are going to add a small random error to each .

Note that, depending on your version of Kafka, you may need to specify the API version to make this code works: api_version=(0,10,2).

Gathering everything into one piece

The final version of our project contains two files: realtime_regression.py which processes the stream using Pathway and generating_kafka_stream.py which generates the streams.

Here is realtime_sum.py:

import pathway as pwrdkafka_settings = {    "bootstrap.servers": "server-address:9092",    "security.protocol": "sasl_ssl",    "sasl.mechanism": "SCRAM-SHA-256",    "group.id": "$GROUP_NAME",    "session.timeout.ms": "6000",    "sasl.username": "username",    "sasl.password": "********",}t = pw.kafka.read(    rdkafka_settings,    topic_names=["linear-regression"],    value_columns=["x", "y"],    format="csv",)pw.csv.write(t, "regression_input.csv")t = t.select(    x=pw.apply(float, t.x),    y=pw.apply(float, t.y),)t += t.select(    x_square=t.x * t.x,    x_y=t.x * t.y,)statistics_table = t.reduce(    count=pw.reducers.count(),    sum_x=pw.reducers.sum(t.x),    sum_y=pw.reducers.sum(t.y),    sum_x_y=pw.reducers.sum(t.x_y),    sum_x_square=pw.reducers.sum(t.x_square),)def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count):    d = count * sum_x_square - sum_x * sum_x    if d == 0:        return 0    else:        return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count):    d = count * sum_x_square - sum_x * sum_x    if d == 0:        return 0    else:        return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select(    a=pw.apply(compute_a, **statistics_table),    b=pw.apply(compute_b, **statistics_table),)pw.csv.write(results_table, "regression_output_stream.csv")pw.run()

Don't forget the pw.run() otherwise no computation will be done! Once pw.run() is called, the computation will be run forever until it got killed.

And the generating_kafka_stream.py:

from kafka import KafkaProducerimport timeimport randomtopic = "linear-regression"random.seed(0)def get_value(i):    return i + (2 * random.random() - 1)/10producer = KafkaProducer(    bootstrap_servers=["server-address:9092"],    sasl_mechanism="SCRAM-SHA-256",    security_protocol="SASL_SSL",    sasl_plain_username="username",    sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)time.sleep(5)for i in range(10):    time.sleep(1)    producer.send(        topic, (str(i) + "," + str(get_value(i))).encode("utf-8"), partition=0    )    producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()

Output

There are two outputs in this project: the CSV file regression_input.csv which keeps all the received update from Kafka and the CSV file output_stream.csv in which all the successive updates of the sum values are displayed.

As in our previous example, the outputs are tables of changes. Each new message of Kafka triggers a new computation and the new values are output in the CSV files!

First, we can check that the generated values are correct:

x,y,time,diff"0","0.06888437030500963",0,1"1","1.0515908805880605",1,1"2","1.984114316166169",2,1"3","2.9517833500585926",3,1"4","4.002254944273722",4,1"5","4.980986827490083",5,1"6","6.056759717806955",6,1"7","6.9606625452157855",7,1"8","7.995319390830471",8,1"9","9.016676407891007",9,1

We obtain ten values which are sampled around the line. Let's check the regression we obtain:

a,b,time,diff0,0,0,10,0,1,-10.06888437030500971,0.9827065102830508,1,10.06888437030500971,0.9827065102830508,2,-10.07724821608916699,0.9576149729305795,2,10.0769101730536299,0.9581220374838857,3,10.07724821608916699,0.9576149729305795,3,-10.05833884879671927,0.9766933617407955,4,10.0769101730536299,0.9581220374838857,4,-10.05087576879874134,0.9822906717392795,5,10.05833884879671927,0.9766933617407955,5,-10.03085078333935821,0.9943056630149089,6,10.05087576879874134,0.9822906717392795,6,-10.03085078333935821,0.9943056630149089,7,-10.03590542987734715,0.9917783397459139,7,10.03198741430177742,0.9934574892783012,8,10.03590542987734715,0.9917783397459139,8,-10.025649728471303895,0.9958341214647295,9,10.03198741430177742,0.9934574892783012,9,-1

We obtain close value to what we expect ( and ). You can play the values (number of samples, error, linear function to approximate etc.) to see how the algorithm reacts.

To go further

Congrats, you are now able to use Pathway with Kafka and do non-trivial computation!

Why not try to do some more advanced computation such as linear regression with several explanatory variables? Or you may want to do some classification?

Olivier Ruas

Algorithm and Data Processing Magician