OpsResource
OpsResource
Platform operations and configuration
OpsResource
Section titled “OpsResource”Accessed via client.ops, this resource manages platform-wide operational
configuration, cluster health monitoring, background jobs, and system state.
All operations require the Ops role.
Configuration changes follow a read-modify-restore pattern: read the current value, apply your change, and restore the original when done.
Setup
Connect as an ops user
# Cell 1 — ParametersUSERNAME = "_FILL_ME_IN_" # Set your email before running# Cell 2 — Connectfrom graph_olap import GraphOLAPClientclient = GraphOLAPClient(username=USERNAME)# Cell 3 — Provisionfrom notebook_setup import provisionpersonas, _ = provision(USERNAME)analyst = personas["analyst"]admin = personas["admin"]ops = personas["ops"]client = analystCluster Health
Monitor cluster and component status
get_cluster_health() -> ClusterHealth
Section titled “get_cluster_health() -> ClusterHealth”Check connectivity to all platform components (database, kubernetes, starburst).
Returns: ClusterHealth with .status (healthy, degraded, unhealthy)
and .components dict of ComponentHealth objects.
health = ops.ops.get_cluster_health()
print(f"Cluster status: {health.status}")print(f"Checked at: {health.checked_at}\n")for name, comp in health.components.items(): print(f" {name}: {comp.status} ({comp.latency_ms}ms)")get_cluster_instances() -> ClusterInstances
Section titled “get_cluster_instances() -> ClusterInstances”Get a cluster-wide summary of instances: totals, breakdowns by status and owner, and current capacity limits.
Returns: ClusterInstances with .total, .by_status, .by_owner,
and .limits (InstanceLimits).
instances = ops.ops.get_cluster_instances()
print(f"Total instances: {instances.total}")print(f"By status: {instances.by_status}")print(f"Capacity: {instances.limits.cluster_used}/{instances.limits.cluster_total}")get_metrics() -> str
Section titled “get_metrics() -> str”Fetch Prometheus metrics from the control plane. Returns metrics for
background jobs, reconciliation loops, lifecycle enforcement, and general
system health in text/plain format.
metrics = ops.ops.get_metrics()
# Show the first 5 linesfor line in metrics.splitlines()[:5]: print(line)Lifecycle Configuration
Manage default TTL and inactivity settings
get_lifecycle_config() -> LifecycleConfig
Section titled “get_lifecycle_config() -> LifecycleConfig”Returns lifecycle defaults for all resource types (mapping, snapshot, instance).
Each has default_ttl, default_inactivity, and max_ttl fields.
update_lifecycle_config(*, mapping=None, snapshot=None, instance=None) -> bool
Section titled “update_lifecycle_config(*, mapping=None, snapshot=None, instance=None) -> bool”Update lifecycle settings. Only provided values are changed; omitted values
remain unchanged. Accepts ResourceLifecycleConfig objects or plain dicts.
| Parameter | Type | Default | Description |
|---|---|---|---|
mapping | dict | ResourceLifecycleConfig | None | None | Lifecycle config for mappings |
snapshot | dict | ResourceLifecycleConfig | None | None | Lifecycle config for snapshots |
instance | dict | ResourceLifecycleConfig | None | None | Lifecycle config for instances |
Returns: True if update succeeded.
# Read current configoriginal = ops.ops.get_lifecycle_config()
print("Current instance lifecycle:")print(f" default_ttl: {original.instance.default_ttl}")print(f" default_inactivity: {original.instance.default_inactivity}")print(f" max_ttl: {original.instance.max_ttl}")# Modify instance TTLops.ops.update_lifecycle_config(instance={"default_ttl": "PT12H"})
updated = ops.ops.get_lifecycle_config()print(f"Updated default_ttl: {updated.instance.default_ttl}")
# Restore originalops.ops.update_lifecycle_config( instance={ "default_ttl": original.instance.default_ttl, "default_inactivity": original.instance.default_inactivity, "max_ttl": original.instance.max_ttl, })print(f"Restored default_ttl: {original.instance.default_ttl}")Concurrency
Control per-analyst and cluster-wide instance limits
get_concurrency_config() -> ConcurrencyConfig
Section titled “get_concurrency_config() -> ConcurrencyConfig”Returns per-analyst and cluster-total instance limits.
update_concurrency_config(*, per_analyst, cluster_total) -> ConcurrencyConfig
Section titled “update_concurrency_config(*, per_analyst, cluster_total) -> ConcurrencyConfig”Update concurrency limits. Both parameters are required.
| Parameter | Type | Range | Description |
|---|---|---|---|
per_analyst | int | 1—100 | Max instances per analyst |
cluster_total | int | 1—1000 | Max instances cluster-wide |
Returns: Updated ConcurrencyConfig.
# Read current limitsoriginal_conc = ops.ops.get_concurrency_config()
print(f"Per analyst: {original_conc.per_analyst}")print(f"Cluster total: {original_conc.cluster_total}")# Temporarily lower limitsupdated_conc = ops.ops.update_concurrency_config(per_analyst=5, cluster_total=20)print(f"Updated per_analyst: {updated_conc.per_analyst}")print(f"Updated cluster_total: {updated_conc.cluster_total}")
# Restore originalops.ops.update_concurrency_config( per_analyst=original_conc.per_analyst, cluster_total=original_conc.cluster_total,)print(f"Restored per_analyst: {original_conc.per_analyst}")Maintenance Mode
Block new instance creation during maintenance
get_maintenance_mode() -> MaintenanceMode
Section titled “get_maintenance_mode() -> MaintenanceMode”Returns current maintenance mode status.
set_maintenance_mode(enabled, message="") -> MaintenanceMode
Section titled “set_maintenance_mode(enabled, message="") -> MaintenanceMode”Enable or disable maintenance mode. When enabled, new instance creation is blocked and users see the provided message.
| Parameter | Type | Default | Description |
|---|---|---|---|
enabled | bool | required | Whether maintenance mode is active |
message | str | "" | Message displayed to users |
# Check current statusmaint = ops.ops.get_maintenance_mode()print(f"Enabled: {maint.enabled}")print(f"Message: {maint.message}")# Enable maintenance modeops.ops.set_maintenance_mode( enabled=True, message="Scheduled maintenance -- back at 14:00 UTC",)
maint = ops.ops.get_maintenance_mode()print(f"Enabled: {maint.enabled}")print(f"Message: {maint.message}")
# Disable maintenance modeops.ops.set_maintenance_mode(enabled=False)print(f"\nMaintenance disabled: {not ops.ops.get_maintenance_mode().enabled}")Export Configuration
Control export job duration limits
get_export_config() -> ExportConfig
Section titled “get_export_config() -> ExportConfig”Returns export configuration including the maximum job duration.
update_export_config(*, max_duration_seconds) -> ExportConfig
Section titled “update_export_config(*, max_duration_seconds) -> ExportConfig”Update the maximum duration for export jobs.
| Parameter | Type | Range | Description |
|---|---|---|---|
max_duration_seconds | int | 60—86400 | Max export job duration in seconds |
# Read current configoriginal_export = ops.ops.get_export_config()print(f"Max duration: {original_export.max_duration_seconds}s")
# Updateupdated_export = ops.ops.update_export_config(max_duration_seconds=7200)print(f"Updated: {updated_export.max_duration_seconds}s")
# Restore originalops.ops.update_export_config( max_duration_seconds=original_export.max_duration_seconds)print(f"Restored: {original_export.max_duration_seconds}s")Jobs
Trigger and monitor background jobs
trigger_job(job_name, reason="manual-trigger") -> dict
Section titled “trigger_job(job_name, reason="manual-trigger") -> dict”Manually trigger a background job. Useful for smoke tests, manual reconciliation after incidents, or debugging.
| Parameter | Type | Default | Description |
|---|---|---|---|
job_name | str | required | "reconciliation", "lifecycle", "export_reconciliation", or "schema_cache" |
reason | str | "manual-trigger" | Reason for trigger (audit log) |
Rate limit: 1 trigger per job per minute.
get_job_status() -> dict
Section titled “get_job_status() -> dict”Get status of all background jobs including next scheduled run times.
# Trigger reconciliation manuallyresult = ops.ops.trigger_job("reconciliation", reason="smoke-test")
print(f"Job: {result['job_name']}")print(f"Status: {result['status']}")# Check all job statusesstatus = ops.ops.get_job_status()
for job in status["jobs"]: print(f" {job['name']}: next run at {job['next_run']}")Platform State
Inspect system state and export jobs
get_state() -> dict
Section titled “get_state() -> dict”Get a system state summary with counts of instances, snapshots, and export jobs by status.
get_export_jobs(status=None, limit=100) -> list[dict]
Section titled “get_export_jobs(status=None, limit=100) -> list[dict]”List export jobs for debugging. Filter by status to find stale or failed jobs.
| Parameter | Type | Default | Description |
|---|---|---|---|
status | str | None | None | "pending", "claimed", "completed", or "failed" |
limit | int | 100 | Max jobs to return (max 1000) |
state = ops.ops.get_state()
print(f"Instances: {state['instances']['total']}")print(f"By status: {state['instances']['by_status']}")# Check for stale claimed export jobsclaimed = ops.ops.get_export_jobs(status="claimed")
print(f"Claimed export jobs: {len(claimed)}")for job in claimed: print(f" Job {job['id']} claimed by {job['claimed_by']}")Key Takeaways
- Always read-modify-restore when changing config: save the original, make your change, then restore it
get_cluster_health()checks all platform components in one callget_cluster_instances()shows capacity and per-owner breakdowns- Use
trigger_job()for manual reconciliation or smoke tests (rate-limited to 1/min per job) get_state()andget_export_jobs()are essential for debugging platform issues