pathway.io.http package
class pathway.io.http.RetryPolicy(first_delay_ms, backoff_factor, jitter_ms)
Class representing policy of delays or backoffs for the retries.
pathway.io.http.read(url, *, method='GET', payload=None, headers=None, response_mapper=None, format='json', delimiter=None, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504), value_columns=None, primary_key=None, types=None, autocommit_duration_ms=10000, debug_data=None)
Reads a table from an HTTP stream.
- Parameters
- url (
str
) – the full URL of streaming endpoint to fetch data from. - method (
str
) – request method for streaming. It should be one of HTTP request methods. - payload (
Optional
Any
) – data to be send in the body of the request. - headers (
Optional
Dict
str
,str
) – request headers in the form of dict. Wildcards are allowed both, in keys and in values. - response_mapper (
Optional
Callable
[[str
|bytes
],bytes
]) – in case a response needs to be processed, this method can be provided. It will be applied to each slice of a stream. - format (
str
) – format of the data, “json” or “raw”. In case of a “raw” format, table with single “data” column will be produced. For “json” format, bytes encoded json is expected. - delimiter (
Union
str
,bytes
,None
) – delimiter used to split stream into messages. - n_retries (
int
) – how many times to retry the failed request. - retry_policy (
RetryPolicy
) – policy of delays or backoffs for the retries. - connect_timeout_ms (
Optional
int
) – connection timeout, specified in milliseconds. In case it’s None, no restrictions on connection duration will be applied. - request_timeout_ms (
Optional
int
) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied. - allow_redirects (
bool
) – whether to allow redirects. - retry_codes (
Optional
Tuple
) – HTTP status codes that trigger retries. - content_type – content type of the data to send. In case the chosen format is JSON, it will be defaulted to “application/json”.
- value_columns (
Optional
List
str
) – columns to extract for a table. - primary_key (
Optional
List
str
) – in case the table should have a primary key generated according to a subset of its columns, the set of columns should be specified in this field. Otherwise, the primary key will be generated as uuid4. - types (
Optional
Dict
str
,PathwayType
) – dictionary containing the mapping between the columns and the data types (pw.Type
) of the values of those columns. This parameter is optional, and if not provided the default type ispw.Type.ANY
. - autocommit_duration_ms (
int
) – the maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - debug_data – static data replacing original one when debug mode is active.
- url (
Examples:
Raw format:
import pathway as pw table = pw.io.http.read( "https://localhost:8000/stream", method="GET", headers={"Authorization": f"Bearer {BEARER_TOKEN}"}, format="raw", )
JSON with response mapper:
Input can be adjusted using a mapping function that will be applied to each slice of a stream. The mapping function should return bytes.
def mapper(msg: bytes) -> bytes: result = json.loads(msg.decode()) return json.dumps({"key": result["id"], "text": result["data"]}).encode()
t = pw.io.http.read( "https://localhost:8000/stream", method="GET", headers={"Authorization": f"Bearer {BEARER_TOKEN}"}, value_columns=["key", "text"], response_mapper=mapper )
pathway.io.http.write(table, url, *, method='POST', format='json', request_payload_template=None, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None, content_type=None, headers=None, allow_redirects=True, retry_codes=(429, 500, 502, 503, 504))
Sends the stream of updates from the table to the specified HTTP API.
- Parameters
- table (
Table
) – table to be tracked. - method (
str
) – request method for streaming. It should be one of HTTP request methods. - url (
str
) – the full URL of the endpoint to push data into. Can contain wildcards. - format (
str
) – the payload format, one of {“json”, “custom”}. If “json” is specified, the plain JSON will be formed and sent. Otherwise, the contents of the field request_payload_template will be used. - request_payload_template (
Optional
str
) – the template to format and send in case “custom” was specified in the format field. Can include wildcards. - n_retries (
int
) – how many times to retry the failed request. - retry_policy (
RetryPolicy
) – policy of delays or backoffs for the retries. - connect_timeout_ms (
Optional
int
) – connection timeout, specified in milliseconds. In case it’s None, no restrictions on connection duration will be applied. - request_timeout_ms (
Optional
int
) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied. - allow_redirects (
bool
) – Whether to allow redirects. - retry_codes (
Optional
Tuple
) – HTTP status codes that trigger retries. - content_type (
Optional
str
) – content type of the data to send. In case the chosen format is JSON, it will be defaulted to “application/json”. - headers (
Optional
Dict
str
,str
) – request headers in the form of dict. Wildcards are allowed both, in keys and in values.
- table (
Wildcards:
Wildcards are the proposed way to customize the HTTP requests composed. The
engine will replace all entries of {table.<column_name>}
with a value from the
column <column_name>
in the row sent. This wildcard resolving will happen in url,
request payload template and headers.
Examples:
For the sake of demonstation, let’s try diffirent ways to send the stream of changes
on a table pets
, containing data about pets and their owners. The table contains
just two columns: the pet and the owner’s name.
import pathway as pw pw.debug.compute_and_print(t, include_id=False)
owner petAlice cat Bob dog
Consider that there is a need to send the stream of changes on such table to the external API endpoint (let’s pick some exemplary URL for the sake of demonstation).
To keep things simple, we can suppose that this API accepts flat JSON objects, which are sent in POST requests. Then, the communication can be done with a simple code snippet:
t = pw.io.http.write(pets, "http://www.example.com/api/event")
Now let’s do something more custom. Suppose that the API endpoint requires us to communicate via PUT method and to pass the values as CGI-parameters. In this case, wildcards are the way to go:
pw.io.http.write( pets, "http://www.example.com/api/event?owner={table.owner}&pet={table.pet}", method="PUT" )
A custom payload can also be formed from the outside. What if the endpoint requires the data in tskv format in request body?
First of all, let’s form a template for the message body:
message_template_tokens = [ "owner={table.owner}", "pet={table.pet}", "time={table.time}", "diff={table.diff}", ] message_template = "\t".join(message_template_tokens)
Now, we can use this template and the custom format, this way:
pw.io.http.write( pets, "http://www.example.com/api/event", method="POST", format="custom", request_payload_template=message_template )