Deduplication Strategies in ClickHouse

Intermediate
Deduplicating data is one of the most common problems when dealing with analytical databases like ClickHouse. Here you'll learn several strategies for it.

OLAP databases like ClickHouse are optimized for fast ingestion and, for that to work, some trade-offs have to be made. One of them is the lack of unique constraints, since enforcing them would add a big overhead and make ingestion speeds too slow for what's expected from a database of this kind.

Nonetheless, there will be lots of times when you will have duplicated data and you will want to get rid of it, or just access the latest data point available. There are several ways to go about it, and the general workflow will be ingesting all of the data first, including duplicates, and dealing with them later.

The problem

Typically, there are two use-cases where you'll end up with duplicated data:

  • Upserts. An upsert is an operation that inserts rows into a database table if they do not already exist, or updates them if they do. On databases like Postgres that'd be accomplished with ON CONFLICT DO INSERT clauses, but as ClickHouse doesn't enforce uniqueness of primary keys, such clauses aren't supported either. The way to do upserts on ClickHouse is with a ReplacingMergeTree engine. More on this later.
  • Constant ingestions and historical data. Imagine you are periodically dumping data from your transactional database to ClickHouse to run analytical queries on it in real-time, like described here. You'd end up with lots of rows inserted at different times with the same primary key. You may want to get only the latest data point - in this case you can get rid of the rest and treat them like upserts. But there are also use-cases where you want to keep all the data to have a historic record of the evolution of the attributes of an object over time.

A real-world example

We've created a dataset that resembles what a social media analytics company would have to track page views of posts over time. It has data for about 10000 posts over every day of a year. It looks like this:

We'll use the same dataset to explain what's the best way to deduplicate data by post to do last-point analytics on it, for the two use cases we've described before: upserts and historical data.

We've created a GitHub repo with the data project to generate the data that also contains the Tinybird Data Source definitions. We'll also use our CLI in this guide.

Let's first define a Data Source like this to store the data, where every day you'd append the total views for each post for that day.

{% tip-box title="Selecting the right sorting keys" %}If you're going to do lots of filtering by ``post_id``, to keep the scanned index range as small as possible it's better to sort first by ``post_id`` and then by ``date``. Read this or Take our 'Principles of Real Time Analytics' free course to learn more about how to define the sorting of your indexes better {% tip-box-end %}

Now create the data source and append data to it.

The solutions

Then, there are basically four strategies to deduplicate data:

  1. Doing it at query time
  2. Using a ``ReplacingMergeTree`` engine (you'll also have to use another one on top of it because the deduplication process is asynchronous and there will be duplicated data after insertions)
  3. Using Materialized Views
  4. Using snapshots.

These solutions are ordered from more simple to more complex ones.


The bigger your data is, the more likely it is that you'll need to use a more complex solution in order to have really good performance.

Deduplicating on query time

Imagine you are interested only in the latest value of views for each post. In that case, you can deduplicate data on ``post_id`` and get the latest value with these strategies:

  • Get the max date for each post in a subquery and then filter by its results
  • Group data by ``post_id`` and use the argMax function
  • Use the LIMIT BY clause

Depending on your data and how you define the sorting keys in your Data Sources to store it on disk, one approach will be faster than others.

In general, deduplicating at query time will be fine if the size of your data is small. But if you have lots of data, the best option to make your query faster will be using a Materialized View to pre-compute the latest value of views for each ``post_id``. We'll show you how to do that in the next section.

Doing upserts

If you have lots of data and you only care about the latest insertion for each unique key, you can use a ReplacingMergeTree engine. You need to use these two engine options to use ``ReplacingMergeTree`` engines: ``engine_sorting_key`` and ``engine_ver``.

  • Rows with the same ``engine_sorting_key`` will be deduplicated. You can select one or more columns.
  • ``engine_ver`` can be omitted and in that case the last inserted row for each unique ``engine_sorting_key`` will be kept. If you specify it (the type of ``engine_ver`` has to be ``UInt*``, ``Date`` or ``DateTime``), the row with the highest ``engine_ver`` for each unique ``engine_sorting_key`` will be kept.

Define a data source like this

and then you'd push it to Tinybird with the CLI (using Docker here) and append data to it as we did before:

An important note from the docs:

"Data deduplication occurs only during a merge. Merging occurs in the background at an unknown time, so you can’t plan for it. Some of the data may remain unprocessed. Thus, ReplacingMergeTree is suitable for clearing out duplicate data in the background in order to save space, but it doesn’t guarantee the absence of duplicates."

{% tip-box title="Frequency of merges" %}Merging will happen in the background, most likely every 9-10 minutes, but if ClickHouse considers that you don't have enough data it won't happen. So doing ``select * from ...`` will most likely give you duplicated rows (even when using a ReplacingMergeTree engine) after an insertion, and you'll need to use some other strategy to select only the last value on query time.{% tip-box-end %}

Note also that we used a new query here, using the FINAL modifier. It merges the data on query time and is not generally recommended because it will usually make queries slower.

By the time you're reading this, mostly likely the engine will have already performed the deduplication and the above queries will be very fast. Until that happens, their performance will be the same as if you were using a ``MergeTree`` engine with all the duplicated data, as in the first queries. After inserting data, the performance of the query using ``FINAL`` will be similar to the one using ``LIMIT BY``, and the one using the subquery will be 5-10X faster, based on our tests.

Using materialized views

You could create a Materialized View (MV) to store the last state of your data if:

  1. Your data is quite big. You'll get there fast if the resolution of your data is hourly or lower instead of daily and you save data for 10x or 100x the number of posts here.
  2. You also want to keep a historical record, without discarding old data - this is key. If you don't care about historical data you don't need the MV at all, a ``ReplacingMergeTree`` is all you need.
  3. You want to be able to make last-point analytics (like we've done before) very fast, in the millisecond range.

To create a MV you just need:

  • an origin Data Source, that you'll use to ingest the data. ``posts_views_historical.datasource``, in this case
  • a destination Data Source that partially aggregates the data
  • a transformation pipe that connects both Data Sources and performs the final aggregation.

This is ``posts_views_latest_agg.datasource``, the destination data source:

And this is the transformation pipe to connect both data sources:

Push the new data source and the MV like this:

Finally, this query computes the latest point computing all the aggregations.

Snapshots

Using an``AggregatingMergeTree`` will work fine when you can filter the data and therefore, you do not need to read much information. This is not always possible,  there are situations when we need to run queries over big amounts of data that we need to be deduplicated.

Let's imagine we want to sync our MySQL database with Tinybird in order to speed up the analytical queries and generate some endpoints for our back office. We want to be as real-time possible, so follow a change data capture (CDC) approach. To simplify, syncer will be generating CSVs and pushing them to TInybird, but we could be using a Debezium + Kafka approach as well.


Data flow from MySQL to TInybird

Once our syncer is integrated, let's take a look at one of our tables we want to sync, the ``users`` table.


In order to generate the same information in Tinybird, we are going to need to create two data sources: ``users.datasource`` and ``users_landing.datasource``

  • ``users_landing.datasource`` where we append all the changes that are happening in the MySQL table. We will create a row for each insert, update or delete that our MySQL table generates.
  • ``users.datasource`` where we will be storing every snapshot we take. We will generate a new snapshot by composing all the changes and merging them with the previous snapshot.

The ``users_landing.datasource`` will have the same fields that we have in our MySQL table with two extra columns:

  • ``operation`` will indicate what kind of operation happened to the row (INSERT, UPDATE, DELETE).
  • ``inserted_at`` will indicate when the row was inserted in the data source. We will use this column to know if it is already inside a snapshot or not.

Then, our ``users.datasource`` will have the same fields that we have in our MySQL table with an extra column, ``snapshot_time``. In this table we are going to be storing every snapshot, so this column will help us to identify the latest snapshot that we want to use.


Now that we have both data sources created, we need a materialized view ( MV ) to connect them.

To learn more about defining MVs with Tinybird, check out our docs.

Other interesting resources:

ON THIS GUIDE