pw.io.mssql
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
Pathway provides both Input and Output connectors for Microsoft SQL Server (MSSQL).
The Input connector supports two operating modes:
- Streaming mode (default) — uses SQL Server’s Change Data Capture (CDC) feature to track
row-level changes in real time via the transaction log. Requires SQL Server Developer or
Enterprise edition with CDC enabled at the database and table level:
-- Enable CDC on the database EXEC sys.sp_cdc_enable_db; -- Enable CDC on the table EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'<table>', @role_name = NULL; - Static mode — reads the full table once as a snapshot, then terminates.
Primary key requirement: the schema passed to the input connector must declare at least one
primary key column using pw.column_definition(primary_key=True). The connector uses these
columns to track which rows have been inserted, updated, or deleted — without them it cannot
maintain a consistent snapshot of the table.
Persistence: the input connector does not support Pathway persistence. On restart, the connector takes a fresh snapshot of the current table state and tracks only new CDC changes from that point forward. Any changes that occurred while the connector was stopped are absorbed into the new snapshot rather than delivered as discrete events. Since Pathway’s internal state is also not saved, the entire pipeline reprocesses from scratch on each restart.
The Output connector supports two operating modes:
- Stream of changes mode — appends every Pathway update as a row with
timeanddiffcolumns. - Snapshot mode — maintains the current state of the Pathway table using atomic
MERGEupserts.
The connector uses a pure Rust TDS implementation (no ODBC drivers required) and is compatible with SQL Server 2017, 2019, 2022, and Azure SQL Edge.
Type Conversion (Input Connector)
The table below describes how MS SQL Server column types are parsed into Pathway values, and what type
to declare in your pw.Schema to receive them correctly.
MS SQL Server types parsed by the input connector
| MS SQL Server type | Pathway schema type and notes |
|---|---|
BIT | bool |
TINYINT | int |
SMALLINT / INT2 | int |
INT / INT4 | int |
BIGINT / INT8 | int. Alternatively, declare as pw.Duration to interpret the value as microseconds — use this when the column was written by Pathway’s output connector, which serializes pw.Duration as a BIGINT microsecond count. |
REAL / FLOAT4 | float |
FLOAT / DOUBLE PRECISION / FLOAT8 | float |
NUMERIC / DECIMAL | float — converted via string representation. Precision loss is possible for values with more than ~15 significant digits. |
NVARCHAR / VARCHAR / CHAR / TEXT / NTEXT | str |
NVARCHAR / VARCHAR | pw.Json — the string is parsed as a JSON literal. The string form is what Pathway’s output connector produces for pw.Json columns. |
NVARCHAR / VARCHAR | pw.Pointer — the string is decoded as a Pathway pointer. The string must have been produced by Pathway’s output connector for a pw.Pointer column. |
NVARCHAR(MAX) | list / tuple — the column must contain a JSON array as produced by Pathway’s output connector. Elements are parsed recursively according to the declared inner types. |
NVARCHAR(MAX) | np.ndarray — the column must contain a JSON object with keys shape (array dimension sizes) and elements (flat list of values), as produced by Pathway’s output connector. Only int and float element types are supported. |
VARBINARY / BINARY / IMAGE | bytes |
VARBINARY(MAX) | pw.PyObjectWrapper — the binary payload is deserialized with bincode. The field must have been written by Pathway’s output connector for a pw.PyObjectWrapper column. |
UNIQUEIDENTIFIER | str — formatted as a standard hyphenated lowercase GUID string, e.g. "6f9619ff-8b86-d011-b42d-00c04fc964ff". |
XML | str — the raw XML text. |
DATETIME / DATETIME2 / SMALLDATETIME | pw.DateTimeNaive |
DATE | pw.DateTimeNaive — midnight (00:00:00) on the given date. |
DATETIMEOFFSET | pw.DateTimeUtc — the timezone offset is applied so that the result is in UTC. |
TIME | pw.Duration — microseconds elapsed since midnight. |
| Any nullable column | Declare the field as optional in the schema. It will be parsed as None if the MSSQL value is NULL; otherwise the value is parsed as type T. If the schema field is not declared as optional and a NULL is received, an error is raised. |
Type Conversion (Output Connector)
The table below describes how Pathway types are mapped to MS SQL Server column types in the output connector. All listed types can be round-tripped back via the input connector when the original schema type is specified.
Pathway types conversion into MS SQL Server
| Pathway type | MS SQL Server type and notes |
|---|---|
bool | BIT |
int | BIGINT |
float | FLOAT |
str | NVARCHAR(MAX). When the column is part of the primary key, NVARCHAR(450) is used instead (SQL Server’s maximum index key width). |
pw.Pointer | NVARCHAR(MAX). When used as primary key, NVARCHAR(450). |
bytes | VARBINARY(MAX) |
pw.Json | NVARCHAR(MAX) — the JSON value is serialized to a string. |
pw.DateTimeNaive | DATETIME2(6) — microsecond precision. |
pw.DateTimeUtc | DATETIMEOFFSET(6) — stored as UTC with a zero offset, microsecond precision. |
pw.Duration | BIGINT — serialized as microseconds. Declare the column as pw.Duration on read to restore the original value. |
pw.PyObjectWrapper | VARBINARY(MAX) — serialized with bincode. Declare the column as pw.PyObjectWrapper on read to restore the original value. |
Optional | Nullable column of the corresponding non-optional type. None values are stored as NULL. |
tuple | NVARCHAR(MAX) — serialized as a JSON array. Elements are converted recursively. Declare the column as tuple on read to restore the original value. |
list | NVARCHAR(MAX) — serialized as a JSON array. Elements are converted recursively. Declare the column as a typed list on read to restore the original value. |
np.ndarray | NVARCHAR(MAX) — serialized as a JSON object with keys shape (array dimension sizes) and elements (flat list of values). Declare the column as np.ndarray on read to restore the original value. |
read(connection_string, table_name, schema, *, mode='streaming', schema_name='dbo', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)
sourceReads a table from a Microsoft SQL Server database.
In "static" mode, the connector issues a plain SELECT against the
table, emits all rows, and terminates. No special database configuration is
required beyond normal read access to the table. Works on any SQL Server
edition, including Express.
In "streaming" mode (the default), the connector uses MSSQL’s Change
Data Capture (CDC) feature to track changes via the transaction log. This
requires:
- SQL Server Developer or Enterprise edition
- CDC enabled on the database:
EXEC sys.sp_cdc_enable_db; - CDC enabled on the table:
EXEC sys.sp_cdc_enable_table @source_schema=N'dbo', @source_name=N'<table>', @role_name=NULL;
Primary key: the schema must declare at least one primary key column via
pw.column_definition(primary_key=True). The connector uses these columns
to track which rows have been inserted, updated, or deleted — without them it
cannot maintain a consistent snapshot of the table.
Persistence: when persistence is enabled, the connector saves the CDC
Log Sequence Number (LSN) of the last processed change as its offset. On
restart it skips the full table snapshot and resumes from that LSN, so
downstream sees only the rows that changed since the last checkpoint — no
re-delivery of the original table contents. Passing an explicit name
is optional — Pathway will auto-generate one if omitted — but setting it
makes the saved state easier to identify in the persistence directory and
protects against accidental mismatches when the pipeline graph changes
between runs.
Persistence applies to both modes. In "streaming" mode the connector
keeps running after the catch-up and continues delivering live CDC
events. In "static" mode it emits the delta accumulated since the
previous run and terminates.
Persistence requires CDC on the target table — the LSN comes from CDC.
If you pass a persistence_config to pw.run but CDC has not been
enabled on the table, the pipeline aborts at startup with an error
pointing you at sp_cdc_enable_table; it does not silently fall back
to re-reading the whole table on every restart.
If the saved LSN predates the capture instance’s current retention window (SQL Server’s CDC cleanup job runs independently of any consumer and drops changes older than the configured retention, 4320 minutes by default), the connector raises an error on startup asking you to clear the persistence directory and re-snapshot. Pick a retention long enough to cover your longest expected downtime.
The connection uses the TDS protocol via a pure Rust implementation (no ODBC drivers required), so it works on any Linux environment without additional system dependencies. Compatible with SQL Server 2017, 2019, 2022, and Azure SQL Edge.
- Parameters
- connection_string (
str) – ADO.NET-style connection string for the MSSQL database. Example:"Server=tcp:localhost,1433;Database=mydb;User Id=sa;Password=pass;TrustServerCertificate=true" - table_name (
str) – Name of the table to read from. - schema (
type[Schema]) – Schema of the resulting table. - mode (
Literal['static','streaming']) –"streaming"(the default) uses CDC for real-time change tracking via the transaction log; requires CDC to be enabled on the database and table."static"reads the full table once as a snapshot, then terminates; no CDC setup is needed. - schema_name (
str|None) – Name of the database schema containing the table. Defaults to"dbo", which is the default schema in MSSQL. - autocommit_duration_ms (
int|None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. - debug_data (
Any) – Static data to use instead of the external source (for testing).
- connection_string (
- Returns
Table – The table read.
Example:
To test this connector locally, you can run a MSSQL instance using Docker:
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=YourStrong!Passw0rd' \
-p 1433:1433 mcr.microsoft.com/mssql/server:2022-latest
For static snapshot mode (no CDC setup required):
import pathway as pw
class MySchema(pw.Schema):
id: int = pw.column_definition(primary_key=True)
name: str
value: float
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
mode="static",
)
For streaming mode with CDC, first enable CDC on the database and the table
(run these once in SQL Server Management Studio or via sqlcmd):
-- Enable CDC on the database
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on the table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'my_table',
@role_name = NULL;
Then read from it using streaming mode:
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
)
Persistence. Pass a persistence_config to pw.run. CDC must
be enabled on the table — without it the pipeline aborts at startup with
a clear error. Persistence works the same way in both modes, the only
difference is what the pipeline does once the delta is consumed:
persistence_config = pw.persistence.Config(
backend=pw.persistence.Backend.filesystem("./PStorage")
)
Streaming mode (the default). The first run delivers the initial snapshot and then keeps running to push live CDC events; every subsequent run skips the snapshot and starts with the delta since the previous checkpoint before continuing to stream:
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
)
pw.io.jsonlines.write(table, "output.jsonl")
pw.run(persistence_config=persistence_config)
Static mode. The first run dumps the full table and terminates; every subsequent run emits only the CDC delta accumulated since the previous run and terminates — handy for scheduled batch pipelines that want change-set semantics without a long-lived process:
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
mode="static",
)
pw.io.jsonlines.write(table, "output.jsonl")
pw.run(persistence_config=persistence_config)
write(table, connection_string, table_name, *, schema_name='dbo', max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)
sourceWrites table to a Microsoft SQL Server table.
The connector works in two modes: snapshot mode and stream of changes.
In snapshot mode, the table maintains the current snapshot of the data
using MSSQL’s MERGE statement for atomic upserts.
In stream of changes mode, the table contains the log of all data updates
with time and diff columns.
Compatible with all MSSQL versions on Linux (SQL Server 2017, 2019, 2022, and Azure SQL Edge). Uses pure Rust TDS implementation — no ODBC drivers required.
- Parameters
- table (
Table) – Table to be written. - connection_string (
str) – ADO.NET-style connection string for the MSSQL database. Example:"Server=tcp:localhost,1433;Database=mydb;User Id=sa;Password=pass;TrustServerCertificate=true" - table_name (
str) – Name of the target table. - schema_name (
str|None) – Name of the database schema containing the table. Defaults to"dbo", which is the default schema in MSSQL. - max_batch_size (
int|None) – Maximum number of entries allowed to be committed within a single transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) –"default": The default initialization mode;"create_if_not_exists": creates the table if it does not exist;"replace": drops and recreates the table. - output_table_type (
Literal['stream_of_changes','snapshot']) – Defines how the output table manages its data. If set to"stream_of_changes"(the default), the system outputs a stream of modifications withtimeanddiffcolumns. If set to"snapshot", the table maintains the current state using atomic MERGE upserts. - primary_key (
list[ColumnReference] |None) – When using snapshot mode, one or more columns that form the primary key in the target MSSQL table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
To test this connector locally, run a MSSQL instance using Docker:
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=YourStrong!Passw0rd' \
-p 1433:1433 mcr.microsoft.com/mssql/server:2022-latest
Then write to it:
import pathway as pw
table = pw.debug.table_from_markdown('''
key | value
1 | Hello
2 | World
''')
Stream of changes mode:
pw.io.mssql.write(
table,
"Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="test",
init_mode="create_if_not_exists",
)
Snapshot mode:
pw.io.mssql.write(
table,
"Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="test_snapshot",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[table.key],
)
You can run this pipeline with pw.run().