Amazon Redshift is a data warehouse that’s orders of magnitudes cheaper than traditional alternatives. Many companies use it, because it’s made data warehousing viable for smaller companies with a limited budget. Since so many Heap customers use Redshift, we built Heap SQL to allow them to sync their Heap datasets to their own Redshift clusters. Combined with Heap’s capture-everything philosophy, it enables some powerful flows: customers can define an event in our web UI, and then run arbitrary SQL on all historical instances of that event!
With Heap SQL, we’re syncing large amounts of data across ~80 Redshift clusters on a daily basis. At first, the sync process we designed was too slow to be viable for large customers. We tried a lot of different things to make it stable and scalable, and in doing so we learned a lot about Redshift and how it’s different from Postgres. This blog post describes some of our experience with Redshift and its various quirks.
Redshift is a cloud-based data warehouse offered by Amazon. It exposes a Postgres-like interface, but under the hood it’s different in a couple ways:
- Data is stored in columns– Unlike Postgres, Redshift is a column store. This means it stores table data organized in terms of columns, rather than rows, so a query that touches a small number of columns on a table can read the columns that are relevant and ignore the rest. Column stores have much better I/O characteristics for analytical workloads (large joins involving a small number of columns, batch inserts), but are typically slower for transactional workloads(lots of small inserts and updates).
- It’s distributed– A Redshift cluster consists of several compute nodes orchestrated by one leader node. Each table has a user-specified distribution key, which determines how rows in the table are sharded across compute nodes.
- It doesn’t support indexes– You can’t define indexes in Redshift. Instead, each table has a user-specified sort key, which determines how rows are ordered . The query planner uses this information to optimize queries.
- Constraints aren’t enforced– Redshift doesn’t enforce primary or foreign key constraints. This makes batch inserts fast, but makes it easy to accidentally cause data quality issues via duplication or foreign key violations.
These differences need to be taken into account to design tables and queries for optimal performance. However, the differences aren’t exposed in the query language, which can lead to a false sense of security for users familiar with Postgres.
The Heap SQL Schema
We organize a customer’s data in Redshift as follows:
userstable, containing a column for every user-level property Heap captures and another for every custom user property provided via our API.
- A table for each event defined in Heap or logged via our API, with a column for every event property. E.g.,
signups, and so forth.
all_eventstable, which is effectively a concatenation of all of the event tables. This is a more useful representation for some analyses that our users care about, such as figuring out what commonly happens immediately before a conversion event.
More information about the Heap SQL schema can be found in our docs.
Why Heap’s Dataset is Very Difficult to Sync
Heap provides an API –
heap.identify – that lets customers tag users with global identities (often an email address). If a customer calls
heap.identify with the same email address for two users, we’ll combine them into a single user record. This gives our customers a full view of their users’ interaction with their products, across different cookies and devices. For example, you might use this to analyze users that sign up for a product on the web and determine what percentage later activate a mobile app.
This makes syncing our dataset to Redshift challenging, because it means we might need to update an event to another user (e.g., “unidentified user clicked share button” needs to get updated to “user with identity clicked share button”) . So, instead of loading data into Redshift in an append-only fashion, we need to do a bunch of point updates on each sync . For all events involving a user that has been merged into another user since the last sync, we update their event rows to have
user_id = new_user_id. This is quite slow – on the order of days for a medium-sized customer if done naively. Additionally, since our syncs usually overlap by a small time window, it can cause strange row duplication issues in which we import an event performed by
user_id on one sync, and then import the same event performed by
new_user_id on the next sync.
Performing User UPDATEs in Redshift
We’ve tried several different methods of merging users in Heap SQL. Here’s a rough overview of the progression we went through:
- Naive UPDATEs – We store all
identifyoperations in a table with 2 columns:
new_user_id. The simplest way to implement Heap SQL is to run an update on each event table based on the mapping in the identify table, for each user combination, like this:
This ended up being way too slow. A row update in Redshift consists of marking the row for deletion, and inserting a new row with the updated data. Redshift stores columns in immutable 1MB blocks, so updating a single row requires creating a new 1MB block for each column. For
BIGINTcolumns, this means that a updating or inserting a single row is roughly the same amount of work as updating or inserting 100,000 rows.
- Batch UPDATEs – We then tried batching updates by using a query like this:
Since updating a single row requires rewriting the entire column chunk, we save a lot of work when batching the updates. But since Redshift limits the size of the queries you can run, we needed to break this query up into many sets of
user_idpairs. So, we were still rewriting big chunks of tables multiple times over.
- Batch UPDATEs using a stage table – Eventually we landed on copying
identifycalls into a stage table, then updating based off that. The query looks like this:
- Naive UPDATEs – We store all
Optimizing Redshift Writes
We had to optimize our queries pretty carefully in order to make update operations fast enough to scale to large customers. A lot of what we learned is in the Redshift docs in some form, but we still got it wrong the first time.
Include the Distribution Key in Joins
Each Redshift table has a distribution key, which defines how the table is sharded amongst compute nodes. For any join in Redshift, it’s a good idea to add the two tables’ distribution keys to your join condition, if possible. This makes it clear to Redshift that no rows will need to be joined across different compute nodes, so Redshift can execute the join as multiple local joins. This usually applies for joining with staging tables since the join is usually time-intensive, and your staging table schema is usually the same as the destination table. Let’s look at how this works in practice.
Consider a table
all_events that has the
user_id as the distribution key. Let’s simulate a staging table by creating a table and copying some rows into it.
Now let’s look at what the query planner does when we update all_events by joining it with
event_id (the primary key).
See the join type
DS_BCAST_INNER? That means that Redshift will broadcast a full copy of the entire inner table to each compute node to execute the join! This is bad – we’re shuttling an entire table across the network to execute the query. Compare that to adding the distribution key to the
In this case, the join is
DS_DIST_NONE. This means that redshift will execute local joins, rather than copying one of the tables to every compute node. In practice, adding a join condition on your distribution key can make queries significantly faster, even when it doesn’t impact the results. This also means that you should choose your distribution key to allow for collocated joins like these. In our case, we were careful to build our stage and
events tables with
user_id as the distribution key, which sped up our update operations significantly.
Filter using the sort key
When we were trying to speed up our update process, we realized that if we could limit the rows that we needed to update using the sort key, the queries would be significantly faster. Now we have an additional step in our update process, where we take the
MIN of the event time for all users in our staging table (
MIN_EVENT_TIME below), and execute our update step as follows:
Redshift stores the maximum and minimum values of the sort key for each block of data in a table. This means that filtering queries using the sort key allows the compute nodes to skip entire blocks of data when performing a sequential scan.
Beware Of Constraints
Remember how we mentioned Redshift doesn’t enforce constraints? To be more specific, Redshift doesn’t enforce primary key, foreign key, or unique constraints. It still allows you to add them though, and it will assume they are correct. Here’s a small example to show how this can be a problem. First we create a table with a single
integer column and add some data with a duplicate to it:
Now, let’s take a look at what the query planner does for
See how the query plan includes a
Unique step? This means the query will eliminate duplicates from the result of the sequential scan. The output is what we’d expect:
Now, let’s add a primary key constraint on that column. This means the values in it should be unique.
Redshift doesn’t complain when adding the constraint, even though there are duplicates in the column! Let’s see what the query planner does for
SELECT DISTINCT now:
Unique step to eliminate duplicates is gone! So the query planner is happily taking our constraint at face value, while not doing the work to ensure it actually holds. Here are the results of the
SELECT DISTINCT after adding the constraint:
It’s what we’d expect from the query plan, but not what we’d expect from just looking at the query. This caused us problems when trying to deduplicate event tables after merging users – we originally deduplicated with a process involving a
SELECT DISTINCT statement, but this didn’t work properly as we had set a primary key on the table. It’s important to be careful with constraints in Redshift, because it won’t enforce them. This makes it really easy to end up with data quality issues when syncing.
As we continue to acquire more and bigger customers, we’re going to have to make changes to the Heap SQL sync process to make it faster and decrease the load on some of our query nodes. Right now, we’re re-architecting our sync process to stream data directly from our Postgres cluster worker nodes, rather than compiling everything on the master node prior to streaming to Redshift. This will significantly increase sync speeds by cutting out a full network transfer step, and allowing us to better distribute the sync load across our cluster .
Looking even further ahead, we’ve considered moving Heap SQL to a near real-time streaming architecture, where we consume our event data directly from Kafka. This would allow us to bypass our query cluster completely while decreasing the latency between data registering in the Heap dashboard and showing up in our customers’ Redshift clusters.
- The other option would be to provide customers with a
identifytable that serves as a mapping from
new_user_ids. Customers could then materialize the user merges on read by joining the
eventstables with the
identifytable. This is harder to manually query, and a lot harder to use via BI tools like Looker, Tableau, and Mode.
- Sort order is not maintained on
copyoperations. New rows get added to an unsorted region, which is only sorted when the table is vacuumed or deep copied. Information about the size of the sorted/unsorted regions in each table can be found in the
- Right now, we have a separate connection pool on our master node for Heap SQL syncs. Sync jobs often spend significant time waiting for a connection to become available, even though certain worker nodes aren’t being utilized by the active connections.