Quarantine data sources¶
Every data source in your workspace has an associated quarantine data source that stores data that doesn't fit the schema. If you send rows that don't match the data source schema, they're automatically sent to the quarantine table so that the ingest process doesn't fail.
By convention, quarantine data sources follow the naming pattern {datasource_name}_quarantine
. You can review quarantined rows at any time or perform operations on them using Pipes. This is a useful source of information when fixing issues in the origin source or applying changes during ingest.
Review quarantined data¶
To check your quarantine data sources, run the tb sql
command. For example:
tb sql "select * from <datasource_name>_quarantine limit 10"
A sample output of the tb sql
command is the following:
────────────────────────────────────────────────────────────────── c__error_column: ['abslevel'] c__error: ["value '' on column 'abslevel' is not Float32"] c__import_id: 01JKQPWT8GVXAN5GJ1VBD4XM27 day: 2014-07-30 station: Embassament de Siurana (Cornudella de Montsant) volume: 11.57 insertion_date: 2025-02-10 10:36:20 ──────────────────────────────────────────────────────────────────
The quarantine data source schema contains the columns of the original row and the following columns with information about the issues that caused the quarantine:
c__error_column
Array(String) contains an array of all the columns that contain an invalid value.c__error
Array(String) contains an array of all the errors that caused the ingestion to fail and led to the row being stored in quarantine. This column, along withc__error_column
, allows you to easily identify which columns have problems and what the specific errors arec__import_id
Nullable(String) contains the job's identifier in case the column was imported through a job.insertion_date
(DateTime) contains the timestamp in which the ingestion was done.
Fixing quarantined data example¶
Using the Electric Vehicle Population Data example:
tb create \ --data "https://data.wa.gov/api/views/f6w7-q2d2/rows.csv?accessType=DOWNLOAD" \ --prompt "Create an endpoint that ranks EV models. It should return all types by default, with optional type and limit parameters"
You build the project and get the following quarantine error: Error appending fixtures for 'rows': There was an error with file contents: 564 rows in quarantine.
tb dev » Building project... ✓ datasources/rows.datasource created ✓ endpoints/rows_endpoint.pipe created ✓ endpoints/model_ranking.pipe created Error appending fixtures for 'rows': There was an error with file contents: 564 rows in quarantine. ✓ Build completed in 9.1s Watching for changes... tb »
Inspecting the rows_quarantine
data source:
tb » select distinct c__error from rows_quarantine » Running QUERY ──────────────────────────────────────────────────────────────────────────────────────────── c__error: ["value '' on column 'postal_code' is not Int64", "value '' on column 'legislative_district' is not Int16", "value '' on column 'c_2020_census_tract' is not Int64"] ──────────────────────────────────────────────────────────────────────────────────────────── c__error: ["value '' on column 'electric_range' is not Int32", "value '' on column 'base_msrp' is not Int64"] ──────────────────────────────────────────────────────────────────────────────────────────── c__error: ["value '' on column 'legislative_district' is not Int16"] ────────────────────────────────────────────────────────────────────────────────────────────
The problem is that some columns should be Nullable or have a DEFAULT value. Let's proceed with adding a DEFAULT value of 0 for them.
Edit the datasources/rows.datasource
file.
datasources/rows.datasource
DESCRIPTION > Generated from https://data.wa.gov/api/views/f6w7-q2d2/rows.csv?accessType=DOWNLOAD SCHEMA > `vin__1_10_` String, `county` String, `city` String, `state` String, `postal_code` Int64 DEFAULT 0, `model_year` Int32, `make` String, `model` String, `electric_vehicle_type` String, `clean_alternative_fuel_vehicle__cafv__eligibility` String, `electric_range` Int32 DEFAULT 0, `base_msrp` Int64 DEFAULT 0, `legislative_district` Int16 DEFAULT 0, `dol_vehicle_id` Int64, `vehicle_location` String, `electric_utility` String, `c_2020_census_tract` Int64 DEFAULT 0
The dev server will rebuild the edited resources.
tb » ⟲ Changes detected in rows.datasource » Rebuilding project... ✓ datasources/rows.datasource changed ✓ Rebuild completed in 1.1s
No errors now, you're good to continue developing.
Recovering data from quarantine¶
Once you've fixed the schema issues that caused data to be quarantined, you can recover the quarantined data back to your main data source. The approach depends on the amount of quarantined data you need to recover.
Small datasets¶
For small amounts of quarantined data, you can recover it directly using the CLI:
- Export the fixed data from quarantine to a CSV file:
tb --cloud --output csv sql "select <query_fixing_issues> from ds_quarantine" --rows-limit 120 > rows.csv
Replace <query_fixing_issues>
with a query that transforms the quarantined data to match your fixed schema. For example:
tb --cloud --output csv sql "select vin__1_10_, county, city, state, COALESCE(postal_code, 0) as postal_code, model_year, make, model from rows_quarantine" --rows-limit 120 > rows.csv
- Deploy your schema changes to the workspace:
tb --cloud deploy
- Append the recovered data to your data source:
tb --cloud datasource append ds rows.csv
Large datasets (more than 120 rows)¶
For larger amounts of quarantined data, use a three-step deployment process with temporary resources:
Step 1: Deploy temporary resources¶
Create a temporary data source and copy pipe to process the quarantined data:
tb --cloud deploy
This deployment should include:
- A temporary data source to host the recovered data
- A copy pipe that transforms quarantined data and writes to the temporary data source
Step 2: Trigger the copy and deploy final schema¶
- Trigger the copy pipe to process quarantined data into the temporary data source:
tb --cloud copy run <copy_pipe_name>
- Deploy the final schema changes and create a copy pipe from the temporary data source to your fixed main data source:
tb --cloud deploy
- Trigger the final copy from temporary to main data source:
tb --cloud copy run <final_copy_pipe_name>
Step 3: Clean up temporary resources¶
Deploy a final time to remove the temporary data source and copy pipes:
tb --cloud deploy
This approach ensures that large datasets are processed efficiently without hitting CLI limits while maintaining data integrity throughout the recovery process.