Skip to content

Python API Reference

The Micromegas Python client provides a simple but powerful interface for querying observability data using SQL. This page covers all client methods, connection options, and advanced features.

Installation

Install the Micromegas Python client from PyPI:

pip install micromegas

Basic Usage

Connection

import micromegas

# Connect to local Micromegas instance
client = micromegas.connect()

# Connect with dictionary encoding preservation (for memory efficiency)
client = micromegas.connect(preserve_dictionary=True)

The connect() function connects to the analytics service at grpc://localhost:50051.

Parameters: - preserve_dictionary (bool, optional): Enable dictionary encoding preservation for memory-efficient data transfer. Default: False

Simple Queries

import datetime

# Set up time range
now = datetime.datetime.now(datetime.timezone.utc)
begin = now - datetime.timedelta(hours=1)
end = now

# Execute query with time range
sql = "SELECT * FROM log_entries LIMIT 10;"
df = client.query(sql, begin, end)
print(df)

Client Methods

query(sql, begin=None, end=None)

Execute a SQL query and return results as a pandas DataFrame.

Parameters:

  • sql (str): SQL query string
  • begin (datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be a datetime object or RFC3339 string (e.g., "2024-01-01T00:00:00Z")
  • end (datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be a datetime object or RFC3339 string (e.g., "2024-01-01T23:59:59Z")

Returns:

  • pandas.DataFrame: Query results

Performance Note: Using begin and end parameters instead of SQL time filters allows the analytics server to eliminate entire partitions before query execution, providing significant performance improvements.

Example:

# ✅ EFFICIENT: API time range enables partition elimination
df = client.query("""
    SELECT time, process_id, level, msg
    FROM log_entries
    WHERE level <= 3
    ORDER BY time DESC
    LIMIT 100;
""", begin, end)  # ⭐ Time range in API parameters

# ❌ INEFFICIENT: SQL time filter scans all partitions
df = client.query("""
    SELECT time, process_id, level, msg
    FROM log_entries
    WHERE time >= NOW() - INTERVAL '1 hour'  -- Server scans ALL partitions
      AND level <= 3
    ORDER BY time DESC
    LIMIT 100;
""")  # Missing API time parameters!

# ✅ Using RFC3339 strings for time ranges
df = client.query("""
    SELECT time, process_id, level, msg
    FROM log_entries
    WHERE level <= 3
    ORDER BY time DESC
    LIMIT 100;
""", "2024-01-01T00:00:00Z", "2024-01-01T23:59:59Z")  # ⭐ RFC3339 strings

# ✅ OK: Query without time range (for metadata queries)
processes = client.query("SELECT process_id, exe FROM processes LIMIT 10;")

query_stream(sql, begin=None, end=None)

Execute a SQL query and return results as a stream of Apache Arrow RecordBatch objects. Use this for large datasets to avoid memory issues.

Parameters:

  • sql (str): SQL query string
  • begin (datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be a datetime object or RFC3339 string (e.g., "2024-01-01T00:00:00Z")
  • end (datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be a datetime object or RFC3339 string (e.g., "2024-01-01T23:59:59Z")

Returns:

  • Iterator of pyarrow.RecordBatch: Stream of result batches

Example:

import pyarrow as pa

# Stream large dataset
sql = """
    SELECT time, process_id, level, target, msg
    FROM log_entries
    WHERE time >= NOW() - INTERVAL '7 days'
    ORDER BY time DESC;
"""

for record_batch in client.query_stream(sql, begin, end):
    # record_batch is a pyarrow.RecordBatch
    print(f"Batch shape: {record_batch.num_rows} x {record_batch.num_columns}")
    print(f"Schema: {record_batch.schema}")

    # Convert to pandas for analysis
    df = record_batch.to_pandas()

    # Process this batch
    error_logs = df[df['level'] <= 3]
    if not error_logs.empty:
        print(f"Found {len(error_logs)} errors in this batch")
        # Process errors...

    # Memory is automatically freed after each batch

query_arrow(sql, begin=None, end=None)

Execute a SQL query and return results as an Apache Arrow Table. This method preserves dictionary encoding when preserve_dictionary=True is set during connection.

Parameters:

  • sql (str): SQL query string
  • begin (datetime or str, optional): Start time for partition elimination
  • end (datetime or str, optional): End time for partition elimination

Returns:

  • pyarrow.Table: Query results as Arrow Table

Example:

# Connect with dictionary preservation
dict_client = micromegas.connect(preserve_dictionary=True)

# Get Arrow table with preserved dictionary encoding
table = dict_client.query_arrow("""
    SELECT properties_to_dict(properties) as dict_props
    FROM measures
""", begin, end)

# Check if column uses dictionary encoding
print(f"Dictionary encoded: {pa.types.is_dictionary(table.schema.field('dict_props').type)}")
print(f"Memory usage: {table.nbytes:,} bytes")

Dictionary Encoding for Memory Efficiency

When working with large datasets containing repeated values (like properties), dictionary encoding can reduce memory usage by 50-80%. Micromegas provides built-in support for dictionary encoding:

Using Dictionary-Encoded Properties

# Connect with dictionary preservation enabled
client = micromegas.connect(preserve_dictionary=True)

# Use properties_to_dict UDF for dictionary encoding
sql = """
SELECT 
    time,
    process_id,
    properties_to_dict(properties) as dict_props,
    properties_length(properties_to_dict(properties)) as prop_count
FROM measures
WHERE time >= NOW() - INTERVAL '1 hour'
"""

# Option 1: Get as pandas DataFrame (automatic conversion)
df = client.query(sql, begin, end)
print(f"DataFrame shape: {df.shape}")
print(f"Memory usage: {df.memory_usage(deep=True).sum():,} bytes")

# Option 2: Get as Arrow Table (preserves dictionary encoding)
table = client.query_arrow(sql, begin, end)
print(f"Arrow table memory: {table.nbytes:,} bytes")

# Dictionary encoding typically uses 50-80% less memory

Compatibility with Standard Functions

Dictionary-encoded data works seamlessly with Micromegas UDFs:

sql = """
SELECT 
    -- Direct property access
    property_get(properties, 'source') as source,

    -- Length calculation (works with both formats)
    properties_length(properties) as regular_count,
    properties_length(properties_to_dict(properties)) as dict_count,

    -- Convert back to array when needed
    array_length(properties_to_array(properties_to_dict(properties))) as array_count
FROM measures
"""

df = client.query(sql, begin, end)

Working with Results

pandas DataFrames

All query() results are pandas DataFrames, giving you access to the full pandas ecosystem:

# Basic DataFrame operations
result = client.query("SELECT process_id, exe, start_time FROM processes;")

# Inspect the data
print(f"Shape: {result.shape}")
print(f"Columns: {result.columns.tolist()}")
print(f"Data types:\n{result.dtypes}")

# Filter and analyze
recent = result[result['start_time'] > datetime.datetime.now() - datetime.timedelta(days=1)]
print(f"Recent processes: {len(recent)}")

# Group and aggregate
by_exe = result.groupby('exe').size().sort_values(ascending=False)
print("Processes by executable:")
print(by_exe.head())

pyarrow RecordBatch

Streaming queries return Apache Arrow RecordBatch objects:

for batch in client.query_stream(sql, begin, end):
    # RecordBatch properties
    print(f"Rows: {batch.num_rows}")
    print(f"Columns: {batch.num_columns}")
    print(f"Schema: {batch.schema}")

    # Access individual columns
    time_column = batch.column('time')
    level_column = batch.column('level')

    # Convert to pandas (zero-copy operation)
    df = batch.to_pandas()

    # Convert to other formats
    table = batch.to_pylist()  # List of dictionaries
    numpy_dict = batch.to_pydict()  # Dictionary of numpy arrays

Connection Configuration

FlightSQLClient(uri, headers=None, preserve_dictionary=False, auth_provider=None)

For advanced connection scenarios, use the FlightSQLClient class directly:

from micromegas.flightsql.client import FlightSQLClient

# Recommended: Connect with OIDC authentication (automatic token refresh)
from micromegas.auth import OidcAuthProvider

auth = OidcAuthProvider.from_file("~/.micromegas/tokens.json")
client = FlightSQLClient(
    "grpc+tls://remote-server:50051",
    auth_provider=auth
)

# Connect with dictionary preservation for memory efficiency
client = FlightSQLClient(
    "grpc://localhost:50051",
    preserve_dictionary=True
)

# Connect to local server (equivalent to micromegas.connect())
client = FlightSQLClient("grpc://localhost:50051")

Parameters: - uri (str): FlightSQL server URI. Use grpc:// for unencrypted or grpc+tls:// for TLS connections - headers (dict, optional): Deprecated. Use auth_provider instead. Static headers for authentication. This parameter is deprecated because it doesn't support automatic token refresh - preserve_dictionary (bool, optional): When True, preserve dictionary encoding in Arrow arrays for memory efficiency. Useful when using dictionary-encoded UDFs. Defaults to False - auth_provider (optional): Recommended. Authentication provider that implements get_token() method. When provided, tokens are automatically refreshed before each request. Example: OidcAuthProvider. This is the recommended way to handle authentication

Authentication Note: The auth_provider parameter is the recommended authentication method as it supports automatic token refresh. The headers parameter is deprecated and will be removed in a future version. See the Authentication Guide for details on setting up OIDC authentication.

Schema Discovery

prepare_statement(sql)

Get query schema information without executing the query:

# Prepare statement to discover schema
stmt = client.prepare_statement(
    "SELECT time, level, msg FROM log_entries WHERE level <= 3"
)

# Inspect the schema
print("Query result schema:")
for field in stmt.dataset_schema:
    print(f"  {field.name}: {field.type}")

# Output:
#   time: timestamp[ns]
#   level: int32  
#   msg: string

# The query is also available
print(f"Query: {stmt.query}")

prepared_statement_stream(statement)

Execute a prepared statement (mainly useful after schema inspection):

# Execute the prepared statement
for batch in client.prepared_statement_stream(stmt):
    df = batch.to_pandas()
    print(f"Received {len(df)} rows")

Note: Prepared statements are primarily for schema discovery. Execution offers no performance benefit over query_stream().

Process and Stream Discovery

find_process(process_id)

Find detailed information about a specific process:

# Find process by ID
process_info = client.find_process('550e8400-e29b-41d4-a716-446655440000')

if not process_info.empty:
    print(f"Process: {process_info['exe'].iloc[0]}")
    print(f"Started: {process_info['start_time'].iloc[0]}")
    print(f"Computer: {process_info['computer'].iloc[0]}")
else:
    print("Process not found")

query_streams(begin, end, limit, process_id=None, tag_filter=None)

Query event streams with filtering:

# Query all streams from the last hour
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(hours=1)
streams = client.query_streams(begin, end, limit=100)

# Filter by process
process_streams = client.query_streams(
    begin, end, 
    limit=50,
    process_id='550e8400-e29b-41d4-a716-446655440000'
)

# Filter by stream tag
log_streams = client.query_streams(
    begin, end,
    limit=20, 
    tag_filter='log'
)

print(f"Found {len(streams)} total streams")
print(f"Stream types: {streams['stream_type'].value_counts()}")

query_blocks(begin, end, limit, stream_id)

Query data blocks within a stream (for low-level inspection):

# First find a stream
streams = client.query_streams(begin, end, limit=1)
if not streams.empty:
    stream_id = streams['stream_id'].iloc[0]

    # Query blocks in that stream
    blocks = client.query_blocks(begin, end, 100, stream_id)
    print(f"Found {len(blocks)} blocks")
    print(f"Total events: {blocks['nb_events'].sum()}")
    print(f"Total size: {blocks['payload_size'].sum()} bytes")

query_spans(begin, end, limit, stream_id)

Query execution spans for performance analysis:

# Query spans for detailed performance analysis
spans = client.query_spans(begin, end, 1000, stream_id)

# Find slowest operations
slow_spans = spans.nlargest(10, 'duration')
print("Slowest operations:")
for _, span in slow_spans.iterrows():
    duration_ms = span['duration'] / 1000000  # Convert nanoseconds to milliseconds
    print(f"  {span['name']}: {duration_ms:.2f}ms")

# Analyze span hierarchy
root_spans = spans[spans['parent_span_id'].isna()]
print(f"Found {len(root_spans)} root operations")

Data Management

bulk_ingest(table_name, df)

Bulk ingest metadata for replication or administrative tasks:

import pandas as pd

# Example: Replicate process metadata
processes_df = pd.DataFrame({
    'process_id': ['550e8400-e29b-41d4-a716-446655440000'],
    'exe': ['/usr/bin/myapp'],
    'username': ['user'],
    'realname': ['User Name'],
    'computer': ['hostname'],
    'distro': ['Ubuntu 22.04'],
    'cpu_brand': ['Intel Core i7'],
    'tsc_frequency': [2400000000],
    'start_time': [datetime.datetime.now(datetime.timezone.utc)],
    'start_ticks': [1234567890],
    'insert_time': [datetime.datetime.now(datetime.timezone.utc)],
    'parent_process_id': [''],
    'properties': [[]]
})

# Ingest process metadata
result = client.bulk_ingest('processes', processes_df)
if result:
    print(f"Ingested {result.record_count} process records")

Supported tables: processes, streams, blocks, payloads

Note: This method is for metadata replication and administrative tasks. Use the telemetry ingestion service HTTP API for normal data ingestion.

materialize_partitions(view_set_name, begin, end, partition_delta_seconds)

Create materialized partitions for performance optimization:

# Materialize hourly partitions for the last 24 hours
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(days=1)

client.materialize_partitions(
    'log_entries',
    begin,
    end,
    3600  # 1-hour partitions
)
# Prints progress messages for each materialized partition

retire_partitions(view_set_name, view_instance_id, begin, end)

Remove materialized partitions to free up storage:

# Retire old partitions
client.retire_partitions(
    'log_entries',
    'process-123-456', 
    begin,
    end
)
# Prints status messages as partitions are retired

Warning: This operation cannot be undone. Retired partitions must be re-materialized if needed.

Administrative Functions

The micromegas.admin module provides administrative functions for schema evolution and partition lifecycle management. These functions are intended for system administrators and should be used with caution.

list_incompatible_partitions(client, view_set_name=None)

Lists partitions with schemas incompatible with current view set schemas. Returns one row per partition.

import micromegas
import micromegas.admin

client = micromegas.connect()

incompatible = micromegas.admin.list_incompatible_partitions(client)
print(f"Found {len(incompatible)} incompatible partitions")
print(f"Total size: {incompatible['file_size'].sum() / (1024**3):.2f} GB")

Returns: view_set_name, view_instance_id, begin_insert_time, end_insert_time, incompatible_schema_hash, current_schema_hash, file_path, file_size

retire_incompatible_partitions(client, view_set_name=None)

Retires partitions with incompatible schemas using metadata-based retirement (works for empty and non-empty partitions).

# Preview first
preview = micromegas.admin.list_incompatible_partitions(client, 'log_entries')
print(f"Would retire {len(preview)} partitions")

# Retire
result = micromegas.admin.retire_incompatible_partitions(client, 'log_entries')
print(f"Retired {result['partitions_retired'].sum()} partitions")

Returns: view_set_name, view_instance_id, partitions_retired, partitions_failed, storage_freed_bytes, retirement_messages

⚠️ DESTRUCTIVE OPERATION: Irreversible. Always preview first.

Command-Line Interface

The Micromegas Python client includes CLI tools for quick queries and administrative tasks.

query.py - Run SQL Queries

Execute arbitrary SQL queries against the analytics service from the command line.

Location: python/micromegas/cli/query.py

Usage:

cd python/micromegas/cli
poetry run python query.py "SELECT * FROM list_partitions() LIMIT 5"

Arguments: - sql (positional): SQL query to execute

Options: - --begin: Begin timestamp (ISO format or relative like 1h, 30m, 7d). Default: 1 hour ago - --end: End timestamp (ISO format or relative like 1h, 30m, 7d). Default: now - --format: Output format - table (default), csv, or json - --max-colwidth: Maximum column width for table format (default: 50, use 0 for unlimited)

Examples:

# Query with default time range (last hour)
poetry run python query.py "SELECT * FROM processes LIMIT 10"

# Query with relative time range (last 24 hours)
poetry run python query.py "SELECT * FROM log_entries LIMIT 100" --begin 24h

# Query with specific timestamps
poetry run python query.py "SELECT * FROM measures LIMIT 50" \
    --begin 2024-01-01T00:00:00 --end 2024-01-02T00:00:00

# Output as CSV for piping to other tools
poetry run python query.py "SELECT * FROM list_partitions()" --format csv

# Output as JSON
poetry run python query.py "SELECT view_set_name, num_rows FROM list_partitions()" --format json

Environment Variables: - MICROMEGAS_ANALYTICS_URI: Analytics server URI (default: grpc://localhost:50051) - MICROMEGAS_OIDC_ISSUER: OIDC issuer URL for authentication - MICROMEGAS_OIDC_CLIENT_ID: OIDC client ID - MICROMEGAS_PYTHON_MODULE_WRAPPER: Custom authentication module for corporate environments

Other CLI Tools

Additional CLI tools are available in python/micromegas/cli/:

  • query_processes.py: List processes in the telemetry database
  • query_process_log.py: Query log entries for specific processes
  • query_process_metrics.py: Query metrics for specific processes
  • write_perfetto.py: Export trace data to Perfetto format
  • logout.py: Clear cached authentication tokens

Time Utilities

format_datetime(value) and parse_time_delta(user_string)

Utility functions for time handling:

from micromegas.time import format_datetime, parse_time_delta

# Format datetime for queries
dt = datetime.datetime.now(datetime.timezone.utc)
formatted = format_datetime(dt)
print(formatted)  # "2024-01-01T12:00:00+00:00"

# Parse human-readable time deltas
one_hour = parse_time_delta('1h')
thirty_minutes = parse_time_delta('30m') 
seven_days = parse_time_delta('7d')

# Use in calculations
recent_time = datetime.datetime.now(datetime.timezone.utc) - parse_time_delta('2h')

Supported units: m (minutes), h (hours), d (days)

Advanced Features

Query Streaming Benefits

Use query_stream() for large datasets to:

  • Reduce memory usage: Process data in chunks instead of loading everything
  • Improve responsiveness: Start processing before the query completes
  • Handle large results: Query datasets larger than available RAM
# Example: Process week of data in batches
total_errors = 0
total_rows = 0

for batch in client.query_stream("""
    SELECT level, msg FROM log_entries 
    WHERE time >= NOW() - INTERVAL '7 days'
""", begin, end):
    df = batch.to_pandas()
    errors_in_batch = len(df[df['level'] <= 2])

    total_errors += errors_in_batch
    total_rows += len(df)

    print(f"Batch: {len(df)} rows, {errors_in_batch} errors")

print(f"Total: {total_rows} rows, {total_errors} errors")

FlightSQL Protocol Benefits

Micromegas uses Apache Arrow FlightSQL for optimal performance:

  • Columnar data transfer: Orders of magnitude faster than JSON
  • Binary protocol: No serialization/deserialization overhead
  • Native compression: Efficient network utilization
  • Vectorized operations: Optimized for analytical workloads
  • Zero-copy operations: Direct memory mapping from network buffers

Connection Configuration

# Connect to local server (default)
client = micromegas.connect()

# Connect with dictionary preservation for memory efficiency
client = micromegas.connect(preserve_dictionary=True)

# For remote servers with authentication, use FlightSQLClient with auth_provider
from micromegas.flightsql.client import FlightSQLClient
from micromegas.auth import OidcAuthProvider

# Recommended: OIDC authentication with automatic token refresh
auth = OidcAuthProvider.from_file("~/.micromegas/tokens.json")
client = FlightSQLClient(
    "grpc+tls://remote-server:50051",
    auth_provider=auth
)

Error Handling

try:
    df = client.query("SELECT * FROM log_entries;", begin, end)
except Exception as e:
    print(f"Query failed: {e}")

# Check for empty results
if df.empty:
    print("No data found for this time range")
else:
    print(f"Found {len(df)} rows")

Performance Tips

Use Time Ranges

Always specify time ranges for better performance:

# ✅ Good - efficient
df = client.query(sql, begin, end)

# ❌ Avoid - can be slow
df = client.query(sql)

Streaming for Large Results

Use streaming for queries that might return large datasets:

# If you expect > 100MB of results, use streaming
if expected_result_size_mb > 100:
    for batch in client.query_stream(sql, begin, end):
        process_batch(batch.to_pandas())
else:
    df = client.query(sql, begin, end)
    process_dataframe(df)

Limit Result Size

Add LIMIT clauses for exploratory queries:

# Good for exploration
df = client.query("SELECT * FROM log_entries LIMIT 1000;", begin, end)

# Then remove limit for production queries
df = client.query("SELECT * FROM log_entries WHERE level <= 2;", begin, end)

Integration Examples

Jupyter Notebooks

import matplotlib.pyplot as plt
import seaborn as sns

# Query data
df = client.query("""
    SELECT time, name, value 
    FROM measures 
    WHERE name = 'cpu_usage'
""", begin, end)

# Plot time series
plt.figure(figsize=(12, 6))
plt.plot(df['time'], df['value'])
plt.title('CPU Usage Over Time')
plt.xlabel('Time')
plt.ylabel('CPU Usage %')
plt.show()

Data Pipeline

import pandas as pd

def extract_metrics(process_id, hours=24):
    """Extract metrics for a specific process."""
    end = datetime.datetime.now(datetime.timezone.utc)
    begin = end - datetime.timedelta(hours=hours)

    sql = f"""
        SELECT time, name, value, unit
        FROM view_instance('measures', '{process_id}')
        ORDER BY time;
    """

    return client.query(sql, begin, end)

def analyze_performance(df):
    """Analyze performance metrics."""
    metrics = {}
    for name in df['name'].unique():
        data = df[df['name'] == name]['value']
        metrics[name] = {
            'mean': data.mean(),
            'max': data.max(),
            'min': data.min(),
            'std': data.std()
        }
    return metrics

# Use in pipeline
process_metrics = extract_metrics('my-service-123')
performance_summary = analyze_performance(process_metrics)
print(performance_summary)

Next Steps