pw.io.mssql
This module is available when using one of the following licenses only: Pathway Scale, Pathway Enterprise.
Pathway provides both Input and Output connectors for Microsoft SQL Server (MSSQL).
The Input connector supports two operating modes:
- Streaming mode (default) — uses SQL Server’s Change Data Capture (CDC) feature to track
row-level changes in real time via the transaction log. Requires SQL Server Developer or
Enterprise edition with CDC enabled at the database and table level:
-- Enable CDC on the database EXEC sys.sp_cdc_enable_db; -- Enable CDC on the table EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'<table>', @role_name = NULL; - Static mode — reads the full table once as a snapshot, then terminates.
Primary key requirement: the schema passed to the input connector must declare at least one
primary key column using pw.column_definition(primary_key=True). The connector uses these
columns to track which rows have been inserted, updated, or deleted — without them it cannot
maintain a consistent snapshot of the table.
Persistence: the input connector does not support Pathway persistence. On restart, the connector takes a fresh snapshot of the current table state and tracks only new CDC changes from that point forward. Any changes that occurred while the connector was stopped are absorbed into the new snapshot rather than delivered as discrete events. Since Pathway’s internal state is also not saved, the entire pipeline reprocesses from scratch on each restart.
The Output connector supports two operating modes:
- Stream of changes mode — appends every Pathway update as a row with
timeanddiffcolumns. - Snapshot mode — maintains the current state of the Pathway table using atomic
MERGEupserts.
The connector uses a pure Rust TDS implementation (no ODBC drivers required) and is compatible with SQL Server 2017, 2019, 2022, and Azure SQL Edge.
Type Conversion (Input Connector)
The table below describes how MS SQL Server column types are parsed into Pathway values, and what type
to declare in your pw.Schema to receive them correctly.
MS SQL Server types parsed by the input connector
| MS SQL Server type | Pathway schema type and notes |
|---|---|
BIT | bool |
TINYINT | int |
SMALLINT / INT2 | int |
INT / INT4 | int |
BIGINT / INT8 | int. Alternatively, declare as pw.Duration to interpret the value as microseconds — use this when the column was written by Pathway’s output connector, which serializes pw.Duration as a BIGINT microsecond count. |
REAL / FLOAT4 | float |
FLOAT / DOUBLE PRECISION / FLOAT8 | float |
NUMERIC / DECIMAL | float — converted via string representation. Precision loss is possible for values with more than ~15 significant digits. |
NVARCHAR / VARCHAR / CHAR / TEXT / NTEXT | str |
NVARCHAR / VARCHAR | pw.Json — the string is parsed as a JSON literal. The string form is what Pathway’s output connector produces for pw.Json columns. |
NVARCHAR / VARCHAR | pw.Pointer — the string is decoded as a Pathway pointer. The string must have been produced by Pathway’s output connector for a pw.Pointer column. |
NVARCHAR(MAX) | list / tuple — the column must contain a JSON array as produced by Pathway’s output connector. Elements are parsed recursively according to the declared inner types. |
NVARCHAR(MAX) | np.ndarray — the column must contain a JSON object with keys shape (array dimension sizes) and elements (flat list of values), as produced by Pathway’s output connector. Only int and float element types are supported. |
VARBINARY / BINARY / IMAGE | bytes |
VARBINARY(MAX) | pw.PyObjectWrapper — the binary payload is deserialized with bincode. The field must have been written by Pathway’s output connector for a pw.PyObjectWrapper column. |
UNIQUEIDENTIFIER | str — formatted as a standard hyphenated lowercase GUID string, e.g. "6f9619ff-8b86-d011-b42d-00c04fc964ff". |
XML | str — the raw XML text. |
DATETIME / DATETIME2 / SMALLDATETIME | pw.DateTimeNaive |
DATE | pw.DateTimeNaive — midnight (00:00:00) on the given date. |
DATETIMEOFFSET | pw.DateTimeUtc — the timezone offset is applied so that the result is in UTC. |
TIME | pw.Duration — microseconds elapsed since midnight. |
| Any nullable column | Declare the field as optional in the schema. It will be parsed as None if the MSSQL value is NULL; otherwise the value is parsed as type T. If the schema field is not declared as optional and a NULL is received, an error is raised. |
Type Conversion (Output Connector)
The table below describes how Pathway types are mapped to MS SQL Server column types in the output connector. All listed types can be round-tripped back via the input connector when the original schema type is specified.
Pathway types conversion into MS SQL Server
| Pathway type | MS SQL Server type and notes |
|---|---|
bool | BIT |
int | BIGINT |
float | FLOAT |
str | NVARCHAR(MAX). When the column is part of the primary key, NVARCHAR(450) is used instead (SQL Server’s maximum index key width). |
pw.Pointer | NVARCHAR(MAX). When used as primary key, NVARCHAR(450). |
bytes | VARBINARY(MAX) |
pw.Json | NVARCHAR(MAX) — the JSON value is serialized to a string. |
pw.DateTimeNaive | DATETIME2(6) — microsecond precision. |
pw.DateTimeUtc | DATETIMEOFFSET(6) — stored as UTC with a zero offset, microsecond precision. |
pw.Duration | BIGINT — serialized as microseconds. Declare the column as pw.Duration on read to restore the original value. |
pw.PyObjectWrapper | VARBINARY(MAX) — serialized with bincode. Declare the column as pw.PyObjectWrapper on read to restore the original value. |
Optional | Nullable column of the corresponding non-optional type. None values are stored as NULL. |
tuple | NVARCHAR(MAX) — serialized as a JSON array. Elements are converted recursively. Declare the column as tuple on read to restore the original value. |
list | NVARCHAR(MAX) — serialized as a JSON array. Elements are converted recursively. Declare the column as a typed list on read to restore the original value. |
np.ndarray | NVARCHAR(MAX) — serialized as a JSON object with keys shape (array dimension sizes) and elements (flat list of values). Declare the column as np.ndarray on read to restore the original value. |
read(connection_string, table_name, schema, *, mode='streaming', schema_name='dbo', autocommit_duration_ms=1500, name=None, max_backlog_size=None, debug_data=None)
sourceReads a table from a Microsoft SQL Server database.
In "static" mode, the connector issues a plain SELECT against the
table, emits all rows, and terminates. No special database configuration is
required beyond normal read access to the table. Works on any SQL Server
edition, including Express.
In "streaming" mode (the default), the connector uses MSSQL’s Change
Data Capture (CDC) feature to track changes via the transaction log. This
requires:
- SQL Server Developer or Enterprise edition
- CDC enabled on the database:
EXEC sys.sp_cdc_enable_db; - CDC enabled on the table:
EXEC sys.sp_cdc_enable_table @source_schema=N'dbo', @source_name=N'<table>', @role_name=NULL;
Primary key: the schema must declare at least one primary key column via
pw.column_definition(primary_key=True). The connector uses these columns
to track which rows have been inserted, updated, or deleted — without them it
cannot maintain a consistent snapshot of the table.
Persistence: this connector does not support Pathway persistence. On restart, the connector takes a fresh snapshot of the current table state and tracks only new CDC changes from that point forward. Any changes that occurred while the connector was stopped are absorbed into the new snapshot rather than delivered as discrete events. Since Pathway’s internal state is also not saved, the entire pipeline reprocesses from scratch on each restart.
The connection uses the TDS protocol via a pure Rust implementation (no ODBC drivers required), so it works on any Linux environment without additional system dependencies. Compatible with SQL Server 2017, 2019, 2022, and Azure SQL Edge.
- Parameters
- connection_string (
str) – ADO.NET-style connection string for the MSSQL database. Example:"Server=tcp:localhost,1433;Database=mydb;User Id=sa;Password=pass;TrustServerCertificate=true" - table_name (
str) – Name of the table to read from. - schema (
type[Schema]) – Schema of the resulting table. - mode (
Literal['static','streaming']) –"streaming"(the default) uses CDC for real-time change tracking via the transaction log; requires CDC to be enabled on the database and table."static"reads the full table once as a snapshot, then terminates; no CDC setup is needed. - schema_name (
str|None) – Name of the database schema containing the table. Defaults to"dbo", which is the default schema in MSSQL. - autocommit_duration_ms (
int|None) – The maximum time between two commits. Every autocommit_duration_ms milliseconds, the updates received by the connector are committed and pushed into Pathway’s computation graph. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - max_backlog_size (
int|None) – Limit on the number of entries read from the input source and kept in processing at any moment. - debug_data (
Any) – Static data to use instead of the external source (for testing).
- connection_string (
- Returns
Table – The table read.
Example:
To test this connector locally, you can run a MSSQL instance using Docker:
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=YourStrong!Passw0rd' \
-p 1433:1433 mcr.microsoft.com/mssql/server:2022-latest
For static snapshot mode (no CDC setup required):
import pathway as pw
class MySchema(pw.Schema):
id: int = pw.column_definition(primary_key=True)
name: str
value: float
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
mode="static",
)
For streaming mode with CDC, first enable CDC on the database and the table
(run these once in SQL Server Management Studio or via sqlcmd):
-- Enable CDC on the database
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on the table
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'my_table',
@role_name = NULL;
Then read from it using streaming mode:
table = pw.io.mssql.read(
connection_string="Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="my_table",
schema=MySchema,
)
write(table, connection_string, table_name, *, schema_name='dbo', max_batch_size=None, init_mode='default', output_table_type='stream_of_changes', primary_key=None, name=None, sort_by=None)
sourceWrites table to a Microsoft SQL Server table.
The connector works in two modes: snapshot mode and stream of changes.
In snapshot mode, the table maintains the current snapshot of the data
using MSSQL’s MERGE statement for atomic upserts.
In stream of changes mode, the table contains the log of all data updates
with time and diff columns.
Compatible with all MSSQL versions on Linux (SQL Server 2017, 2019, 2022, and Azure SQL Edge). Uses pure Rust TDS implementation — no ODBC drivers required.
- Parameters
- table (
Table) – Table to be written. - connection_string (
str) – ADO.NET-style connection string for the MSSQL database. Example:"Server=tcp:localhost,1433;Database=mydb;User Id=sa;Password=pass;TrustServerCertificate=true" - table_name (
str) – Name of the target table. - schema_name (
str|None) – Name of the database schema containing the table. Defaults to"dbo", which is the default schema in MSSQL. - max_batch_size (
int|None) – Maximum number of entries allowed to be committed within a single transaction. - init_mode (
Literal['default','create_if_not_exists','replace']) –"default": The default initialization mode;"create_if_not_exists": creates the table if it does not exist;"replace": drops and recreates the table. - output_table_type (
Literal['stream_of_changes','snapshot']) – Defines how the output table manages its data. If set to"stream_of_changes"(the default), the system outputs a stream of modifications withtimeanddiffcolumns. If set to"snapshot", the table maintains the current state using atomic MERGE upserts. - primary_key (
list[ColumnReference] |None) – When using snapshot mode, one or more columns that form the primary key in the target MSSQL table. - name (
str|None) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional[Iterable[ColumnReference]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
- Returns
None
Example:
To test this connector locally, run a MSSQL instance using Docker:
docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=YourStrong!Passw0rd' \
-p 1433:1433 mcr.microsoft.com/mssql/server:2022-latest
Then write to it:
import pathway as pw
table = pw.debug.table_from_markdown('''
key | value
1 | Hello
2 | World
''')
Stream of changes mode:
pw.io.mssql.write(
table,
"Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="test",
init_mode="create_if_not_exists",
)
Snapshot mode:
pw.io.mssql.write(
table,
"Server=tcp:localhost,1433;Database=testdb;"
"User Id=sa;Password=YourStrong!Passw0rd;TrustServerCertificate=true",
table_name="test_snapshot",
init_mode="create_if_not_exists",
output_table_type="snapshot",
primary_key=[table.key],
)
You can run this pipeline with pw.run().