pw.io.python

class pw.io.python.ConnectorSubject

[source]
An abstract class allowing to create custom python connectors.

Custom python connector can be created by extending this class and implementing run() function responsible for filling the buffer with data. This function will be started by pathway engine in a separate thread.

In order to send a message one of the methods next_json(), next_str(), next_bytes() can be used.

If the subject won’t delete records, set the class property deletions_enabled to False as it may help to improve the performance.

Example:

import pathway as pw
from pathway.io.python import ConnectorSubject

class MySchema(pw.Schema):
    a: int
    b: str


class MySubject(ConnectorSubject):
    def run(self) -> None:
        for i in range(4):
            self.next_json({"a": i, "b": f"x{i}"})
    @property
    def _deletions_enabled(self) -> bool:
        return False


s = MySubject()

table = pw.io.python.read(s, schema=MySchema)
pw.debug.compute_and_print(table, include_id=False)
a | b
0 | x0
1 | x1
2 | x2
3 | x3

close()

sourceSends a sentinel message.

Should be called to indicate that no new messages will be sent.

commit()

sourceSends a commit message.

end()

sourceJoins a thread running run().

Should not be called directly.

next_bytes(message)

sourceSends a message.

  • Parameters
    message (bytes) – bytes encoded json string.

next_json(message)

sourceSends a message.

  • Parameters
    message (dict) – Dict representing json.

next_str(message)

sourceSends a message.

  • Parameters
    message (str) – json string.

on_stop()

sourceCalled after the end of the run() function.

start()

sourceRuns a separate thread with function feeding data into buffer.

Should not be called directly.

class pw.io.python.InteractiveCsvPlayer(csv_file='')

[source]

close()

sourceSends a sentinel message.

Should be called to indicate that no new messages will be sent.

commit()

sourceSends a commit message.

end()

sourceJoins a thread running run().

Should not be called directly.

next_bytes(message)

sourceSends a message.

  • Parameters
    message (bytes) – bytes encoded json string.

next_json(message)

sourceSends a message.

  • Parameters
    message (dict) – Dict representing json.

next_str(message)

sourceSends a message.

  • Parameters
    message (str) – json string.

on_stop()

sourceCalled after the end of the run() function.

start()

sourceRuns a separate thread with function feeding data into buffer.

Should not be called directly.

pw.io.python.read(subject, *, schema=None, format='json', autocommit_duration_ms=1500, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None, persistent_id=None)

sourceReads a table from a ConnectorSubject.

  • Parameters
    • subject (ConnectorSubject) – An instance of a ConnectorSubject.
    • schema (type[Schema] | None) – Schema of the resulting table.
    • format (str) – Format of the data produced by a subject, “json”, “raw” or “binary”. In case of a “raw” format, table with single “data” column will be produced.
    • debug_data – Static data replacing original one when debug mode is active.
    • autocommit_duration_ms (int | None) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph
    • value_columns (list[str] | None) – Columns to extract for a table. [will be deprecated soon]
    • primary_key (list[str] | None) – In case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated randomly. [will be deprecated soon]
    • types (dict[str, PathwayType] | None) – Dictionary containing the mapping between the columns and the data types (pw.Type) of the values of those columns. This parameter is optional, and if not provided the default type is pw.Type.ANY. [will be deprecated soon]
    • default_values (dict[str, Any] | None) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon]
    • persistent_id (str | None) – (unstable) An identifier, under which the state of the table will be persisted or None, if there is no need to persist the state of this table. When a program restarts, it restores the state for all input tables according to what was saved for their persistent_id. This way it’s possible to configure the start of computations from the moment they were terminated last time.
  • Returns
    Table – The table read.