Ingest from Snowflake using incremental updates

Read on to learn how to incrementally load data from a Snowflake table into Tinybird, using Amazon S3 as an intermediary staging area.

An incremental loading strategy ensures that only new or updated rows are transferred.

Before you start

Before you begin, ensure the following:

  • Snowflake Account: A Snowflake instance with the data you want to load.
  • Amazon S3 Bucket: Access to an S3 bucket for staging data, along with appropriate permissions (write from Snowflake and read from Tinybird).
  • Tinybird Account: A Tinybird workspace with an appropriate data source set up.
  • Snowflake Permissions: Ensure the Snowflake user has privileges to:
    • Access the target table.
    • Create and manage stages.
    • Unload data into S3.
  • AWS Credentials: Ensure Snowflake can use AWS credentials (IAM Role or Access Key/Secret Key pair) to write to the S3 bucket.
1

Create the unload task in Snowflake

Follow these steps to create the unload task in Snowflake:

  1. Grant the required permissions in AWS IAM Console

Make sure the S3 bucket allows Snowflake to write files by setting up an appropriate IAM role or policy. You can use this template to create the policy and attach it to the AWS role:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": ["s3:PutObject", "s3:AbortMultipartUpload"],
            "Resource": "arn:aws:s3:::your-bucket-name/path/*"
        }
    ]
}

Replace your-bucket-name/path/* with your bucket name, and optionally the path you want to grant access to.

  1. Create the storage integration

Run the following SQL statement to create the storage integration:

/* Create the S3 integration.
 */
CREATE or replace STORAGE INTEGRATION tinybird_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = '<arn_role>'
  STORAGE_ALLOWED_LOCATIONS = ('*');

-- describe integration tinybird_integration;

Replace <arn_role> with the ARN of the role created in the previous step.

  1. Create the file format

Run the following SQL statement to create the file format:

/* Create the file format for the output files generated.
 */
CREATE OR REPLACE FILE FORMAT csv_unload_format
  TYPE = 'CSV';
  1. Create the stage

Run the following SQL statement to create the stage:

/* And finally the stage we'll use to unload the data to.
 */
CREATE or replace STAGE tinybird_stage
  STORAGE_INTEGRATION = tinybird_integration
  URL = 's3://your-bucket-name/path/'
  FILE_FORMAT = csv_unload_format;

Replace your-bucket-name and path with your S3 bucket details.

2

Create the unload task

Run the following SQL statement to create the scheduled task that unloads the new records since the last successful execution to the S3 bucket:

/* Create the scheduled task that unloads the new records since
 * last successful execution to the S3 bucket.
 * 
 * Note how it reads the timestamp of the last successful execution,
 * and leaves a one hour margin.
 *
 * Orders need to be deduplicated later in Tinybird.
 */
CREATE or replace TASK export_order_deltas
    WAREHOUSE = compute_wh
    SCHEDULE = 'USING CRON 05 * * * * UTC'
AS
BEGIN
   LET sql := 'COPY INTO @tinybird_stage/orders/orders_<ts> from (
    select
        O_ORDERKEY, O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE,
        O_ORDERPRIORITY, O_CLERK
    from tinybird.samples.orders_incremental
    where o_orderdate >= (
        SELECT coalesce(timestampadd(hour,-1,max(QUERY_START_TIME)),\'1970-01-01\')
        FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME=>\'export_order_deltas\'))
        where state = \'SUCCEEDED\'
        ORDER BY SCHEDULED_TIME
    )) max_file_size=1000000000';

   sql := REPLACE(sql, '<ts>', TO_VARCHAR(CONVERT_TIMEZONE('UTC',current_timestamp()), 'YYYY_MM_DD__hh24_mi_ss'));
   
   EXECUTE IMMEDIATE (sql);

   RETURN sql;
END;
3

Configure the ingestion in Tinybird

Create the S3 connection. See S3 Connector.

You can use the following schema for the data source:

tinybird/datasources/s3_landing_ds.datasource - data source with S3 connection
SCHEMA >
    `O_ORDERKEY` Int64,
    `O_CUSTKEY` Int64,
    `O_ORDERSTATUS` String,
    `O_TOTALPRICE` Float32,
    `O_ORDERDATE` DateTime64(3),
    `O_ORDERPRIORITY` String,
    `O_CLERK` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYear(O_ORDERDATE)"
ENGINE_SORTING_KEY "O_ORDERDATE, O_ORDERPRIORITY, O_CLERK"

IMPORT_CONNECTION_NAME 'tinybird-tb-s3'
IMPORT_BUCKET_URI 's3://tinybird-tb/snowflake/csv/orders/*.csv.gz'
IMPORT_SCHEDULE '@auto'

Deploy the data source. The new files Snowflake writes to the bucket are automatically ingested by Tinybird in a few seconds.

4

Handle duplicates in Tinybird

Use a materialized view to handle duplicates in Tinybird. For example:

tinybird/materializations/mat_s3_data.pipe - pipe to materialize data
NODE mat_s3_data_0
SQL >

    SELECT *
    FROM landing_ds

TYPE materialized
DATASOURCE deduplicate_rmt_mv

If you only need to work with the latest snapshot of the data, include a file_ingested_at field in the materializing pipe.

This is important in cases where your incremental loads from Snowflake don’t indicate deleted records. Since a deleted record from a previous ingest won’t be overwritten by a new ingest, it will persist in the dataset. By filtering downstream on the file_ingested_at field, you can exclude these stale records and isolate only the most recent ingest. For example:

tinybird/materializations/mat_s3_data_latest.pipe - pipe to materialize data with timestamp
NODE mat_s3_data_latest_0
SQL >

    SELECT *, toDateTime(now()) as file_ingested_at
    FROM landing_ds

TYPE materialized
DATASOURCE deduplicate_rmt_mv

In the target datasource, the ReplacingMergeTree and ENGINE_VER options will deduplicate records with the same sorting key value.

tinybird/datasources/deduplicate_rmt_mv.datasource - Replacing Merge Tree to deduplicate data
SCHEMA >
    `O_ORDERKEY` Int64,
    `O_CUSTKEY` Int64,
    `O_ORDERSTATUS` String,
    `O_TOTALPRICE` Float32,
    `O_ORDERDATE` DateTime64(3),
    `O_ORDERPRIORITY` String,
    `O_CLERK` String,
    `file_ingested_at` DateTime

ENGINE "ReplacingMergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(O_ORDERDATE)"
ENGINE_SORTING_KEY "O_ORDERKEY"
ENGINE_VER "file_ingested_at"

Finally, create a pipe to query the deduplicated data source. Filter on the maximum timestamp to get the latest snapshot, rounding the timestamp based on your import schedule. This ensures that you query the most recent snapshot with no duplicates.

tinybird/endpoints/get_snapshot.pipe - query for latest snapshot
NODE get_snapshot_0
SQL >
    
    WITH (SELECT max(toStartOfDay(file_ingested_at)) FROM deduplicate_rmt_mv) AS latest_file_version
    SELECT *
    FROM deduplicate_rmt_mv
    FINAL
    WHERE toStartOfDay(file_ingested_at) = latest_file_version

TYPE endpoint

Remember to use final when querying a ReplacingMergeTree.

5

Next steps

See the following resources:

Updated