Deep Platform Configuration
Tutorial
Deep Platform Configuration
Configure lifecycle, concurrency, maintenance, and background jobs
What You'll Learn
- Lifecycle Configuration — Read and modify TTLs for mappings, snapshots, and instances
- Concurrency Limits — Adjust per-analyst and cluster-wide instance caps
- Maintenance Mode — Enable/disable with custom messages
- Export Configuration — Control export duration limits
- Background Jobs — Trigger and monitor reconciliation, lifecycle, and cache jobs
- Platform State — Inspect system state and export job history
- Maintenance Workflow — Complete end-to-end maintenance window procedure
1
Setup
Connect as Dave (ops persona)
# Cell 1 — ParametersUSERNAME = "_FILL_ME_IN_" # Set your email before running# Cell 2 — Connectfrom graph_olap import GraphOLAPClientclient = GraphOLAPClient(username=USERNAME)
# Cell 3 — Provisionfrom notebook_setup import provisionpersonas, conn = provision(USERNAME)analyst = personas["analyst"]admin = personas["admin"]ops = personas["ops"]
print(f"Connected to: {ops._config.api_url}")
2
Lifecycle Configuration
TTLs and inactivity timeouts for mappings, snapshots, and instances
# Read current lifecycle configuration# Each resource type (mapping, snapshot, instance) has:# - default_ttl: how long before automatic expiry# - default_inactivity: how long idle before cleanup# - max_ttl: upper bound analysts can requestlifecycle = ops.ops.get_lifecycle_config()
print(f"{'Resource':<12} {'Default TTL':<14} {'Inactivity':<14} {'Max TTL':<10}")print("-" * 52)for name, cfg in [ ("Mapping", lifecycle.mapping), ("Snapshot", lifecycle.snapshot), ("Instance", lifecycle.instance),]: ttl = str(cfg.default_ttl or "N/A") inact = str(cfg.default_inactivity or "N/A") max_t = str(cfg.max_ttl or "N/A") print(f"{name:<12} {ttl:<14} {inact:<14} {max_t:<10}")# Save original instance TTL, then modify itoriginal_instance_ttl = lifecycle.instance.default_ttlprint(f"Original instance TTL: {original_instance_ttl}")
# Extend instance TTL to 24 hours (ISO 8601 duration)ops.ops.update_lifecycle_config( instance={"default_ttl": "PT24H"})
# Verify the changeupdated = ops.ops.get_lifecycle_config()print(f"Updated instance TTL: {updated.instance.default_ttl}")
# Restore originalops.ops.update_lifecycle_config( instance={"default_ttl": original_instance_ttl})restored = ops.ops.get_lifecycle_config()print(f"Restored instance TTL: {restored.instance.default_ttl}")
3
Concurrency Limits
Per-analyst and cluster-wide instance caps
# Read current concurrency limitsconcurrency = ops.ops.get_concurrency_config()original_per_analyst = concurrency.per_analystoriginal_cluster_total = concurrency.cluster_total
print(f"Current per_analyst: {original_per_analyst}")print(f"Current cluster_total: {original_cluster_total}")
# Temporarily lower limits (e.g., during maintenance prep)ops.ops.update_concurrency_config( per_analyst=2, cluster_total=20,)lowered = ops.ops.get_concurrency_config()print(f"\nLowered per_analyst: {lowered.per_analyst}")print(f"Lowered cluster_total: {lowered.cluster_total}")
# Restore original limitsops.ops.update_concurrency_config( per_analyst=original_per_analyst, cluster_total=original_cluster_total,)restored = ops.ops.get_concurrency_config()print(f"\nRestored per_analyst: {restored.per_analyst}")print(f"Restored cluster_total: {restored.cluster_total}")
4
Maintenance Mode
Block new instance creation during maintenance windows
# Check current maintenance modemaint = ops.ops.get_maintenance_mode()print(f"Maintenance mode: {'ENABLED' if maint.enabled else 'DISABLED'}")
# Enable maintenance mode with a user-visible messageops.ops.set_maintenance_mode( enabled=True, message="Scheduled maintenance — database upgrade in progress",)maint = ops.ops.get_maintenance_mode()print(f"\nMaintenance mode: {'ENABLED' if maint.enabled else 'DISABLED'}")print(f"Message: {maint.message}")
# Disable maintenance modeops.ops.set_maintenance_mode(enabled=False, message="")maint = ops.ops.get_maintenance_mode()print(f"\nMaintenance mode: {'ENABLED' if maint.enabled else 'DISABLED'}")
5
Export Configuration
Control export duration limits
# Read current export configexport = ops.ops.get_export_config()original_max_duration = export.max_duration_secondsprint(f"Current max export duration: {original_max_duration}s")
# Increase to 2 hours for a large export windowops.ops.update_export_config(max_duration_seconds=7200)updated = ops.ops.get_export_config()print(f"Updated max export duration: {updated.max_duration_seconds}s")
# Restore originalops.ops.update_export_config(max_duration_seconds=original_max_duration)restored = ops.ops.get_export_config()print(f"Restored max export duration: {restored.max_duration_seconds}s")
6
Background Jobs
Trigger and monitor reconciliation, lifecycle, and cache jobs
# View scheduled job statuses# Each job has a name and next scheduled run timejob_status = ops.ops.get_job_status()
print(f"{'Job Name':<25} {'Next Run':>25}")print("-" * 52)for job in job_status["jobs"]: print(f"{job['name']:<25} {job['next_run']:>25}")# Manually trigger a reconciliation job# Available jobs: reconciliation, lifecycle, export_reconciliation, schema_cache# Rate-limited to 1 trigger per minute per jobresult = ops.ops.trigger_job( job_name="reconciliation", reason="tutorial-demo",)print(f"Job: {result['job_name']}")print(f"Status: {result['status']}")
7
Platform State
System state summary and export job history
# Get overall platform state -- nested dict with instance/snapshot/export countsstate = ops.ops.get_state()
print(f"Instances: {state['instances']['total']}")print(f"By status: {state['instances']['by_status']}")print(f"Snapshots: {state.get('snapshots', {}).get('total', 'N/A')}")print(f"Exports: {state.get('exports', {}).get('total', 'N/A')}")# Query export jobs by status# Valid statuses: pending, claimed, completed, failedcompleted_exports = ops.ops.get_export_jobs(status="completed", limit=5)print(f"Recent completed exports: {len(completed_exports)}")for job in completed_exports[:3]: print(f" {job}")
# Check for any failed exportsfailed_exports = ops.ops.get_export_jobs(status="failed", limit=10)print(f"\nFailed exports: {len(failed_exports)}")# Prometheus metrics -- raw text/plain output from the control planemetrics = ops.ops.get_metrics()
print("Prometheus metrics (first 5 lines):")for line in metrics.splitlines()[:5]: print(f" {line}")
8
Maintenance Window Workflow
Complete end-to-end maintenance procedure
# Complete maintenance window workflow:# 1. Save current settings# 2. Lower concurrency limits# 3. Enable maintenance mode# 4. Trigger a background job# 5. Verify platform state# 6. Disable maintenance mode# 7. Restore original settings
from graph_olap.exceptions import ConcurrencyLimitError
# --- Step 1: Save current settings ---concurrency = ops.ops.get_concurrency_config()saved_per_analyst = concurrency.per_analystsaved_cluster_total = concurrency.cluster_totalprint("Step 1: Saved current settings")print(f" per_analyst={saved_per_analyst}, cluster_total={saved_cluster_total}")
# --- Step 2: Lower concurrency limits ---ops.ops.update_concurrency_config(per_analyst=1, cluster_total=10)print("\nStep 2: Lowered concurrency limits")lowered = ops.ops.get_concurrency_config()print(f" per_analyst={lowered.per_analyst}, cluster_total={lowered.cluster_total}")
# --- Step 3: Enable maintenance mode ---ops.ops.set_maintenance_mode( enabled=True, message="Scheduled maintenance window — expect degraded service",)print("\nStep 3: Maintenance mode ENABLED")
# --- Step 4: Trigger a background job ---# Use lifecycle (not reconciliation, which was triggered earlier and is rate-limited)try: trigger = ops.ops.trigger_job(job_name="lifecycle", reason="maintenance-window") print(f"\nStep 4: Lifecycle job triggered (status={trigger['status']})")except ConcurrencyLimitError: print("\nStep 4: Job rate-limited (already triggered recently) — skipping")
# --- Step 5: Verify platform state ---state = ops.ops.get_state()job_status = ops.ops.get_job_status()print("\nStep 5: Platform state check")print(f" Instances: {state['instances']['total']}")print(f" By status: {state['instances']['by_status']}")print(f" Scheduled jobs: {len(job_status['jobs'])}")
# --- Step 6: Disable maintenance mode ---ops.ops.set_maintenance_mode(enabled=False, message="")print("\nStep 6: Maintenance mode DISABLED")
# --- Step 7: Restore original settings ---ops.ops.update_concurrency_config( per_analyst=saved_per_analyst, cluster_total=saved_cluster_total,)restored = ops.ops.get_concurrency_config()print("\nStep 7: Restored original settings")print(f" per_analyst={restored.per_analyst}, cluster_total={restored.cluster_total}")
print("\nMaintenance window complete.")Key Takeaways
get_lifecycle_config()returns TTL settings for mappings, snapshots, and instances — useupdate_lifecycle_config()to modifyupdate_concurrency_config()setsper_analystandcluster_totalcaps — lower during maintenanceset_maintenance_mode()blocks new instance creation and shows a message to usersupdate_export_config()controls the maximum export duration in secondstrigger_job()manually fires reconciliation, lifecycle, export, or cache jobs (rate-limited 1/min/job)get_state()andget_export_jobs()provide platform-wide visibility- Always use the read-modify-restore pattern: save originals before changing, restore when done