pw.io.pubsub
pw.io.pubsub.write(table, publisher, project_id, topic_id)
sourcePublish the table
’s stream of changes into the specified PubSub topic. Please note that table
must consist of a single column of the binary type. In addition, the connector adds two attributes: pathway_time
containing the logical time of the change and pathway_diff
corresponding to the change type: either addition (pathway_diff = 1
) or deletion (pathway_diff = -1
).
- Parameters
- table – The table to publish.
- publisher (
PublisherClient
) – The configuredpubsub_v1.PublisherClient
object. You can refer to the Google Cloud documentation for the example of a simple publisher configuration. You can also see the examples for batching settings configuration or flow control configuration. - project_id (
str
) – The ID of the project where the changes are published. - topic_id (
str
) – The topic ID where the changes are published.
- Returns
None
Example:
Consider that you have a table blobs
, consisting of a single column that has a binary
type. You would like to publish changes that happen to this table into a topic called
blobs
in your Google Cloud pub/sub project.
For simplicity, let’s consider that the project ID is stored in the project_id
variable. The topic_id
variable then would denote the name of the target topic
which is blobs
in our case:
project_id = "YOUR_PROJECT_ID"
topic_id = "blobs"
Now, you need to create the publisher object of pubsub_v1.PublisherClient
type. If you have the service account credentials stored
in a file ./credentials.json
, it can be done with the following code:
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient.from_service_account_file(
"./credentials.json"
)
If you don’t have the topic created yet, you may want to create it first:
topic_path = publisher.topic_path(project_id, topic_id)
topic = publisher.create_topic(request={"name": topic_path})
After that you can configure the table output with the following code:
import pathway as pw
pw.io.pubsub.write(table, publisher, project_id, topic_id)
At last, don’t forget to add pw.run()
to run your pipeline.