December 23, 2022TUTORIAL · DATA-PIPELINE

Start doing ML on a streaming data source with Pathway.

In this article, we are going to see how to do a simple linear regression on streaming data from Kafka. This article can be seen as an extension of our realtime sum using CSV connectors.

We have a data stream of data points $(x_{i},y_{i})$, and we want to compute a simple linear regression on those points: we want to compute the two parameters $(a,b)$ so that, for each point $(x_{i},y_{i})$, $y_{i}$ can be approximated by $y_{i}≈a+b×x_{i}$.

We are not going to explain the mathematical details here, but you can find all the details in the Wikipedia article.

First, we need a connector to connect to our input data streams and receive values on which the regression will be computed. In this article, we will set up a Kafka connector.

To be able to reproduce this example, you may want to use upstash which provides a free Kafka instance.

To use a Kafka connector, we need to set all the Kafka parameters by using a dictionary, following the format of librdkafka.
We also need to define a Kafka topic on which to connect onto: we will go with `"linear-regression"`

.
Here is an example of settings to connect to Kafka using SASL-SSL authentication over SCRAM-SHA-256 mechanism:

`rdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}`

You need, of course, to replace the server address and the associated credentials.

With this, setting the connector is straightforward, you just need to specify the topic, the column names, and the types:

`t = pw.io.kafka.read( rdkafka_settings, topic_names=["linear-regression"], value_columns=["x", "y"], types={"x":pw.Type.FLOAT, "y":pw.Type.FLOAT}, format="csv", autocommit_duration_ms=1000)`

We used the `csv`

format, but there are two other ways to read from Kafka: `raw`

which reads a table with only one column `data`

in which the whole message is dumped and `json`

which reads JSON messages. You can see more about this connector in its dedicated tutorial. In our case we expect CSV messages.

For the output, we use a CSV connector, which is set up as follows:

`pw.io.csv.write(t, "regression_output_stream.csv")`

For more details on how this connector works, checkout our example or the tutorial about it.

To do the regression, we need to compute the sum of the $x_{i}$, of the $x_{i}$, of the $y_{i}$ and of the $x_{i}×y_{i}$ and the total number of data points received until then. This is done as follows:

`t = t.select( *pw.this, x_square=t.x * t.x, x_y=t.x * t.y)statistics_table = t.reduce( count=pw.reducers.count(), sum_x=pw.reducers.sum(t.x), sum_y=pw.reducers.sum(t.y), sum_x_y=pw.reducers.sum(t.x_y), sum_x_square=pw.reducers.sum(t.x_square),)`

Then we can compute the estimation of $a$ and $b$:

`def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select( a=pw.apply(compute_a, **statistics_table), b=pw.apply(compute_b, **statistics_table),)`

To use the Kafka connector, we have to follow a few rules. First, the Kafka connector expects the first message to contain the names of the columns. The connector will not properly work without this message, however it must be sent only once: if sent twice, the second message will be treated like a normal row.

We can use the KafkaProducer API provided by Kafka to send message using python:

`producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)producer.send( "linear-regression", ("0,0").encode("utf-8"), partition=0)producer.send( "linear-regression", ("1,1").encode("utf-8"), partition=0)producer.close()`

This code sample sends $(0,0)$ and $(1,1)$ and then closes the Kafka Producer. For our example, we are going to send more messages containing different pairs $(x,y)$ which are samples from the line $y=x$. However, for the example not to be too simple, we are going to add a small random error to each $y$.

Note that, depending on your version of Kafka, you may need to specify the API version to make this code work:

`api_version=(0,10,2)`

.

The final version of our project contains two files: `realtime_regression.py`

which processes the stream using Pathway and `generating_kafka_stream.py`

which generates the streams.

Here is `realtime_regression.py`

:

realtime_regression.py

`import pathway as pwrdkafka_settings = { "bootstrap.servers": "server-address:9092", "security.protocol": "sasl_ssl", "sasl.mechanism": "SCRAM-SHA-256", "group.id": "$GROUP_NAME", "session.timeout.ms": "6000", "sasl.username": "username", "sasl.password": "********",}t = pw.io.kafka.read( rdkafka_settings, topic_names=["linear-regression"], value_columns=["x", "y"], types={"x":pw.Type.FLOAT, "y":pw.Type.FLOAT}, format="csv", autocommit_duration_ms=1000,)pw.io.csv.write(t, "regression_input.csv")t += t.select( x_square=t.x * t.x, x_y=t.x * t.y,)statistics_table = t.reduce( count=pw.reducers.count(), sum_x=pw.reducers.sum(t.x), sum_y=pw.reducers.sum(t.y), sum_x_y=pw.reducers.sum(t.x_y), sum_x_square=pw.reducers.sum(t.x_square),)def compute_a(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (sum_y * sum_x_square - sum_x * sum_x_y) / ddef compute_b(sum_x, sum_y, sum_x_square, sum_x_y, count): d = count * sum_x_square - sum_x * sum_x if d == 0: return 0 else: return (count * sum_x_y - sum_x * sum_y) / dresults_table = statistics_table.select( a=pw.apply(compute_a, **statistics_table), b=pw.apply(compute_b, **statistics_table),)pw.io.csv.write(results_table, "regression_output_stream.csv")pw.run()`

Don't forget the `pw.run()`

otherwise no computation will be done!
Once `pw.run()`

is called, the computation will be run forever until it gets killed.

And the `generating_kafka_stream.py`

:

generating_kafka_stream.py

`from kafka import KafkaProducerimport timeimport randomtopic = "linear-regression"random.seed(0)def get_value(i): return i + (2 * random.random() - 1)/10producer = KafkaProducer( bootstrap_servers=["server-address:9092"], sasl_mechanism="SCRAM-SHA-256", security_protocol="SASL_SSL", sasl_plain_username="username", sasl_plain_password="********",)producer.send(topic, ("x,y").encode("utf-8"), partition=0)time.sleep(5)for i in range(10): time.sleep(1) producer.send( topic, (str(i) + "," + str(get_value(i))).encode("utf-8"), partition=0 )producer.close()`

There are two outputs in this project: the CSV file `regression_input.csv`

which keeps all the updates received from Kafka and the CSV file `output_stream.csv`

in which all the successive updates of the sum values are displayed.

As in our previous example, the outputs are tables of changes. Each new message of Kafka triggers a new computation and the new values are output in the CSV files!

First, we can check that the generated values are correct:

`x,y,time,diff"0","0.06888437030500963",0,1"1","1.0515908805880605",1,1"2","1.984114316166169",2,1"3","2.9517833500585926",3,1"4","4.002254944273722",4,1"5","4.980986827490083",5,1"6","6.056759717806955",6,1"7","6.9606625452157855",7,1"8","7.995319390830471",8,1"9","9.016676407891007",9,1`

We obtain ten values which are sampled around the $y=x$ line. Let's check the regression we obtain:

`a,b,time,diff0,0,0,10,0,1,-10.06888437030500971,0.9827065102830508,1,10.06888437030500971,0.9827065102830508,2,-10.07724821608916699,0.9576149729305795,2,10.0769101730536299,0.9581220374838857,3,10.07724821608916699,0.9576149729305795,3,-10.05833884879671927,0.9766933617407955,4,10.0769101730536299,0.9581220374838857,4,-10.05087576879874134,0.9822906717392795,5,10.05833884879671927,0.9766933617407955,5,-10.03085078333935821,0.9943056630149089,6,10.05087576879874134,0.9822906717392795,6,-10.03085078333935821,0.9943056630149089,7,-10.03590542987734715,0.9917783397459139,7,10.03198741430177742,0.9934574892783012,8,10.03590542987734715,0.9917783397459139,8,-10.025649728471303895,0.9958341214647295,9,10.03198741430177742,0.9934574892783012,9,-1`

We obtain close value to what we expect ($a=0$ and $b=1$). You can play the values (number of samples, error, linear function to approximate etc.) to see how the algorithm reacts.

Congrats, you are now able to use Pathway with Kafka and do some non-trivial computation!

Why not try to do some more advanced computation such as linear regression with several explanatory variables? Or you may want to do some classification?

Related articles

- developers / showcases / fuzzy_joinPart 1: Realtime Fuzzy-Join in PathwayPathway Team2022-10-18data-pipeline
- developers / showcases / fuzzy_joinPart 2: Realtime Fuzzy-Join in PathwayPathway Team2022-10-19data-pipeline
- developers / tutorialsTime between events in a multi-topic event streamPrzemek Uznański2022-11-01data-pipeline