Skip to content

Data Pipeline Reference

Technical reference for the Starburst Galaxy → Parquet → Ryugraph data pipeline.

This document details how relational data flows from Starburst Galaxy (managed Trino SaaS) SQL queries through Parquet files into Ryugraph/KuzuDB graph structures. It documents external system behaviour that our components depend on.

References:

Related documents:


diagram-1

Mermaid Source
flowchart LR
A["Starburst<br/>SQL Query"] -->|UNLOAD| B["GCS<br/>Parquet Files"]
B -->|Separate files for<br/>nodes and edges| C["Ryugraph<br/>COPY FROM"]
C -->|CREATE NODE TABLE<br/>CREATE REL TABLE| D["Graph<br/>Instance"]

The platform implements a two-tier export strategy with automatic fallback:

TierMethodWhen UsedCharacteristics
1 (Primary)Server-side (system.unload())Starburst Galaxy with GCS catalogDistributed execution, direct GCS write
2 (Fallback)Client-side (PyArrow)When server-side unavailableStreams through export worker, memory buffered
Export Worker → Starburst Galaxy
┌───────────┴───────────┐
│ │
▼ ▼
system.unload() SELECT * (fallback)
│ │
│ ▼
│ Export Worker
│ │
│ ▼
│ PyArrow
│ │
└───────────┬───────────┘
GCS

The system falls back to PyArrow when:

  • system.unload() is not available
  • GCS catalog is not configured
  • Feature flag is disabled
  • Permission errors occur
Dataset SizeMemory RequiredRecommendation
< 100 MB256 MBSafe for fallback
100 MB - 1 GB1-2 GBMonitor closely
> 1 GB2+ GBPrefer server-side

The UNLOAD table function exports query results directly to GCS:

SELECT * FROM TABLE(
io.unload(
input => TABLE(
SELECT customer_id, name, city, signup_date
FROM analytics.customers
),
location => 'gs://bucket/{user_id}/{mapping_id}/{snapshot_id}/nodes/customers/',
format => 'PARQUET',
compression => 'SNAPPY'
)
)

Parameters:

ParameterRequiredDescription
inputYesTABLE(…) containing the SELECT query
locationYesGCS destination path
formatYesOutput format (use ‘PARQUET’)
compressionNoCompression codec (recommend ‘SNAPPY’)

Starburst writes in parallel without partitioning. Multiple worker nodes each produce their own output files:

gs://bucket/snapshot/nodes/customers/
├── 00000-123-uuid1-0-00001.parquet ← worker 1
├── 00000-124-uuid2-0-00001.parquet ← worker 2
├── 00000-125-uuid3-0-00001.parquet ← worker 3
└── 00000-126-uuid4-0-00001.parquet ← worker 4

File naming pattern: {sequence}-{task}-{UUID}-{split}-{part}.parquet

The number of files depends on cluster size and data volume. For typical ≤2GB graphs, this produces a manageable number of files.

The Parquet schema is controlled entirely by the SELECT query. Column ordering is critical:

  • For nodes: primary_key must be first column
  • For edges: from_key, to_key must be first two columns
  • Properties follow in defined order
SELECT * FROM TABLE(
io.unload(
input => TABLE(
SELECT
customer_id, -- Primary key (must be first)
name, -- Property
CAST(balance AS DOUBLE) as balance, -- Cast DECIMAL to DOUBLE
signup_date -- Property
FROM analytics.customers
),
location => 'gs://bucket/path/nodes/customers/',
format => 'PARQUET',
compression => 'SNAPPY'
)
)

CREATE NODE TABLE Customer(
customer_id STRING PRIMARY KEY,
name STRING,
city STRING,
signup_date DATE
);

Rules:

  • Every node table requires a PRIMARY KEY
  • Primary key must be unique across all nodes of that type
  • Supported PK types: STRING, INT64, DATE, UUID
CREATE REL TABLE PURCHASED(
FROM Customer TO Product,
purchase_date DATE,
amount DOUBLE,
quantity INT64
);

Rules:

  • Must specify FROM NodeTable TO NodeTable
  • First two columns in Parquet must be FROM and TO primary keys
  • Relationship properties follow the FROM/TO columns
CREATE REL TABLE REVIEWED(
FROM Customer TO Product,
FROM Customer TO Merchant,
rating INT64,
review_text STRING
);

When importing, specify which pair:

COPY REVIEWED FROM 'customer_product_reviews.parquet' (from='Customer', to='Product');
COPY REVIEWED FROM 'customer_merchant_reviews.parquet' (from='Customer', to='Merchant');

Design decision: Our mapping schema does not currently support multi-source/target relationships. Each edge definition connects exactly one from_node to one to_node. For polymorphic relationships, define separate edge types (e.g., REVIEWED_PRODUCT, REVIEWED_MERCHANT). This simplifies validation, export, and import logic. Revisit if analysts need to query across polymorphic endpoints as a single relationship type—would require allowing multiple edge definitions with the same type and identical properties.

-- Each customer has at most one primary address
CREATE REL TABLE HAS_PRIMARY_ADDRESS(FROM Customer TO Address, MANY_ONE);
-- One-to-one marriage relationship
CREATE REL TABLE MARRIED_TO(FROM Person TO Person, ONE_ONE);

Design decision: Our mapping schema does not support multiplicity constraints. All relationships use the default MANY_MANY. Since our graphs are read-only analytics snapshots (no inserts/updates after initial load), cardinality enforcement provides no benefit—the source data already defines the actual cardinality.


For a node table:

CREATE NODE TABLE Customer(
customer_id STRING PRIMARY KEY,
name STRING,
age INT64,
city STRING
);

Parquet columns must be in exact order:

ColumnTypeNotes
customer_idSTRINGPrimary key (required, first)
nameSTRINGProperty
ageINT64Property
citySTRINGProperty

For a relationship table:

CREATE REL TABLE PURCHASED(
FROM Customer TO Product,
purchase_date DATE,
amount DOUBLE
);

Parquet columns must be:

ColumnTypeNotes
customer_idSTRINGFROM node primary key (required, first)
product_idSTRINGTO node primary key (required, second)
purchase_dateDATERelationship property
amountDOUBLERelationship property

For large datasets with sequential IDs:

CREATE NODE TABLE Event(
id SERIAL PRIMARY KEY,
event_type STRING,
timestamp TIMESTAMP
);

When using SERIAL:

  • Omit the primary key column from Parquet file
  • Ryugraph auto-generates IDs starting from 0
  • Significantly improves load performance

Parquet columns for SERIAL node:

ColumnType
event_typeSTRING
timestampTIMESTAMP

Design decision: Our mapping schema does not support SERIAL primary keys. We always use the source database primary key to maintain traceability between graph nodes and source data. At our target scale (≤2GB graphs, ~10M nodes max), the performance benefit of SERIAL is negligible. SERIAL would also complicate edge definitions since edges reference nodes by primary key, and auto-generated IDs aren’t known until after node import. Revisit if scale increases significantly (50M+ nodes)—SERIAL provides fastest graph traversal via sequential INT64 keys with optimal memory layout and cache locality.


COPY TableName FROM 'gs://bucket/path/*.parquet';

Critical: Always import nodes before relationships. Relationships reference node primary keys that must already exist.

-- STEP 1: Import nodes FIRST (order matters!)
COPY Customer FROM 'gs://bucket/snapshot_123/nodes/customers/*.parquet';
COPY Product FROM 'gs://bucket/snapshot_123/nodes/products/*.parquet';
-- STEP 2: Import relationships AFTER nodes exist
COPY PURCHASED FROM 'gs://bucket/snapshot_123/edges/purchased/*.parquet';

Ryugraph reads all matching files via glob pattern:

COPY Customer FROM 'gs://bucket/path/*.parquet'; -- reads all parquet files

Starburst/Trino TypeRyugraph TypeNotes
VARCHAR, CHARSTRINGUTF-8 encoded
BIGINTINT6464-bit signed integer
INTEGERINT3232-bit signed integer
SMALLINTINT1616-bit signed integer
TINYINTINT88-bit signed integer
DOUBLEDOUBLE64-bit floating point
REALFLOAT32-bit floating point
DECIMALDOUBLECast in SQL query
DATEDATECalendar date
TIMESTAMPTIMESTAMPDate and time
TIMESTAMP WITH TIME ZONETIMESTAMPTimezone stripped
BOOLEANBOOLTrue/false
VARBINARYBLOBBinary data
ARRAYLISTVariable-length array
MAPMAPKey-value pairs
ROWSTRUCTStructured type
UUIDUUIDUniversal identifier

Some types require explicit casting in the SQL query:

SELECT
customer_id,
name,
CAST(price AS DOUBLE) as price, -- DECIMAL -> DOUBLE
CAST(created_at AS TIMESTAMP) as created, -- Handle timezone
CAST(is_active AS BOOLEAN) as is_active -- Ensure boolean
FROM source_table

Ryugraph rejects duplicate primary keys by default:

-- Enable error skipping for dirty data
COPY Customer FROM 'file.parquet' (IGNORE_ERRORS=true);

Note: IGNORE_ERRORS=true has performance cost. Prefer clean data.

Relationships referencing non-existent nodes will fail:

Error: Node with primary key 'customer_xyz' not found in Customer table

Solution: Always load nodes before relationships.

Parquet columns must match Ryugraph schema exactly:

Error: Column count mismatch. Expected 4 columns, got 3.

Solution: Ensure SELECT column order matches CREATE TABLE property order.


StageParallelismHow
Starburst UNLOADMultiple workers write files concurrentlyAutomatic based on cluster size
Ryugraph COPY FROMMultiple threads read files concurrentlymax_num_threads setting

Both parallel write and parallel read work without partitioning. The flat file structure is optimal:

gs://bucket/snapshot/nodes/customers/
├── file-001.parquet ← written by worker 1, read by thread A
├── file-002.parquet ← written by worker 2, read by thread B
├── file-003.parquet ← written by worker 3, read by thread C
└── ...
  • Use compression => 'SNAPPY' for balance of speed and size
  • Files are automatically parallelised across workers
  • No partitioning needed for performance
  • COPY FROM is fastest for bulk loading (vs individual inserts)
  • Use glob patterns: COPY FROM 'path/*.parquet' to read all files
  • Ryugraph reads multiple files in parallel using available threads
  • Set buffer pool appropriately: buffer_pool_size=2_147_483_648 (2GB)
  • For consecutive integer IDs, use SERIAL and omit PK column

For large graphs (approaching 2GB limit):

  • Configure Ryugraph buffer pool to ~80% of pod memory
  • Enable disk spilling for larger-than-memory operations
  • Use persistent volume for spill files

In relational databases, many-to-many relationships are represented using join tables with foreign keys to both entities. In the graph model, these become relationship tables with properties.

-- Join table with foreign keys to both entities
CREATE TABLE transactions (
transaction_id VARCHAR PRIMARY KEY,
customer_id VARCHAR REFERENCES customers(customer_id),
product_id VARCHAR REFERENCES products(product_id),
amount DECIMAL(10,2),
transaction_date TIMESTAMP
);
-- Foreign keys become FROM/TO, other columns become properties
CREATE REL TABLE PURCHASED(
FROM Customer TO Product,
transaction_id STRING,
amount DOUBLE,
transaction_date TIMESTAMP
);
Relational ConceptGraph Equivalent
Join tableREL TABLE
FK to source entityFROM NodeType
FK to target entityTO NodeType
Other columnsRelationship properties
Join table PKOptional property (or omit if not needed)

Critical: Column ordering determines the relationship direction.

SELECT
customer_id, -- FROM node PK (must be first)
product_id, -- TO node PK (must be second)
transaction_id, -- Property
CAST(amount AS DOUBLE) as amount,
transaction_date
FROM transactions

Always load both node tables before the relationship table:

-- 1. Load nodes first
COPY Customer FROM 'gs://bucket/nodes/Customer/*.parquet';
COPY Product FROM 'gs://bucket/nodes/Product/*.parquet';
-- 2. Load relationships after nodes exist
COPY PURCHASED FROM 'gs://bucket/edges/PURCHASED/*.parquet';

-- Customers table
CREATE TABLE customers (
customer_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR,
city VARCHAR,
signup_date DATE
);
-- Products table
CREATE TABLE products (
product_id VARCHAR PRIMARY KEY,
name VARCHAR,
category VARCHAR,
price DECIMAL(10,2)
);
-- Transactions table (join table with attributes)
CREATE TABLE transactions (
transaction_id VARCHAR PRIMARY KEY,
customer_id VARCHAR REFERENCES customers(customer_id),
product_id VARCHAR REFERENCES products(product_id),
amount DECIMAL(10,2),
transaction_date TIMESTAMP
);
-- Node tables
CREATE NODE TABLE Customer(
customer_id STRING PRIMARY KEY,
name STRING,
email STRING,
city STRING,
signup_date DATE
);
CREATE NODE TABLE Product(
product_id STRING PRIMARY KEY,
name STRING,
category STRING,
price DOUBLE
);
-- Relationship table (foreign keys become explicit edges)
CREATE REL TABLE PURCHASED(
FROM Customer TO Product,
transaction_id STRING,
amount DOUBLE,
transaction_date TIMESTAMP
);
-- Export Customers
SELECT * FROM TABLE(io.unload(
input => TABLE(
SELECT customer_id, name, email, city, signup_date
FROM analytics.customers
),
location => 'gs://bucket/snapshot_123/nodes/Customer/',
format => 'PARQUET',
compression => 'SNAPPY'
))
-- Export Products
SELECT * FROM TABLE(io.unload(
input => TABLE(
SELECT product_id, name, category, CAST(price AS DOUBLE) as price
FROM analytics.products
),
location => 'gs://bucket/snapshot_123/nodes/Product/',
format => 'PARQUET',
compression => 'SNAPPY'
))
-- Export PURCHASED relationships
SELECT * FROM TABLE(io.unload(
input => TABLE(
SELECT
t.customer_id, -- FROM node PK (first)
t.product_id, -- TO node PK (second)
t.transaction_id, -- Property
CAST(t.amount AS DOUBLE) as amount,
t.transaction_date
FROM analytics.transactions t
),
location => 'gs://bucket/snapshot_123/edges/PURCHASED/',
format => 'PARQUET',
compression => 'SNAPPY'
))
-- Create schema
CREATE NODE TABLE Customer(customer_id STRING PRIMARY KEY, name STRING, email STRING, city STRING, signup_date DATE);
CREATE NODE TABLE Product(product_id STRING PRIMARY KEY, name STRING, category STRING, price DOUBLE);
CREATE REL TABLE PURCHASED(FROM Customer TO Product, transaction_id STRING, amount DOUBLE, transaction_date TIMESTAMP);
-- Import nodes first
COPY Customer FROM 'gs://bucket/snapshot_123/nodes/Customer/*.parquet';
COPY Product FROM 'gs://bucket/snapshot_123/nodes/Product/*.parquet';
-- Import relationships after nodes exist
COPY PURCHASED FROM 'gs://bucket/snapshot_123/edges/PURCHASED/*.parquet';
gs://bucket/snapshot_123/
├── nodes/
│ ├── Customer/
│ │ ├── 00000-xxx-uuid1.parquet
│ │ └── 00000-xxx-uuid2.parquet
│ └── Product/
│ └── 00000-xxx-uuid3.parquet
└── edges/
└── PURCHASED/
├── 00000-xxx-uuid4.parquet
└── 00000-xxx-uuid5.parquet

This section documents the Starburst REST API used for async query execution. Reference: Trino Client Protocol.

Submit a query via POST to /v1/statement:

POST /v1/statement HTTP/1.1
Host: starburst.example.com
X-Trino-Catalog: analytics
X-Trino-Schema: public
Content-Type: text/plain
Authorization: Basic <base64>
SELECT * FROM TABLE(io.unload(...))

Response (success):

{
"id": "20250116_123456_00001_abcde",
"infoUri": "http://starburst:8080/ui/query/20250116_123456_00001_abcde",
"nextUri": "http://starburst:8080/v1/query/20250116_123456_00001_abcde/1",
"stats": {
"state": "QUEUED",
"queued": true,
"scheduled": false,
"nodes": 0,
"totalSplits": 0,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 0
}
}

Response (immediate error):

{
"id": "20250116_123456_00001_abcde",
"stats": {"state": "FAILED"},
"error": {
"message": "line 1:1: Table 'analytics.public.invalid' does not exist",
"errorCode": 1,
"errorName": "TABLE_NOT_FOUND",
"errorType": "USER_ERROR"
}
}

Poll query status via GET to the nextUri:

GET /v1/query/20250116_123456_00001_abcde/1 HTTP/1.1
Host: starburst.example.com
Authorization: Basic <base64>

Response (in progress):

{
"id": "20250116_123456_00001_abcde",
"nextUri": "http://starburst:8080/v1/query/20250116_123456_00001_abcde/2",
"stats": {
"state": "RUNNING",
"queued": false,
"scheduled": true,
"nodes": 4,
"totalSplits": 100,
"queuedSplits": 20,
"runningSplits": 30,
"completedSplits": 50
}
}

Response (finished - no nextUri):

{
"id": "20250116_123456_00001_abcde",
"stats": {
"state": "FINISHED",
"queued": false,
"scheduled": true,
"nodes": 4,
"totalSplits": 100,
"queuedSplits": 0,
"runningSplits": 0,
"completedSplits": 100
}
}

Response (failed):

{
"id": "20250116_123456_00001_abcde",
"stats": {"state": "FAILED"},
"error": {
"message": "Query exceeded maximum time limit of 1.00h",
"errorCode": 65540,
"errorName": "EXCEEDED_TIME_LIMIT",
"errorType": "USER_ERROR"
}
}
QUEUED
PLANNING
STARTING
RUNNING ────→ FAILED
FINISHING
FINISHED

State descriptions:

StateDescription
QUEUEDQuery waiting in queue
PLANNINGQuery plan being generated
STARTINGAllocating resources
RUNNINGExecuting on workers
FINISHINGCompleting final operations
FINISHEDQuery completed successfully
FAILEDQuery failed (check error field)
  1. Completion detection: Keep polling nextUri until it’s absent from the response. The state field is for humans only.

  2. HTTP error handling:

    StatusAction
    200Process response
    429Retry after Retry-After header
    502, 503, 504Retry in 50-100ms (load balancer issue)
    OtherQuery failed
  3. Authentication: Headers only required on initial POST, not when following nextUri.

  4. Query cancellation: Send DELETE to nextUri to cancel a running query.

HeaderRequiredDescription
X-Trino-CatalogYes (POST)Default catalog
X-Trino-SchemaYes (POST)Default schema
AuthorizationYesBasic auth or Bearer token
Content-TypeYes (POST)text/plain for query body

connector.name=hive
hive.metastore.uri=thrift://metastore:9083
# Enable native GCS
fs.native-gcs.enabled=true
gcs.project-id=your-gcp-project
gcs.json-key-file-path=/path/to/service-account.json
{
"id": "gcs-export",
"location": "gs://your-bucket/",
"configuration": {
"fs.native-gcs.enabled": "true",
"gcs.json-key": "{...service account key JSON...}"
}
}
  • storage.objects.create
  • storage.objects.delete (for overwrites)
  • storage.objects.get
  • storage.objects.list
  • storage.buckets.get

For E2E testing, we use open-source Trino with a translation proxy instead of Starburst Enterprise. This approach replicates production behavior by executing real SQL queries and writing actual Parquet files.

Starburst’s io.unload() is proprietary and not available in open-source Trino. The trino-proxy translates io.unload() calls to Hive CTAS (CREATE TABLE AS SELECT) with external_location:

-- Input (Starburst io.unload):
SELECT * FROM TABLE(io.unload(
input => TABLE(SELECT id, name FROM catalog.schema.table),
location => 'gs://bucket/export/',
format => 'PARQUET', compression => 'SNAPPY'))
-- Output (Trino Hive CTAS):
CREATE TABLE hive.temp.export_abc123
WITH (format = 'PARQUET', external_location = 'gs://bucket/export/')
AS SELECT id, name FROM catalog.schema.table
ComponentImagePurpose
trinotrino-gcs:e2e-testTrino 479 with native GCS (gcs.endpoint for fake-gcs-server)
trino-proxytrino-proxy:e2e-testio.unload() → Hive CTAS translation
hive-metastorehive-metastore-gcs:e2e-testApache Hive 4.1.0 with GCS connector JAR
fake-gcs-serverfsouza/fake-gcs-server:1.52.3GCS emulator for local testing
  1. Same execution path - Trino executes queries and writes Parquet, same as production
  2. Same storage API - Uses GCS URLs (gs://), not S3/MinIO
  3. Same file format - Raw Parquet files, not Iceberg tables
  4. Export-worker unchanged - No code changes needed for E2E testing