Python API Reference¶
The Micromegas Python client provides a simple but powerful interface for querying observability data using SQL. This page covers all client methods, connection options, and advanced features.
Installation¶
Install the Micromegas Python client from PyPI:
Basic Usage¶
Connection¶
The connect()
function connects to the analytics service at grpc://localhost:50051
.
Simple Queries¶
import datetime
# Set up time range
now = datetime.datetime.now(datetime.timezone.utc)
begin = now - datetime.timedelta(hours=1)
end = now
# Execute query with time range
sql = "SELECT * FROM log_entries LIMIT 10;"
df = client.query(sql, begin, end)
print(df)
Client Methods¶
query(sql, begin=None, end=None)
¶
Execute a SQL query and return results as a pandas DataFrame.
Parameters:
sql
(str): SQL query stringbegin
(datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T00:00:00Z"
)end
(datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T23:59:59Z"
)
Returns:
pandas.DataFrame
: Query results
Performance Note:
Using begin
and end
parameters instead of SQL time filters allows the analytics server to eliminate entire partitions before query execution, providing significant performance improvements.
Example:
# ✅ EFFICIENT: API time range enables partition elimination
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE level <= 3
ORDER BY time DESC
LIMIT 100;
""", begin, end) # ⭐ Time range in API parameters
# ❌ INEFFICIENT: SQL time filter scans all partitions
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE time >= NOW() - INTERVAL '1 hour' -- Server scans ALL partitions
AND level <= 3
ORDER BY time DESC
LIMIT 100;
""") # Missing API time parameters!
# ✅ Using RFC3339 strings for time ranges
df = client.query("""
SELECT time, process_id, level, msg
FROM log_entries
WHERE level <= 3
ORDER BY time DESC
LIMIT 100;
""", "2024-01-01T00:00:00Z", "2024-01-01T23:59:59Z") # ⭐ RFC3339 strings
# ✅ OK: Query without time range (for metadata queries)
processes = client.query("SELECT process_id, exe FROM processes LIMIT 10;")
query_stream(sql, begin=None, end=None)
¶
Execute a SQL query and return results as a stream of Apache Arrow RecordBatch objects. Use this for large datasets to avoid memory issues.
Parameters:
sql
(str): SQL query stringbegin
(datetime or str, optional): ⚡ Recommended - Start time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T00:00:00Z"
)end
(datetime or str, optional): ⚡ Recommended - End time for partition elimination. Can be adatetime
object or RFC3339 string (e.g.,"2024-01-01T23:59:59Z"
)
Returns:
- Iterator of
pyarrow.RecordBatch
: Stream of result batches
Example:
import pyarrow as pa
# Stream large dataset
sql = """
SELECT time, process_id, level, target, msg
FROM log_entries
WHERE time >= NOW() - INTERVAL '7 days'
ORDER BY time DESC;
"""
for record_batch in client.query_stream(sql, begin, end):
# record_batch is a pyarrow.RecordBatch
print(f"Batch shape: {record_batch.num_rows} x {record_batch.num_columns}")
print(f"Schema: {record_batch.schema}")
# Convert to pandas for analysis
df = record_batch.to_pandas()
# Process this batch
error_logs = df[df['level'] <= 3]
if not error_logs.empty:
print(f"Found {len(error_logs)} errors in this batch")
# Process errors...
# Memory is automatically freed after each batch
Working with Results¶
pandas DataFrames¶
All query()
results are pandas DataFrames, giving you access to the full pandas ecosystem:
# Basic DataFrame operations
result = client.query("SELECT process_id, exe, start_time FROM processes;")
# Inspect the data
print(f"Shape: {result.shape}")
print(f"Columns: {result.columns.tolist()}")
print(f"Data types:\n{result.dtypes}")
# Filter and analyze
recent = result[result['start_time'] > datetime.datetime.now() - datetime.timedelta(days=1)]
print(f"Recent processes: {len(recent)}")
# Group and aggregate
by_exe = result.groupby('exe').size().sort_values(ascending=False)
print("Processes by executable:")
print(by_exe.head())
pyarrow RecordBatch¶
Streaming queries return Apache Arrow RecordBatch objects:
for batch in client.query_stream(sql, begin, end):
# RecordBatch properties
print(f"Rows: {batch.num_rows}")
print(f"Columns: {batch.num_columns}")
print(f"Schema: {batch.schema}")
# Access individual columns
time_column = batch.column('time')
level_column = batch.column('level')
# Convert to pandas (zero-copy operation)
df = batch.to_pandas()
# Convert to other formats
table = batch.to_pylist() # List of dictionaries
numpy_dict = batch.to_pydict() # Dictionary of numpy arrays
Advanced Features¶
Query Streaming Benefits¶
Use query_stream()
for large datasets to:
- Reduce memory usage: Process data in chunks instead of loading everything
- Improve responsiveness: Start processing before the query completes
- Handle large results: Query datasets larger than available RAM
# Example: Process week of data in batches
total_errors = 0
total_rows = 0
for batch in client.query_stream("""
SELECT level, msg FROM log_entries
WHERE time >= NOW() - INTERVAL '7 days'
""", begin, end):
df = batch.to_pandas()
errors_in_batch = len(df[df['level'] <= 2])
total_errors += errors_in_batch
total_rows += len(df)
print(f"Batch: {len(df)} rows, {errors_in_batch} errors")
print(f"Total: {total_rows} rows, {total_errors} errors")
FlightSQL Protocol Benefits¶
Micromegas uses Apache Arrow FlightSQL for optimal performance:
- Columnar data transfer: Orders of magnitude faster than JSON
- Binary protocol: No serialization/deserialization overhead
- Native compression: Efficient network utilization
- Vectorized operations: Optimized for analytical workloads
- Zero-copy operations: Direct memory mapping from network buffers
Connection Configuration¶
# Connect to local server (default)
client = micromegas.connect()
# Connect to a custom endpoint using FlightSQLClient directly
from micromegas.flightsql.client import FlightSQLClient
client = FlightSQLClient("grpc://remote-server:50051")
Error Handling¶
try:
df = client.query("SELECT * FROM log_entries;", begin, end)
except Exception as e:
print(f"Query failed: {e}")
# Check for empty results
if df.empty:
print("No data found for this time range")
else:
print(f"Found {len(df)} rows")
Performance Tips¶
Use Time Ranges¶
Always specify time ranges for better performance:
# ✅ Good - efficient
df = client.query(sql, begin, end)
# ❌ Avoid - can be slow
df = client.query(sql)
Streaming for Large Results¶
Use streaming for queries that might return large datasets:
# If you expect > 100MB of results, use streaming
if expected_result_size_mb > 100:
for batch in client.query_stream(sql, begin, end):
process_batch(batch.to_pandas())
else:
df = client.query(sql, begin, end)
process_dataframe(df)
Limit Result Size¶
Add LIMIT clauses for exploratory queries:
# Good for exploration
df = client.query("SELECT * FROM log_entries LIMIT 1000;", begin, end)
# Then remove limit for production queries
df = client.query("SELECT * FROM log_entries WHERE level <= 2;", begin, end)
Integration Examples¶
Jupyter Notebooks¶
import matplotlib.pyplot as plt
import seaborn as sns
# Query data
df = client.query("""
SELECT time, name, value
FROM measures
WHERE name = 'cpu_usage'
""", begin, end)
# Plot time series
plt.figure(figsize=(12, 6))
plt.plot(df['time'], df['value'])
plt.title('CPU Usage Over Time')
plt.xlabel('Time')
plt.ylabel('CPU Usage %')
plt.show()
Data Pipeline¶
import pandas as pd
def extract_metrics(process_id, hours=24):
"""Extract metrics for a specific process."""
end = datetime.datetime.now(datetime.timezone.utc)
begin = end - datetime.timedelta(hours=hours)
sql = f"""
SELECT time, name, value, unit
FROM view_instance('measures', '{process_id}')
WHERE time >= '{begin.isoformat()}'
ORDER BY time;
"""
return client.query(sql, begin, end)
def analyze_performance(df):
"""Analyze performance metrics."""
metrics = {}
for name in df['name'].unique():
data = df[df['name'] == name]['value']
metrics[name] = {
'mean': data.mean(),
'max': data.max(),
'min': data.min(),
'std': data.std()
}
return metrics
# Use in pipeline
process_metrics = extract_metrics('my-service-123')
performance_summary = analyze_performance(process_metrics)
print(performance_summary)
Next Steps¶
- Schema Reference - Understand available views and fields
- Functions Reference - Learn about SQL functions
- Query Patterns - Common observability query patterns
- Performance Guide - Optimize your queries