TypeScript SDK resources¶
Use the TypeScript SDK to define Tinybird resources as code with full type inference. You can define datasources, pipes, endpoints, materialized views, copy pipes, sink pipes, and connections in TypeScript and sync them to Tinybird.
Project layout¶
The SDK scaffolds the following files by default:
lib/tinybird.tstinybird.config.mjs
Configure credentials¶
Create a .env.local file in your project root with the Tinybird token and host:
TINYBIRD_TOKEN=p.your_token_here
TINYBIRD_URL=https://api.tinybird.co
Tokens¶
Use defineToken to declare reusable token names, then attach them to datasources or pipes with each resource's tokens option.
import {
defineDatasource,
defineEndpoint,
defineToken,
engine,
node,
t,
} from "@tinybirdco/sdk";
export const eventsAppend = defineToken("events_append");
export const eventsRead = defineToken("events_read");
export const events = defineDatasource("events", {
schema: {
timestamp: t.dateTime(),
payload: t.string(),
},
engine: engine.mergeTree({
sortingKey: ["timestamp"],
}),
tokens: [
{ token: eventsAppend, scope: "APPEND" },
{ token: eventsRead, scope: "READ" },
],
});
export const eventsApi = defineEndpoint("events_api", {
nodes: [
node({
name: "latest",
sql: `SELECT timestamp, payload FROM events ORDER BY timestamp DESC LIMIT 100`,
}),
],
output: {
timestamp: t.dateTime(),
payload: t.string(),
},
tokens: [{ token: eventsRead, scope: "READ" }],
});
Token names must start with a letter or underscore and then contain only alphanumeric characters or underscores. Use READ or APPEND for datasource token bindings, and READ for pipe or endpoint token bindings.
Datasources¶
Define datasources with schemas and engines:
import { defineDatasource, engine, t, type InferRow } from "@tinybirdco/sdk";
export const pageViews = defineDatasource("page_views", {
description: "Page view tracking data",
schema: {
timestamp: t.dateTime(),
pathname: t.string(),
session_id: t.string(),
country: t.string().lowCardinality().nullable(),
},
engine: engine.mergeTree({
sortingKey: ["pathname", "timestamp"],
}),
});
export type PageViewsRow = InferRow<typeof pageViews>;
Endpoints¶
Define API endpoints:
import {
defineEndpoint,
node,
p,
t,
type InferParams,
type InferOutputRow,
} from "@tinybirdco/sdk";
export const topPages = defineEndpoint("top_pages", {
description: "Get the most visited pages",
params: {
start_date: p.dateTime(),
end_date: p.dateTime(),
limit: p.int32().optional(10),
},
nodes: [
node({
name: "aggregated",
sql: `
SELECT pathname, count() AS views
FROM page_views
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
GROUP BY pathname
ORDER BY views DESC
LIMIT {{Int32(limit, 10)}}
`,
}),
],
output: {
pathname: t.string(),
views: t.uint64(),
},
});
export type TopPagesParams = InferParams<typeof topPages>;
export type TopPagesOutput = InferOutputRow<typeof topPages>;
Pipes¶
Define internal pipes that are not exposed as API endpoints:
import { definePipe, node, p } from "@tinybirdco/sdk";
export const filteredEvents = definePipe("filtered_events", {
description: "Filter events by date range",
params: {
start_date: p.dateTime(),
end_date: p.dateTime(),
},
nodes: [
node({
name: "filtered",
sql: `
SELECT * FROM events
WHERE timestamp >= {{DateTime(start_date)}}
AND timestamp <= {{DateTime(end_date)}}
`,
}),
],
});
Materialized views¶
Use materialized views to populate derived datasources:
import {
defineDatasource,
defineMaterializedView,
node,
t,
engine,
} from "@tinybirdco/sdk";
export const dailyStats = defineDatasource("daily_stats", {
schema: {
date: t.date(),
pathname: t.string(),
views: t.simpleAggregateFunction("sum", t.uint64()),
},
engine: engine.aggregatingMergeTree({
sortingKey: ["date", "pathname"],
}),
jsonPaths: false,
});
export const dailyStatsMv = defineMaterializedView("daily_stats_mv", {
datasource: dailyStats,
nodes: [
node({
name: "aggregate",
sql: `
SELECT toDate(timestamp) AS date, pathname, count() AS views
FROM page_views
GROUP BY date, pathname
`,
}),
],
});
Copy pipes¶
Use copy pipes for snapshots or scheduled exports:
import { defineCopyPipe, node } from "@tinybirdco/sdk";
export const dailySnapshot = defineCopyPipe("daily_snapshot", {
datasource: dailyStats,
schedule: "0 0 * * *",
mode: "append",
nodes: [
node({
name: "snapshot",
sql: `SELECT * FROM daily_stats WHERE date = today() - 1`,
}),
],
});
Sink pipes¶
Use sink pipes to publish query results to Kafka or S3:
import { defineSinkPipe, node } from "@tinybirdco/sdk";
export const kafkaEventsSink = defineSinkPipe("kafka_events_sink", {
sink: {
connection: eventsKafka,
topic: "events_export",
schedule: "@on-demand",
},
nodes: [
node({
name: "publish",
sql: `SELECT timestamp, payload FROM kafka_events`,
}),
],
});
export const s3EventsSink = defineSinkPipe("s3_events_sink", {
sink: {
connection: landingS3,
bucketUri: "s3://my-bucket/exports/",
fileTemplate: "events_{date}",
format: "csv",
schedule: "@once",
},
nodes: [
node({
name: "export",
sql: `SELECT timestamp, session_id FROM s3_landing`,
}),
],
});
Connections¶
Define connector connections and reference them in datasources:
import {
defineGCSConnection,
defineKafkaConnection,
defineDynamoDBConnection,
defineDatasource,
engine,
secret,
t,
} from "@tinybirdco/sdk";
export const eventsKafka = defineKafkaConnection("events_kafka", {
bootstrapServers: "kafka.example.com:9092",
securityProtocol: "SASL_SSL",
saslMechanism: "PLAIN",
key: secret("KAFKA_KEY"),
secret: secret("KAFKA_SECRET"),
});
export const landingGCS = defineGCSConnection("landing_gcs", {
serviceAccountCredentialsJson: secret("GCS_SERVICE_ACCOUNT_CREDENTIALS_JSON"),
});
export const kafkaEvents = defineDatasource("kafka_events", {
schema: {
timestamp: t.dateTime(),
payload: t.string(),
},
engine: engine.mergeTree({
sortingKey: ["timestamp"],
}),
kafka: {
connection: eventsKafka,
topic: "events",
groupId: "events-consumer",
},
});
export const gcsLanding = defineDatasource("gcs_landing", {
schema: {
timestamp: t.dateTime(),
session_id: t.string(),
},
engine: engine.mergeTree({
sortingKey: ["timestamp"],
}),
gcs: {
connection: landingGCS,
bucketUri: "gs://my-gcs-bucket/events/*.csv",
schedule: "@auto",
},
});
DynamoDB¶
The DynamoDB connector mirrors an Amazon DynamoDB table into Tinybird: an initial backfill via a Point-in-Time Recovery export to S3, followed by continuous Change Data Capture from DynamoDB Streams. See the DynamoDB connector page for the table requirements and IAM setup.
Defining the connection in code isn't enough. Tinybird assumes an IAM role in your own AWS account, and the role's trust policy must include a Workspace-specific external ID that only Tinybird generates. Run tb connection create dynamodb to get the external ID, the Principal account, and the policy JSON, then create the role in AWS and store its ARN as the DYNAMODB_ROLE_ARN secret (set it from the dashboard or with tb secret set). Without the correct external ID, deploy fails with a 403 ... external ID error. See the DynamoDB connector page for the full setup.
Define the connection with defineDynamoDBConnection. It takes a region and an arn (the IAM role Tinybird assumes to read your table, its stream, and the S3 export bucket); both are required:
import { defineDynamoDBConnection, secret } from "@tinybirdco/sdk";
export const eventsDynamo = defineDynamoDBConnection("events_dynamo", {
region: "us-east-1",
arn: secret("DYNAMODB_ROLE_ARN"),
});
The arn is read from a workspace secret. The SDK has no command for creating secrets, so set DYNAMODB_ROLE_ARN from the Tinybird dashboard or with tb secret set DYNAMODB_ROLE_ARN <role-arn> before you deploy. See the DynamoDB connector page for the IAM role and trust policy the ARN must point to.
Each row represents a change to the table, so map the change record's fields with jsonPath() and use a replacingMergeTree engine with ver and isDeleted to query the current state. Attach the connection through the dynamodb key with the table ARN and the name of an S3 exportBucket you own (the bucket name only, with no s3:// prefix):
import { defineDatasource, t, engine } from "@tinybirdco/sdk";
import { eventsDynamo } from "./connections";
export const dynamoEvents = defineDatasource("dynamo_events", {
schema: {
id: t.string().jsonPath("$.Id"),
_record: t.string().jsonPath("$.NewImage"),
_timestamp: t.dateTime64(3).jsonPath("$.ApproximateCreationDateTime"),
_event_name: t.string().lowCardinality().jsonPath("$.eventName"),
_is_deleted: t.uint8().jsonPath("$._is_deleted"),
},
engine: engine.replacingMergeTree({
sortingKey: ["id"],
ver: "_timestamp",
isDeleted: "_is_deleted",
}),
dynamodb: {
connection: eventsDynamo,
tableArn: "arn:aws:dynamodb:us-east-1:123456789012:table/events",
exportBucket: "my-export-bucket",
},
});
Next steps¶
- Review the SDK CLI: TypeScript SDK CLI commands
- Follow the SDK quickstart: Quick start: TypeScript SDK