# Doing a linear regression on data from Kafka

Start doing ML on a streaming data source with Pathway.

In this article, we are going to see how to do a simple linear regression on a streaming data from Kafka. This article can be seen as an extension of our realtime sum using CSV connectors.

We have a data stream of data points $(x_{i},y_{i})$, and we want to compute a simple linear regression on those points: we want to compute the two parameters $(a,b)$ so that, for each point $(x_{i},y_{i})$, $y_{i}$ can be approximated by $y_{i}≈a+b×x_{i}$.

We are not going to explain the mathematical details here, you can find all the details in the Wikipedia article.

## Connectors

Fist, we need a connector to connect to our input data streams and receive values on which the regression will be computed. In this article, we will set up a Kafka connector.

To be able to reproduce this example, you may want to use upstash which provides a free Kafka instance.

To use a Kafka connector, we need to set all the Kafka parameters by using a dictionary, following the format of librdkafka.
We also need to define a Kafka topic on which to connect onto: we will go with `"linear-regression"`

.
Here is an example of settings to connect to Kafka using SASL-SSL authentication over SCRAM-SHA-256 mechanism:

`rdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}`

You need of course to replace the server address and the associated credentials.

With this, setting the connector is straightforward, we just need to specify the topic and the column names:

`t = pw.kafka.read( rdkafka_settings, topic_names=["linear-regression"], value_columns=["x", "y"], format="csv")`

We used the `csv`

format, but there are two other ways to read from Kafka: `raw`

which read a table with only one column `data`

in which the whole message is dumped and `json`

which read JSON messages. You can see more about this connector in its dedicated tutorial. In our case we expect CSV messages.

For the output, we use a CSV connector, which is set up as follows:

`pw.csv.write(t, "regression_output_stream.csv")`

For more details on how this connector works, checkout our example or the tutorial about it.

## Doing a linear regression

To do the regression, we need to compute the sum of the $x_{i}$, of the $x_{i}$, of the $y_{i}$ and of the $x_{i}×y_{i}$ and the total number of data point received until then. This is done as follows:

`t = t.select( *pw.this, x_square=t.x * t.x, x_y=t.x * t.y)statistics_table = t.reduce( count=pw.reducers.count(), sum_x=pw.reducers.sum(t.x), sum_y=pw.reducers.sum(t.y), sum_x_y=pw.reducers.sum(t.x_y), sum_x_square=pw.reducers.sum(t.x_square),)`

Then we can compute the estimation of $a$ and $b$:

`def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select( a=pw.apply(compute_a, **statistics_table), b=pw.apply(compute_b, **statistics_table),)`

## Creating the input stream

To use the Kafka connector, we have to follow few rules.
First, the Kafka connector expects the first message to contain the names of the columns.
The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.
To be taken into account, updates must be followed by a `*COMMIT*`

message.

We can use the KafkaProducer API provided by Kafka to send message using python:

`producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)producer.send( "linear-regression", ("0,0").encode("utf-8"), partition=0)producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.send( "linear-regression", ("1,1").encode("utf-8"), partition=0)producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()`

This code sample send $(0,0)$ and $(1,1)$ and then close the Kafka Producer. For our example, we are going to send more messages containing different pairs $(x,y)$ which are sample from the line $y=x$. However, for the example not to be too simple, we are going to add a small random error to each $y$.

Note that, depending on your version of Kafka, you may need to specify the API version to make this code works:

`api_version=(0,10,2)`

.

## Gathering everything into one piece

The final version of our project contains two files: `realtime_regression.py`

which processes the stream using Pathway and `generating_kafka_stream.py`

which generates the streams.

Here is `realtime_sum.py`

:

`import pathway as pwrdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}t = pw.kafka.read( rdkafka_settings, topic_names=["linear-regression"], value_columns=["x", "y"], format="csv",)pw.csv.write(t, "regression_input.csv")t = t.select( x=pw.apply(float, t.x), y=pw.apply(float, t.y),)t += t.select( x_square=t.x * t.x, x_y=t.x * t.y,)statistics_table = t.reduce( count=pw.reducers.count(), sum_x=pw.reducers.sum(t.x), sum_y=pw.reducers.sum(t.y), sum_x_y=pw.reducers.sum(t.x_y), sum_x_square=pw.reducers.sum(t.x_square),)def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select( a=pw.apply(compute_a, **statistics_table), b=pw.apply(compute_b, **statistics_table),)pw.csv.write(results_table, "regression_output_stream.csv")pw.run()`

Don't forget the `pw.run()`

otherwise no computation will be done!
Once `pw.run()`

is called, the computation will be run forever until it got killed.

And the `generating_kafka_stream.py`

:

`from kafka import KafkaProducerimport timeimport randomtopic = "linear-regression"random.seed(0)def get_value(i): return i + (2 * random.random() - 1)/10producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)time.sleep(5)for i in range(10): time.sleep(1) producer.send( topic, (str(i) + "," + str(get_value(i))).encode("utf-8"), partition=0 ) producer.send(topic, "*COMMIT*".encode("utf-8"), partition=0)producer.close()`

## Output

There are two outputs in this project: the CSV file `regression_input.csv`

which keeps all the received update from Kafka and the CSV file `output_stream.csv`

in which all the successive updates of the sum values are displayed.

As in our previous example, the outputs are tables of changes. Each new message of Kafka triggers a new computation and the new values are output in the CSV files!

First, we can check that the generated values are correct:

`x,y,time,diff"0","0.06888437030500963",0,1"1","1.0515908805880605",1,1"2","1.984114316166169",2,1"3","2.9517833500585926",3,1"4","4.002254944273722",4,1"5","4.980986827490083",5,1"6","6.056759717806955",6,1"7","6.9606625452157855",7,1"8","7.995319390830471",8,1"9","9.016676407891007",9,1`

We obtain ten values which are sampled around the $y=x$ line. Let's check the regression we obtain:

`a,b,time,diff0,0,0,10,0,1,-10.06888437030500971,0.9827065102830508,1,10.06888437030500971,0.9827065102830508,2,-10.07724821608916699,0.9576149729305795,2,10.0769101730536299,0.9581220374838857,3,10.07724821608916699,0.9576149729305795,3,-10.05833884879671927,0.9766933617407955,4,10.0769101730536299,0.9581220374838857,4,-10.05087576879874134,0.9822906717392795,5,10.05833884879671927,0.9766933617407955,5,-10.03085078333935821,0.9943056630149089,6,10.05087576879874134,0.9822906717392795,6,-10.03085078333935821,0.9943056630149089,7,-10.03590542987734715,0.9917783397459139,7,10.03198741430177742,0.9934574892783012,8,10.03590542987734715,0.9917783397459139,8,-10.025649728471303895,0.9958341214647295,9,10.03198741430177742,0.9934574892783012,9,-1`

We obtain close value to what we expect ($a=0$ and $b=1$). You can play the values (number of samples, error, linear function to approximate etc.) to see how the algorithm reacts.

## To go further

Congrats, you are now able to use Pathway with Kafka and do non-trivial computation!

Why not try to do some more advanced computation such as linear regression with several explanatory variables? Or you may want to do some classification?