Recently I had to work with a large client dataset of 100+
million rows and do quite some data cleaning and plumbing. 🔧 In order to prepare this dataset for running a parallel batch job. What it boiled down to in the end, was to create chunks
of 1000
users in the batch job.
In this post, I’ll share one nice way of doing so in BigQuery
to reduce chunking and data export time from hours down to minutes!
Chunking
When processing data that contains a large number of records, processing each record one-by-one can be quite slow. Often data is also fetched from external sources such as an API. Whilst processing data in memory tends to be fast, there are natural memory limitations based on your compute instance. By chunking the data the processing of the job can be sped up multifold. A chunk is simply a grouped set of records according to some key and size e.g. chunks of 1000
users in each file. In order to fit everything in memory.
Chunking the “naive” Pythonic way
Let’s say that you have a table called transactions
with the schema below and 100
million transactions and 50
thousand unique users:
1[
2 {
3 "name": "transactionid",
4 "mode": "REQUIRED",
5 "type": "STRING",
6 "description" "Unique ID of transaction",
7 "fields": []
8 },
9 {
10 "name": "userid",
11 "mode": "REQUIRED",
12 "type": "STRING",
13 "description" "Unique ID of user",
14 "fields": []
15 },
16 {
17 "name": "accountid",
18 "mode": "REQUIRED",
19 "type": "STRING",
20 "description" "Unique ID of users account",
21 "fields": []
22 },
23 {
24 "name": "date",
25 "mode": "NULLABLE",
26 "type": "DATE",
27 "description" "Date of transaction",
28 "fields": []
29 },
30 {
31 "name": "amount",
32 "mode": "NULLABLE",
33 "type": "FLOAT64",
34 "description" "Amount of transaction debit or credit based on sign",
35 "fields": []
36 }
37 {
38 "name": "description",
39 "mode": "NULLABLE",
40 "type": "STRING",
41 "description" "Transaction description",
42 "fields": []
43 }
44]
The pythonic way shown below would be to:
- Create chunks based on all unique user IDs (
user_id_list
) - Only query users for a given chunk
- Export data to a file e.g.
.csv
1def chunk_list(x: list, chunk_size) -> list:
2 return [x[i : i + chunk_size] for i in range(0, len(x), chunk_size)]
3
4i=0
5for user_id_list in chunk_list(df_userids.userid.tolist(), 1000):
6 start_time = time.time()
7
8 query = f"""SELECT *, _FILE_NAME
9 FROM `project.dataset.transactions`
10 WHERE userid IN UNNEST({user_id_list})
11 """
12 result = datalake.query(query)
13 df_batch = pd.read_csv(result)
Although the code looks easy to understand, testing this on 100
million transactions takes roughly ~7+
hours. This is way too slow and we can do much better 🧠.
Chunking using BigQuery
As BigQuery is “practically” spark under the hood we can use partitioning
and especially two inbuilt functions NTILE and RANGE_BUCKET.
What the NTILE
function is doing:
Info
NTILE
divides the rows into constant_integer_expression buckets based on row ordering and returns the 1-based bucket number that is assigned to each row. The number of rows in the buckets can differ by at most 1. The remainder values (the remainder of number of rows divided by buckets) are distributed one for each bucket, starting with bucket 1. If constant_integer_expression evaluates to NULL, 0 or negative, an error is provided.
And what RANGE_BUCKET
function is doing:
Info
RANGE_BUCKET
scans through a sorted array and returns the 0-based position of the point’s upper bound. This can be useful if you need to group your data to build partitions, histograms, business-defined rules, and more..
In short, NTILE
creates the chunk
groups we want to use in a window function
whilst RANGE_BUCKET
takes care of creating the partitions.
In our previous example combining these two together would look like:
1--we have 50k users and want 1000 users in each bucket i.e. 50k/1000-> 50
2DECLARE num_buckets INT64;
3SET num_buckets = 50;
4
5CREATE TABLE `project.dataset.trx_user_batches`
6PARTITION BY RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS
7-- get transactions
8WITH transactions AS (
9SELECT *
10FROM
11 `project.dataset.transactions`
12),
13--create n buckets / chunk to contain n users in our case 50
14user_rank AS (
15SELECT
16 *,
17 NTILE(num_buckets) OVER(ORDER BY userid) AS id
18FROM report
19),
20--notice that we need to create num_buckets + 1 here
21user_bucket AS (
22SELECT
23 *,
24 RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS bucket
25FROM
26 user_rank
27),
28SELECT
29 trx.userid,
30 trx.accountid,
31 trx.transactionid,
32 trx.date,
33 trx.amount,
34 trx.description
35 rb.id
36FROM
37 transactions AS trx
38INNER JOIN
39 user_bucket AS ub
40ON
41 AND trx.userid = ub.userid
50
partitions with ~1000
users in each file or around 2
million transactions per file. This is a much smaller dataset that we can fit in memory (i.e. Pandas
) compared to the 100
million rows we started with. For instance, if you want to export the partitions as files for another job or workflow you could use:
1EXPORT DATA
2 OPTIONS (
3 uri = 'gs://trx_user_batches/user_batch_*.csv',
4 format = 'CSV',
5 overwrite = true,
6 header = true,
7 field_delimiter = ';')
8AS (
9SELECT
10 *
11FROM
12 `project.dataset.trx_user_batches`
13ORDER BY
14 userid ASC,
15 accountid ASC,
16 date ASC
17);
The nice thing about having the partitions and using EXPORT DATA
statement is that this is much faster than the pytonic approach in terms of chunking. Exporting 50 partition files takes roughly ~30-40
minutes instead of 7+
hours 🚀.
Tip
By default BigQuery
exports data >= 1GB
to several files. This is true even if you have partitions in your dataset. If you want to force your export to only save the output to 1 file given that the size of each file is < 1 GB
you can add a LIMIT
statement e.g. with MAX_INTEGER
to force all data to the same worker.
An update to the query if you want to make sure that you save each partition to 1 file:
1DECLARE num_files INT64;
2SET num_files = 50;
3FOR item in (SELECT idx FROM UNNEST(GENERATE_ARRAY(1, num_files + 1, 1)) AS idx WHERE idx < num_files)
4DO
5EXPORT DATA
6 OPTIONS (
7 uri = CONCAT('gs://trx_user_batches/user_batch_*', item.idx, '.csv'),
8 format = 'CSV',
9 overwrite = true,
10 header = true,
11 field_delimiter = ';')
12AS (
13SELECT
14 *
15FROM
16 `project.dataset.trx_user_batches`
17WHERE
18 id = item.idx
19ORDER BY
20 userid ASC,
21 accountid ASC,
22 date ASC
23LIMIT
24 9223372036854775807
25);
26END FOR;