Mapping Authoring
Tutorial
Mapping Authoring
Design graph schemas from SQL tables and manage mapping versions
What You'll Learn
- Catalog Browsing - Discover tables and columns in Starburst catalogs
- Node Definitions - Design nodes with primary keys and typed properties
- Edge Definitions - Define relationships between node types
- Mapping Lifecycle - Create, update, version, diff, copy, and delete mappings
1
Setup
Connect to the platform and provision tutorial resources
# Cell 1 — ParametersUSERNAME = "_FILL_ME_IN_" # Set your email before running# Cell 2 — Connect and provisionfrom graph_olap import GraphOLAPClientclient = GraphOLAPClient(username=USERNAME)
from notebook_setup import provision, make_namespacepersonas, conn = provision(USERNAME)analyst = personas["analyst"]
namespace = make_namespace(USERNAME)print(f"Connected | namespace: {namespace}")
2
Browsing the Catalog
Discover catalogs, schemas, tables, and columns in Starburst
# Ensure schema cache is populated before browsing# The cache is refreshed every 24h; admin_refresh() forces an immediate updateimport time
admin = personas["admin"]admin.schema.admin_refresh()print("Schema cache refresh triggered, waiting for data...")
for _ in range(24): time.sleep(5) catalogs = analyst.schema.list_catalogs() if catalogs: break print(" waiting...")
print(f"\nCatalogs ({len(catalogs)}):")for cat in catalogs: print(f" {cat.catalog_name}")# List schemas in the first catalogcatalog_name = catalogs[0].catalog_name if catalogs else "bigquery"schemas = analyst.schema.list_schemas(catalog_name)print(f"Schemas in '{catalog_name}' ({len(schemas)}):")for s in schemas: print(f" {s.schema_name}")# List tables — find the graph_olap_e2e schemaschema_name = next( (s.schema_name for s in schemas if "graph_olap" in s.schema_name), schemas[0].schema_name if schemas else "graph_olap_e2e",)tables = analyst.schema.list_tables(catalog_name, schema_name)print(f"Tables in '{catalog_name}.{schema_name}' ({len(tables)}):")for t in tables: print(f" {t.table_name}")# List columns in the account tablecust_table = next( (t.table_name for t in tables if "acct" in t.table_name.lower()), tables[0].table_name if tables else "bis_acct_dh",)columns = analyst.schema.list_columns(catalog_name, schema_name, cust_table)print(f"Columns in '{cust_table}' ({len(columns)}):")for col in columns: print(f" {col.column_name:20s} {col.data_type}")
3
Designing Node Definitions
Create a Customer node with typed properties
from graph_olap.models.mapping import NodeDefinition, EdgeDefinition, PropertyDefinitionfrom graph_olap_schemas import RyugraphType
# RyugraphType defines the supported property typesprint("Supported property types:")for rt in RyugraphType: print(f" {rt.value}")
# Define a Customer node from the bis_acct_dh tablecustomer_node = NodeDefinition( label="Customer", sql=( "SELECT DISTINCT CAST(psdo_cust_id AS VARCHAR) AS id, " "MIN(bk_sectr) AS bk_sectr, COUNT(DISTINCT psdo_acno) AS account_count, " "MIN(acct_stus) AS acct_stus FROM bigquery.graph_olap_e2e.bis_acct_dh WHERE 1=1 GROUP BY psdo_cust_id" ), primary_key={"name": "id", "type": "STRING"}, properties=[ PropertyDefinition(name="bk_sectr", type="STRING"), PropertyDefinition(name="account_count", type="INT64"), PropertyDefinition(name="acct_stus", type="STRING"), ],)
print(f"\nNode label: {customer_node.label}")print(f"Primary key: {customer_node.primary_key}")print(f"Properties: {[p.name for p in customer_node.properties]}")print(f"SQL preview: {customer_node.sql[:60]}...")
4
Designing Edge Definitions
Define a SHARES_ACCOUNT relationship between customers
from graph_olap.models.mapping import EdgeDefinition
# Define a SHARES_ACCOUNT edge via the account tableshares_account_edge = EdgeDefinition( type="SHARES_ACCOUNT", from_node="Customer", to_node="Customer", sql=( "SELECT DISTINCT " "CAST(a.psdo_cust_id AS VARCHAR) AS from_id, " "CAST(b.psdo_cust_id AS VARCHAR) AS to_id " "FROM bigquery.graph_olap_e2e.bis_acct_dh a " "JOIN bigquery.graph_olap_e2e.bis_acct_dh b " "ON a.psdo_acno = b.psdo_acno " "AND a.psdo_cust_id < b.psdo_cust_id" ), from_key="from_id", to_key="to_id",)
print(f"Edge type: {shares_account_edge.type}")print(f"From node: {shares_account_edge.from_node}")print(f"To node: {shares_account_edge.to_node}")print(f"From key: {shares_account_edge.from_key}")print(f"To key: {shares_account_edge.to_key}")print(f"SQL preview: {shares_account_edge.sql[:60]}...")
5
Creating a Mapping
Combine node and edge definitions into a mapping
# Create a mapping with a unique namespaced namemapping_name = f"tutorial-authoring-{namespace}"
mapping = analyst.mappings.create( name=mapping_name, node_definitions=[customer_node], edge_definitions=[shares_account_edge], description="Tutorial mapping: Customer graph with shared accounts",)
print(f"Created mapping: {mapping.name}")print(f" ID: {mapping.id}")print(f" Version: {mapping.current_version}")print(f" Nodes: {[n.label for n in mapping.node_definitions]}")print(f" Edges: {[e.type for e in mapping.edge_definitions]}")
6
Evolving with Versions
Update the mapping and track version history
# Add a new Account node to the mapping (version 2)account_node = NodeDefinition( label="Account", sql=( "SELECT DISTINCT CAST(psdo_acno AS VARCHAR) AS id" " FROM bigquery.graph_olap_e2e.bis_acct_dh WHERE 1=1" ), primary_key={"name": "id", "type": "STRING"}, properties=[],)
owns_account_edge = EdgeDefinition( type="OWNS_ACCOUNT", from_node="Customer", to_node="Account", sql=( "SELECT DISTINCT " "CAST(psdo_cust_id AS VARCHAR) AS from_id, " "CAST(psdo_acno AS VARCHAR) AS to_id" " FROM bigquery.graph_olap_e2e.bis_acct_dh WHERE 1=1" ), from_key="from_id", to_key="to_id",)
updated = analyst.mappings.update( mapping.id, node_definitions=[customer_node, account_node], edge_definitions=[shares_account_edge, owns_account_edge], change_description="Add Account node and OWNS_ACCOUNT edge",)
print(f"Updated to version {updated.current_version}")print(f" Nodes: {[n.label for n in updated.node_definitions]}")print(f" Edges: {[e.type for e in updated.edge_definitions]}")# List all versions of the mappingversions = analyst.mappings.list_versions(mapping.id)print(f"Versions for mapping {mapping.id} ({len(versions)} total):")for v in versions: desc = v.change_description or "(initial)" nodes = [n.label for n in v.node_definitions] edges = [e.type for e in v.edge_definitions] print(f" v{v.version}: {desc}") print(f" nodes={nodes} edges={edges}")
7
Comparing Versions
Diff two versions to see what changed
# Diff version 1 vs version 2diff = analyst.mappings.diff(mapping.id, from_version=1, to_version=2)
print(f"Diff v1 -> v2:")print(f" Nodes added: {diff.summary.get('nodes_added', 0)}")print(f" Nodes removed: {diff.summary.get('nodes_removed', 0)}")print(f" Edges added: {diff.summary.get('edges_added', 0)}")print(f" Edges removed: {diff.summary.get('edges_removed', 0)}")
# Use helper methods to inspect individual changesprint(f"\nAdded nodes:")for node in diff.nodes_added(): print(f" + {node}")print(f"Added edges:")for edge in diff.edges_added(): print(f" + {edge}")# Retrieve a specific versionv1 = analyst.mappings.get_version(mapping.id, version=1)print(f"Version 1 snapshot:")print(f" Nodes: {[n.label for n in v1.node_definitions]}")print(f" Edges: {[e.type for e in v1.edge_definitions]}")
8
Copy and Cleanup
Copy a mapping for experimentation, then clean up
# Copy the mapping for experimentationcopy_name = f"tutorial-authoring-copy-{namespace}"copy = analyst.mappings.copy(mapping.id, copy_name)print(f"Copied mapping:")print(f" Original: {mapping.name} (id={mapping.id})")print(f" Copy: {copy.name} (id={copy.id})")print(f" Nodes: {[n.label for n in copy.node_definitions]}")print(f" Edges: {[e.type for e in copy.edge_definitions]}")
# View the mapping tree structuretree = analyst.mappings.get_tree(mapping.id)print(f"\nMapping tree:")for key, value in tree.items(): print(f" {key}: {value}")# Clean up: delete the copy and the tutorial mappinganalyst.mappings.delete(copy.id)print(f"Deleted copy: {copy.name} (id={copy.id})")
analyst.mappings.delete(mapping.id)print(f"Deleted mapping: {mapping.name} (id={mapping.id})")
print("\nCleanup complete.")Key Takeaways
- Use
schema.list_catalogs/schemas/tables/columnsto discover SQL data sources before designing mappings NodeDefinitionmaps a SQL query to a graph node label with a primary key and typed propertiesEdgeDefinitionmaps a SQL join to a graph relationship between two node labelsmappings.create()builds a mapping from node and edge definitions in a single callmappings.update()creates a new version -- usechange_descriptionto document the reasonmappings.diff()shows exactly what changed between two versions (added, removed, modified)mappings.copy()creates an independent copy for safe experimentation- Always clean up tutorial mappings with
mappings.delete()to avoid resource clutter