pw.io.pubsub
write(table, publisher, project_id, topic_id, *, name=None, sort_by=None)
sourcePublish the table
’s stream of changes into the specified PubSub topic. Please note
that table
must consist of a single column of the binary type. In addition, the connector
adds two attributes: pathway_time
containing the logical time of the change and
pathway_diff
corresponding to the change type: either addition (pathway_diff = 1
)
or deletion (pathway_diff = -1
).
- Parameters
- table – The table to publish.
- publisher (
PublisherClient
) – The configuredpubsub_v1.PublisherClient
object. You can refer to the Google Cloud documentation for the example of a simple publisher configuration. You can also see the examples for batching settings configuration or flow control configuration. - project_id (
str
) – The ID of the project where the changes are published. - topic_id (
str
) – The topic ID where the changes are published. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- Returns
None
Example:
Consider that you have a table blobs
, consisting of a single column that has a binary
type. You would like to publish changes that happen to this table into a topic called
blobs
in your Google Cloud pub/sub project.
For simplicity, let’s consider that the project ID is stored in the project_id
variable. The topic_id
variable then would denote the name of the target topic
which is blobs
in our case:
project_id = "YOUR_PROJECT_ID"
topic_id = "blobs"
Now, you need to create the publisher object of pubsub_v1.PublisherClient
type. If you have the service account credentials stored
in a file ./credentials.json
, it can be done with the following code:
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient.from_service_account_file(
"./credentials.json"
)
If you don’t have the topic created yet, you may want to create it first:
topic_path = publisher.topic_path(project_id, topic_id)
topic = publisher.create_topic(request={"name": topic_path})
After that you can configure the table output with the following code:
import pathway as pw
pw.io.pubsub.write(table, publisher, project_id, topic_id)
At last, don’t forget to add pw.run()
to run your pipeline.