pathway.io.http package
class pw.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)
Functions
pw.io.http.read(url, *, schema=None, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), autocommit_duration_ms=10000, debug_data=None, value_columns=None, primary_key=None, types=None, default_values=None)
Reads a table from an HTTP stream.- Parameters
- url (
str
) – the full URL of streaming endpoint to fetch data from. - schema (
Optional
[Type
[Schema
]]) – Schema of the resulting table. - method (
str
) – request method for streaming. It should be one of HTTP request methods. - payload (
Optional
[Any
]) – data to be send in the body of the request. - headers (
Optional
[Dict
[str
,str
]]) – request headers in the form of dict. Wildcards are allowed both, in keys and in values. - response_mapper (
Optional
[Callable
[[Union
[str
,bytes
]],bytes
]]) – in case a response needs to be processed, this method can be provided. It will be applied to each slice of a stream. - format (
str
) – format of the data, “json” or “raw”. In case of a “raw” format, table with single “data” column will be produced. For “json” format, bytes encoded json is expected. - delimiter (
Union
[str
,bytes
,None
]) – delimiter used to split stream into messages. - n_retries (
int
) – how many times to retry the failed request. - retry_policy (
RetryPolicy
) – policy of delays or backoffs for the retries. - connect_timeout_ms (
Optional
[int
]) – connection timeout, specified in milliseconds. In case it’s None, no restrictions on connection duration will be applied. - request_timeout_ms (
Optional
[int
]) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied. - allow_redirects (
bool
) – whether to allow redirects. - retry_codes (
Optional
[Tuple
]) – HTTP status codes that trigger retries. - content_type – content type of the data to send. In case the chosen format is JSON, it will be defaulted to “application/json”.
- autocommit_duration_ms (
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - debug_data – static data replacing original one when debug mode is active.
- value_columns (
Optional
[List
[str
]]) – columns to extract for a table. [will be deprecated soon] - primary_key (
Optional
[List
[str
]]) – in case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4. [will be deprecated soon] - types (
Optional
[Dict
[str
,PathwayType
]]) – dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. [will be deprecated soon] - default_values (
Optional
[Dict
[str
,Any
]]) – dictionary containing default values for columns replacing blank entries. The default value of the column must be specified explicitly, otherwise there will be no default value. [will be deprecated soon]
- url (
Examples:
Raw format:
import os
import pathway as pw
table = pw.io.http.read(
"https://localhost:8000/stream",
method="GET",
headers={"Authorization": f"Bearer {os.environ['BEARER_TOKEN']}"},
format="raw",
)
JSON with response mapper:
Input can be adjusted using a mapping function that will be applied to each slice of a stream. The mapping function should return bytes.
def mapper(msg: bytes) -> bytes:
result = json.loads(msg.decode())
return json.dumps({"key": result["id"], "text": result["data"]}).encode()
class InputSchema(pw.Schema):
key: int
text: str
t = pw.io.http.read(
"https://localhost:8000/stream",
method="GET",
headers={"Authorization": f"Bearer {os.environ['BEARER_TOKEN']}"},
schema=InputSchema,
response_mapper=mapper
)
pw.io.http.rest_connector(host, port, *, route='/', schema=None, autocommit_duration_ms=1500, keep_queries=False)
Runs a lightweight HTTP server and inputs a collection from the HTTP endpoint, configured by the parameters of this method.On the output, the method provides a table and a callable, which needs to accept the result table of the computation, which entries will be tracked and put into respective request’s responses.
- Parameters
- host (
str
) – TCP/IP host or a sequence of hosts for the created endpoint; - port (
int
) – port for the created endpoint; - route (
str
) – route which will be listened to by the web server; - schema (
Optional
[Type
[Schema
]]) – schema of the resulting table; - autocommit_duration_ms – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph;
- keep_queries (
bool
) – whether to keep queries after processing; defaults to False.
- host (
- Returns
table – the table read; response_writer: a callable, where the result table should be provided.
pw.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))
Sends the stream of updates from the table to the specified HTTP API.- Parameters
- table (
Table
) – table to be tracked. - method (
str
) – request method for streaming. It should be one of HTTP request methods. - url (
str
) – the full URL of the endpoint to push data into. Can contain wildcards. - format (
str
) – the payload format, one of {“json”, “custom”}. If “json” is specified, the plain JSON will be formed and sent. Otherwise, the contents of the field request_payload_template will be used. - request_payload_template (
Optional
[str
]) – the template to format and send in case “custom” was specified in the format field. Can include wildcards. - n_retries (
int
) – how many times to retry the failed request. - retry_policy (
RetryPolicy
) – policy of delays or backoffs for the retries. - connect_timeout_ms (
Optional
[int
]) – connection timeout, specified in milliseconds. In case it’s None, no restrictions on connection duration will be applied. - request_timeout_ms (
Optional
[int
]) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied. - allow_redirects (
bool
) – Whether to allow redirects. - retry_codes (
Optional
[Tuple
]) – HTTP status codes that trigger retries. - content_type (
Optional
[str
]) – content type of the data to send. In case the chosen format is JSON, it will be defaulted to “application/json”. - headers (
Optional
[Dict
[str
,str
]]) – request headers in the form of dict. Wildcards are allowed both, in keys and in values.
- table (
Wildcards:
Wildcards are the proposed way to customize the HTTP requests composed. The
engine will replace all entries of {table.<column_name>}
with a value from the
column <column_name>
in the row sent. This wildcard resolving will happen in url,
request payload template and headers.
Examples:
For the sake of demonstation, let’s try diffirent ways to send the stream of changes
on a table pets
, containing data about pets and their owners. The table contains
just two columns: the pet and the owner’s name.
import pathway as pw
pets = pw.debug.parse_to_table("owner pet \n Alice dog \n Bob cat \n Alice cat")
Consider that there is a need to send the stream of changes on such table to the external API endpoint (let’s pick some exemplary URL for the sake of demonstation).
To keep things simple, we can suppose that this API accepts flat JSON objects, which are sent in POST requests. Then, the communication can be done with a simple code snippet:
pw.io.http.write(pets, "http://www.example.com/api/event")
Now let’s do something more custom. Suppose that the API endpoint requires us to communicate via PUT method and to pass the values as CGI-parameters. In this case, wildcards are the way to go:
pw.io.http.write(
pets,
"http://www.example.com/api/event?owner={table.owner}&pet={table.pet}",
method="PUT"
)
A custom payload can also be formed from the outside. What if the endpoint requires the data in tskv format in request body?
First of all, let’s form a template for the message body:
message_template_tokens = [
"owner={table.owner}",
"pet={table.pet}",
"time={table.time}",
"diff={table.diff}",
]
message_template = "\t".join(message_template_tokens)
Now, we can use this template and the custom format, this way:
pw.io.http.write(
pets,
"http://www.example.com/api/event",
method="POST",
format="custom",
request_payload_template=message_template
)