pw.io.pubsub

write(table, publisher, project_id, topic_id)

sourcePublish the table’s stream of changes into the specified PubSub topic. Please note that table must consist of a single column of the binary type. In addition, the connector adds two attributes: pathway_time containing the logical time of the change and pathway_diff corresponding to the change type: either addition (pathway_diff = 1) or deletion (pathway_diff = -1).

  • Parameters
    • table – The table to publish.
    • publisher (PublisherClient) – The configured pubsub_v1.PublisherClient object. You can refer to the Google Cloud documentation for the example of a simple publisher configuration. You can also see the examples for batching settings configuration or flow control configuration.
    • project_id (str) – The ID of the project where the changes are published.
    • topic_id (str) – The topic ID where the changes are published.
  • Returns
    None

Example:

Consider that you have a table blobs, consisting of a single column that has a binary type. You would like to publish changes that happen to this table into a topic called blobs in your Google Cloud pub/sub project.

For simplicity, let’s consider that the project ID is stored in the project_id variable. The topic_id variable then would denote the name of the target topic which is blobs in our case:

project_id = "YOUR_PROJECT_ID"
topic_id = "blobs"

Now, you need to create the publisher object of pubsub_v1.PublisherClient type. If you have the service account credentials stored in a file ./credentials.json, it can be done with the following code:

from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient.from_service_account_file(  
    "./credentials.json"
)

If you don’t have the topic created yet, you may want to create it first:

topic_path = publisher.topic_path(project_id, topic_id)  
topic = publisher.create_topic(request={"name": topic_path})

After that you can configure the table output with the following code:

import pathway as pw
pw.io.pubsub.write(table, publisher, project_id, topic_id)

At last, don’t forget to add pw.run() to run your pipeline.