Skip to content

Jupyter SDK Algorithms Design

Algorithm execution for the Jupyter SDK, including native Ryugraph algorithms and NetworkX integration.


Wrapper Capabilities: Algorithm availability differs by wrapper type.

  • Ryugraph: Native algorithms + NetworkX (100+ algorithms)
  • FalkorDB: Native Cypher procedures only (NO NetworkX)

See falkordb-wrapper.design.md for FalkorDB algorithm details.


Algorithm types are defined in the shared graph-olap-schemas package to ensure consistency between SDK and wrapper:

from graph_olap_schemas import (
AlgorithmType, # native, networkx
ExecutionStatus, # pending, running, completed, failed, cancelled
AlgorithmCategory, # centrality, community, pathfinding, etc.
AlgorithmExecutionResponse,
AlgorithmInfoResponse,
AlgorithmListResponse,
)

See graph-olap-schemas for the authoritative schema definitions. See ADR-24 for the decision rationale.


The SDK provides two algorithm managers accessible via InstanceConnection:

ManagerAccessPurpose
conn.algoAlgorithmManagerNative Ryugraph algorithms (PageRank, WCC, Louvain, etc.)
conn.networkxNetworkXManagerNetworkX algorithms (100+ via dynamic introspection)

Both managers support:

  • Dynamic discovery - algorithms() lists available algorithms
  • Introspection - algorithm_info() returns parameter documentation
  • Generic execution - run() executes any algorithm by name
  • Convenience methods - Typed methods for common algorithms

Native algorithms run directly in Ryugraph’s C++ engine for maximum performance.

# Discovery
>>> conn.algo.algorithms()
>>> conn.algo.algorithm_info("pagerank")
# Generic execution
>>> conn.algo.run("pagerank", node_label="Customer", property_name="pr_score")
# Convenience methods
>>> conn.algo.pagerank("Customer", "pr_score", damping=0.85)
>>> conn.algo.wcc("Account", "component_id")
>>> conn.algo.louvain("Customer", "community_id")

See jupyter-sdk.algorithms.native.design.md for full details.

NetworkX algorithms are discovered dynamically - no SDK updates needed when new algorithms are added.

# Discovery
>>> conn.networkx.algorithms(category="centrality")
>>> conn.networkx.algorithm_info("betweenness_centrality")
# Generic execution
>>> conn.networkx.run("betweenness_centrality", node_label="Customer", property_name="bc")
# Convenience methods
>>> conn.networkx.closeness_centrality("Customer", "cc")
>>> conn.networkx.eigenvector_centrality("Customer", "ec")

See jupyter-sdk.algorithms.networkx.design.md for full details.


Add these to exceptions.py:

# Additional exceptions
class SnapshotNotReadyError(ConflictError):
"""Cannot use a snapshot that is not ready."""
pass
class StarburstError(GraphOLAPError):
"""Error from Starburst during export."""
def __init__(self, message: str, details: dict = None):
super().__init__(message)
self.details = details or {}
class GCSError(GraphOLAPError):
"""Error accessing Google Cloud Storage."""
def __init__(self, message: str, details: dict = None):
super().__init__(message)
self.details = details or {}
class ExecutionNotFoundError(NotFoundError):
"""Algorithm execution not found."""
pass

Add to client.py:

# Quick start convenience method in client.py
def quick_start(
self,
mapping_id: int,
snapshot_name: str | None = None,
instance_name: str | None = None,
snapshot_timeout: int = 600,
instance_timeout: int = 300,
) -> "InstanceConnection":
"""
Convenience method to go from mapping to running instance in one call.
Creates a snapshot from the mapping, waits for it to be ready,
creates an instance, waits for it to start, and returns a connection.
Args:
mapping_id: Source mapping ID
snapshot_name: Name for snapshot (default: auto-generated)
instance_name: Name for instance (default: auto-generated)
snapshot_timeout: Max seconds to wait for snapshot
instance_timeout: Max seconds to wait for instance
Returns:
InstanceConnection ready for queries
Example:
>>> conn = client.quick_start(mapping_id=1)
>>> df = conn.query_df("MATCH (n) RETURN n LIMIT 10")
"""
from datetime import datetime
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
# Create and wait for snapshot
snapshot = self.snapshots.create_and_wait(
mapping_id=mapping_id,
name=snapshot_name or f"QuickStart_{timestamp}",
timeout=snapshot_timeout,
)
# Create and wait for instance
instance = self.instances.create_and_wait(
snapshot_id=snapshot.id,
name=instance_name or f"QuickStart_{timestamp}",
timeout=instance_timeout,
)
# Return connection
return self.instances.connect(instance.id)

Add to pagination.py:

pagination.py
from dataclasses import dataclass
from typing import Generic, TypeVar, Iterator, Callable
T = TypeVar("T")
@dataclass
class PaginatedList(Generic[T]):
"""Paginated list of items with metadata."""
items: list[T]
total: int
offset: int
limit: int
def __iter__(self) -> Iterator[T]:
return iter(self.items)
def __len__(self) -> int:
return len(self.items)
@property
def has_more(self) -> bool:
return self.offset + len(self.items) < self.total
class PaginatedIterator(Generic[T]):
"""Iterator that automatically fetches all pages."""
def __init__(
self,
fetch_page: Callable[[int, int], PaginatedList[T]],
limit: int = 100,
):
self._fetch_page = fetch_page
self._limit = limit
self._offset = 0
self._exhausted = False
self._current_page: list[T] = []
self._page_index = 0
def __iter__(self) -> Iterator[T]:
return self
def __next__(self) -> T:
# Fetch next page if needed
if self._page_index >= len(self._current_page):
if self._exhausted:
raise StopIteration
page = self._fetch_page(self._offset, self._limit)
self._current_page = page.items
self._page_index = 0
self._offset += len(page.items)
if not page.has_more:
self._exhausted = True
if not self._current_page:
raise StopIteration
item = self._current_page[self._page_index]
self._page_index += 1
return item
# Add to each resource class:
def iter_all(self, **filters) -> PaginatedIterator[T]:
"""
Iterate through all matching resources, automatically handling pagination.
Example:
>>> for mapping in client.mappings.iter_all(owner="alice"):
... process(mapping)
"""
def fetch_page(offset: int, limit: int) -> PaginatedList[T]:
return self.list(**filters, offset=offset, limit=limit)
return PaginatedIterator(fetch_page)

For packaging configuration, Docker images, JupyterHub deployment, and IPython magic commands, see:

jupyter-sdk.deployment.design.md


See decision.log.md for:

  • OQ-001: Authentication mechanism (OAuth, API keys, JWT)
  • OQ-002: Jupyter connectivity (Ingress, service mesh)