Jupyter SDK Design
Jupyter SDK Design
Section titled “Jupyter SDK Design”Overview
Section titled “Overview”The Jupyter SDK is a Python client library providing the sole user interface for the Graph OLAP Platform. All platform operations - from creating graph mappings to querying instances - are performed through this SDK. There is no separate web interface or GUI; the SDK is the complete and authoritative interface for analysts interacting with the platform.
The SDK provides:
- Control Plane Operations: Full CRUD for mappings (create, read, update, delete, copy, list), instance lifecycle management (create, terminate, update CPU), and administrative operations
- Data Plane Operations: Cypher query execution and graph algorithms on running instances
- Schema Discovery: Browse Starburst catalog metadata to design mappings
- Operational Management: Cluster health monitoring, configuration, and bulk operations (admin role)
Designed for use in Jupyter notebooks, it emphasizes ergonomic API design, clear error messages, and seamless integration with pandas/polars DataFrames.
Note: Explicit snapshot APIs have been disabled. Instances are created directly from mappings using
client.instances.create_from_mapping(). The snapshot layer operates internally.
SDK as the Sole User Interface
Section titled “SDK as the Sole User Interface”The Graph OLAP Platform is notebook-first by design. Unlike traditional enterprise platforms with web consoles, all user interactions happen through Python code in Jupyter notebooks:
| Operation Category | SDK Resource | Key Methods |
|---|---|---|
| Mapping Management | client.mappings | create(), list(), get(), update(), delete(), copy() |
| Instance Lifecycle | client.instances | create_from_mapping_and_wait(), terminate(), update_cpu(), list() |
| Graph Queries | conn.query() | query(), query_df(), query_scalar(), query_one() |
| Graph Algorithms | conn.algo / conn.networkx | pagerank(), louvain(), wcc(), 500+ NetworkX algorithms |
| Schema Discovery | client.schema | list_catalogs(), list_tables(), search_tables() |
| Favorites | client.favorites | add(), remove(), list() |
| Operations (Ops role) | client.ops | get_cluster_health(), get_lifecycle_config(), trigger_job() |
| Administration (Admin role) | client.admin | bulk_delete() |
| Health Checks | client.health | check(), ready() |
Why Notebook-First?
Section titled “Why Notebook-First?”- Reproducibility: All operations are code, making workflows reproducible and version-controllable
- Automation: Scripts can automate common tasks without GUI interaction
- Integration: Seamless integration with data science workflows (pandas, polars, visualization)
- Auditability: Every operation is logged with the user who executed it
- Flexibility: Power users can extend and customize workflows programmatically
Prerequisites
Section titled “Prerequisites”- requirements.md - SDK interface requirements, user workflows
- api.common.spec.md - API conventions, error codes, authentication
- api.mappings.spec.md, api.snapshots.spec.md, api.instances.spec.md - Control Plane API specs
- api.wrapper.spec.md - Wrapper Pod API specs (query, algorithm endpoints)
Related Components
Section titled “Related Components”- control-plane.design.md - Server-side implementation of Control Plane APIs
- ryugraph-wrapper.design.md - Server-side implementation of instance APIs
- falkordb-wrapper.design.md - FalkorDB wrapper implementation (alternative to Ryugraph)
- jupyter-sdk.deployment.design.md - SDK packaging and Jupyter cluster deployment
This Document Series
Section titled “This Document Series”This is the core SDK design. Additional details are in:
- jupyter-sdk.connection.design.md - Instance connection, queries, visualization, exceptions
- jupyter-sdk.models.spec.md - Model definitions (Mapping, Snapshot, Instance, QueryResult, etc.)
- jupyter-sdk.algorithms.design.md - Algorithm extensions, pagination, quick start helper
Constraints
Section titled “Constraints”- Python 3.9+ compatibility (Jupyter environment)
- Synchronous API by default (notebook-friendly), with async support
- Clear, actionable error messages for common failure modes
- Minimal dependencies (httpx, pydantic, polars optional)
Package Structure
Section titled “Package Structure”graph-olap-sdk/├── src/│ └── graph_olap/│ ├── __init__.py # Public API exports│ ├── client.py # Main GraphOLAPClient class│ ├── config.py # Configuration and authentication│ ├── notebook.py # Jupyter integration (connect(), init())│ ├── testing.py # E2E test fixtures and utilities│ ├── styles/│ │ └── notebook.css # Embedded CSS design system (1506 lines)│ ├── resources/│ │ ├── __init__.py│ │ ├── mappings.py # MappingResource│ │ ├── snapshots.py # SnapshotResource│ │ ├── instances.py # InstanceResource│ │ ├── favorites.py # FavoriteResource│ │ ├── ops.py # OpsResource (config, cluster)│ │ ├── health.py # HealthResource (health, ready)│ │ ├── schema.py # SchemaResource (Starburst metadata)│ │ └── admin.py # AdminResource (bulk delete, privileged ops)│ ├── instance/│ │ ├── __init__.py│ │ ├── connection.py # InstanceConnection class│ │ └── algorithms.py # Algorithm execution│ ├── models/│ │ ├── __init__.py│ │ ├── mapping.py # Mapping, MappingVersion models│ │ ├── snapshot.py # Snapshot model│ │ ├── instance.py # Instance model│ │ ├── execution.py # AlgorithmExecution model│ │ ├── ops.py # Config, cluster, health models│ │ └── common.py # Shared types│ ├── exceptions.py # Exception hierarchy│ ├── http.py # HTTP client wrapper│ └── utils/│ ├── __init__.py│ └── diff.py # Diff utilities├── tests/├── examples/│ ├── basic_workflow.ipynb│ ├── algorithms.ipynb│ └── visualization.ipynb├── pyproject.toml└── README.mdClient Architecture
Section titled “Client Architecture”Mermaid Source
flowchart TB accTitle: Jupyter SDK Client Architecture accDescr: Layered architecture from GraphOLAPClient through resources and HTTP to Control Plane API
classDef client fill:#F3E5F5,stroke:#7B1FA2,stroke-width:2px,color:#4A148C classDef resource fill:#E1F5FE,stroke:#0277BD,stroke-width:2px,color:#01579B classDef http fill:#E8F5E9,stroke:#2E7D32,stroke-width:2px,color:#1B5E20 classDef api fill:#E3F2FD,stroke:#1565C0,stroke-width:2px,color:#0D47A1 classDef instance fill:#FFF3E0,stroke:#E65100,stroke-width:2px,color:#E65100
subgraph SDK["Jupyter SDK"] CLIENT[GraphOLAPClient]:::client
subgraph Resources MAP[MappingResource]:::resource SNAP[SnapshotResource]:::resource INST[InstanceResource]:::resource FAV[FavoriteResource]:::resource OPS[OpsResource]:::resource HEALTH[HealthResource]:::resource end
HTTP[HTTPClient<br/>retry, auth, errors]:::http
subgraph Instance["InstanceConnection"] CONN[Connection]:::instance ALGO[AlgorithmManager]:::instance NX[NetworkXManager]:::instance end end
CP[Control Plane API]:::api WRAP[Wrapper Pod API]:::api
CLIENT --> MAP & SNAP & INST & FAV & OPS & HEALTH MAP & SNAP & INST & FAV & OPS & HEALTH --> HTTP HTTP --> CP
INST -.->|connect()| CONN CONN --> ALGO & NX CONN --> WRAPMain Client
Section titled “Main Client”from graph_olap.resources import ( MappingResource, SnapshotResource, InstanceResource, FavoriteResource, OpsResource, HealthResource,)from graph_olap.http import HTTPClientfrom graph_olap.config import Config
class GraphOLAPClient: """ Main client for the Graph OLAP Platform.
Example: >>> client = GraphOLAPClient( ... api_url="https://graph.example.com", ... api_key="your-api-key" ... ) >>> mappings = client.mappings.list() >>> instance = client.instances.create(snapshot_id=123, name="My Graph") """
def __init__( self, api_url: str, api_key: str | None = None, internal_api_key: str | None = None, username: str | None = None, *, timeout: float = 30.0, max_retries: int = 3, ): """ Initialize the Graph OLAP client.
Authentication modes (in priority order): - internal_api_key: Uses 'X-Internal-Api-Key' header (internal services) - api_key: Uses 'Authorization: Bearer {key}' header (production) - username: Uses 'X-Username' header (development/testing)
Args: api_url: Base URL of the Control Plane API api_key: API key for authentication (Bearer token) internal_api_key: Internal API key (X-Internal-Api-Key header) username: Username for user-scoped routes (X-Username header) timeout: Request timeout in seconds max_retries: Maximum retry attempts for transient failures """ self._config = Config( api_url=api_url.rstrip("/"), api_key=api_key, timeout=timeout, max_retries=max_retries, ) self._http = HTTPClient(self._config)
# Resource managers self.mappings = MappingResource(self._http) self.snapshots = SnapshotResource(self._http) self.instances = InstanceResource(self._http, self._config) self.favorites = FavoriteResource(self._http) self.ops = OpsResource(self._http) self.health = HealthResource(self._http)
def close(self) -> None: """Close the HTTP client and release resources.""" self._http.close()
def __enter__(self) -> "GraphOLAPClient": return self
def __exit__(self, *args) -> None: self.close()
@classmethod def from_env( cls, api_url: str | None = None, api_key: str | None = None, internal_api_key: str | None = None, username: str | None = None, **kwargs, ) -> "GraphOLAPClient": """ Create client from environment variables.
Environment Variables: GRAPH_OLAP_API_URL: Base URL for the control plane API GRAPH_OLAP_API_KEY: API key for authentication (Bearer token) GRAPH_OLAP_INTERNAL_API_KEY: Internal API key (X-Internal-Api-Key header) GRAPH_OLAP_USERNAME: Username for development/testing (X-Username header) GRAPH_OLAP_IN_CLUSTER_MODE: Set to "true" for in-cluster execution (default: "false") GRAPH_OLAP_NAMESPACE: Kubernetes namespace for service DNS (default: "graph-olap-local")
Args: api_url: Override GRAPH_OLAP_API_URL api_key: Override GRAPH_OLAP_API_KEY internal_api_key: Override GRAPH_OLAP_INTERNAL_API_KEY username: Override GRAPH_OLAP_USERNAME **kwargs: Additional config options (timeout, max_retries)
Returns: Configured GraphOLAPClient
Raises: ValueError: If GRAPH_OLAP_API_URL is not set """ config = Config.from_env(...) return cls(...)
def quick_start( self, mapping_id: int, *, snapshot_name: str | None = None, instance_name: str | None = None, wait_timeout: int = 600, ) -> "InstanceConnection": """ Quick start: create snapshot, instance, and connect in one call.
Convenience method for the common workflow of going from a mapping to a connected instance ready for queries.
Args: mapping_id: Mapping ID to use snapshot_name: Name for snapshot (defaults to "Quick Snapshot") instance_name: Name for instance (defaults to "Quick Instance") wait_timeout: Max time to wait for snapshot + instance creation
Returns: InstanceConnection ready for queries
Example: >>> conn = client.quick_start(mapping_id=1) >>> result = conn.query("MATCH (n) RETURN count(n)") >>> # Remember to terminate the instance when done! """ ...Notebook Integration
Section titled “Notebook Integration”The graph_olap.notebook module provides zero-config Jupyter integration:
from graph_olap.client import GraphOLAPClient
# Global client for notebook convenience_current_client: GraphOLAPClient | None = None
def connect( api_url: str | None = None, api_key: str | None = None, **kwargs,) -> GraphOLAPClient: """ Connect to Graph OLAP Platform with auto-discovery.
This is the recommended entry point for Jupyter notebooks. Configuration is auto-discovered from environment variables, or can be provided explicitly.
Args: api_url: Override GRAPH_OLAP_API_URL environment variable api_key: Override GRAPH_OLAP_API_KEY environment variable **kwargs: Additional config options (timeout, max_retries)
Returns: Configured GraphOLAPClient ready for use
Raises: ValueError: If GRAPH_OLAP_API_URL is not set
Example: >>> from graph_olap import notebook >>> client = notebook.connect()
>>> # Or with explicit configuration >>> client = notebook.connect( ... api_url="https://graph-olap.example.com", ... api_key="sk-xxx", ... )
>>> # Start working immediately >>> mappings = client.mappings.list() """ global _current_client
# Initialize itables for interactive DataFrames if available _setup_itables()
# Create and store client _current_client = GraphOLAPClient.from_env( api_url=api_url, api_key=api_key, **kwargs, )
return _current_client
def init( api_url: str | None = None, api_key: str | None = None, **kwargs,) -> GraphOLAPClient: """Alias for connect() - initialize Graph OLAP SDK for notebooks.""" return connect(api_url=api_url, api_key=api_key, **kwargs)
def get_client() -> GraphOLAPClient | None: """ Get the current notebook client.
Returns: Current GraphOLAPClient or None if not connected """ return _current_clientQuick Start (2 lines):
>>> from graph_olap import notebook>>> client = notebook.connect()Configuration
Section titled “Configuration”Reference: ADR-073: Notebook Environment Variables Standardization
Environment Variables
Section titled “Environment Variables”| Variable | Purpose | Example |
|---|---|---|
GRAPH_OLAP_API_URL | Control Plane API endpoint | http://control-plane:8000 |
GRAPH_OLAP_IN_CLUSTER_MODE | Enable in-cluster service DNS | true or false |
GRAPH_OLAP_API_KEY | Bearer token for authentication | sk-xxx |
GRAPH_OLAP_USERNAME | Username for development/testing | e2e-test-user |
GRAPH_OLAP_NAMESPACE | Kubernetes namespace for service DNS | graph-olap-local |
Environment Detection
Section titled “Environment Detection”The notebook.connect() function automatically detects the runtime environment:
def connect() -> GraphOLAPClient: """Connect to API with automatic environment detection.""" url = os.environ.get("GRAPH_OLAP_API_URL")
if url: return GraphOLAPClient(url)
# Auto-detect in-cluster mode if os.environ.get("GRAPH_OLAP_IN_CLUSTER_MODE") == "true": return GraphOLAPClient("http://control-plane.graph-olap.svc:8000")
# Default for local development return GraphOLAPClient("http://localhost:8000")Testing Module
Section titled “Testing Module”Reference: ADR-074: SDK Testing Module with Fixture Utilities
The graph_olap.testing module provides reusable fixtures and utilities for E2E tests.
Module Components
Section titled “Module Components”# Connection utilitiesdef connect() -> GraphOLAPClient: ...
# Fixture factoryclass TestFixtures: def create_instance(self, **kwargs) -> Instance: ... def create_snapshot(self, **kwargs) -> Snapshot: ... def create_mapping(self, **kwargs) -> Mapping: ... def cleanup(self) -> None: ... # Automatic resource cleanup
# Context managers@contextmanagerdef test_context(): ...
@contextmanagerdef instance_context(name: str): ...
# Data factoriesdef random_name(prefix: str = "test") -> str: ...Usage in Tests
Section titled “Usage in Tests”# Before (50+ lines of boilerplate)client = GraphOLAPClient(os.environ.get("API_URL"))# ... complex setup and cleanup ...
# After (3 lines)from graph_olap.testing import test_context
with test_context() as (client, fixtures): instance = fixtures.create_instance(name="test-instance") # test code - cleanup automaticNotebook Styling
Section titled “Notebook Styling”Reference: ADR-091: SDK Embedded CSS Distribution
The SDK embeds the notebook CSS design system for zero-config styling in JupyterHub deployments.
CSS Location
Section titled “CSS Location”from importlib import resources
def get_notebook_css() -> str: """Load the notebook CSS from package resources.""" return resources.files(__package__).joinpath("notebook.css").read_text()Distribution Flow
Section titled “Distribution Flow”The CSS is distributed via the SDK package and installed by init containers:
SDK Package JupyterHub Pod┌─────────────┐ ┌─────────────────┐│ graph_olap/ │ │ Init Container ││ styles/ │ pip install │ ││ notebook.css│──────────────►│ cp CSS to ││ (1506 lines)│ │ ~/.jupyter/ │└─────────────┘ │ custom/ │ └─────────────────┘HTTP Client
Section titled “HTTP Client”import httpxfrom tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_typefrom graph_olap.exceptions import ( GraphOLAPError, AuthenticationError, NotFoundError, ValidationError, ConflictError, ServerError,)
class HTTPClient: """HTTP client with retry logic and error handling."""
def __init__(self, config: Config): self._config = config self._client = httpx.Client( base_url=config.api_url, headers={ "Authorization": f"Bearer {config.api_key}", "Content-Type": "application/json", "Accept": "application/json", }, timeout=config.timeout, )
def close(self) -> None: self._client.close()
@retry( retry=retry_if_exception_type((httpx.ConnectError, httpx.ReadTimeout)), stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), ) def request( self, method: str, path: str, params: dict | None = None, json: dict | None = None, ) -> dict: """Make an HTTP request with error handling.""" response = self._client.request( method=method, url=path, params=params, json=json, )
return self._handle_response(response)
def _handle_response(self, response: httpx.Response) -> dict: """Parse response and raise appropriate exceptions.""" if response.status_code == 204: return {}
try: data = response.json() except ValueError: raise ServerError(f"Invalid JSON response: {response.text}")
if response.is_success: return data
# Handle error responses error = data.get("error", {}) code = error.get("code", "UNKNOWN_ERROR") message = error.get("message", "Unknown error") details = error.get("details", {})
if response.status_code == 401: raise AuthenticationError(message) elif response.status_code == 403: raise PermissionDeniedError(message, details) elif response.status_code == 404: raise NotFoundError(message, details) elif response.status_code == 400: raise ValidationError(message, details) elif response.status_code == 409: if code == "RESOURCE_LOCKED": raise ResourceLockedError(message, details) elif code == "CONCURRENCY_LIMIT_EXCEEDED": raise ConcurrencyLimitError(message, details) elif code == "RESOURCE_HAS_DEPENDENCIES": raise DependencyError(message, details) else: raise ConflictError(message, details) elif response.status_code == 408: raise TimeoutError(message) elif response.status_code >= 500: raise ServerError(message) else: raise GraphOLAPError(f"{code}: {message}")
def get(self, path: str, params: dict | None = None) -> dict: return self.request("GET", path, params=params)
def post(self, path: str, json: dict | None = None) -> dict: return self.request("POST", path, json=json)
def put(self, path: str, json: dict | None = None) -> dict: return self.request("PUT", path, json=json)
def delete(self, path: str) -> dict: return self.request("DELETE", path)Resource Classes
Section titled “Resource Classes”Mapping Resource
Section titled “Mapping Resource”from graph_olap.models import Mapping, MappingVersion, NodeDefinition, EdgeDefinitionfrom graph_olap.pagination import PaginatedList
class MappingResource: """Manage mapping definitions."""
def __init__(self, http: HTTPClient): self._http = http
def list( self, owner: str | None = None, search: str | None = None, created_after: str | None = None, created_before: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", offset: int = 0, limit: int = 50, ) -> PaginatedList[Mapping]: """ List mappings with optional filters.
Args: owner: Filter by owner username search: Text search on name and description created_after: Filter by created_at >= timestamp (ISO 8601) created_before: Filter by created_at <= timestamp (ISO 8601) sort_by: Field to sort by (name, created_at, current_version) sort_order: Sort direction (asc, desc) offset: Number of records to skip limit: Maximum records to return (max 100)
Returns: PaginatedList of Mapping objects """ params = { "offset": offset, "limit": min(limit, 100), "sort_by": sort_by, "sort_order": sort_order, } if owner: params["owner"] = owner if search: params["search"] = search if created_after: params["created_after"] = created_after if created_before: params["created_before"] = created_before
response = self._http.get("/api/mappings", params=params)
return PaginatedList( items=[Mapping.from_dict(m) for m in response["data"]], total=response["meta"]["total"], offset=response["meta"]["offset"], limit=response["meta"]["limit"], )
def get(self, mapping_id: int) -> Mapping: """ Get a mapping by ID (returns current version).
Args: mapping_id: Mapping ID
Returns: Mapping object with current version data
Raises: NotFoundError: If mapping doesn't exist """ response = self._http.get(f"/api/mappings/{mapping_id}") return Mapping.from_dict(response["data"])
def get_version(self, mapping_id: int, version: int) -> MappingVersion: """ Get a specific mapping version.
Args: mapping_id: Mapping ID version: Version number
Returns: MappingVersion object
Raises: NotFoundError: If mapping or version doesn't exist """ response = self._http.get(f"/api/mappings/{mapping_id}/versions/{version}") return MappingVersion.from_dict(response["data"])
def list_versions(self, mapping_id: int) -> list[MappingVersion]: """List all versions of a mapping.""" response = self._http.get(f"/api/mappings/{mapping_id}/versions") return [MappingVersion.from_dict(v) for v in response["data"]]
def list_snapshots(self, mapping_id: int) -> PaginatedList["Snapshot"]: """List all snapshots for a mapping (across all versions).""" from graph_olap.models import Snapshot
response = self._http.get(f"/api/mappings/{mapping_id}/snapshots") return PaginatedList( items=[Snapshot.from_dict(s) for s in response["data"]], total=response["meta"]["total"], offset=response["meta"]["offset"], limit=response["meta"]["limit"], )
def create( self, name: str, description: str, node_definitions: list[NodeDefinition] | list[dict], edge_definitions: list[EdgeDefinition] | list[dict], ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Mapping: """ Create a new mapping.
Args: name: Display name description: Description of the mapping node_definitions: List of node definitions (see NodeDefinition) edge_definitions: List of edge definitions (see EdgeDefinition) ttl: Time-to-live (ISO 8601 duration, e.g., "P7D") inactivity_timeout: Inactivity timeout (ISO 8601 duration)
Returns: Created Mapping object
Raises: ValidationError: If definitions are invalid or SQL fails validation """ # Convert to dicts if needed nodes = [n.to_dict() if hasattr(n, "to_dict") else n for n in node_definitions] edges = [e.to_dict() if hasattr(e, "to_dict") else e for e in edge_definitions]
body = { "name": name, "description": description, "node_definitions": nodes, "edge_definitions": edges, } if ttl: body["ttl"] = ttl if inactivity_timeout: body["inactivity_timeout"] = inactivity_timeout
response = self._http.post("/api/mappings", json=body) return Mapping.from_dict(response["data"])
def update( self, mapping_id: int, change_description: str, node_definitions: list[NodeDefinition] | list[dict] | None = None, edge_definitions: list[EdgeDefinition] | list[dict] | None = None, ) -> Mapping: """ Update a mapping (creates a new version).
Args: mapping_id: Mapping ID to update change_description: Description of what changed (required) node_definitions: New node definitions (optional, keeps current if None) edge_definitions: New edge definitions (optional, keeps current if None)
Returns: Updated Mapping object with new version
Raises: ValidationError: If change_description is empty or definitions invalid PermissionDeniedError: If not owner or admin """ body = {"change_description": change_description}
if node_definitions is not None: body["node_definitions"] = [ n.to_dict() if hasattr(n, "to_dict") else n for n in node_definitions ] if edge_definitions is not None: body["edge_definitions"] = [ e.to_dict() if hasattr(e, "to_dict") else e for e in edge_definitions ]
response = self._http.put(f"/api/mappings/{mapping_id}", json=body) return Mapping.from_dict(response["data"])
def delete(self, mapping_id: int) -> None: """ Delete a mapping.
Args: mapping_id: Mapping ID to delete
Raises: DependencyError: If snapshots exist for this mapping PermissionDeniedError: If not owner or admin """ self._http.delete(f"/api/mappings/{mapping_id}")
def copy(self, mapping_id: int, new_name: str) -> Mapping: """ Copy a mapping to create a new one.
Args: mapping_id: Source mapping ID new_name: Name for the new mapping
Returns: New Mapping object (you own it) """ response = self._http.post( f"/api/mappings/{mapping_id}/copy", json={"name": new_name}, ) return Mapping.from_dict(response["data"])
def set_lifecycle( self, mapping_id: int, ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Mapping: """ Set lifecycle parameters for a mapping.
Args: mapping_id: Mapping ID ttl: Time-to-live (ISO 8601 duration) or None to clear inactivity_timeout: Inactivity timeout (ISO 8601 duration) or None to clear
Returns: Updated Mapping object """ body = {} if ttl is not None: body["ttl"] = ttl if inactivity_timeout is not None: body["inactivity_timeout"] = inactivity_timeout
response = self._http.put(f"/api/mappings/{mapping_id}/lifecycle", json=body) return Mapping.from_dict(response["data"])
def get_tree( self, mapping_id: int, *, include_instances: bool = True, status: str | None = None, ) -> dict: """ Get full resource hierarchy for a mapping.
Returns versions -> snapshots -> instances tree structure.
Args: mapping_id: Mapping ID include_instances: Include instance details status: Filter snapshots by status
Returns: Tree structure with versions, snapshots, and instances """ params = {"include_instances": include_instances} if status: params["status"] = status
response = self._http.get(f"/api/mappings/{mapping_id}/tree", params=params) return response["data"]
def diff_versions( self, mapping_id: int, from_version: int, to_version: int, ) -> dict: """ Compare two mapping versions.
Args: mapping_id: Mapping ID from_version: Base version number to_version: Target version number
Returns: Diff with summary and detailed changes for nodes/edges
Example: >>> diff = client.mappings.diff_versions(1, from_version=2, to_version=3) >>> print(f"Added {diff['summary']['nodes_added']} nodes") """ response = self._http.get( f"/api/mappings/{mapping_id}/versions/{from_version}/diff/{to_version}" ) return response["data"]Snapshot Resource
Section titled “Snapshot Resource”import timefrom graph_olap.models import Snapshotfrom graph_olap.exceptions import TimeoutError as SDKTimeoutError
class SnapshotResource: """Manage data snapshots."""
def __init__(self, http: HTTPClient): self._http = http
def list( self, mapping_id: int | None = None, mapping_version: int | None = None, owner: str | None = None, status: str | None = None, search: str | None = None, created_after: str | None = None, created_before: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", offset: int = 0, limit: int = 50, ) -> PaginatedList[Snapshot]: """List snapshots with optional filters.""" params = { "offset": offset, "limit": min(limit, 100), "sort_by": sort_by, "sort_order": sort_order, } if mapping_id is not None: params["mapping_id"] = mapping_id if mapping_version is not None: params["mapping_version"] = mapping_version if owner: params["owner"] = owner if status: params["status"] = status if search: params["search"] = search if created_after: params["created_after"] = created_after if created_before: params["created_before"] = created_before
response = self._http.get("/api/snapshots", params=params) return PaginatedList( items=[Snapshot.from_dict(s) for s in response["data"]], total=response["meta"]["total"], offset=response["meta"]["offset"], limit=response["meta"]["limit"], )
def get(self, snapshot_id: int) -> Snapshot: """Get a snapshot by ID.""" response = self._http.get(f"/api/snapshots/{snapshot_id}") return Snapshot.from_dict(response["data"])
def create( self, mapping_id: int, name: str, description: str | None = None, version: int | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Snapshot: """ Create a new snapshot from a mapping.
This triggers an async export job. The snapshot will be in 'pending' status initially, then 'creating' while exporting, then 'ready' when complete or 'failed' if there's an error.
Args: mapping_id: Source mapping ID name: Display name description: Optional description version: Mapping version to use (defaults to current) ttl: Time-to-live (ISO 8601 duration) inactivity_timeout: Inactivity timeout (ISO 8601 duration)
Returns: Snapshot object (status will be 'pending') """ body = { "mapping_id": mapping_id, "name": name, } if description: body["description"] = description if version is not None: body["version"] = version if ttl: body["ttl"] = ttl if inactivity_timeout: body["inactivity_timeout"] = inactivity_timeout
response = self._http.post("/api/snapshots", json=body) return Snapshot.from_dict(response["data"])
def wait_until_ready( self, snapshot_id: int, timeout: int = 600, poll_interval: int = 5, ) -> Snapshot: """ Wait for a snapshot to become ready.
Args: snapshot_id: Snapshot ID to wait for timeout: Maximum time to wait in seconds poll_interval: Time between status checks in seconds
Returns: Snapshot object with status='ready'
Raises: TimeoutError: If snapshot doesn't become ready within timeout SnapshotFailedError: If snapshot status becomes 'failed' """ start = time.time()
while time.time() - start < timeout: snapshot = self.get(snapshot_id)
if snapshot.status == "ready": return snapshot
if snapshot.status == "failed": raise SnapshotFailedError( f"Snapshot {snapshot_id} failed: {snapshot.error_message}" )
time.sleep(poll_interval)
raise SDKTimeoutError( f"Snapshot {snapshot_id} did not become ready within {timeout}s" )
def delete(self, snapshot_id: int) -> None: """ Delete a snapshot.
Raises: DependencyError: If active instances exist PermissionDeniedError: If not owner or admin """ self._http.delete(f"/api/snapshots/{snapshot_id}")
def update( self, snapshot_id: int, name: str | None = None, description: str | None = None, ) -> Snapshot: """Update snapshot metadata.""" body = {} if name is not None: body["name"] = name if description is not None: body["description"] = description
response = self._http.put(f"/api/snapshots/{snapshot_id}", json=body) return Snapshot.from_dict(response["data"])
def set_lifecycle( self, snapshot_id: int, ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Snapshot: """Set lifecycle parameters for a snapshot.""" body = {} if ttl is not None: body["ttl"] = ttl if inactivity_timeout is not None: body["inactivity_timeout"] = inactivity_timeout
response = self._http.put(f"/api/snapshots/{snapshot_id}/lifecycle", json=body) return Snapshot.from_dict(response["data"])
def get_progress(self, snapshot_id: int) -> SnapshotProgress: """ Get detailed creation progress for a snapshot.
Args: snapshot_id: Snapshot ID
Returns: SnapshotProgress with phase, steps, and completion info """ response = self._http.get(f"/api/snapshots/{snapshot_id}/progress") return SnapshotProgress.from_api_response(response["data"])
def retry(self, snapshot_id: int) -> Snapshot: """ Retry a failed snapshot export.
Args: snapshot_id: Snapshot ID (must be in 'failed' status)
Returns: Snapshot object (status will be 'pending')
Raises: InvalidStateError: If snapshot is not in 'failed' status """ response = self._http.post(f"/api/snapshots/{snapshot_id}/retry") return Snapshot.from_dict(response["data"])
def create_and_wait( self, mapping_id: int, name: str, *, description: str | None = None, version: int | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, timeout: int = 600, poll_interval: int = 5, on_progress: Callable[[str, int, int], None] | None = None, ) -> Snapshot: """ Create a snapshot and wait for it to become ready.
Convenience method that combines create() and wait_until_ready().
Args: mapping_id: Source mapping ID name: Snapshot name description: Optional description version: Mapping version to use (defaults to current) ttl: Time-to-live (ISO 8601 duration) inactivity_timeout: Inactivity timeout (ISO 8601 duration) timeout: Maximum wait time in seconds poll_interval: Time between status checks on_progress: Optional callback(phase, completed_steps, total_steps)
Returns: Snapshot object with status='ready'
Example: >>> def show_progress(phase, completed, total): ... print(f"{phase}: {completed}/{total}") >>> snapshot = client.snapshots.create_and_wait( ... mapping_id=1, ... name="Analysis", ... on_progress=show_progress, ... ) """ snapshot = self.create( mapping_id=mapping_id, name=name, description=description, version=version, ttl=ttl, inactivity_timeout=inactivity_timeout, )
start = time.time()
while time.time() - start < timeout: progress = self.get_progress(snapshot.id)
if on_progress: on_progress(progress.phase, progress.completed_steps, progress.total_steps)
if progress.status == "ready": return self.get(snapshot.id)
if progress.status == "failed": raise SnapshotFailedError( f"Snapshot {snapshot.id} failed: {progress.error_message}" )
time.sleep(poll_interval)
raise SDKTimeoutError(f"Snapshot {snapshot.id} did not complete within {timeout}s")Instance Resource
Section titled “Instance Resource”from graph_olap.models import Instancefrom graph_olap.instance.connection import InstanceConnection
class InstanceResource: """Manage graph instances."""
def __init__(self, http: HTTPClient, config: Config): self._http = http self._config = config
def list( self, snapshot_id: int | None = None, owner: str | None = None, status: str | None = None, search: str | None = None, created_after: str | None = None, created_before: str | None = None, sort_by: str = "created_at", sort_order: str = "desc", offset: int = 0, limit: int = 50, ) -> PaginatedList[Instance]: """List instances with optional filters.""" params = { "offset": offset, "limit": min(limit, 100), "sort_by": sort_by, "sort_order": sort_order, } if snapshot_id is not None: params["snapshot_id"] = snapshot_id if owner: params["owner"] = owner if status: params["status"] = status if search: params["search"] = search if created_after: params["created_after"] = created_after if created_before: params["created_before"] = created_before
response = self._http.get("/api/instances", params=params) return PaginatedList( items=[Instance.from_dict(i) for i in response["data"]], total=response["meta"]["total"], offset=response["meta"]["offset"], limit=response["meta"]["limit"], )
def get(self, instance_id: int) -> Instance: """Get an instance by ID.""" response = self._http.get(f"/api/instances/{instance_id}") return Instance.from_dict(response["data"])
def create( self, snapshot_id: int, name: str, description: str | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Instance: """ Create a new graph instance from a snapshot.
Args: snapshot_id: Source snapshot ID (must be 'ready') name: Display name description: Optional description ttl: Time-to-live (ISO 8601 duration) inactivity_timeout: Inactivity timeout (ISO 8601 duration)
Returns: Instance object (status will be 'starting')
Raises: InvalidStateError: If snapshot is not 'ready' ConcurrencyLimitError: If instance limits exceeded """ body = { "snapshot_id": snapshot_id, "name": name, } if description: body["description"] = description if ttl: body["ttl"] = ttl if inactivity_timeout: body["inactivity_timeout"] = inactivity_timeout
response = self._http.post("/api/instances", json=body) return Instance.from_dict(response["data"])
def wait_until_running( self, instance_id: int, timeout: int = 300, poll_interval: int = 5, ) -> Instance: """ Wait for an instance to become running.
Args: instance_id: Instance ID to wait for timeout: Maximum time to wait in seconds poll_interval: Time between status checks in seconds
Returns: Instance object with status='running'
Raises: TimeoutError: If instance doesn't start within timeout InstanceFailedError: If instance status becomes 'failed' """ start = time.time()
while time.time() - start < timeout: instance = self.get(instance_id)
if instance.status == "running": return instance
if instance.status == "failed": raise InstanceFailedError( f"Instance {instance_id} failed: {instance.error_message}" )
time.sleep(poll_interval)
raise TimeoutError( f"Instance {instance_id} did not start within {timeout}s" )
def terminate(self, instance_id: int) -> None: """ Terminate an instance.
Args: instance_id: Instance ID to terminate
Raises: PermissionDeniedError: If not owner or admin """ self._http.post(f"/api/instances/{instance_id}/terminate")
def update( self, instance_id: int, *, name: str | None = None, description: str | None = None, ) -> Instance: """ Update instance metadata.
Args: instance_id: Instance ID name: New name (optional) description: New description (optional)
Returns: Updated Instance object """ body = {} if name is not None: body["name"] = name if description is not None: body["description"] = description
response = self._http.put(f"/api/instances/{instance_id}", json=body) return Instance.from_dict(response["data"])
def get_progress(self, instance_id: int) -> InstanceProgress: """ Get detailed startup progress for an instance.
Args: instance_id: Instance ID
Returns: InstanceProgress with phase, steps, and completion info """ response = self._http.get(f"/api/instances/{instance_id}/progress") return InstanceProgress.from_api_response(response["data"])
def extend_ttl(self, instance_id: int, hours: int = 24) -> Instance: """ Extend instance TTL by specified hours from current expiry.
Convenience method matching UX "Extend TTL" button behavior. Calculates new expiry as current_expiry + hours.
Args: instance_id: Instance ID hours: Hours to add to current TTL (default: 24)
Returns: Updated Instance object
Raises: ValidationError: If extension would exceed maximum TTL (7 days from creation)
Example: >>> instance = client.instances.extend_ttl(123) # +24 hours >>> instance = client.instances.extend_ttl(123, hours=48) # +48 hours """ ...
def create_and_wait( self, snapshot_id: int, name: str, *, description: str | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, timeout: int = 300, poll_interval: int = 5, on_progress: Callable[[str, int, int], None] | None = None, ) -> Instance: """ Create an instance and wait for it to become running.
This method performs environment-aware health checks: - In-cluster mode (GRAPH_OLAP_IN_CLUSTER_MODE="true"): Uses Kubernetes service DNS - External mode: Uses ingress URL from instance.instance_url
Both modes wait for the wrapper HTTP service to be ready (not just pod status).
Args: snapshot_id: Source snapshot ID name: Display name description: Optional description ttl: Time-to-live (ISO 8601 duration) inactivity_timeout: Inactivity timeout (ISO 8601 duration) timeout: Maximum wait time in seconds poll_interval: Time between status checks on_progress: Optional callback(phase, completed_steps, total_steps)
Returns: Instance object with status='running'
Example: >>> instance = client.instances.create_and_wait( ... snapshot_id=1, ... name="Quick Analysis" ... ) >>> conn = client.instances.connect(instance.id) """ ...
def connect(self, instance_id: int) -> "InstanceConnection": """ Connect to a running instance for queries and algorithms.
This method performs environment-aware URL construction: - In-cluster mode (GRAPH_OLAP_IN_CLUSTER_MODE="true"): Uses Kubernetes service DNS - External mode: Uses ingress URL from instance.instance_url
Args: instance_id: Instance ID to connect to
Returns: InstanceConnection object for graph operations
Raises: InvalidStateError: If instance is not 'running'
Example: >>> conn = client.instances.connect(123) >>> result = conn.query("MATCH (n:Customer) RETURN n LIMIT 10") >>> df = conn.query_df("MATCH (n)-[r]->(m) RETURN n.id, m.id") """ instance = self.get(instance_id)
if instance.status != "running": raise InvalidStateError( f"Instance {instance_id} is not running (status: {instance.status})" )
return InstanceConnection( instance_url=instance.instance_url, api_key=self._config.api_key, instance_id=instance_id, )
def create_from_mapping( self, mapping_id: int, name: str, *, description: str | None = None, mapping_version: int | None = None, snapshot_name: str | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, ) -> Instance: """ Create an instance directly from a mapping.
This is a convenience method that creates both a snapshot and instance in a single API call. The instance is returned immediately with status='waiting_for_snapshot'. A background job monitors for snapshot completion and transitions the instance to 'starting' automatically.
This method is ideal for workflows where you want to start an instance from a mapping without manually managing the snapshot lifecycle.
Args: mapping_id: Source mapping ID name: Display name for the instance description: Optional description mapping_version: Mapping version to use (defaults to current) snapshot_name: Name for auto-created snapshot (defaults to instance name) ttl: Time-to-live (ISO 8601 duration, e.g., "PT24H") inactivity_timeout: Inactivity timeout (ISO 8601 duration)
Returns: Instance object with status='waiting_for_snapshot'
Raises: NotFoundError: If mapping doesn't exist ValidationError: If mapping version is invalid ConcurrencyLimitError: If instance limits exceeded
Example: >>> instance = client.instances.create_from_mapping( ... mapping_id=1, ... name="Quick Analysis", ... ) >>> print(f"Instance {instance.id} status: {instance.status}") # Instance 123 status: waiting_for_snapshot """ body = { "mapping_id": mapping_id, "name": name, } if description: body["description"] = description if mapping_version is not None: body["mapping_version"] = mapping_version if snapshot_name: body["snapshot_name"] = snapshot_name if ttl: body["ttl"] = ttl if inactivity_timeout: body["inactivity_timeout"] = inactivity_timeout
response = self._http.post("/api/instances/from-mapping", json=body) return Instance.from_dict(response["data"])
def create_from_mapping_and_wait( self, mapping_id: int, name: str, *, description: str | None = None, mapping_version: int | None = None, snapshot_name: str | None = None, ttl: str | None = None, inactivity_timeout: str | None = None, timeout: int = 900, poll_interval: int = 5, on_progress: Callable[[str, int, int], None] | None = None, ) -> Instance: """ Create an instance from a mapping and wait until it becomes running.
This is a convenience method that combines create_from_mapping() with polling until the instance reaches 'running' status. It handles the full lifecycle: snapshot creation, instance creation, and startup.
The default timeout is 900 seconds (15 minutes) to account for both snapshot export time and instance startup time.
Args: mapping_id: Source mapping ID name: Display name for the instance description: Optional description mapping_version: Mapping version to use (defaults to current) snapshot_name: Name for auto-created snapshot (defaults to instance name) ttl: Time-to-live (ISO 8601 duration, e.g., "PT24H") inactivity_timeout: Inactivity timeout (ISO 8601 duration) timeout: Maximum wait time in seconds (default: 900) poll_interval: Time between status checks in seconds on_progress: Optional callback(phase, completed_steps, total_steps)
Returns: Instance object with status='running'
Raises: NotFoundError: If mapping doesn't exist ValidationError: If mapping version is invalid ConcurrencyLimitError: If instance limits exceeded TimeoutError: If instance doesn't become running within timeout InstanceFailedError: If instance or snapshot fails
Example: >>> def show_progress(phase, completed, total): ... print(f"{phase}: {completed}/{total}") >>> instance = client.instances.create_from_mapping_and_wait( ... mapping_id=1, ... name="Quick Analysis", ... on_progress=show_progress, ... ) >>> conn = client.instances.connect(instance.id) >>> result = conn.query("MATCH (n) RETURN count(n)") """ instance = self.create_from_mapping( mapping_id=mapping_id, name=name, description=description, mapping_version=mapping_version, snapshot_name=snapshot_name, ttl=ttl, inactivity_timeout=inactivity_timeout, )
start = time.time()
while time.time() - start < timeout: instance = self.get(instance.id)
if on_progress: # Map status to progress info if instance.status == "waiting_for_snapshot": on_progress("waiting_for_snapshot", 0, 3) elif instance.status == "starting": on_progress("starting", 1, 3) elif instance.status == "running": on_progress("running", 3, 3)
if instance.status == "running": return instance
if instance.status == "failed": raise InstanceFailedError( f"Instance {instance.id} failed: {instance.error_message}" )
time.sleep(poll_interval)
raise SDKTimeoutError( f"Instance {instance.id} did not become running within {timeout}s" )
def update_cpu( self, instance_id: int, cpu_cores: int, ) -> Instance: """ Update CPU cores for a running instance.
Scale the instance's CPU allocation up or down. The change takes effect without restarting the instance.
Args: instance_id: Instance ID cpu_cores: New CPU core count (1-8)
Returns: Updated Instance object
Raises: ValidationError: If cpu_cores is out of valid range InvalidStateError: If instance is not running PermissionDeniedError: If not owner or admin
Example: >>> # Scale up to 4 cores for intensive analysis >>> instance = client.instances.update_cpu(123, cpu_cores=4) >>> print(f"Instance now has {instance.cpu_cores} CPU cores") """ response = self._http.put( f"/api/instances/{instance_id}/cpu", json={"cpu_cores": cpu_cores}, ) return Instance.from_dict(response["data"])Ops Resource
Section titled “Ops Resource”from graph_olap.models.ops import ( LifecycleConfig, ResourceLifecycleConfig, ConcurrencyConfig, MaintenanceMode, ExportConfig, ClusterHealth, ClusterInstances,)
class OpsResource: """ Operations endpoints for platform configuration and cluster status.
Requires ops role for access. Use for: - Lifecycle configuration (TTL, inactivity timeouts) - Concurrency limits (per-analyst, cluster-wide) - Maintenance mode - Export configuration - Cluster health and instance monitoring
Example: >>> # Check cluster health >>> health = client.ops.get_cluster_health() >>> print(f"Status: {health.status}")
>>> # View instance distribution >>> instances = client.ops.get_cluster_instances() >>> print(f"Total: {instances.total}, Available: {instances.limits.cluster_available}") """
def __init__(self, http: HTTPClient): self._http = http
# Config Methods
def get_lifecycle_config(self) -> LifecycleConfig: """ Get lifecycle configuration for all resource types.
Returns: LifecycleConfig with mapping, snapshot, and instance settings """ response = self._http.get("/api/config/lifecycle") return LifecycleConfig.from_api_response(response["data"])
def update_lifecycle_config( self, mapping: ResourceLifecycleConfig | dict | None = None, snapshot: ResourceLifecycleConfig | dict | None = None, instance: ResourceLifecycleConfig | dict | None = None, ) -> bool: """ Update lifecycle configuration.
Args: mapping: Mapping lifecycle config (default_ttl, default_inactivity, max_ttl) snapshot: Snapshot lifecycle config instance: Instance lifecycle config
Returns: True if update succeeded """ body = {} for key, val in [("mapping", mapping), ("snapshot", snapshot), ("instance", instance)]: if val is not None: if hasattr(val, "__dataclass_fields__"): body[key] = { "default_ttl": val.default_ttl, "default_inactivity": val.default_inactivity, "max_ttl": val.max_ttl, } else: body[key] = val
self._http.put("/api/config/lifecycle", json=body) return True
def get_concurrency_config(self) -> ConcurrencyConfig: """Get concurrency limits configuration.""" response = self._http.get("/api/config/concurrency") return ConcurrencyConfig.from_api_response(response["data"])
def update_concurrency_config( self, per_analyst: int | None = None, cluster_total: int | None = None, ) -> ConcurrencyConfig: """ Update concurrency limits.
Args: per_analyst: Max instances per analyst cluster_total: Max instances cluster-wide """ body = {} if per_analyst is not None: body["per_analyst"] = per_analyst if cluster_total is not None: body["cluster_total"] = cluster_total
response = self._http.put("/api/config/concurrency", json=body) return ConcurrencyConfig.from_api_response(response["data"])
def get_maintenance_mode(self) -> MaintenanceMode: """Get maintenance mode status.""" response = self._http.get("/api/config/maintenance") return MaintenanceMode.from_api_response(response["data"])
def set_maintenance_mode( self, enabled: bool, message: str = "", ) -> MaintenanceMode: """ Enable or disable maintenance mode.
Args: enabled: True to enable, False to disable message: Message shown to users during maintenance """ response = self._http.put( "/api/config/maintenance", json={"enabled": enabled, "message": message}, ) return MaintenanceMode.from_api_response(response["data"])
def get_export_config(self) -> ExportConfig: """Get export configuration.""" response = self._http.get("/api/config/export") return ExportConfig.from_api_response(response["data"])
def update_export_config(self, max_duration_seconds: int) -> ExportConfig: """Update export configuration.""" response = self._http.put( "/api/config/export", json={"max_duration_seconds": max_duration_seconds}, ) return ExportConfig.from_api_response(response["data"])
# Cluster Methods
def get_cluster_health(self) -> ClusterHealth: """ Get cluster health status.
Returns: ClusterHealth with status and component health """ response = self._http.get("/api/cluster/health") return ClusterHealth.from_api_response(response["data"])
def get_cluster_instances(self) -> ClusterInstances: """ Get cluster-wide instance summary.
Returns: ClusterInstances with counts by status, owner, and limits """ response = self._http.get("/api/cluster/instances") return ClusterInstances.from_api_response(response["data"])
# Background Jobs Methods (NEW in 2025-12)
def trigger_job( self, job_name: str, reason: str = "manual-trigger", ) -> dict[str, Any]: """ Manually trigger a background job for immediate execution.
Useful for debugging, smoke tests, and incident response.
**Rate Limiting:** 1 request per minute per job.
Args: job_name: Job to trigger: "reconciliation", "lifecycle", "export_reconciliation", "schema_cache" reason: Reason for manual trigger (audit log, 1-500 chars)
Returns: Dict with job_name, status, triggered_at, triggered_by, reason
Raises: RateLimitError: If job was triggered < 1 minute ago ValidationError: If invalid job_name ForbiddenError: If user doesn't have ops role
Example: >>> # Trigger reconciliation for smoke test >>> result = client.ops.trigger_job( ... job_name="reconciliation", ... reason="post-deployment smoke test" ... ) >>> print(f"Job {result['job_name']} status: {result['status']}") """ response = self._http.post( "/api/ops/jobs/trigger", json={"job_name": job_name, "reason": reason}, ) return response["data"]
def get_job_status(self) -> dict[str, Any]: """ Get status of all background jobs.
Returns health status and last execution times for all jobs.
Returns: Dict with "jobs" list containing name, schedule, last_success_at, last_failure_at, consecutive_failures, health_status
Raises: ForbiddenError: If user doesn't have ops role
Example: >>> status = client.ops.get_job_status() >>> for job in status["jobs"]: ... if job["health_status"] == "unhealthy": ... print(f"WARNING: Job {job['name']} is unhealthy!") """ response = self._http.get("/api/ops/jobs/status") return response["data"]
def get_state(self) -> dict[str, Any]: """ Get system state snapshot.
Returns current counts of instances, snapshots, export jobs by status. Useful for operational dashboards and debugging.
Returns: Dict with "instances", "snapshots", "export_jobs" containing counts by status
Raises: ForbiddenError: If user doesn't have ops role
Example: >>> state = client.ops.get_state() >>> print(f"Running instances: {state['instances']['by_status']['running']}") >>> print(f"Instances without pod: {state['instances']['without_pod_name']}") """ response = self._http.get("/api/ops/state") return response["data"]
def get_export_jobs( self, status: str | None = None, limit: int = 100, ) -> list[dict[str, Any]]: """ Get export jobs for debugging.
Returns export jobs with detailed status for troubleshooting worker issues.
Args: status: Filter by status: "pending", "claimed", "completed", "failed" limit: Max records to return (max: 100)
Returns: List of export job dicts with id, snapshot_id, entity_type, entity_name, status, attempts, timestamps, error_message
Raises: ForbiddenError: If user doesn't have ops role
Example: >>> # Get all failed export jobs >>> failed_jobs = client.ops.get_export_jobs(status="failed") >>> for job in failed_jobs: ... print(f"Job {job['id']}: {job['error_message']}") """ params = {"limit": limit} if status: params["status"] = status
response = self._http.get("/api/ops/export-jobs", params=params) return response["data"]
def get_metrics(self) -> str: """ Get Prometheus metrics in text format.
Returns raw Prometheus metrics for parsing job health and system state. Used by E2E tests to poll for job execution.
Returns: Prometheus metrics text
Raises: ForbiddenError: If user doesn't have ops role
Example: >>> metrics_text = client.ops.get_metrics() >>> # Parse metric value >>> for line in metrics_text.split("\\n"): ... if line.startswith("instances_by_status_total"): ... print(line) """ response = self._http.get("/metrics") return responseAdmin Resource
Section titled “Admin Resource”File: packages/graph-olap-sdk/src/graph_olap/resources/admin.py
from graph_olap.http import HTTPClientfrom typing import Any
class AdminResource: """ Admin-only privileged operations.
Requires admin role. Use for: - Bulk resource deletion (test cleanup, ops maintenance) - Other privileged operations
Example: >>> # Dry run to see what would be deleted >>> result = client.admin.bulk_delete( ... resource_type="instance", ... filters={"created_by": "e2e-test-user"}, ... reason="test-cleanup", ... dry_run=True ... ) >>> print(f"Would delete {result['matched_count']} instances") >>> print(f"IDs: {result['matched_ids']}") >>> >>> # Actually delete with expected_count safety check >>> result = client.admin.bulk_delete( ... resource_type="instance", ... filters={"created_by": "e2e-test-user"}, ... reason="test-cleanup", ... expected_count=result['matched_count'], ... dry_run=False ... ) >>> print(f"Deleted {result['deleted_count']} instances") """
def __init__(self, http: HTTPClient): self._http = http
def bulk_delete( self, resource_type: str, filters: dict[str, Any], reason: str, expected_count: int | None = None, dry_run: bool = False, ) -> dict[str, Any]: """ Bulk delete resources with safety filters.
**Requires:** Admin role
**Safety features:** - At least one filter required - Max 100 deletions per request - Expected count validation - Dry run mode available - Full audit logging
Args: resource_type: Resource type: "instance", "snapshot", "mapping" filters: Filters to match resources (at least one required): - name_prefix: Match resources starting with prefix - created_by: Match resources created by username - older_than_hours: Match resources older than N hours - status: Match resources with specific status reason: Reason for deletion (audit log, 1-500 chars) expected_count: Expected number to delete (safety check). Must match actual count or operation fails. dry_run: If True, return what would be deleted without deleting
Returns: Dict with dry_run, matched_count, matched_ids, deleted_count, deleted_ids, failed_ids, errors
Raises: ForbiddenError: If user doesn't have Admin role ValidationError: If no filters, matched > 100, or count mismatch
Example: >>> # Step 1: Dry run to get count >>> result = client.admin.bulk_delete( ... resource_type="instance", ... filters={ ... "name_prefix": "E2ETest-", ... "older_than_hours": 24 ... }, ... reason="cleanup-old-test-instances", ... dry_run=True ... ) >>> print(f"Would delete {result['matched_count']} instances") >>> >>> # Step 2: Actually delete with expected_count >>> result = client.admin.bulk_delete( ... resource_type="instance", ... filters={ ... "name_prefix": "E2ETest-", ... "older_than_hours": 24 ... }, ... reason="cleanup-old-test-instances", ... expected_count=result['matched_count'], # Safety check! ... dry_run=False ... ) >>> print(f"Deleted: {result['deleted_count']}") >>> print(f"Failed: {len(result['failed_ids'])}") """ response = self._http.delete( "/api/admin/resources/bulk", json={ "resource_type": resource_type, "filters": filters, "reason": reason, "expected_count": expected_count, "dry_run": dry_run, }, ) return response["data"]Health Resource
Section titled “Health Resource”from graph_olap.models.ops import HealthStatus
class HealthResource: """ Health check endpoints.
Use for monitoring and readiness checks: - /health - Basic liveness check - /ready - Full readiness check including database
Example: >>> health = client.health.check() >>> print(f"Status: {health.status}, Version: {health.version}")
>>> ready = client.health.ready() >>> print(f"Database: {ready.database}") """
def __init__(self, http: HTTPClient): self._http = http
def check(self) -> HealthStatus: """ Basic health check (liveness probe).
Returns: HealthStatus with status and version """ response = self._http.get("/health") return HealthStatus.from_api_response(response)
def ready(self) -> HealthStatus: """ Readiness check including database connectivity.
Returns: HealthStatus with status, version, and database status """ response = self._http.get("/ready") return HealthStatus.from_api_response(response)Favorite Resource
Section titled “Favorite Resource”from graph_olap.models.common import Favorite
class FavoriteResource: """ Manage user favorites/bookmarks.
Favorites allow users to quickly access frequently used resources.
Example: >>> # List all favorites >>> favorites = client.favorites.list() >>> for f in favorites: ... print(f"{f.resource_type}: {f.resource_name}")
>>> # Add a mapping to favorites >>> client.favorites.add("mapping", 1)
>>> # Remove from favorites >>> client.favorites.remove("mapping", 1) """
def __init__(self, http: HTTPClient): self._http = http
def list(self, resource_type: str | None = None) -> list[Favorite]: """ List current user's favorites.
Args: resource_type: Filter by type (mapping, snapshot, instance)
Returns: List of Favorite objects """ params = {} if resource_type: params["resource_type"] = resource_type
response = self._http.get("/api/favorites", params=params) return [Favorite.from_api_response(f) for f in response["data"]]
def add(self, resource_type: str, resource_id: int) -> Favorite: """ Add a resource to favorites.
Args: resource_type: Resource type (mapping, snapshot, instance) resource_id: Resource ID
Returns: Created Favorite object
Raises: NotFoundError: If resource doesn't exist ConflictError: If already favorited """ response = self._http.post( "/api/favorites", json={"resource_type": resource_type, "resource_id": resource_id}, ) return Favorite.from_api_response(response["data"])
def remove(self, resource_type: str, resource_id: int) -> None: """ Remove a resource from favorites.
Args: resource_type: Resource type (mapping, snapshot, instance) resource_id: Resource ID
Raises: NotFoundError: If favorite doesn't exist """ self._http.delete(f"/api/favorites/{resource_type}/{resource_id}")Schema Resource
Section titled “Schema Resource”from graph_olap.models.schema import CacheStats, Catalog, Column, Schema, Table
class SchemaResource: """ Browse Starburst schema metadata.
All operations use cached metadata (refreshed every 24h). Performance: ~5ms per API call (HTTP overhead), ~1us for cache lookup.
Example: >>> # List all catalogs >>> catalogs = client.schema.list_catalogs() >>> for cat in catalogs: ... print(f"{cat.catalog_name}: {cat.schema_count} schemas")
>>> # List schemas in a catalog >>> schemas = client.schema.list_schemas("analytics") >>> for sch in schemas: ... print(f"{sch.schema_name}: {sch.table_count} tables")
>>> # List tables in a schema >>> tables = client.schema.list_tables("analytics", "public") >>> for tbl in tables: ... print(f"{tbl.table_name} ({tbl.table_type})")
>>> # Get columns for a table >>> columns = client.schema.list_columns("analytics", "public", "users") >>> for col in columns: ... print(f"{col.column_name}: {col.data_type}")
>>> # Search for tables >>> results = client.schema.search_tables("customer", limit=50) >>> for tbl in results: ... print(f"{tbl.catalog_name}.{tbl.schema_name}.{tbl.table_name}")
>>> # Search for columns >>> results = client.schema.search_columns("email", limit=50) >>> for col in results: ... print(f"{col.catalog_name}.{col.schema_name}.{col.table_name}.{col.column_name}") """
def __init__(self, http: HTTPClient): self._http = http
def list_catalogs(self) -> list[Catalog]: """ List all cached Starburst catalogs.
Returns: List of Catalog objects (sorted by name) """ response = self._http.get("/api/schema/catalogs") return [Catalog.from_api_response(item) for item in response["data"]]
def list_schemas(self, catalog: str) -> list[Schema]: """ List all schemas in a catalog.
Args: catalog: Catalog name (e.g., "analytics")
Returns: List of Schema objects
Raises: NotFoundError: Catalog not found in cache """ response = self._http.get(f"/api/schema/catalogs/{catalog}/schemas") return [Schema.from_api_response(item) for item in response["data"]]
def list_tables(self, catalog: str, schema: str) -> list[Table]: """ List all tables in a schema.
Args: catalog: Catalog name schema: Schema name
Returns: List of Table objects
Raises: NotFoundError: Schema not found in cache """ response = self._http.get( f"/api/schema/catalogs/{catalog}/schemas/{schema}/tables" ) return [Table.from_api_response(item) for item in response["data"]]
def list_columns(self, catalog: str, schema: str, table: str) -> list[Column]: """ Get all columns for a table.
Args: catalog: Catalog name schema: Schema name table: Table name
Returns: List of Column objects (sorted by ordinal_position)
Raises: NotFoundError: Table not found in cache """ response = self._http.get( f"/api/schema/catalogs/{catalog}/schemas/{schema}/tables/{table}/columns" ) return [Column.from_api_response(item) for item in response["data"]]
def search_tables(self, pattern: str, limit: int = 100) -> list[Table]: """ Search tables by name pattern (prefix match, case-insensitive).
Args: pattern: Search pattern (e.g., "customer" matches "customers", "customer_orders") limit: Maximum results (default: 100, max: 1000)
Returns: List of Table objects matching pattern """ response = self._http.get( "/api/schema/search/tables", params={"q": pattern, "limit": limit} ) return [Table.from_api_response(item) for item in response["data"]]
def search_columns(self, pattern: str, limit: int = 100) -> list[Column]: """ Search columns by name pattern (prefix match, case-insensitive).
Args: pattern: Search pattern (e.g., "email" matches "email", "email_address") limit: Maximum results (default: 100, max: 1000)
Returns: List of Column objects matching pattern """ response = self._http.get( "/api/schema/search/columns", params={"q": pattern, "limit": limit} ) return [Column.from_api_response(item) for item in response["data"]]
# Admin operations
def admin_refresh(self) -> dict: """ Trigger cache refresh (admin only).
Starts background task to fetch latest metadata from Starburst.
Returns: Dict with status message
Raises: ForbiddenError: If user doesn't have admin role """ response = self._http.post("/api/schema/admin/refresh") return response["data"]
def get_stats(self) -> CacheStats: """ Get cache statistics (admin only).
Returns: CacheStats object with counts and metadata
Raises: ForbiddenError: If user doesn't have admin role """ response = self._http.get("/api/schema/stats") return CacheStats.from_api_response(response["data"])Progress Models
Section titled “Progress Models”Models for tracking async operations (snapshot creation, instance startup):
class SnapshotProgress(BaseModel): """Detailed progress for snapshot creation."""
id: int status: str phase: str # queued, exporting, uploading, importing, ready, failed started_at: datetime | None = None steps: list[dict[str, Any]] = [] current_step: str | None = None progress_percent: int = 0 estimated_remaining_seconds: int | None = None error_message: str | None = None
@property def completed_steps(self) -> int: """Number of completed steps.""" return sum(1 for s in self.steps if s.get("status") == "completed")
@property def total_steps(self) -> int: """Total number of steps.""" return len(self.steps)
# models/instance.pyclass InstanceProgress(BaseModel): """Detailed progress for instance startup."""
id: int status: str phase: str # pod_scheduled, downloading, loading_schema, loading_data, ready, failed started_at: datetime | None = None steps: list[dict[str, Any]] = [] current_step: str | None = None progress_percent: int = 0 estimated_remaining_seconds: int | None = None error_message: str | None = None
@property def completed_steps(self) -> int: """Number of completed steps.""" return sum(1 for s in self.steps if s.get("status") == "completed")
@property def total_steps(self) -> int: """Total number of steps.""" return len(self.steps)
class LockStatus(BaseModel): """Instance lock status for algorithm execution."""
locked: bool holder_id: str | None = None holder_name: str | None = None algorithm: str | None = None locked_at: datetime | None = None