"""
PCTX Client
Main client for executing code with both MCP tools and local Python tools.
"""
from typing import TYPE_CHECKING
from urllib.parse import urlparse
from httpx import AsyncClient
from pydantic import BaseModel
from pctx_client._tool import AsyncTool, Tool
from pctx_client._utils import to_snake_case
from pctx_client._websocket_client import WebSocketClient
from pctx_client.exceptions import ConnectionError, SessionError
from pctx_client.models import (
ExecuteInput,
ExecuteOutput,
GetFunctionDetailsInput,
GetFunctionDetailsOutput,
ListedFunction,
ListFunctionsOutput,
ServerConfig,
ToolConfig,
)
if TYPE_CHECKING:
try:
from agents import FunctionTool
from bm25s import BM25
from crewai.tools import BaseTool as CrewAiBaseTool
from langchain_core.tools import BaseTool as LangchainBaseTool
from pydantic_ai.tools import Tool as PydanticAITool
from Stemmer import Stemmer
except ImportError:
pass
try:
from bm25s import BM25, tokenize
from Stemmer import Stemmer
HAS_SEARCH = True
except ImportError:
HAS_SEARCH = False
[docs]
class Pctx:
"""
PCTX Client
Execute TypeScript/JavaScript code with access to both MCP tools and local Python tools.
"""
[docs]
def __init__(
self,
tools: list[Tool | AsyncTool] | None = None,
servers: list[ServerConfig] | None = None,
url: str = "http://localhost:8080",
api_key: str | None = None,
execute_timeout: float = 30.0,
):
"""
Initialize the PCTX client.
Args:
tools: List of local Python tools to register
servers: List of MCP servers to register. Each server can be either:
- HTTP server: {"name": "...", "url": "...", "auth": {...}}
- stdio server: {"name": "...", "command": "...", "args": [...], "env": {...}}
url: PCTX server URL (default: http://localhost:8080)
execute_timeout: Timeout for code execution in seconds (default: 30.0)
"""
# Parse and normalize the URL
parsed = urlparse(url)
# Determine the base host and port
if parsed.scheme in ["ws", "wss"]:
# WebSocket URL provided - derive HTTP from it
http_scheme = "https" if parsed.scheme == "wss" else "http"
host = parsed.netloc
elif parsed.scheme in ["http", "https"]:
# HTTP URL provided - derive WebSocket from it
http_scheme = parsed.scheme
host = parsed.netloc
else:
raise ValueError(
f"Invalid URL scheme: {parsed.scheme}. Expected http, https, ws, or wss"
)
ws_scheme = "wss" if http_scheme == "https" else "ws"
self._ws_client = WebSocketClient(
url=f"{ws_scheme}://{host}{parsed.path}/ws", api_key=api_key, tools=tools
)
self._client = AsyncClient(
base_url=f"{http_scheme}://{host}{parsed.path}",
headers={"x-pctx-api-key": api_key or ""},
)
self._session_id: str | None = None
self._api_key = api_key
self._tools = tools or []
self._servers = servers or []
self._execute_timeout = execute_timeout
self._search_retriever = None
[docs]
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
[docs]
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.disconnect()
[docs]
async def connect(self):
"""Creates CodeMode session, register local tools, and register MCP servers."""
if self._session_id is not None:
await self.disconnect()
try:
connect_res = await self._client.post("/code-mode/session/create")
connect_res.raise_for_status()
except Exception as e:
# Check if this is a connection error (server not running)
error_message = str(e).lower()
if any(
msg in error_message
for msg in ["connection", "refused", "failed to connect", "unreachable"]
):
raise ConnectionError(
f"Failed to connect to PCTX server at {self._client.base_url}. "
"Please ensure the server is running.\n"
"Start the server with: pctx server start"
) from e
# Re-raise other errors as-is
raise
# Parse the session ID from the response
try:
self._session_id = connect_res.json()["session_id"]
except (KeyError, ValueError) as e:
raise ConnectionError(
f"Received invalid response from PCTX server at {self._client.base_url}. "
"The server may be running but not responding correctly."
) from e
self._client.headers.update({"x-code-mode-session": self._session_id or ""})
# Register all local tools & MCP servers
configs: list[ToolConfig] = [
{
"name": t.name,
"namespace": t.namespace,
"description": t.description,
"input_schema": t.input_json_schema(),
"output_schema": t.output_json_schema(),
}
for t in self._tools
]
if len(configs) > 0:
await self._register_tools(configs)
if len(self._servers) > 0:
await self._register_servers(self._servers)
# reset search to re-index
self._search_retriever = None
[docs]
async def disconnect(self):
"""Disconnect closes current code-mode session."""
close_res = await self._client.post("/code-mode/session/close")
close_res.raise_for_status()
self._session_id = None
# ========== Main code mode methods method ==========
[docs]
async def list_functions(self) -> ListFunctionsOutput:
"""
List all available functions organized by namespace.
This is typically the first method you should call to discover what functions
are available in the current session, including both registered local tools
and MCP server functions.
Returns:
ListFunctionsOutput: An object containing function signatures organized
by namespace. The `code` attribute contains TypeScript code with
function declarations that can be used for reference.
Raises:
SessionError: If called before establishing a session via connect().
Example:
>>> async with Pctx() as pctx:
... functions = await pctx.list_functions()
... print(functions.code) # TypeScript declarations
"""
if self._session_id is None:
raise SessionError(
"No code mode session exists, run Pctx(...).connect() before calling"
)
list_res = await self._client.post("/code-mode/functions/list")
list_res.raise_for_status()
return ListFunctionsOutput.model_validate(list_res.json())
[docs]
async def search_functions(self, query: str, k: int = 10) -> list[ListedFunction]:
"""
Search available functions matching query.
This is typically the first method you should call to discover what functions
are available in the current session, including both registered local tools
and MCP server functions.
Args:
query: Search query string to find relevant functions.
k: Max number of top results to return (default: 5).
Returns:
list[ListedFunction]: An list of matching function signatures matching the query
Raises:
ImportError: If bm25s is not installed.
SessionError: If called before establishing a session via connect().
"""
if not HAS_SEARCH:
raise ImportError(
"bm25s is not installed. Install it with: pip install pctx[bm25s]"
)
if self._session_id is None:
raise SessionError(
"No code mode session exists, run Pctx(...).connect() before calling"
)
stemmer = Stemmer("english")
if self._search_retriever is None:
self._functions = (await self.list_functions()).functions
corpus = [
f"{to_snake_case(function.namespace).replace('_', ' ')}.{to_snake_case(function.name).replace('_', ' ')}: {function.description}"
for function in self._functions
]
corpus_tokens = tokenize(corpus, stopwords="en", stemmer=stemmer)
self._search_retriever = BM25()
self._search_retriever.index(corpus_tokens)
query_tokens = tokenize([query], stopwords="en", stemmer=stemmer)
actual_k = min(k, len(self._functions))
results, scores = self._search_retriever.retrieve(query_tokens, k=actual_k)
tools = []
for i in range(results.shape[1]):
tool = self._functions[results[0, i]]
score = scores[0, i]
if score > 0:
tools.append(tool)
return tools
[docs]
async def get_function_details(
self, functions: list[str]
) -> GetFunctionDetailsOutput:
"""
Get detailed information about specific functions.
After discovering available functions with list_functions(), use this method
to get comprehensive details about parameter types, return values, and usage
for the specific functions you need.
Args:
functions: List of function names in 'namespace.functionName' format
(e.g., ['Notion.apiPostSearch', 'Weather.getCurrentWeather']).
Returns:
GetFunctionDetailsOutput: An object containing detailed TypeScript
declarations for the requested functions. The `code` attribute
contains the full function signatures with JSDoc comments.
Raises:
SessionError: If called before establishing a session via connect().
Example:
>>> async with Pctx() as pctx:
... details = await pctx.get_function_details(['Weather.getCurrentWeather'])
... print(details.code) # Detailed TypeScript with parameter info
"""
if self._session_id is None:
raise SessionError(
"No code mode session exists, run Pctx(...).connect() before calling"
)
list_res = await self._client.post(
"/code-mode/functions/details", json={"functions": functions}
)
list_res.raise_for_status()
return GetFunctionDetailsOutput.model_validate(list_res.json())
[docs]
async def execute(self, code: str) -> ExecuteOutput:
"""
Execute TypeScript code that calls namespaced functions.
This method runs TypeScript code in a secure Deno sandbox with access to
all registered functions (both local tools and MCP server functions).
Args:
code: TypeScript code to execute. Must include an async `run()` function
that serves as the entry point. Functions must be called with their
namespace prefix (e.g., 'Weather.getCurrentWeather()').
Returns:
ExecuteOutput: An object containing execution results with attributes:
- result: The value returned from the run() function
- logs: Array of console.log() outputs
- markdown(): Method to format output as markdown
Raises:
SessionError: If called before establishing a session via connect().
TimeoutError: If execution exceeds the configured timeout (default 30s).
Notes:
- Code must define an `async function run()` as the entry point
- Functions MUST be called as 'Namespace.functionName'
- Only functions from list_functions() are available
- No access to fetch(), fs, or other standard Node/Deno APIs
- Variables don't persist between execute() calls
- Return values are already parsed objects, not JSON strings
Example:
>>> async with Pctx() as pctx:
... code = '''
... async function run() {
... const result = await Weather.getCurrentWeather({ city: "NYC" });
... console.log("Temperature:", result.temp);
... return { temperature: result.temp };
... }
... '''
... output = await pctx.execute(code)
... print(output.markdown()) # Formatted results with logs
"""
if self._session_id is None:
raise SessionError(
"No code mode session exists, run Pctx(...).connect() before calling"
)
return await self._ws_client.execute_code(
self._session_id, code, timeout=self._execute_timeout
)
# ========== Registrations ==========
async def _register_tools(self, configs: list[ToolConfig]):
res = await self._client.post("/register/tools", json={"tools": configs})
res.raise_for_status()
async def _register_servers(self, configs: list[ServerConfig]):
res = await self._client.post("/register/servers", json={"servers": configs})
res.raise_for_status()
def _search_functions_result_to_string(
self, functions: list[ListedFunction]
) -> str:
return "\n".join(
[
f"{func.namespace}.{func.name}: {func.description or ''}"
for func in functions
]
)
CODE_MODE_TOOL_DESCRIPTIONS = {
"list_functions": (
"Use this tool to list all available functions organized by namespace."
if HAS_SEARCH
else "ALWAYS USE THIS TOOL FIRST to list all available functions organized by namespace."
)
+ """
WORKFLOW:
1. Start here - Call this tool to see what functions are available
2. Then call get_function_details() for specific functions you need to understand
3. Finally call execute() to run your TypeScript code
This returns function signatures without full details.""",
"search_functions": """ALWAYS USE THIS TOOL FIRST to find relevant functions.
Arguments:
query: The search query string to find relevant functions.
k: The maximum number of top results to return (default: 10).
WORKFLOW:
1. Start here - Call this tool to find suitable functions
2. Then call get_function_details() for specific functions you need to understand
3. Finally call execute() to run your TypeScript code
This returns a list of matching functions.""",
"get_function_details": """Get detailed information about specific functions you want to use.
WHEN TO USE: After calling """
+ ("search_functions() or " if HAS_SEARCH else "")
+ """list_functions(), use this to learn about parameter types, return values, and usage for specific functions.
REQUIRED FORMAT: Functions must be specified as 'namespace.functionName' (e.g., 'Namespace.apiPostSearch')
This tool is lightweight and only returns details for the functions you request, avoiding unnecessary token usage.
Only request details for functions you actually plan to use in your code.
NOTE ON RETURN TYPES:
- If a function returns Promise<any>, the MCP server didn't provide an output schema
- The actual value is a parsed object (not a string) - access properties directly
- Don't use JSON.parse() on the results - they're already JavaScript objects""",
"execute": "Execute TypeScript code that calls namespaced functions. USE THIS LAST after "
+ ("search_functions() or " if HAS_SEARCH else "")
+ """list_functions() and get_function_details().
TOKEN USAGE WARNING: This tool could return LARGE responses if your code returns big objects.
To minimize tokens:
- Filter/map/reduce data IN YOUR CODE before returning
- Only return specific fields you need (e.g., return {id: result.id, count: items.length})
- Use console.log() for intermediate results instead of returning everything
- Avoid returning full API responses - extract just what you need
REQUIRED CODE STRUCTURE:
async function run() {
// Your code here
// Call namespace.functionName() - MUST include namespace prefix
// Process data here to minimize return size
return onlyWhatYouNeed; // Keep this small!
}
IMPORTANT RULES:
- Functions MUST be called as 'Namespace.functionName' (e.g., 'Notion.apiPostSearch')
- Only functions from list_functions() are available - no fetch(), fs, or other Node/Deno APIs
- Variables don't persist between execute() calls - return or log anything you need later
- Add console.log() statements between API calls to track progress if errors occur
- Code runs in an isolated Deno sandbox with restricted network access
RETURN TYPE NOTE:
- Functions without output schemas show Promise<any> as return type
- The actual runtime value is already a parsed JavaScript object, NOT a JSON string
- Do NOT call JSON.parse() on results - they're already objects
- Access properties directly (e.g., result.data) or inspect with console.log() first
- If you see 'Promise<any>', the structure is unknown - log it to see what's returned""",
}