Connect ClickHouse Python Client to Tinybird¶
The official ClickHouse Python client can connect to Tinybird using the ClickHouse® HTTP protocol compatibility. This enables you to query your Tinybird data sources programmatically from Python applications with excellent performance and integration with the Python data ecosystem.
The ClickHouse® connection to Tinybird is read-only. You can use it to query and analyze data from your Tinybird data sources, but you cannot modify data through this connection.
Prerequisites¶
- Python 3.8 or later
- A Tinybird workspace with data sources
- A Tinybird Auth Token with read permissions for the workspace data sources
Installation¶
Install the official ClickHouse Python client:
pip install clickhouse-connect
Configuration¶
Create a client instance with the following configuration:
import clickhouse_connect
client = clickhouse_connect.get_client(
interface='https',
host='clickhouse.tinybird.co',
username='<WORKSPACE_NAME>', # Optional, for identification
password='<TOKEN>', # Your Tinybird Auth Token
port=443,
)
See the list of ClickHouse hosts to find the correct one for your region.
Test the connection¶
Create a simple query to verify your connection:
def test_connection():
try:
result = client.query('SELECT database, name, engine FROM system.tables LIMIT 5')
print("Connection successful!")
# Print column names
print(" | ".join(result.column_names))
# Print data rows
for row in result.result_rows:
print(" | ".join(str(cell) for cell in row))
except Exception as error:
print(f"Connection failed: {error}")
test_connection()
Query your data¶
Once connected, you can query your Tinybird data sources:
# Query a specific data source
def query_data_source():
result = client.query('SELECT * FROM your_data_source_name LIMIT 10')
return result.result_rows
# Query with parameters
def query_with_params(start_date, limit):
result = client.query(
"""
SELECT timestamp, user_id, event_name
FROM events
WHERE timestamp >= %(start_date)s
LIMIT %(limit)s
""",
parameters={
'start_date': start_date,
'limit': limit
}
)
return result.result_rows
# Convert to pandas DataFrame
def query_to_dataframe():
import pandas as pd
result = client.query('SELECT * FROM your_data_source_name LIMIT 100')
# Create DataFrame from result
df = pd.DataFrame(result.result_rows, columns=result.column_names)
return df
Observability¶
You can also explore Tinybird's observability data from internal service data sources:
# View recent API endpoint usage and performance
def query_recent_usage():
result = client.query(
"""
SELECT
start_datetime,
pipe_name,
duration,
result_rows,
read_bytes,
status_code
FROM tinybird.pipe_stats_rt
WHERE start_datetime >= now() - INTERVAL 1 DAY
ORDER BY start_datetime DESC
LIMIT 10
"""
)
print("Recent API endpoint usage:")
for row in result.result_rows:
start_time, pipe_name, duration, result_rows, read_bytes, status_code = row
duration_ms = duration * 1000
print(f"[{start_time}] {pipe_name} - {duration_ms:.2f}ms, {result_rows} rows, status {status_code}")
# Analyze endpoint performance metrics
def query_performance_metrics():
result = client.query(
"""
SELECT
pipe_name,
count() as request_count,
avg(duration) as avg_duration_ms,
avg(result_rows) as avg_result_rows,
sum(read_bytes) as total_bytes_read
FROM tinybird.pipe_stats_rt
WHERE start_datetime >= now() - INTERVAL 1 HOUR
GROUP BY pipe_name
ORDER BY request_count DESC
"""
)
print("\nEndpoint performance metrics (last hour):")
for row in result.result_rows:
pipe_name, request_count, avg_duration, avg_result_rows, total_bytes_read = row
avg_duration_ms = avg_duration * 1000
print(f"{pipe_name}: {request_count} requests, {avg_duration_ms:.2f}ms avg, {avg_result_rows:.0f} avg rows, {total_bytes_read:,} bytes")
# Convert to pandas DataFrame for analysis
def query_to_dataframe():
import pandas as pd
result = client.query(
"""
SELECT
start_datetime,
pipe_name,
duration,
result_rows,
read_bytes,
status_code
FROM tinybird.pipe_stats_rt
WHERE start_datetime >= now() - INTERVAL 1 DAY
ORDER BY start_datetime DESC
"""
)
df = pd.DataFrame(result.result_rows, columns=result.column_names)
return df
Working with data types¶
Handle various ClickHouse data types:
from datetime import datetime
from typing import Any, List, Dict
def handle_query_results():
result = client.query(
"""
SELECT
toUInt32(42) as number,
'hello world' as text,
now() as timestamp,
[1, 2, 3] as array_col,
map('key1', 'value1', 'key2', 'value2') as map_col
"""
)
for row in result.result_rows:
number, text, timestamp, array_col, map_col = row
print(f"Number: {number} (type: {type(number)})")
print(f"Text: {text} (type: {type(text)})")
print(f"Timestamp: {timestamp} (type: {type(timestamp)})")
print(f"Array: {array_col} (type: {type(array_col)})")
print(f"Map: {map_col} (type: {type(map_col)})")
Error handling¶
Handle connection and query errors appropriately:
from clickhouse_connect.driver.exceptions import ClickHouseError
def safe_query(query, parameters=None):
try:
result = client.query(query, parameters=parameters)
return result
except ClickHouseError as error:
print(f"ClickHouse error: {error}")
print(f"Error code: {error.code}")
raise
except Exception as error:
print(f"Connection error: {error}")
raise
# Example usage
try:
result = safe_query('SELECT COUNT(*) FROM your_data_source')
count = result.result_rows[0][0]
print(f"Total rows: {count:,}")
except Exception:
print("Query failed")
Advanced usage¶
Streaming large results¶
For large datasets, use streaming to avoid memory issues:
def stream_large_query():
# Stream results for large datasets
result = client.query_row_block_stream(
'SELECT * FROM large_data_source',
settings={'max_block_size': 10000}
)
for block in result:
# Process each block of rows
for row in block:
process_row(row)
def process_row(row):
# Your row processing logic here
pass
Custom settings¶
Pass ClickHouse settings for query optimization:
# Query with custom settings
result = client.query(
'SELECT * FROM system.numbers LIMIT 5',
settings={
'max_result_rows': 5,
'max_execution_time': 10
}
)