pw.io.pubsub

write(table, publisher, project_id, topic_id, *, name=None, sort_by=None)

sourcePublish the table’s stream of changes into the specified PubSub topic. Please note that table must consist of a single column of the binary type. In addition, the connector adds two attributes: pathway_time containing the logical time of the change and pathway_diff corresponding to the change type: either addition (pathway_diff = 1) or deletion (pathway_diff = -1).

  • Parameters
    • table – The table to publish.
    • publisher (PublisherClient) – The configured pubsub_v1.PublisherClient object. You can refer to the Google Cloud documentation for the example of a simple publisher configuration. You can also see the examples for batching settings configuration or flow control configuration.
    • project_id (str) – The ID of the project where the changes are published.
    • topic_id (str) – The topic ID where the changes are published.
    • name (str | None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards.
    • sort_by (Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
  • Returns
    None

Example:

Consider that you have a table blobs, consisting of a single column that has a binary type. You would like to publish changes that happen to this table into a topic called blobs in your Google Cloud pub/sub project.

For simplicity, let’s consider that the project ID is stored in the project_id variable. The topic_id variable then would denote the name of the target topic which is blobs in our case:

project_id = "YOUR_PROJECT_ID"
topic_id = "blobs"

Now, you need to create the publisher object of pubsub_v1.PublisherClient type. If you have the service account credentials stored in a file ./credentials.json, it can be done with the following code:

from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient.from_service_account_file(  
    "./credentials.json"
)

If you don’t have the topic created yet, you may want to create it first:

topic_path = publisher.topic_path(project_id, topic_id)  
topic = publisher.create_topic(request={"name": topic_path})

After that you can configure the table output with the following code:

import pathway as pw
pw.io.pubsub.write(table, publisher, project_id, topic_id)

At last, don’t forget to add pw.run() to run your pipeline.