Skip to content

OpsResource

Reference

OpsResource

Platform operations and configuration

15 min Advanced
ReferenceAPIOps

Accessed via client.ops, this resource manages platform-wide operational configuration, cluster health monitoring, background jobs, and system state. All operations require the Ops role.

Configuration changes follow a read-modify-restore pattern: read the current value, apply your change, and restore the original when done.

1

Setup

Connect as an ops user

# Cell 1 — Parameters
USERNAME = "_FILL_ME_IN_" # Set your email before running
# Cell 2 — Connect
from graph_olap import GraphOLAPClient
client = GraphOLAPClient(username=USERNAME)
# Cell 3 — Provision
from notebook_setup import provision
personas, _ = provision(USERNAME)
analyst = personas["analyst"]
admin = personas["admin"]
ops = personas["ops"]
client = analyst
2

Cluster Health

Monitor cluster and component status

Check connectivity to all platform components (database, kubernetes, starburst).

Returns: ClusterHealth with .status (healthy, degraded, unhealthy) and .components dict of ComponentHealth objects.

health = ops.ops.get_cluster_health()
print(f"Cluster status: {health.status}")
print(f"Checked at: {health.checked_at}\n")
for name, comp in health.components.items():
print(f" {name}: {comp.status} ({comp.latency_ms}ms)")

get_cluster_instances() -> ClusterInstances

Section titled “get_cluster_instances() -> ClusterInstances”

Get a cluster-wide summary of instances: totals, breakdowns by status and owner, and current capacity limits.

Returns: ClusterInstances with .total, .by_status, .by_owner, and .limits (InstanceLimits).

instances = ops.ops.get_cluster_instances()
print(f"Total instances: {instances.total}")
print(f"By status: {instances.by_status}")
print(f"Capacity: {instances.limits.cluster_used}/{instances.limits.cluster_total}")

Fetch Prometheus metrics from the control plane. Returns metrics for background jobs, reconciliation loops, lifecycle enforcement, and general system health in text/plain format.

metrics = ops.ops.get_metrics()
# Show the first 5 lines
for line in metrics.splitlines()[:5]:
print(line)
3

Lifecycle Configuration

Manage default TTL and inactivity settings

Returns lifecycle defaults for all resource types (mapping, snapshot, instance). Each has default_ttl, default_inactivity, and max_ttl fields.

update_lifecycle_config(*, mapping=None, snapshot=None, instance=None) -> bool

Section titled “update_lifecycle_config(*, mapping=None, snapshot=None, instance=None) -> bool”

Update lifecycle settings. Only provided values are changed; omitted values remain unchanged. Accepts ResourceLifecycleConfig objects or plain dicts.

ParameterTypeDefaultDescription
mappingdict | ResourceLifecycleConfig | NoneNoneLifecycle config for mappings
snapshotdict | ResourceLifecycleConfig | NoneNoneLifecycle config for snapshots
instancedict | ResourceLifecycleConfig | NoneNoneLifecycle config for instances

Returns: True if update succeeded.

# Read current config
original = ops.ops.get_lifecycle_config()
print("Current instance lifecycle:")
print(f" default_ttl: {original.instance.default_ttl}")
print(f" default_inactivity: {original.instance.default_inactivity}")
print(f" max_ttl: {original.instance.max_ttl}")
# Modify instance TTL
ops.ops.update_lifecycle_config(instance={"default_ttl": "PT12H"})
updated = ops.ops.get_lifecycle_config()
print(f"Updated default_ttl: {updated.instance.default_ttl}")
# Restore original
ops.ops.update_lifecycle_config(
instance={
"default_ttl": original.instance.default_ttl,
"default_inactivity": original.instance.default_inactivity,
"max_ttl": original.instance.max_ttl,
}
)
print(f"Restored default_ttl: {original.instance.default_ttl}")
4

Concurrency

Control per-analyst and cluster-wide instance limits

get_concurrency_config() -> ConcurrencyConfig

Section titled “get_concurrency_config() -> ConcurrencyConfig”

Returns per-analyst and cluster-total instance limits.

update_concurrency_config(*, per_analyst, cluster_total) -> ConcurrencyConfig

Section titled “update_concurrency_config(*, per_analyst, cluster_total) -> ConcurrencyConfig”

Update concurrency limits. Both parameters are required.

ParameterTypeRangeDescription
per_analystint1—100Max instances per analyst
cluster_totalint1—1000Max instances cluster-wide

Returns: Updated ConcurrencyConfig.

# Read current limits
original_conc = ops.ops.get_concurrency_config()
print(f"Per analyst: {original_conc.per_analyst}")
print(f"Cluster total: {original_conc.cluster_total}")
# Temporarily lower limits
updated_conc = ops.ops.update_concurrency_config(per_analyst=5, cluster_total=20)
print(f"Updated per_analyst: {updated_conc.per_analyst}")
print(f"Updated cluster_total: {updated_conc.cluster_total}")
# Restore original
ops.ops.update_concurrency_config(
per_analyst=original_conc.per_analyst,
cluster_total=original_conc.cluster_total,
)
print(f"Restored per_analyst: {original_conc.per_analyst}")
5

Maintenance Mode

Block new instance creation during maintenance

Returns current maintenance mode status.

set_maintenance_mode(enabled, message="") -> MaintenanceMode

Section titled “set_maintenance_mode(enabled, message="") -> MaintenanceMode”

Enable or disable maintenance mode. When enabled, new instance creation is blocked and users see the provided message.

ParameterTypeDefaultDescription
enabledboolrequiredWhether maintenance mode is active
messagestr""Message displayed to users
# Check current status
maint = ops.ops.get_maintenance_mode()
print(f"Enabled: {maint.enabled}")
print(f"Message: {maint.message}")
# Enable maintenance mode
ops.ops.set_maintenance_mode(
enabled=True,
message="Scheduled maintenance -- back at 14:00 UTC",
)
maint = ops.ops.get_maintenance_mode()
print(f"Enabled: {maint.enabled}")
print(f"Message: {maint.message}")
# Disable maintenance mode
ops.ops.set_maintenance_mode(enabled=False)
print(f"\nMaintenance disabled: {not ops.ops.get_maintenance_mode().enabled}")
6

Export Configuration

Control export job duration limits

Returns export configuration including the maximum job duration.

update_export_config(*, max_duration_seconds) -> ExportConfig

Section titled “update_export_config(*, max_duration_seconds) -> ExportConfig”

Update the maximum duration for export jobs.

ParameterTypeRangeDescription
max_duration_secondsint60—86400Max export job duration in seconds
# Read current config
original_export = ops.ops.get_export_config()
print(f"Max duration: {original_export.max_duration_seconds}s")
# Update
updated_export = ops.ops.update_export_config(max_duration_seconds=7200)
print(f"Updated: {updated_export.max_duration_seconds}s")
# Restore original
ops.ops.update_export_config(
max_duration_seconds=original_export.max_duration_seconds
)
print(f"Restored: {original_export.max_duration_seconds}s")
7

Jobs

Trigger and monitor background jobs

trigger_job(job_name, reason="manual-trigger") -> dict

Section titled “trigger_job(job_name, reason="manual-trigger") -> dict”

Manually trigger a background job. Useful for smoke tests, manual reconciliation after incidents, or debugging.

ParameterTypeDefaultDescription
job_namestrrequired"reconciliation", "lifecycle", "export_reconciliation", or "schema_cache"
reasonstr"manual-trigger"Reason for trigger (audit log)

Rate limit: 1 trigger per job per minute.

Get status of all background jobs including next scheduled run times.

# Trigger reconciliation manually
result = ops.ops.trigger_job("reconciliation", reason="smoke-test")
print(f"Job: {result['job_name']}")
print(f"Status: {result['status']}")
# Check all job statuses
status = ops.ops.get_job_status()
for job in status["jobs"]:
print(f" {job['name']}: next run at {job['next_run']}")
8

Platform State

Inspect system state and export jobs

Get a system state summary with counts of instances, snapshots, and export jobs by status.

get_export_jobs(status=None, limit=100) -> list[dict]

Section titled “get_export_jobs(status=None, limit=100) -> list[dict]”

List export jobs for debugging. Filter by status to find stale or failed jobs.

ParameterTypeDefaultDescription
statusstr | NoneNone"pending", "claimed", "completed", or "failed"
limitint100Max jobs to return (max 1000)
state = ops.ops.get_state()
print(f"Instances: {state['instances']['total']}")
print(f"By status: {state['instances']['by_status']}")
# Check for stale claimed export jobs
claimed = ops.ops.get_export_jobs(status="claimed")
print(f"Claimed export jobs: {len(claimed)}")
for job in claimed:
print(f" Job {job['id']} claimed by {job['claimed_by']}")

Key Takeaways

  • Always read-modify-restore when changing config: save the original, make your change, then restore it
  • get_cluster_health() checks all platform components in one call
  • get_cluster_instances() shows capacity and per-owner breakdowns
  • Use trigger_job() for manual reconciliation or smoke tests (rate-limited to 1/min per job)
  • get_state() and get_export_jobs() are essential for debugging platform issues