Published on

Lost in the Haystack: Optimizing an Expensive ClickHouse Query

Authors

Imagine I asked you to find a needle in a haystack. That'd be pretty difficult, right? Now imagine I asked you to hold the haystack while you searched and even if you found the needle you had to keep holding the haystack until you looked at every piece of hay. That would just be mean. Well, that's how I treated ClickHouse and it's probably why it stopped returning my calls.

This is an accurate representation of what ClickHouse had to do to support the Session Replay product and it had real-world implications. For our largest customers, searching was impossible. We didn't have enough memory to answer search and sort queries. For more moderately sized customers, search was possible but it was unbearably slow taking more than 10 seconds to respond.

Today these heavy queries are more than 10x faster and our memory usage is 100x lower (and, more importantly, bounded). Our largest customers no longer see errors when searching for replays and we can now support customers of arbitrary size without running out of memory. This is how we did it.

Overview of the Replays Query

A replay is not a singular event. It's multiple events aggregated together. You can perform that aggregation when you write, when you read, or asynchronously at some undefined time. But you have to aggregate! At Sentry, we chose to aggregate on-demand when a user wants to see their replays. This has benefits but also imposes its own set of constraints.

One of those constraints is cardinality. As it turns out our aggregation key has high-cardinality. We could have millions of unique aggregations in memory at once. This many aggregation keys means you have to be strategic about the data you aggregate. Otherwise you risk running out of memory. But our customers don't care about that. They want to see the data they paid us to store. This is the central conflict we need to resolve. How do we minimize our memory usage while simulataneously returning useful results to our customers?

Minimizing Memory Usage

When Reading Data

Let's look at an example query.

SELECT replay_id, groupArray(url)
FROM replay_events
GROUP BY replay_id
LIMIT 1

The total memory usage of this query is the sum of every unique replay-id plus every url stored in the database. We absolutely can not hold this in memory. But we need to return the aggregated urls for that replay-id. How do we resolve this?

For every request made by a user to the Replay's service we can make two queries to the database. The first query is a preflight. It returns a set of replay-ids after all search and sort conditions are applied. The second query is a data query. It fetches all the data needed to satisfy the user's request using the replay-ids returned by the preflight.

Let's transform our example query into this new format:

-- Preflight Query
SELECT replay_id
FROM replay_events
GROUP BY replay_id
LIMIT 1

-- Data Query
SELECT replay_id, groupArray(url)
FROM replay_events
WHERE replay_id = 'my_replay_id'
GROUP BY replay_id
LIMIT 1

The total memory footprint of our preflight query has been reduced by the uncompressed size of the url column. But our data query hasn't increased by that amount. It's only increased by the uncompressed size of the url column for that replay-id.

Terrific! With this pattern we can now read the url column without running out of memory.

When Filtering Data

Let's look at another example query:

SELECT replay_id
FROM replay_events
GROUP BY replay_id
HAVING has(groupArray(url), 'sentry.io')
LIMIT 1

We've adopted our changes from the previous step but now, to answer a search condition, we're aggregating the url column in the HAVING clause. This has identical memory usage to SELECTing the column directly. So how do you ask the question "is some value contained within the aggregated set" when the aggregated set is too large to fit in memory? You "stream" it!

Instead of aggregating a column and then asking it some question, you can ask the column a question and aggregate its answer before finally asking a question about the aggregated answer! Clear as mud? Let's demonstrate the concept with SQL.

Let's ask the same question again: "does sentry.io exist in the set of aggregated urls". How should we phrase this in SQL? There's the straightforward approach has(groupArray(url), 'sentry.io') and then there's the streaming approach sum(url = 'sentry.io') > 0. What we've done here is subtle but has huge implications. Instead of aggregating the url we're aggregating the result of the condition "does this term match this value" which is represented as either a 0 or 1.

The memory usage from aggregating these tiny integers is minimal meaning we might consume 100x less memory to answer the same question. Also, the query now consumes memory proportional to the number of unique aggregation keys (rather than consuming the uncompressed size of the filtered column). The implication of this change might not be obvious but consuming memory in this manner is predictable and allows us to control the memory usage of the query through code and through ClickHouse configuration!

When Ordering Data

Let's look at another query example.

SELECT replay_id
FROM replay_events
GROUP BY replay_id
ORDER BY sum(length(error_ids)) DESC

Our dataset has several heavy columns. For example, we keep an array of the errors encountered for each replay event received. We typically like to represent these values as counts for sorting and in their raw state for searching.

When we compute the counts we call sum(length(column)). This works but it requires reading those heavy columns. This increases the number of bytes read, decreases the rate of rows scanned, and increases memory usage significantly. Memory usage increases so much that for our largest customers sorting against these arrays is not possible.

Once again we've run up against this memory constraint but now we're out of tricks. We can't solve this problem at read-time. It has to be solved deeper in the stack.

An interesting feature of ClickHouse is its ability to materialize a column. Materialized columns are functions that are evaluated on insert and stored as some output type T. For example, a materialized error_ids column might look like this:

`count_errors` UInt8 MATERIALIZED length(error_ids)

By materializing the column we reduce a huge array of thousands of bytes into a single 1-byte integer value. The end result is more rows from the materialized column can be loaded into memory at once (better performance) and the total size of those values are thousands of times smaller (better memory usage).

Because this is evaluated on insert, it takes some time to roll out. In the case of Sentry, we have a retention period and old rows gradually fall off the end. After making this change we just have to wait for the duration of the retention period before every row has this optimization.

In the meantime, we can still target the count_errors column and if it doesn't have any data populated it will compute the value at run-time. Performance gradually improves as time goes on.

A Sprinkle of Configuration

Because of our cardinality, memory usage is still too high. There's one final change we need to solve this problem.

Because our memory usage is now proportional to the number of unique aggregation keys we can use a ClickHouse query setting to cap the number of unique aggregation keys held in memory. This is really easy and it's something you can set and forget.

In our preflight we're going to update our query with two new settings. The first is max_rows_to_group_by. A somewhat misnamed setting, it doesn't cap the number of rows used to build your aggregation result. It caps the number of results in your aggregated set. This allows us to compute the maximum memory usage of our query by multiplying the size of a single row by the value of max_rows_to_group_by.

The second option is group_by_overflow_mode. The default configuration is "throw". We don't want that. It's up to you whether you choose "any" or "break". "break" will perform better and "any" could return more accurate results. It depends on how your data is distributed. We've chosen "any" because we don't observe any performance difference at our current scale.

SELECT replay_id
FROM replay_events
GROUP BY replay_id
LIMIT 1
SETTINGS max_rows_to_group_by='1000000', group_by_overflow_mode='any'

Success

We've done it! With these simple changes we've successfully conquered our memory usage. We could stop here and be happy. But there is this unspoken problem that we've not considered. We have to scan the full dataset for every query. For our use case, there are not many ways to avoid that but there are a few and in cases where we do have to perform a table scan it would be nice if those scans were faster.

Making Schema Changes

Encoding Columns

ClickHouse offers the ability to alter the representation of a column on disk with special encodings like LowCardinality. The LowCardinality encoding takes some string value and converts it into an enum representation. We have a few columns that could benefit from this encoding. We should consider applying it there.

We're also using Nullable an awful lot too but should we? Nullable in ClickHouse works differently from other databases. ClickHouse stores null values in a separate file. It's basically a bitmap index of where the nulls are in our column. Your column would then contain the empty state of its datatype in each row position where a null exists. So a string column would be "" and an integer column would be 0. By using null you don't save any space, you're just adding an index that needs to be scanned. So we should consider removing it too.

Let's experiment. What happens if we take the browser_name column and apply these two changes?

Before:

`browser_name` Nullable(String)

After:

`browser_name` LowCardinality(String)

If you've installed ClickHouse with Docker you should have received clickhouse-client. On my machine, I can run docker exec -it clickhouse /usr/bin/clickhouse-client to start the client.

With this tool, I can insert records into the database, run queries against those records, and evaluate the performance of those queries. I'm doing exactly that here to validate these schema changes. I've created a table called replays_test and I've added the old column which uses Nullable(String) and the new column which uses LowCardinality(String). After that I bulk insert 1,000,000 rows and start playing around.

We can run the following query to see the size of the columns on disk:

SELECT
    name,
    formatReadableSize(sum(data_compressed_bytes)) AS compressed_size,
    formatReadableSize(sum(data_uncompressed_bytes)) AS uncompressed_size,
    round(sum(data_uncompressed_bytes) / sum(data_compressed_bytes), 2) AS ratio
FROM system.columns
WHERE table = 'replays_test'
GROUP BY name
ORDER BY sum(data_compressed_bytes) DESC

Which outputs:

┌─name───────┬─compressed_size─┬─uncompressed_size─┬──ratio─┐
│ old        │ 1.90 MiB        │ 4.47 MiB          │   2.36 │
│ new        │ 370.76 KiB      │ 978.55 KiB        │   2.64 │
└────────────┴─────────────────┴───────────────────┴────────┘

As you can see, our encodings have reduced our size on disk by 80%. That's pretty cool and we didn't have to work too hard for it. We can also evaluate how these queries perform when reading these columns by running the following query:

SELECT
    query_duration_ms,
    read_rows,
    read_bytes,
    memory_usage,
    query
FROM system.query_log
ORDER BY event_time DESC
LIMIT 10

Which produces this output:

┌─query_duration_ms─┬─read_rows─┬─read_bytes─┬─memory_usage─┬─query────────────────┐
│                24 │    245760 │    7047494 │     14303428 │ WHERE old = 'Safari' │
│                14 │    286720 │    4874860 │      8743218 │ WHERE new = 'Safari' │
└───────────────────┴───────────┴────────────┴──────────────┴──────────────────────┘

A pre-requisite to running this query is that you've run other queries against your test table. But once you've done that you should see that our new, encoded column has significantly better performance than the old column. We read more rows with less memory usage and it takes us significantly less time to complete the query. Neat!

Be careful running this migration. It can be tough depending on the size of your dataset.

Sidebar. Dropping nullability has implications for your query. For example, the any function will ignore nulls when computing its result. If you have an empty string value in your dataset that was previously marked as null your query may return that value rather than the non-empty string it previously returned. There's a simple solution to this. ClickHouse functions accept an If combinator and this combinator can be used to strip the empty values. Like so: anyIf(column, notEmpty(column)). The second argument to the function is any expression which returns a boolean result.

Applying Indexes

ClickHouse indexes are a little different from indexes in other databases. They don't point to a row. They point to a granule which is a collection of rows. At least one row will contain the indexed value for a given matched granule.

This is important because an index is no guarantee of a fast query. If your value has a 1 in 8192 (or greater) chance of being present on a row then indexing it will give you nothing. ClickHouse will scan every row in the database to answer your query.

Indexes should only be used for values which are rare. A UUID is a great candidate for an index. A person's birth year is not.

I'll leave applying indexes as an exercise for the reader, we have a different problem we need to solve. Session Replay has an aggregated data model. Indexes won't work under these conditions. We need the ability to query for replays in a non-aggregated context.

One possibility is a sub-query. Our replay_id column is indexed and is not under any aggregate function. We can make scalar comparisons against it in the WHERE clause. A sub-query would then need to return a set of replay-ids matching some indexed condition. For example:

WHERE replay_id IN (
    SELECT replay_id
    FROM replay_events
    WHERE error_id = 'my_error_id'
)

We did not end up using this approach. We decided on an alternative. In our preflight query, under certain conditions, we can apply filters in the WHERE clause which reduce the aggregation set but do not alter the outcome of the query. For example, consider the question "show me every replay which contains the error_id x".

Instead of querying like this:

GROUP BY replay_id
HAVING sum(error_id = 'x') > 0

We can query like this:

WHERE error_id = 'x'
GROUP BY replay_id

There are limitations to this. For example, you can't filter by multiple error_ids at the same time. In practice, this optimization can be applied to nearly all of our customer's queries. It also has the benefit of not consuming any memory which is a great win for a query operating in a memory-constrained environment.

Evaluating the "performance" of a query has many dimensions. I've mentioned a couple throughout this post but one dimension in particular is the target of this optimization. Query latency. We know we can find a query's latency by inspecting the system's logs but how scientific is that? Query latency when measured against one or two or ten runs of a query has limited utility. You need a more extensive testing strategy to truly evaluate the latency impact of a change. Fortunately, ClickHouse includes a tool called clickhouse-benchmark for testing this. I'm able to access this utility by entering the following command: docker exec clickhouse /usr/bin/clickhouse-benchmark.

ClickHouse Benchmark accepts a --query parameter followed by a string argument. The benchmarking utility will execute that query thousands of times and at the end of this process you'll have a nice percentile breakdown of how a query performed. Here's the output of our before and after.

Before:

Queries executed: 1018.

localhost:9000, queries 1018, QPS: 94.159, RPS: 94158531.479, MiB/s: 5118.405, result RPS: 94.159, result MiB/s: 0.001.

0.000%          0.009 sec.
10.000%         0.009 sec.
20.000%         0.009 sec.
30.000%         0.009 sec.
40.000%         0.009 sec.
50.000%         0.009 sec.
60.000%         0.010 sec.
70.000%         0.011 sec.
80.000%         0.013 sec.
90.000%         0.014 sec.
95.000%         0.015 sec.
99.000%         0.015 sec.
99.900%         0.016 sec.
99.990%         0.016 sec.

After:

Queries executed: 3401.

localhost:9000, queries 3401, QPS: 474.014, RPS: 8706691.199, MiB/s: 406.868, result RPS: 474.014, result MiB/s: 0.004.

0.000%          0.002 sec.
10.000%         0.002 sec.
20.000%         0.002 sec.
30.000%         0.002 sec.
40.000%         0.002 sec.
50.000%         0.002 sec.
60.000%         0.002 sec.
70.000%         0.002 sec.
80.000%         0.002 sec.
90.000%         0.002 sec.
95.000%         0.002 sec.
99.000%         0.003 sec.
99.900%         0.004 sec.
99.990%         0.009 sec.

A 5x throughput improvement and our p99.99 latency matches our p0 latency on the non-indexed query.

Parting Thoughts

Hopefully you enjoyed this post. ClickHouse is a really interesting piece of technology and well worth your time to learn if you've not looked at it before.

Let's recap what we did:

  1. We added indexes where appropriate.
  2. We encoded our columns where appropriate.
  3. We bounded our memory usage to a multiple of the number of aggregation keys.
  4. We capped the number of aggregation keys that we hold in memory.
  5. We learned how to use the utilities ClickHouse provides to validate our assumptions.

Thanks for reading!