Pathway MCP Server

Introduction

The Model Context Protocol (MCP) is designed to standardize the way applications interact with large language models (LLMs). It serves as a bridge, much like a universal connector, enabling seamless integration between AI models and various data sources and tools. This protocol facilitates the creation of sophisticated AI workflows and agents, enhancing the capabilities of LLMs by connecting them with real-world data and functionalities.

Pathway provides its own MCP Server to allow you to deliver real-time statistics and document indexing for your agentic applications. In this article, you will learn how to set up Pathway MCP Server.

MCP Server

An MCP server functions as a crucial intermediary that connects AI applications to a wide array of data sources and tools. It allows AI models to access and process real-time data, perform actions, and utilize contextual information from various applications seamlessly. The key benefits of using an MCP server include:

  • Pre-built Integrations: Access to a wide range of integrations for popular tools and platforms, simplifying the setup process.
  • Custom Integrations: The ability to build and integrate custom tools and data sources, tailored to specific needs and workflows.
  • Open Protocol: A freely implementable and usable protocol, ensuring flexibility and broad compatibility.
  • Portability: The capability to switch between different applications while retaining context, enhancing adaptability.

MCP Client

The MCP Client connects AI applications to MCP servers, allowing them to access various data sources and tools. This connection is important for improving AI capabilities, as it allows access to databases, document stores, and real-time statistics. The MCP Client is customizable, so developers can adjust its functions to meet specific needs and add custom integrations for unique tools and data sources using different MCP servers at once.

MCP Clients are used in AI applications like chatbots and data analysis tools to access data from various sources. They help these applications provide accurate and timely responses by connecting to databases and document stores, improving decision-making and user interactions.

Pathway MCP Server

Pathway's MCP Server provides the real-time data processing capabilities of the Pathway engine to your AI applications. By integrating with Pathway's extensive data connectors and processing framework, the MCP Server enables AI models to access real-time statistics and document stores efficiently. This integration is essential for applications that require up-to-date information and context-aware responses.

Key Features

  • Real-Time Statistics: Pathway's MCP Server can provide real-time statistics to LLMs, enabling them to make informed, data-driven decisions based on the latest information.
  • Document Store for RAG: The server offers a real-time index for retrieval-augmented generation, enhancing the ability of LLMs to retrieve and utilize relevant documents and data effectively.

How it works

Let's start with a working example, exposing a get_constant_value tool returning the value 1:

import pathway as pw
from pathway.xpacks.llm.mcp_server import McpServable, McpServer, PathwayMcp


# no argument required
class EmptyRequestSchema(pw.Schema):
    pass


class ConstantValueTool(McpServable):

    def get_constant_value(self, input_from_client: pw.Table) -> pw.Table:
        """
        Return a constant value.
        """

        return input_from_client.select(result=1)

    def register_mcp(self, server: McpServer):
        server.tool(
            "get_constant_value",
            request_handler=self.get_constant_value,
            schema=EmptyRequestSchema,
        )

function_to_serve = ConstantValueTool()

pathway_mcp_server = PathwayMcp(
    name="Streamable MCP Server",
    transport="streamable-http",
    host="localhost",
    port=8123,
    serve=[function_to_serve],
)

pw.run()

McpServable

To run an MCP server, you first need an operation to expose: the tool that the MCP client will call. It is done in Pathway by using instance of a class inheriting the McpServable class. It requires three things:

  • A schema to impose the required input for this tool. Let's call it EmptyRequestSchema for simplicity (the name does not matter).
  • The function called whenever the tool is called, the actual operation the MCP server will expose.
  • The register_mcp function that will expose the function in the MCP server.

To be successfully exposed, the function must follow the following constraints. It must have two parameters: self and a Pathway table, let's called it input_from_client (it can be named the way you want). The table input_from_client is a Pathway table following the EmptyRequestSchema schema and contains a single row. Each input parameter given in the client is inside the associated column. You must use this table to compute the output you want to return to the MCP client. To be valid, the function should return a table with a result column and a single row, whose ID should be the same as the input table.

You can then expose the function by implementing the register_mcp function which takes two arguments, self and a McpServer. You need then to expose the tool using the tool method of the server, with the three arguments:

  • The name your tool will have in the MCP Server.
  • request_handler: the function you created.
  • schema: the schema of the input from the MCP client.

PathwayMcp

Once your tool is defined, you can easily set up a Pathway MCP server using the PathwayMcp class:

pathway_mcp_server = PathwayMcp(
    name="Your Pathway MCP Server",
    transport="streamable-http",
    host="localhost",
    port=8123,
    serve=[function_to_serve],
)

The Pathway MCP server requires the following arguments:

  • name: The name of your MCP server, it'll be used by the MCP client to identify your MCP server.
  • transport: How to connect to the MCP server, only streamable-http is available for now.
  • host: the host of the MCP server.
  • port: the port of the MCP server.
  • serve: The list of McpServable objects to expose in your MCP server.

Examples

MCP client

To test your examples, you need an MCP client which will connect to your MCP server. You can use the fastmcp package to define a client as follows:

import asyncio
from fastmcp import Client

PATHWAY_MCP_URL = "http://localhost:8123/mcp/"

client = Client(PATHWAY_MCP_URL)

async def main():
    async with client:
        tools = await client.list_tools()
        print(tools)

    async with client:
        result = await client.call_tool(name="get_constant_value", arguments={})
        print(result)

asyncio.run(main())

You can list the different tools available in the MCP server using the list_tools of the client. To access a given tool, you can use the method call_tool, with the name and arguments parameters. The arguments should be a dict of the different values: in this case, the get_constant_value tool does not require any input so an empty dictionary is passed.

Addition

You can easily act on the parameters given by the client. Let's see how to make an addition.

First, you need to impose the client to send two integers, x and y. This is done using the schema:

class AddRequestSchema(pw.Schema):
    x: int
    y: int

You can now implement the class to do an addition:

class AddTool(McpServable):
    def add(self, x_y_values: pw.Table) -> pw.Table:
        """
        Return a table containing the sum of the parameters x and y.
        """

        results = x_y_values.select(result=pw.this.x + pw.this.y)
        return results

    def register_mcp(self, server: McpServer):
        server.tool(
            "add",
            request_handler=self.add,
            schema=AddRequestSchema,
        )

function_to_serve = AddTool()

The rest is unchanged. You can see below how to add several tools to the same MCP server.

You can use the tool in the MCP client by sending a dictionary having both x and y as entries:

    async with client:
        result = await client.call_tool(name="add", arguments={"x":4, "y":6})
        print(result)

Exposing several tools

You can easily expose multiple tools in the MCP server, you simply need to define each tool, and add them to the list to expose:

import pathway as pw
from pathway.xpacks.llm.mcp_server import McpServable, McpServer, PathwayMcp


# no argument required
class EmptyRequestSchema(pw.Schema):
    pass

class AddRequestSchema(pw.Schema):
    x: int
    y: int


class ConstantValueTool(McpServable):

    def get_constant_value(self, input_from_client: pw.Table) -> pw.Table:
        """
        Return a constant value.
        """

        return input_from_client.select(result=1)

    def register_mcp(self, server: McpServer):
        server.tool(
            "get_constant_value",
            request_handler=self.get_constant_value,
            schema=EmptyRequestSchema,
        )


class AddTool(McpServable):
    def add(self, x_y_values: pw.Table) -> pw.Table:
        """
        Return a table containing the sum of the parameters x and y.
        """

        results = x_y_values.select(result=pw.this.x + pw.this.y)
        return results

    def register_mcp(self, server: McpServer):
        server.tool(
            "add",
            request_handler=self.add,
            schema=AddRequestSchema,
        )


constant_tool = ConstantValueTool()
add_tool = AddTool()

pathway_mcp_server = PathwayMcp(
    name="Streamable MCP Server",
    transport="streamable-http",
    host="localhost",
    port=8123,
    serve=[constant_tool, add_tool],
)

pw.run()

The resulting MCP server will have both the get_constant_value tool and the add tool.

Alternatively, you can add several tools in a single class. You simply need to expose each tool to the server in register_mcp:

import pathway as pw
from pathway.xpacks.llm.mcp_server import McpServable, McpServer, PathwayMcp


class EmptyRequestSchema(pw.Schema):
    pass

class AddRequestSchema(pw.Schema):
    x: int
    y: int


class BasicTools(McpServable):

    def get_constant_value(self, input_from_client: pw.Table) -> pw.Table:
        """
        Return a constant value.
        """

        return input_from_client.select(result=1)

    def add(self, x_y_values: pw.Table) -> pw.Table:
        """
        Return a table containing the sum of the parameters x and y.
        """

        results = x_y_values.select(result=pw.this.x + pw.this.y)
        return results

    def register_mcp(self, server: McpServer):
        server.tool(
            "get_constant_value",
            request_handler=self.get_constant_value,
            schema=EmptyRequestSchema,
        )
        server.tool(
            "add",
            request_handler=self.add,
            schema=AddRequestSchema,
        )


basic_tools = BasicTools()

pathway_mcp_server = PathwayMcp(
    name="Streamable MCP Server",
    transport="streamable-http",
    host="localhost",
    port=8123,
    serve=[basic_tools],
)

pw.run()

Both ways have the same results: get_constant_values and add are exposed by the MCP server. You can see both tools when calling the list_tool of the MCP client:

    async with client:
        tools = await client.list_tools()
        print(tools)

Then you can run each of them:

    async with client:
        result = await client.call_tool(name="get_constant_value", arguments={})
        print(result)
        result = await client.call_tool(name="add", arguments={"x":4, "y":6})
        print(result)

Count

Until now, you have been working using constant values or values given by the client. The interest of Pathway, is to be able to work on real-time tables.

Let's see how to count the entries in a table.

First, you need a table, let's generate a synthetic one using the demo package:

table = pw.demo.range_stream(nb_rows=50)

This table will have a with a single column, value, and will have a new entry each second, starting from 0 to 49.

Now we can define our tool, and count the number of elements in the table:

class CountTool(McpServable):

    def get_count(self, empty_row: pw.Table) -> pw.Table:
        """
        Return a the number of entries in the Pathway table.
        """

        single_row_table = table.reduce(count=pw.reducers.count())
        results = empty_row.select(result=single_row_table.ix_ref().count)
        return results

    def register_mcp(self, server: McpServer):
        server.tool(
            "get_count",
            request_handler=self.get_count,
            schema=InputEmptyRequestSchema,
        )

function_to_serve = CountTool()

Note that since the table returned must have a single row with the same ID as the input, you cannot directly return the table. You must use aggregators to obtain a single row table, and then put this value to the input table. In Pathway, accessing the value of a single-row table is done using .ix_ref().

You can call this tool as follows:

    async with client:
        result = await client.call_tool(name="get_count", arguments={})
        print(result)

This will return the number of rows in the table at the time the request was made. Try to run it several times in a row, the number will change as the table gets updated!

Statistics Example

Here is a complete example of a tool to provide the real-time statistics about a table.

import pathway as pw
from pathway.xpacks.llm.mcp_server import McpServable, McpServer, PathwayMcp


class ValueRequestSchema(pw.Schema):
    pass


table = pw.demo.range_stream(nb_rows=50)


class StatisticsTool(McpServable):

    @pw.udf
    def statistics_udf(count, minimum, maximum, avg, latest):
        return f"count: {count}, min: {minimum}, max: {maximum}, avg: {avg}, latest: {latest}"


    def get_statistics(self, input_from_client: pw.Table) -> pw.Table:
        """
        Return basic statistics about the table.
        """

        single_row_table = table.groupby().reduce(
            count=pw.reducers.count(pw.this.value),
            min=pw.reducers.min(pw.this.value),
            max=pw.reducers.max(pw.this.value),
            avg=pw.reducers.avg(pw.this.value),
            latest=pw.reducers.latest(pw.this.value),
        )
        single_cell_table = single_row_table.select(
            single_cell=statistics_udf(
                pw.this.count,
                pw.this.min,
                pw.this.max,
                pw.this.avg,
                pw.this.latest,
            )
        )
        results = input_from_client.select(result=single_cell_table.ix_ref().single_cell)
        return results

    def register_mcp(self, server: McpServer):
        server.tool(
            "get_statistics",
            request_handler=self.get_statistics,
            schema=ValueRequestSchema,
        )


function_to_serve = StatisticsTool()

pathway_mcp_server = PathwayMcp(
    name="Streamable MCP Server",
    transport="streamable-http",
    host="localhost",
    port=8123,
    serve=[function_to_serve],
)

pw.run(
    monitoring_level=pw.MonitoringLevel.NONE,
    terminate_on_error=False,
)

Note that the input_from_client table contains table with a single row, with only an id column as the tool does not require any input.

Calling get_statistics will return a string with the statistics. You can format it as a JSON for further computation on the MCP Client side.

Here is how the client should look like to access it:

import asyncio
from fastmcp import Client

# HTTP server
PATHWAY_MCP_URL = "http://localhost:8123/mcp/"

client = Client(PATHWAY_MCP_URL)

async def main():
    async with client:
        result = await client.call_tool(name="get_statistics", arguments={})
        print(result)

asyncio.run(main())

You can now easily access basic statistics about a Pathway table. Those numbers will evolve with the table, making sure your MCP client works on fresh data!

Exposing Pathway Document Store

Document indexing is crucial in Retrieval-Augmented Generation (RAG) and agentic pipelines because it organizes information in a way that makes it quickly searchable and retrievable. In RAG, efficient indexing allows models to pull relevant information swiftly, improving the accuracy and relevance of generated responses.

The Pathway MCP Server allows you to expose its document store to MCP clients. This setup allows various applications and agents to access a real-time index. By doing so, it ensures that any client connected to the MCP server can efficiently retrieve and use the indexed information, making sure your AI application works on accurate and up-to-date data.

Pathway DocumentStore inherits from McpServable and passed directly to PathwayMcp.

YAML Template

You can add the Document Store directly in a YAML app:

mcp_http: !pw.xpacks.llm.mcp_server.PathwayMcp
  name: "Streamable MCP Server"
  transport: "streamable-http"
  host: "localhost"
  port: 8068
  serve:
    - $document_store

Here is a complete example:

$sources:
  - !pw.io.fs.read
    path: data
    format: binary
    with_metadata: true

$embedder: !pw.xpacks.llm.embedders.OpenAIEmbedder
  model: "text-embedding-ada-002"
  cache_strategy: !pw.udfs.DefaultCache {}

$splitter: !pw.xpacks.llm.splitters.TokenCountSplitter
  min_tokens: 250
  max_tokens: 600

$parser: !pw.xpacks.llm.parsers.DoclingParser {}

$knn_index: !pw.stdlib.indexing.BruteForceKnnFactory
  reserved_space: 1000
  embedder: $embedder
  metric: !pw.engine.BruteForceKnnMetricKind.COS

$bm25_index: !pw.stdlib.indexing.TantivyBM25Factory {}

$retriever_factory: !pw.stdlib.indexing.HybridIndexFactory
  retriever_factories:
    - $knn_index
    - $bm25_index

$document_store: !pw.xpacks.llm.document_store.DocumentStore
  docs: $sources
  parser: $parser
  splitter: $splitter
  retriever_factory: $retriever_factory


# Streamable MCP server, can be proxied
mcp_http: !pw.xpacks.llm.mcp_server.PathwayMcp
  name: "Streamable MCP Server"
  transport: "streamable-http"
  host: "localhost"
  port: 8068
  serve:
    - $document_store

Conclusion

Pathway's MCP Server provides real-time data and context to your AI applications. It connects AI models to various data sources, improving their ability to deliver accurate and timely information. This tool is valuable for applications needing up-to-date data and efficient processing. As real-time data becomes more important, tools like Pathway's MCP Server will be essential for effective data handling and decision-making.