pw.io.logstash

write(table, endpoint, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None)

sourceSends the stream of updates from the table to HTTP input https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html of Logstash. The data is sent in the format of flat JSON objects, with two extra fields for time and diff.

  • Parameters
    • table (Table) – table to be tracked;
    • endpoint (str) – Logstash endpoint, accepting entries;
    • n_retries (int) – number of retries in case of failure;
    • retry_policy (RetryPolicy) – policy of delays or backoffs for the retries;
    • connect_timeout_ms (int | None) – connection timeout, specified in milliseconds. In case it’s None, no restrictions on connection duration will be applied;
    • request_timeout_ms (int | None) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied.

Example:

Suppose that we need to send the stream of updates to locally installed Logstash. For example, you can use docker-elk https://github.com/deviantony/docker-elk repository in order to get the ELK stack up and running at your local machine in a few minutes.

If Logstash stack is installed, you need to configure the input pipeline. The simplest possible way to do this, is to add the following lines in the input plugins list:

http {
    port => 8012
}

The port is specified for the sake of example and can be changed. Further, we will use 8012 for clarity.

Now, with the pipeline configured, you can stream the changed into Logstash as simple as:

pw.io.logstash.write(table, "http://localhost:8012")