Data Types and Schemas

In this guide, you will explore how to effectively utilize data types and schemas.

Understanding Data Types and Schemas

In Pathway, data is represented in the form of tables. The structure of each table is defined by a schema, which serves as a blueprint for the data. The schema ensures that the column types are correctly preserved, regardless of variations in the incoming data.

Typically, Pathway automatically infers the schema, but there are cases where enforcing a specific schema for input proves useful.

Here is a simple example on how to define a schema in Pathway:

import pathway as pw

class InputSchema(pw.Schema):
    colA: int
    colB: float
    colC: str

Schema Usage in Pathway

Schemas play an important role in Pathway by allowing you to declare constraints on tables via input connectors. You can declare the following attributes within a schema:

  • Columns: Select the desired columns for your table; any undeclared columns will be ignored by the connectors.
  • Columns Types: Define the data type for each column, Pathway will automatically convert the input data accordingly.
  • Primary Keys: Set primary keys to determine the indexes. If no primary keys are defined, indexes will be generated automatically.
  • Default values: Specify default values for columns, making it easier to handle missing data.

How to Define and Use a Schema

To create a schema, you need to define a class that inherits from pathway.Schema or pw.Schema, depending on your import. In the following we will use pw.Schema.
Each column is declared as an attribute of the class. The schema is then passed as a parameter to the input connector:

class InputSchema(pw.Schema):
    value: int


table = pw.io.csv.read("./input/", schema=InputSchema)

The above example defines a table with only one column named value of type int.

Defining Multiple Columns

You can declare multiple columns in a schema by simply adding them as attributes to your class:

class MyFirstTwoColumnsSchema(pw.Schema):
    colA: int
    colB: int

Typing the Columns

To assign data types to columns, simply specify the desired types for the associated attributes in your class:

class TypedSchema(pw.Schema):
    colA: int
    colB: float

With pw.Schema, you have to type the columns. If you don't know the type of the input column, you can type the column as typing.Any:

class TypedSchema(pw.Schema):
    colA: typing.Any

⚠️ While tempting, any is not a Python type but a function. Be careful to use typing.Any as using any will raise a ValueError.

Defining Primary Keys

To designate primary keys, use the column_definition function and the primary_key parameter:

class PrimarySchema(pw.Schema):
    colA: int = pw.column_definition(primary_key=True)
    colB: float

In this example, the index will be based on the colA column.

You can select multiple columns to be a part of primary key:

class MultiplePrimarySchema(pw.Schema):
    colA: int = pw.column_definition(primary_key=True)
    colB: float
    colC: str = pw.column_definition(primary_key=True)

Defining Default Values

Similar to primary keys, you can set default values using the column_definition function and the default_value parameter:

class DefaultValueSchema(pw.Schema):
    colA: int = pw.column_definition(default_value=0)
    colB: float
    colC: str = pw.column_definition(default_value="Empty")

Inline Schemas Definitions

When it may not be practical to define a class, when automating schema definitions for example, Pathway offers an alternative approaches using inline schema definition.

Schema from Dictionary

You can define a schema using a dictionary through the schema_builder function.
schema_builder takes as argument a parameter columns which is a dictionary that maps column names to column definitions created using the column_definition function. The dtype parameter of column_definition is used to specify the types, if not provided it defaults to typing.Any. Additionally, if desired, you can assign a name to the schema using the optional name parameter.

schema = pw.schema_builder(columns={
    'key': pw.column_definition(dtype=int, primary_key=True),
    'data': pw.column_definition(dtype=int, default_value=0)
    }, name="my_schema")

This resulting schema is equivalent to the following class-based schema:

class InputSchema(pw.Schema):
    key: int = pw.column_definition(primary_key=True)
    data: int = pw.column_definition(default_value=0)

With schema_builder, defining the type is optional. The default type is typing.Any:

schema = pw.schema_builder(columns={
    'key': pw.column_definition(dtype=int, primary_key=True),
    'data': pw.column_definition()
    }, name="my_schema")
table = pw.io.csv.read("./input/", schema=schema)
print(table.typehints())
{'key': <class 'int'>, 'data': <class 'typing.Any'>}

Schema from Types

For the simple cases where you only need to define types and not default values nor primary keys, you can use schema_from_types.

schema_from_types simply takes the types as field=type kwargs:

schema = pw.schema_from_types(key=int, data=int)

This resulting schema is equivalent to the following class-based schema:

class InputSchema(pw.Schema):
    key: int
    data: int

Accessing Table Types

During debugging, you may need to assess the schema of a table. You can achieve this by printing its typehints:

print(table.typehints())

This will display the data types of each column in the table. For example:

{'age': <class 'int'>, 'owner': <class 'str'>, 'pet': <class 'str'>}

You can also print the schema of a single column by using schema:

print(table.schema['age'])
<class 'int'>

⚠️ Please note that these functions are executed during the creation of the pipeline before any computation is launched by pw.run().

Type Casting an Existing Table

You can also want to cast the data of an existing table. This can be done using cast:

table = table.select(value = pw.cast(int, pw.this.value))

This will cast the values of the column value to int.

Typing a Column Created with apply

If Pathway fails to infer the correct column's type created with apply, you can enforce the resulting type with apply_with_type:

table = table.select(
    value = pw.apply_with_type(lambda x: int(x)+1, int, pw.this.value)
)

This will cast the values to integers and increment each value by one, resulting in the value column being of type int.

This is only a workaround since Pathway should be able to correctly infer your data type.

Conclusions

Mastering data types and schemas is essential for effectively managing Tables. By leveraging schemas, you can define the structure of your tables, improving the efficiency of your data pipeline. If you encounter typing issues, please contact us on discord, so we can help you.