Skip to content

API Specification: Internal APIs

REST API specification for internal communication between system components. These endpoints are not exposed to external clients.

Authorization: Bearer {service_account_token}
X-Component: {worker|wrapper}

Note: Internal APIs use Kubernetes service account tokens, not user API keys.

http://control-plane.graph-olap.svc.cluster.local/api/internal
  • Internal endpoints are only accessible within the cluster network
  • All status updates go through Control Plane (single source of truth)

POST /export-jobs/claim

Called by Export Worker to atomically claim pending jobs for processing. Uses SELECT ... FOR UPDATE SKIP LOCKED internally to prevent race conditions between workers.

Request Body:

{
"worker_id": "export-worker-abc123-xyz",
"limit": 10
}

Request Body Fields:

FieldTypeRequiredDescription
worker_idstringYesUnique identifier for this worker instance (e.g., pod name)
limitintegerNoMaximum jobs to claim (default: 10)

Response: 200 OK

{
"data": {
"claimed": 3,
"jobs": [
{
"id": 1,
"snapshot_id": 42,
"job_type": "node",
"entity_name": "Customer",
"status": "claimed",
"sql": "SELECT id, name FROM analytics.public.customers",
"column_names": ["id", "name"],
"starburst_catalog": "analytics",
"gcs_path": "gs://bucket/user/42/v1/123/nodes/Customer/",
"claimed_by": "export-worker-abc123-xyz",
"claimed_at": "2025-01-15T10:30:00Z"
}
]
}
}

Response: 200 OK (no jobs available)

{
"data": {
"claimed": 0,
"jobs": []
}
}

Notes:

  • Jobs are claimed atomically - no two workers can claim the same job
  • Claimed jobs have a lease timeout (10 minutes) - if worker crashes, reconciliation resets them
  • Worker should call this periodically to get new work
  • Jobs include denormalized sql, column_names, starburst_catalog so worker doesn’t need separate mapping fetch

GET /export-jobs/pollable

Called by Export Worker to get jobs that are ready for Starburst status polling.

Query Parameters:

ParameterTypeDefaultDescription
limitinteger10Maximum jobs to return

Response: 200 OK

{
"data": {
"jobs": [
{
"id": 1,
"snapshot_id": 42,
"job_type": "node",
"entity_name": "Customer",
"status": "submitted",
"starburst_query_id": "query_20250115_abc123",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_abc123/5",
"next_poll_at": "2025-01-15T10:30:00Z",
"poll_count": 3,
"gcs_path": "gs://bucket/user/42/v1/123/nodes/Customer/"
}
]
}
}

Notes:

  • Returns jobs where status = 'submitted' AND next_poll_at <= now
  • Worker should poll Starburst for each job and update status accordingly
  • Uses FOR UPDATE SKIP LOCKED to prevent multiple workers polling same job

PUT /snapshots/:id/status

Called by Export Worker to update snapshot status during processing.

Request Body (Creating):

{
"status": "creating",
"phase": "exporting_nodes",
"progress": {
"current_step": "Customer",
"completed_steps": 0,
"total_steps": 3
}
}

Request Body (Ready):

{
"status": "ready",
"size_bytes": 1073741824,
"node_counts": {"Customer": 10000, "Product": 5000},
"edge_counts": {"PURCHASED": 50000}
}

Request Body (Failed):

{
"status": "failed",
"error_message": "Starburst query timeout after 30 minutes",
"failed_step": "PURCHASED",
"partial_results": {
"node_counts": {"Customer": 10000, "Product": 5000},
"edge_counts": {}
}
}

Response: 200 OK

{
"data": {"updated": true}
}

POST /snapshots/:id/export-jobs

Called by Export Submitter after submitting UNLOAD queries to Starburst. Creates one export_job record per node/edge definition.

Request Body:

{
"jobs": [
{
"job_type": "node",
"entity_name": "Customer",
"starburst_query_id": "query_20250115_abc123",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_abc123/1",
"gcs_path": "gs://bucket/user/mapping/snapshot/nodes/Customer/",
"status": "running",
"submitted_at": "2025-01-15T10:30:00Z"
},
{
"job_type": "node",
"entity_name": "Product",
"starburst_query_id": "query_20250115_def456",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_def456/1",
"gcs_path": "gs://bucket/user/mapping/snapshot/nodes/Product/",
"status": "running",
"submitted_at": "2025-01-15T10:30:00Z"
},
{
"job_type": "edge",
"entity_name": "PURCHASED",
"starburst_query_id": "query_20250115_ghi789",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_ghi789/1",
"gcs_path": "gs://bucket/user/mapping/snapshot/edges/PURCHASED/",
"status": "running",
"submitted_at": "2025-01-15T10:30:00Z"
}
]
}

Request Body Fields:

FieldTypeRequiredDescription
job_typestringYes"node" or "edge"
entity_namestringYesNode label or edge type name
starburst_query_idstringYesStarburst query ID from submission
next_uristringYesStarburst polling URI
gcs_pathstringYesGCS destination path
statusstringNoInitial status (default: "running")
submitted_attimestampNoWhen query was submitted (default: current time)

Response: 201 Created

{
"data": {
"created": 3,
"jobs": [
{"id": 1, "entity_name": "Customer", "status": "running"},
{"id": 2, "entity_name": "Product", "status": "running"},
{"id": 3, "entity_name": "PURCHASED", "status": "running"}
]
}
}

Response: 404 Not Found (snapshot doesn’t exist)

Response: 409 Conflict (jobs already exist for this snapshot)


GET /snapshots/:id/export-jobs

Called by Export Poller to get export jobs for polling. Returns jobs that need attention.

Query Parameters:

ParameterTypeDefaultDescription
statusstring-Filter by status: pending, running, completed, failed

Response: 200 OK

{
"data": {
"jobs": [
{
"id": 1,
"snapshot_id": 42,
"job_type": "node",
"entity_name": "Customer",
"status": "running",
"starburst_query_id": "query_20250115_abc123",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_abc123/5",
"gcs_path": "gs://bucket/user/mapping/snapshot/nodes/Customer/",
"row_count": null,
"size_bytes": null,
"submitted_at": "2025-01-15T10:30:00Z",
"completed_at": null,
"error_message": null
},
{
"id": 2,
"snapshot_id": 42,
"job_type": "node",
"entity_name": "Product",
"status": "completed",
"starburst_query_id": "query_20250115_def456",
"next_uri": null,
"gcs_path": "gs://bucket/user/mapping/snapshot/nodes/Product/",
"row_count": 5000,
"size_bytes": 1048576,
"submitted_at": "2025-01-15T10:30:00Z",
"completed_at": "2025-01-15T10:32:00Z",
"error_message": null
}
]
}
}

PATCH /export-jobs/:id

Called by Export Worker to update a single export job’s status, polling state, and results.

Request Body (Mark as Submitted - after Starburst accepts query):

{
"status": "submitted",
"starburst_query_id": "query_20250115_abc123",
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_abc123/1",
"next_poll_at": "2025-01-15T10:30:05Z",
"poll_count": 1,
"submitted_at": "2025-01-15T10:30:00Z"
}

Request Body (Update polling state - query still running):

{
"next_uri": "https://starburst.example.com/v1/statement/query_20250115_abc123/10",
"next_poll_at": "2025-01-15T10:30:35Z",
"poll_count": 5
}

Request Body (Completed):

{
"status": "completed",
"row_count": 10000,
"size_bytes": 2097152,
"completed_at": "2025-01-15T10:35:00Z"
}

Request Body (Failed):

{
"status": "failed",
"error_message": "Starburst query failed: QUERY_EXCEEDED_TIME_LIMIT",
"completed_at": "2025-01-15T10:35:00Z"
}

Request Body Fields:

FieldTypeRequiredDescription
statusstringNoNew status: "submitted", "completed", "failed"
starburst_query_idstringNoQuery ID from Starburst (set when submitted)
next_uristringNoUpdated Starburst polling URI
next_poll_attimestampNoWhen to poll next (for stateless backoff)
poll_countintegerNoCurrent poll count (for Fibonacci backoff calculation)
submitted_attimestampNoWhen query was submitted to Starburst
row_countintegerNoFinal row count (set when completed)
size_bytesintegerNoFinal size in bytes (set when completed)
completed_attimestampNoWhen job completed (default: current time if status=completed/failed)
error_messagestringNoError details (set when failed)

Row Count Semantics:

ValueMeaning
nullCount not yet attempted (job still in progress)
0Count successful, table/query returned no rows
n > 0Count successful, n rows exported

Note: On GCS read failure, mark job as failed with error_message rather than setting row_count = 0. A null row count with completed status should never occur; the worker must always count rows before completing.

Response: 200 OK

{
"data": {
"id": 1,
"snapshot_id": 42,
"status": "completed",
"row_count": 10000,
"size_bytes": 2097152,
"completed_at": "2025-01-15T10:35:00Z"
}
}

Response: 404 Not Found (export job doesn’t exist)

Notes:

  • next_poll_at and poll_count enable stateless Fibonacci backoff - worker doesn’t need to track in memory
  • When job status changes to completed or failed, Control Plane checks if all jobs for snapshot are done
  • If all jobs completed: snapshot status → ready
  • If any job failed: snapshot status → failed (after all jobs finish)
  • See data.model.spec.md for the complete export_jobs table schema

PUT /instances/:id/status

Called by Wrapper Pod to report status changes.

Request Body (Running):

{
"status": "running",
"pod_ip": "10.0.0.42",
"instance_url": "https://graph.example.com/instance-uuid/",
"graph_stats": {
"node_count": 15000,
"edge_count": 50000
}
}

Request Body (Failed):

{
"status": "failed",
"error_code": "DATA_LOAD_ERROR",
"error_message": "Failed to load edges: file not found",
"failed_phase": "loading_edges",
"stack_trace": "Traceback (most recent call last):\n File \"/app/wrapper/lifespan.py\", line 142..."
}

Request Body Fields:

FieldTypeRequiredDescription
statusstringYesstarting, running, stopping, failed
pod_ipstringNoPod IP address (set when running)
instance_urlstringNoInstance URL (set when running)
graph_statsobjectNoNode/edge counts (set when running)
error_codestringNoMachine-readable error code (set when failed)
error_messagestringNoHuman-readable error message (set when failed)
failed_phasestringNoPhase when failure occurred (set when failed)
stack_tracestringNoStack trace for debugging (set when failed)

Error Codes:

CodeDescription
STARTUP_FAILEDGeneral startup failure
MAPPING_FETCH_ERRORFailed to fetch mapping from Control Plane
SCHEMA_CREATE_ERRORFailed to create Ryugraph schema
FALKORDB_SCHEMA_CREATE_ERRORFailed to create FalkorDB schema
DATA_LOAD_ERRORFailed to load Parquet data from GCS
DATABASE_ERRORRyugraph database error
FALKORDB_DATABASE_ERRORFalkorDB database error
OOM_KILLEDPod killed due to memory limit

Response: 200 OK

{
"data": {"updated": true}
}

PUT /instances/:id/metrics

Called periodically by Wrapper Pod to report resource usage.

Request Body:

{
"memory_usage_bytes": 536870912,
"disk_usage_bytes": 1073741824,
"last_activity_at": "2025-01-15T14:00:00Z",
"query_count_since_last": 15,
"avg_query_time_ms": 25
}

Response: 200 OK

{
"data": {"updated": true}
}

PUT /instances/:id/progress

Called during instance startup to report loading progress.

Request Body:

{
"phase": "loading_nodes",
"steps": [
{"name": "pod_scheduled", "status": "completed"},
{"name": "schema_created", "status": "completed"},
{"name": "Customer", "type": "node", "status": "completed", "row_count": 10000},
{"name": "Product", "type": "node", "status": "in_progress", "row_count": null},
{"name": "PURCHASED", "type": "edge", "status": "pending"}
]
}

Response: 200 OK

{
"data": {"updated": true}
}

Note: Audit events (queries, algorithms) are sent directly to the company’s external observability stack, not through Control Plane.


GET /instances/:id/mapping

Called by Wrapper Pod during startup to retrieve the mapping definition for schema creation.

Response: 200 OK (see requirements.md for node_definitions/edge_definitions schema; note: sql field omitted as not needed for schema creation)

{
"data": {
"snapshot_id": 42,
"mapping_id": 123,
"mapping_version": 3,
"gcs_path": "gs://bucket/user-uuid/mapping-uuid/snapshot-uuid/",
"node_definitions": ["..."],
"edge_definitions": ["..."]
}
}

Response: 404 Not Found (instance doesn’t exist)


POST /instances/:id/activity

Called by Wrapper Pod when queries or algorithms are executed to update last_activity_at timestamp.

Request Body: None (empty)

Response: 204 No Content


POST /shutdown

Called by Control Plane when terminating an instance (via the Wrapper Pod’s internal API).

Request Body:

{
"reason": "user_terminated",
"grace_period_seconds": 30
}

Response: 200 OK

{
"data": {
"acknowledged": true,
"active_queries": 0,
"lock_held": false
}
}

Response: 409 Conflict (lock held)

{
"error": {
"code": "SHUTDOWN_BLOCKED",
"message": "Cannot shutdown while algorithm is running",
"details": {
"lock_holder": "user-uuid",
"algorithm": "pagerank",
"running_seconds": 30
}
}
}

CodeHTTP StatusDescription
INVALID_COMPONENT401Unknown component identity
RESOURCE_NOT_FOUND404Instance/snapshot/export_job not found
INVALID_STATE_TRANSITION409Status change not allowed
SHUTDOWN_BLOCKED409Cannot shutdown with active algorithm
JOBS_ALREADY_EXIST409Export jobs already created for this snapshot
STARBURST_ERROR500Starburst connection/query error