pw.io.elasticsearch
This module is available when using one of the following licenses only: Pathway Live Data Framework Scale, Pathway Live Data Framework Enterprise.
The Pathway Live Data Framework provides both Input and Output connectors for Elasticsearch. See the
read and write documentation below for the modes, requirements, and
options specific to each.
Both connectors exchange documents as JSON: the output connector serializes each
row into a JSON document, and the input connector parses each document’s
_source back into Live Data Framework values according to your pw.Schema.
The type conversions below therefore apply to both directions, and every type
produced by the output connector round-trips back through the input connector
when the original schema type is specified.
Type Conversion
Pathway types serialization into JSON documents
| Live Data Framework type | JSON type |
|---|---|
bool | boolean |
int | number |
float | number |
pointer | string, can be deserialized back if pw.Pointer type is specified in Live Data Framework table schema |
str | string |
bytes | string, containing base64-encoded binary data |
Naive DateTime | string, containing the datetime in ISO-8601 format |
UTC DateTime | string, containing the datetime in ISO-8601 format |
Duration | number, serialized and deserialized with nanosecond precision |
JSON | object, containing the JSON value |
np.ndarray | object type with two top-level fields: an integer array shape containing the shape of the array, and an array elements containing the flattened elements of the array |
tuple | array, the order of the elements corresponds to their appearance in the tuple |
list | array |
pw.PyObjectWrapper | string, can be deserialized back if the pw.PyObjectWrapper type is specified in Live Data Framework table schema |
The input connector additionally relies on two of these columns to drive its
polling: timestamp_column must be a numeric (number) field — for example,
epoch milliseconds — because the connector orders and watermarks documents by it,
and id_column must be a unique, sortable field (a keyword or numeric
mapping) because it is used both to deduplicate the polling overlap window and as
the Live Data Framework row key. At startup the connector inspects the index
mapping and logs a warning if either column is mapped in a way it cannot use (for
example an id_column dynamically mapped as text, which Elasticsearch
cannot sort by — use its .keyword sub-field instead).
When the input connector is a good fit
Elasticsearch exposes no change-data-capture API, so the input connector ingests by polling: it repeatedly queries a sliding window of the most recent documents and reconciles the overlap between consecutive queries so that no document is missed and none is delivered twice. This approach is append-only and rests on a few assumptions — check them before relying on it.
It is a good fit when:
- The index is append-only. Documents are added, not updated in place or
deleted (or such changes need not reach Pathway). The connector deduplicates by
id_column, so a document re-indexed under the same id is treated as already-seen and is not re-delivered, and a deletion is never observed. To capture updates or deletes, use a change-data-capture source instead — polling cannot see them. - ``timestamp_column`` is a trustworthy, non-decreasing clock that advances as documents are indexed. A single document with a timestamp far in the future (for example from a clock-skewed producer) pushes the connector’s watermark past all normal documents and silently drops everything indexed afterwards, so guard against out-of-range timestamps upstream.
- Each ``id_column`` value is unique and unambiguous. Two different documents
whose ids reduce to the same string (e.g. the number
1in one document and the string"1"in another) are treated as one, and one of them is dropped. - The number of documents written within any ``max_transaction_duration`` window is modest (thousands, not millions). The connector keeps that many documents in memory for deduplication and re-reads them on each poll.
- Documents spread out over time — the time span of live data is larger than
max_transaction_duration. If every document falls within a singlemax_transaction_durationwindow (for example a one-shot bulk load whose timestamps are all close together), nothing ever settles: the deduplication set and the persisted offset grow to the size of the whole load, and the window is re-read in full on every poll.
It is not a good fit when you need to capture updates or deletes, when
timestamp_column is missing, non-numeric, or can move backwards / jump
arbitrarily, or when a single timestamp_column value can hold more documents
than fit comfortably in memory (the connector reads all documents sharing one
timestamp as a single batch).
Tuning the polling parameters
max_transaction_durationshould be the real upper bound on how late a document may become visible relative to the newest one. Too small risks missing a genuinely late document; too large only widens the re-read window — it costs memory and read traffic but is never unsafe.read_batch_sizeshould comfortably exceed the number of documents in onemax_transaction_durationwindow. Correctness does not depend on it, but a larger value lets each poll read the window in a single request.poll_intervaltrades latency against load: each in-window document is re-read on every poll until it settles, so the read amplification on Elasticsearch is roughlymax_transaction_duration / poll_interval. A longer interval reduces load at the cost of higher delivery latency.
class ElasticSearchAuth(engine_es_auth)
[source]Elasticsearch authentication object to be used in the write method.
classmethod apikey(apikey_id, apikey)
sourceConstructs API key-based Elasticsearch authorization.
- Parameters
- apikey_id – The ID of the API key.
- apikey – The API key.
- Returns
An authentication object to use for Elasticsearch authorization.
classmethod basic(username, password)
sourceConstructs basic Elasticsearch authorization using a username and password.
- Parameters
- username – The username to use for authentication.
- password – The password for the specified user.
- Returns
An authentication object to use for Elasticsearch authorization.
classmethod bearer(bearer)
sourceConstructs Elasticsearch authorization using the specified bearer token.
- Parameters
bearer – The bearer token. - Returns
An authentication object to use for Elasticsearch authorization.
read(host, auth, index_name, schema, *, timestamp_column, id_column, max_transaction_duration, mode='streaming', poll_interval=datetime.timedelta(seconds=1), read_batch_size=10000, autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)
sourceReads an index from Elasticsearch into a Pathway Live Data Framework table.
Elasticsearch exposes no change-data-capture (CDC) API, so this connector ingests incrementally by polling: it repeatedly queries the index for rows at or after a watermark and reconciles the overlap between consecutive queries so that no row is missed and no row is delivered twice. The reconciliation is driven by two columns that the index must contain:
timestamp_column— a numeric field (e.g. epoch milliseconds) recording when a row was indexed. The connector orders and watermarks rows by this value, so it must be a sortable integer field, not a date string.id_column— a unique, sortable identifier (akeywordor numeric field). It is used both to deduplicate rows seen in the overlap window and as the Pathway Live Data Framework row key.
This is an append-only mechanism: it ingests new documents, but does not observe
in-place updates or deletes (a document re-indexed under the same id_column is
treated as already-seen and is not re-delivered). It also assumes timestamp_column
is trustworthy and non-decreasing — a single document timestamped far in the future
pushes the watermark past all normal documents and silently drops everything indexed
afterwards. See the connector documentation for the full list of cases this mechanism
fits, the cases it does not, and how to tune the parameters below.
The third polling parameter, max_transaction_duration, is the largest amount of
time within which a concurrent writer may still commit a row whose
timestamp_column is older than the current watermark (i.e. the maximum clock
skew / transaction duration to tolerate). The connector keeps re-reading rows newer
than now - max_transaction_duration until they settle, guaranteeing that a
slow-to-commit row is never skipped. Set it conservatively: too small a value can
miss rows committed late, while too large a value only widens the re-read window and
costs a little extra work.
When persistence is enabled, the connector saves the watermark together with the set
of ids still inside the overlap window as its offset. On restart it resumes from that
state and delivers only the rows that arrived since the last checkpoint, deduplicating
anything that was already emitted. The name parameter is required when using
persistence so the engine can match the connector to its saved state.
- Parameters
- host (
str) – The host and port on which the Elasticsearch server works, e.g."http://localhost:9200". - auth (
ElasticSearchAuth) – Credentials for Elasticsearch authorization. - index_name (
str) – The name of the index to read from. - schema (
type[Schema]) – Schema of the resulting table. Column names must match the field names in the Elasticsearch documents. Bothtimestamp_columnandid_columnmust be present. Do not declare a primary key in the schema — the connector keys the table byid_column. - timestamp_column (
str) – Name of the numeric column recording when each row was written or last updated. Used to order and watermark the polled rows. - id_column (
str) – Name of the unique, sortable identifier column. Used to deduplicate the overlap window and as the Pathway Live Data Framework row key. - max_transaction_duration (
timedelta) – Maximum time within which a concurrent writer may still commit a row with a timestamp older than the current watermark. - mode (
Literal['static','streaming']) – If set to"streaming"(the default), the connector keeps polling for new rows. If set to"static", it reads the index once and terminates. - poll_interval (
timedelta) – How long to wait between two consecutive polls in streaming mode. - read_batch_size (
int) – Maximum number of documents fetched per query. The connector pages through the index in blocks of this size — each block becomes one minibatch — instead of pulling the whole index at once, which bounds memory on a cold start over a large index. If a singletimestamp_columnvalue holds more rows than this, the connector transparently enlarges the query until that timestamp is fully read, so the value is a throughput/latency knob, not a correctness one; it must be positive. - autocommit_duration_ms (
int|None) – The maximum time between two commits. Everyautocommit_duration_msmilliseconds, the updates received by the connector are committed and pushed into Pathway Live Data Framework’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards, and as the name for the persisted snapshot. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. Reading pauses when the limit is reached and resumes as processing of some entries completes. - debug_data (
Any) – Static data replacing the original one when debug mode is active.
- host (
- Returns
Table – The table read.
Example:
Consider an Elasticsearch instance running locally on port 9200 with an index
"logs" whose documents carry an integer ts field (epoch milliseconds) and a
unique doc_id field:
import datetime
import pathway as pw
class LogSchema(pw.Schema):
doc_id: str
ts: int
message: str
table = pw.io.elasticsearch.read(
host="http://localhost:9200",
auth=pw.io.elasticsearch.ElasticSearchAuth.basic("admin", "admin"),
index_name="logs",
schema=LogSchema,
timestamp_column="ts",
id_column="doc_id",
max_transaction_duration=datetime.timedelta(minutes=5),
)
write(table, host, auth, index_name, *, name=None, sort_by=None)
sourceWrite a table to a given index in ElasticSearch.
The rows of the table are serialized into JSON. Type conversions are the same as in the JSON output connector.
Note that two additional fields are included in the generated JSON: time, which
indicates the time of the Pathway Live Data Framework minibatch, and diff, which can be either
1 (row addition) or -1 (row deletion).
- Parameters
- table (
Table) – the table to output. - host (
str) – the host and port, on which Elasticsearch server works. - auth (
ElasticSearchAuth) – credentials for Elasticsearch authorization. - index_name (
str) – name of the index, which gets the docs. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
Consider there is an instance of Elasticsearch, running locally on a port 9200.
There we have an index "animals", containing an information about pets and their
owners.
For the sake of simplicity we will also consider that the cluster has a simple
username-password authentication having both username and password equal to "admin".
Now suppose we want to send a Pathway Live Data Framework table pets to this local instance of Elasticsearch.
import pathway as pw
pets = pw.debug.table_from_markdown('''
age | owner | pet
10 | Alice | dog
9 | Bob | cat
8 | Alice | cat
''')
It can be done as follows:
pw.io.elasticsearch.write(
table=pets,
host="http://localhost:9200",
auth=pw.io.elasticsearch.ElasticSearchAuth.basic("admin", "admin"),
index_name="animals",
)
All the updates of table "pets" will be indexed to "animals" as well.