Python API Reference¶
The Micromegas Python client provides a simple but powerful interface for querying observability data using SQL. This page covers all client methods, connection options, and advanced features.
Installation¶
Install the Micromegas Python client from PyPI:
Basic Usage¶
Connection¶
import micromegas
# Connect to local Micromegas instance
client = micromegas.connect()
# Connect with dictionary encoding preservation (for memory efficiency)
client = micromegas.connect(preserve_dictionary=True)
The connect()
function connects to the analytics service at grpc://localhost:50051
.
Parameters:
- preserve_dictionary
(bool, optional): Enable dictionary encoding preservation for memory-efficient data transfer. Default: False
Simple Queries¶
import datetime
# Set up time range
now = datetime.datetime.now(datetime.timezone.utc)
begin = now - datetime.timedelta(hours=1)
end = now
# Execute query with time range
sql = "SELECT * FROM log_entries LIMIT 10;"
df = client.query(sql, begin, end)
print(df)
Client Methods¶
query(sql, begin=None, end=None)
¶
Execute a SQL query and return results as a pandas DataFrame.
Parameters:
sql
(str): SQL query stringbegin
(datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T00:00:00Z"
)end
(datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T23:59:59Z"
)
Returns:
pandas.DataFrame
: Query results
Performance Note:
Using begin
and end
parameters instead of SQL time filters allows the analytics server to eliminate entire partitions before query execution, providing significant performance improvements.
Example:
# ✅ EFFICIENT: API time range enables partition elimination
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE level <= 3
ORDER BY time DESC
LIMIT 100;
""", begin, end) # ⭐ Time range in API parameters
# ❌ INEFFICIENT: SQL time filter scans all partitions
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE time >= NOW() - INTERVAL '1 hour' -- Server scans ALL partitions
AND level <= 3
ORDER BY time DESC
LIMIT 100;
""") # Missing API time parameters!
# ✅ Using RFC3339 strings for time ranges
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE level <= 3
ORDER BY time DESC
LIMIT 100;
""", "2024-01-01T00:00:00Z", "2024-01-01T23:59:59Z") # ⭐ RFC3339 strings
# ✅ OK: Query without time range (for metadata queries)
processes = client.query("SELECT process_id, exe FROM processes LIMIT 10;")
query_stream(sql, begin=None, end=None)
¶
Execute a SQL query and return results as a stream of Apache Arrow RecordBatch objects. Use this for large datasets to avoid memory issues.
Parameters:
sql
(str): SQL query stringbegin
(datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T00:00:00Z"
)end
(datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T23:59:59Z"
)
Returns:
- Iterator of
pyarrow.RecordBatch
: Stream of result batches
Example:
import pyarrow as pa
# Stream large dataset
sql = """
SELECT time, process_id, level, target, msg
FROM log_entries
WHERE time >= NOW() - INTERVAL '7 days'
ORDER BY time DESC;
"""
for record_batch in client.query_stream(sql, begin, end):
# record_batch is a pyarrow.RecordBatch
print(f"Batch shape: {record_batch.num_rows} x {record_batch.num_columns}")
print(f"Schema: {record_batch.schema}")
# Convert to pandas for analysis
df = record_batch.to_pandas()
# Process this batch
error_logs = df[df['level'] <= 3]
if not error_logs.empty:
print(f"Found {len(error_logs)} errors in this batch")
# Process errors...
# Memory is automatically freed after each batch
query_arrow(sql, begin=None, end=None)
¶
Execute a SQL query and return results as an Apache Arrow Table. This method preserves dictionary encoding when preserve_dictionary=True
is set during connection.
Parameters:
sql
(str): SQL query stringbegin
(datetime or str, optional): Start time for partition eliminationend
(datetime or str, optional): End time for partition elimination
Returns:
pyarrow.Table
: Query results as Arrow Table
Example:
# Connect with dictionary preservation
dict_client = micromegas.connect(preserve_dictionary=True)
# Get Arrow table with preserved dictionary encoding
table = dict_client.query_arrow("""
SELECT properties_to_dict(properties) as dict_props
FROM measures
""", begin, end)
# Check if column uses dictionary encoding
print(f"Dictionary encoded: {pa.types.is_dictionary(table.schema.field('dict_props').type)}")
print(f"Memory usage: {table.nbytes:,} bytes")
Dictionary Encoding for Memory Efficiency¶
When working with large datasets containing repeated values (like properties), dictionary encoding can reduce memory usage by 50-80%. Micromegas provides built-in support for dictionary encoding:
Using Dictionary-Encoded Properties¶
# Connect with dictionary preservation enabled
client = micromegas.connect(preserve_dictionary=True)
# Use properties_to_dict UDF for dictionary encoding
sql = """
SELECT
time,
process_id,
properties_to_dict(properties) as dict_props,
properties_length(properties_to_dict(properties)) as prop_count
FROM measures
WHERE time >= NOW() - INTERVAL '1 hour'
"""
# Option 1: Get as pandas DataFrame (automatic conversion)
df = client.query(sql, begin, end)
print(f"DataFrame shape: {df.shape}")
print(f"Memory usage: {df.memory_usage(deep=True).sum():,} bytes")
# Option 2: Get as Arrow Table (preserves dictionary encoding)
table = client.query_arrow(sql, begin, end)
print(f"Arrow table memory: {table.nbytes:,} bytes")
# Dictionary encoding typically uses 50-80% less memory
Compatibility with Standard Functions¶
Dictionary-encoded data works seamlessly with Micromegas UDFs:
sql = """
SELECT
-- Direct property access
property_get(properties, 'source') as source,
-- Length calculation (works with both formats)
properties_length(properties) as regular_count,
properties_length(properties_to_dict(properties)) as dict_count,
-- Convert back to array when needed
array_length(properties_to_array(properties_to_dict(properties))) as array_count
FROM measures
"""
df = client.query(sql, begin, end)
Working with Results¶
pandas DataFrames¶
All query()
results are pandas DataFrames, giving you access to the full pandas ecosystem:
# Basic DataFrame operations
result = client.query("SELECT process_id, exe, start_time FROM processes;")
# Inspect the data
print(f"Shape: {result.shape}")
print(f"Columns: {result.columns.tolist()}")
print(f"Data types:\n{result.dtypes}")
# Filter and analyze
recent = result[result['start_time'] > datetime.datetime.now() - datetime.timedelta(days=1)]
print(f"Recent processes: {len(recent)}")
# Group and aggregate
by_exe = result.groupby('exe').size().sort_values(ascending=False)
print("Processes by executable:")
print(by_exe.head())
pyarrow RecordBatch¶
Streaming queries return Apache Arrow RecordBatch objects:
for batch in client.query_stream(sql, begin, end):
# RecordBatch properties
print(f"Rows: {batch.num_rows}")
print(f"Columns: {batch.num_columns}")
print(f"Schema: {batch.schema}")
# Access individual columns
time_column = batch.column('time')
level_column = batch.column('level')
# Convert to pandas (zero-copy operation)
df = batch.to_pandas()
# Convert to other formats
table = batch.to_pylist() # List of dictionaries
numpy_dict = batch.to_pydict() # Dictionary of numpy arrays
Connection Configuration¶
FlightSQLClient(uri, headers=None)
¶
For advanced connection scenarios, use the FlightSQLClient
class directly:
from micromegas.flightsql.client import FlightSQLClient
# Connect to remote server with authentication
client = FlightSQLClient(
"grpc+tls://remote-server:50051",
headers={"authorization": "Bearer your-token"}
)
# Connect to local server (equivalent to micromegas.connect())
client = FlightSQLClient("grpc://localhost:50051")
Parameters:
- uri
(str): FlightSQL server URI. Use grpc://
for unencrypted or grpc+tls://
for TLS connections
- headers
(dict, optional): Custom headers for authentication or metadata
Schema Discovery¶
prepare_statement(sql)
¶
Get query schema information without executing the query:
# Prepare statement to discover schema
stmt = client.prepare_statement(
"SELECT time, level, msg FROM log_entries WHERE level <= 3"
)
# Inspect the schema
print("Query result schema:")
for field in stmt.dataset_schema:
print(f" {field.name}: {field.type}")
# Output:
# time: timestamp[ns]
# level: int32
# msg: string
# The query is also available
print(f"Query: {stmt.query}")
prepared_statement_stream(statement)
¶
Execute a prepared statement (mainly useful after schema inspection):
# Execute the prepared statement
for batch in client.prepared_statement_stream(stmt):
df = batch.to_pandas()
print(f"Received {len(df)} rows")
Note: Prepared statements are primarily for schema discovery. Execution offers no performance benefit over query_stream()
.
Process and Stream Discovery¶
find_process(process_id)
¶
Find detailed information about a specific process:
# Find process by ID
process_info = client.find_process('550e8400-e29b-41d4-a716-446655440000')
if not process_info.empty:
print(f"Process: {process_info['exe'].iloc[0]}")
print(f"Started: {process_info['start_time'].iloc[0]}")
print(f"Computer: {process_info['computer'].iloc[0]}")
else:
print("Process not found")
query_streams(begin, end, limit, process_id=None, tag_filter=None)
¶
Query event streams with filtering:
# Query all streams from the last hour
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(hours=1)
streams = client.query_streams(begin, end, limit=100)
# Filter by process
process_streams = client.query_streams(
begin, end,
limit=50,
process_id='550e8400-e29b-41d4-a716-446655440000'
)
# Filter by stream tag
log_streams = client.query_streams(
begin, end,
limit=20,
tag_filter='log'
)
print(f"Found {len(streams)} total streams")
print(f"Stream types: {streams['stream_type'].value_counts()}")
query_blocks(begin, end, limit, stream_id)
¶
Query data blocks within a stream (for low-level inspection):
# First find a stream
streams = client.query_streams(begin, end, limit=1)
if not streams.empty:
stream_id = streams['stream_id'].iloc[0]
# Query blocks in that stream
blocks = client.query_blocks(begin, end, 100, stream_id)
print(f"Found {len(blocks)} blocks")
print(f"Total events: {blocks['nb_events'].sum()}")
print(f"Total size: {blocks['payload_size'].sum()} bytes")
query_spans(begin, end, limit, stream_id)
¶
Query execution spans for performance analysis:
# Query spans for detailed performance analysis
spans = client.query_spans(begin, end, 1000, stream_id)
# Find slowest operations
slow_spans = spans.nlargest(10, 'duration')
print("Slowest operations:")
for _, span in slow_spans.iterrows():
duration_ms = span['duration'] / 1000000 # Convert nanoseconds to milliseconds
print(f" {span['name']}: {duration_ms:.2f}ms")
# Analyze span hierarchy
root_spans = spans[spans['parent_span_id'].isna()]
print(f"Found {len(root_spans)} root operations")
Data Management¶
bulk_ingest(table_name, df)
¶
Bulk ingest metadata for replication or administrative tasks:
import pandas as pd
# Example: Replicate process metadata
processes_df = pd.DataFrame({
'process_id': ['550e8400-e29b-41d4-a716-446655440000'],
'exe': ['/usr/bin/myapp'],
'username': ['user'],
'realname': ['User Name'],
'computer': ['hostname'],
'distro': ['Ubuntu 22.04'],
'cpu_brand': ['Intel Core i7'],
'tsc_frequency': [2400000000],
'start_time': [datetime.datetime.now(datetime.timezone.utc)],
'start_ticks': [1234567890],
'insert_time': [datetime.datetime.now(datetime.timezone.utc)],
'parent_process_id': [''],
'properties': [[]]
})
# Ingest process metadata
result = client.bulk_ingest('processes', processes_df)
if result:
print(f"Ingested {result.record_count} process records")
Supported tables: processes
, streams
, blocks
, payloads
Note: This method is for metadata replication and administrative tasks. Use the telemetry ingestion service HTTP API for normal data ingestion.
materialize_partitions(view_set_name, begin, end, partition_delta_seconds)
¶
Create materialized partitions for performance optimization:
# Materialize hourly partitions for the last 24 hours
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(days=1)
client.materialize_partitions(
'log_entries',
begin,
end,
3600 # 1-hour partitions
)
# Prints progress messages for each materialized partition
retire_partitions(view_set_name, view_instance_id, begin, end)
¶
Remove materialized partitions to free up storage:
# Retire old partitions
client.retire_partitions(
'log_entries',
'process-123-456',
begin,
end
)
# Prints status messages as partitions are retired
Warning: This operation cannot be undone. Retired partitions must be re-materialized if needed.
Administrative Functions¶
The micromegas.admin
module provides administrative functions for schema evolution and partition lifecycle management. These functions are intended for system administrators and should be used with caution.
list_incompatible_partitions(client, view_set_name=None)
¶
Lists partitions with schemas incompatible with current view set schemas. These partitions cannot be queried correctly alongside current partitions and should be retired to enable schema evolution.
import micromegas
import micromegas.admin
# Connect to analytics service
client = micromegas.connect()
# List all incompatible partitions across all view sets
incompatible = micromegas.admin.list_incompatible_partitions(client)
print(f"Found {len(incompatible)} groups of incompatible partitions")
print(f"Total incompatible partitions: {incompatible['partition_count'].sum()}")
print(f"Total size to be freed: {incompatible['total_size_bytes'].sum() / (1024**3):.2f} GB")
# List incompatible partitions for specific view set
log_incompatible = micromegas.admin.list_incompatible_partitions(client, 'log_entries')
print(f"Log entries incompatible partitions: {log_incompatible['partition_count'].sum()}")
Returns:
- view_set_name
: Name of the view set
- view_instance_id
: Instance ID (e.g., process_id or 'global')
- incompatible_schema_hash
: The old schema hash in the partition
- current_schema_hash
: The current schema hash from ViewFactory
- partition_count
: Number of incompatible partitions with this schema
- total_size_bytes
: Total size in bytes of all incompatible partitions
- file_paths
: Array of file paths for each incompatible partition
retire_incompatible_partitions(client, view_set_name=None)
¶
Retires partitions with schemas incompatible with current view set schemas. This enables safe schema evolution by cleaning up old schema versions.
import micromegas
import micromegas.admin
client = micromegas.connect()
# Preview what would be retired (recommended first step)
preview = micromegas.admin.list_incompatible_partitions(client, 'log_entries')
print(f"Would retire {preview['partition_count'].sum()} partitions")
print(f"Would free {preview['total_size_bytes'].sum() / (1024**3):.2f} GB")
# Retire incompatible partitions for specific view set
if input("Proceed with retirement? (yes/no): ") == "yes":
result = micromegas.admin.retire_incompatible_partitions(client, 'log_entries')
print(f"Retired {result['partitions_retired'].sum()} partitions")
print(f"Failed {result['partitions_failed'].sum()} partitions")
# Check for any failures
for _, row in result.iterrows():
if row['partitions_failed'] > 0:
print(f"Failures in {row['view_set_name']}/{row['view_instance_id']}:")
for msg in row['retirement_messages']:
if msg.startswith("ERROR:"):
print(f" {msg}")
Returns:
- view_set_name
: View set that was processed
- view_instance_id
: Instance ID of partitions retired
- partitions_retired
: Count of partitions successfully retired
- partitions_failed
: Count of partitions that failed to retire
- storage_freed_bytes
: Total bytes freed from storage
- retirement_messages
: Array of detailed messages for each retirement attempt
Safety Features: - Uses file-path-based retirement for precision targeting - Cannot accidentally retire compatible partitions - Provides detailed error reporting for failures - Allows view-specific filtering to prevent bulk operations
⚠️ DESTRUCTIVE OPERATION: This operation is irreversible. Retired partitions will be permanently deleted from metadata and their data files removed from object storage. Always preview with list_incompatible_partitions()
before calling this function.
Time Utilities¶
format_datetime(value)
and parse_time_delta(user_string)
¶
Utility functions for time handling:
from micromegas.time import format_datetime, parse_time_delta
# Format datetime for queries
dt = datetime.datetime.now(datetime.timezone.utc)
formatted = format_datetime(dt)
print(formatted) # "2024-01-01T12:00:00+00:00"
# Parse human-readable time deltas
one_hour = parse_time_delta('1h')
thirty_minutes = parse_time_delta('30m')
seven_days = parse_time_delta('7d')
# Use in calculations
recent_time = datetime.datetime.now(datetime.timezone.utc) - parse_time_delta('2h')
Supported units: m
(minutes), h
(hours), d
(days)
Advanced Features¶
Query Streaming Benefits¶
Use query_stream()
for large datasets to:
- Reduce memory usage: Process data in chunks instead of loading everything
- Improve responsiveness: Start processing before the query completes
- Handle large results: Query datasets larger than available RAM
# Example: Process week of data in batches
total_errors = 0
total_rows = 0
for batch in client.query_stream("""
SELECT level, msg FROM log_entries
WHERE time >= NOW() - INTERVAL '7 days'
""", begin, end):
df = batch.to_pandas()
errors_in_batch = len(df[df['level'] <= 2])
total_errors += errors_in_batch
total_rows += len(df)
print(f"Batch: {len(df)} rows, {errors_in_batch} errors")
print(f"Total: {total_rows} rows, {total_errors} errors")
FlightSQL Protocol Benefits¶
Micromegas uses Apache Arrow FlightSQL for optimal performance:
- Columnar data transfer: Orders of magnitude faster than JSON
- Binary protocol: No serialization/deserialization overhead
- Native compression: Efficient network utilization
- Vectorized operations: Optimized for analytical workloads
- Zero-copy operations: Direct memory mapping from network buffers
Connection Configuration¶
# Connect to local server (default)
client = micromegas.connect()
# Connect to a custom endpoint using FlightSQLClient directly
from micromegas.flightsql.client import FlightSQLClient
client = FlightSQLClient("grpc://remote-server:50051")
Error Handling¶
try:
df = client.query("SELECT * FROM log_entries;", begin, end)
except Exception as e:
print(f"Query failed: {e}")
# Check for empty results
if df.empty:
print("No data found for this time range")
else:
print(f"Found {len(df)} rows")
Performance Tips¶
Use Time Ranges¶
Always specify time ranges for better performance:
# ✅ Good - efficient
df = client.query(sql, begin, end)
# ❌ Avoid - can be slow
df = client.query(sql)
Streaming for Large Results¶
Use streaming for queries that might return large datasets:
# If you expect > 100MB of results, use streaming
if expected_result_size_mb > 100:
for batch in client.query_stream(sql, begin, end):
process_batch(batch.to_pandas())
else:
df = client.query(sql, begin, end)
process_dataframe(df)
Limit Result Size¶
Add LIMIT clauses for exploratory queries:
# Good for exploration
df = client.query("SELECT * FROM log_entries LIMIT 1000;", begin, end)
# Then remove limit for production queries
df = client.query("SELECT * FROM log_entries WHERE level <= 2;", begin, end)
Integration Examples¶
Jupyter Notebooks¶
import matplotlib.pyplot as plt
import seaborn as sns
# Query data
df = client.query("""
SELECT time, name, value
FROM measures
WHERE name = 'cpu_usage'
""", begin, end)
# Plot time series
plt.figure(figsize=(12, 6))
plt.plot(df['time'], df['value'])
plt.title('CPU Usage Over Time')
plt.xlabel('Time')
plt.ylabel('CPU Usage %')
plt.show()
Data Pipeline¶
import pandas as pd
def extract_metrics(process_id, hours=24):
"""Extract metrics for a specific process."""
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(hours=hours)
sql = f"""
SELECT time, name, value, unit
FROM view_instance('measures', '{process_id}')
ORDER BY time;
"""
return client.query(sql, begin, end)
def analyze_performance(df):
"""Analyze performance metrics."""
metrics = {}
for name in df['name'].unique():
data = df[df['name'] == name]['value']
metrics[name] = {
'mean': data.mean(),
'max': data.max(),
'min': data.min(),
'std': data.std()
}
return metrics
# Use in pipeline
process_metrics = extract_metrics('my-service-123')
performance_summary = analyze_performance(process_metrics)
print(performance_summary)
Next Steps¶
- Python API Advanced - Advanced patterns, performance optimization, and specialized tooling
- Schema Reference - Understand available views and fields
- Functions Reference - Learn about SQL functions
- Query Patterns - Common observability query patterns
- Performance Guide - Optimize your queries