Recently I had to work with a large client dataset of 100+ million rows and do quite some data cleaning and plumbing. 🔧 In order to prepare this dataset for running a parallel batch job. What it boiled down to in the end, was to create chunks of 1000 users in the batch job.

In this post, I’ll share one nice way of doing so in BigQuery to reduce chunking and data export time from hours down to minutes!

Chunking Link to this heading

When processing data that contains a large number of records, processing each record one-by-one can be quite slow. Often data is also fetched from external sources such as an API. Whilst processing data in memory tends to be fast, there are natural memory limitations based on your compute instance. By chunking the data the processing of the job can be sped up multifold. A chunk is simply a grouped set of records according to some key and size e.g. chunks of 1000 users in each file. In order to fit everything in memory.

Chunking the “naive” Pythonic way Link to this heading

Let’s say that you have a table called transactions with the schema below and 100 million transactions and 50 thousand unique users:

 1[
 2    {
 3    "name": "transactionid",
 4    "mode": "REQUIRED",
 5    "type": "STRING",
 6    "description" "Unique ID of transaction",
 7    "fields": []    
 8    },
 9    {
10    "name": "userid",
11    "mode": "REQUIRED",
12    "type": "STRING",
13    "description" "Unique ID of user",
14    "fields": []    
15    },
16    {
17    "name": "accountid",
18    "mode": "REQUIRED",
19    "type": "STRING",
20    "description" "Unique ID of users account",
21    "fields": []    
22    },
23    {
24    "name": "date",
25    "mode": "NULLABLE",
26    "type": "DATE",
27    "description" "Date of transaction",
28    "fields": []    
29    },
30    {
31    "name": "amount",
32    "mode": "NULLABLE",
33    "type": "FLOAT64",
34    "description" "Amount of transaction debit or credit based on sign",
35    "fields": []    
36    }
37    {
38    "name": "description",
39    "mode": "NULLABLE",
40    "type": "STRING",
41    "description" "Transaction description",
42    "fields": []    
43    }
44]

The pythonic way shown below would be to:

  • Create chunks based on all unique user IDs (user_id_list)
  • Only query users for a given chunk
  • Export data to a file e.g. .csv
 1def chunk_list(x: list, chunk_size) -> list:
 2    return [x[i : i + chunk_size] for i in range(0, len(x), chunk_size)]
 3
 4i=0
 5for user_id_list in chunk_list(df_userids.userid.tolist(), 1000):
 6    start_time = time.time()
 7
 8    query = f"""SELECT *, _FILE_NAME 
 9               FROM `project.dataset.transactions`
10               WHERE userid IN UNNEST({user_id_list})
11            """
12    result = datalake.query(query)
13    df_batch = pd.read_csv(result)

Although the code looks easy to understand, testing this on 100 million transactions takes roughly ~7+ hours. This is way too slow and we can do much better 🧠.

Chunking using BigQuery Link to this heading

As BigQuery is “practically” spark under the hood we can use partitioning and especially two inbuilt functions NTILE and RANGE_BUCKET.

What the NTILE function is doing:

Info

NTILE divides the rows into constant_integer_expression buckets based on row ordering and returns the 1-based bucket number that is assigned to each row. The number of rows in the buckets can differ by at most 1. The remainder values (the remainder of number of rows divided by buckets) are distributed one for each bucket, starting with bucket 1. If constant_integer_expression evaluates to NULL, 0 or negative, an error is provided.

And what RANGE_BUCKET function is doing:

Info

RANGE_BUCKET scans through a sorted array and returns the 0-based position of the point’s upper bound. This can be useful if you need to group your data to build partitions, histograms, business-defined rules, and more..

In short, NTILE creates the chunk groups we want to use in a window function whilst RANGE_BUCKET takes care of creating the partitions.

In our previous example combining these two together would look like:

 1--we have 50k users and want 1000 users in each bucket i.e. 50k/1000-> 50
 2DECLARE num_buckets INT64;
 3SET num_buckets = 50;
 4
 5CREATE TABLE `project.dataset.trx_user_batches`
 6PARTITION BY RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS 
 7-- get transactions
 8WITH transactions AS (
 9SELECT *
10FROM 
11    `project.dataset.transactions`
12),
13--create n buckets / chunk to contain n users in our case 50
14user_rank AS (
15SELECT 
16  *,  
17  NTILE(num_buckets) OVER(ORDER BY userid) AS id
18FROM report
19),
20--notice that we need to create num_buckets + 1 here
21user_bucket AS (
22SELECT
23  *,
24  RANGE_BUCKET(id, GENERATE_ARRAY(1, num_buckets + 1, 1)) AS bucket
25FROM 
26  user_rank
27),
28SELECT
29  trx.userid,
30  trx.accountid,
31  trx.transactionid,
32  trx.date,
33  trx.amount,
34  trx.description
35  rb.id
36FROM 
37  transactions AS trx
38INNER JOIN
39  user_bucket AS ub
40ON
41  AND trx.userid = ub.userid
With this query we get 50 partitions with ~1000 users in each file or around 2 million transactions per file. This is a much smaller dataset that we can fit in memory (i.e. Pandas) compared to the 100 million rows we started with. For instance, if you want to export the partitions as files for another job or workflow you could use:

 1EXPORT DATA
 2  OPTIONS (
 3    uri = 'gs://trx_user_batches/user_batch_*.csv',
 4    format = 'CSV',
 5    overwrite = true,
 6    header = true,
 7    field_delimiter = ';')
 8AS (
 9SELECT
10  *
11FROM 
12  `project.dataset.trx_user_batches` 
13ORDER BY
14  userid ASC,
15  accountid ASC,
16  date ASC
17);

The nice thing about having the partitions and using EXPORT DATA statement is that this is much faster than the pytonic approach in terms of chunking. Exporting 50 partition files takes roughly ~30-40 minutes instead of 7+ hours 🚀.

Tip

By default BigQuery exports data >= 1GB to several files. This is true even if you have partitions in your dataset. If you want to force your export to only save the output to 1 file given that the size of each file is < 1 GB you can add a LIMIT statement e.g. with MAX_INTEGER to force all data to the same worker.

An update to the query if you want to make sure that you save each partition to 1 file:

 1DECLARE num_files INT64;
 2SET num_files = 50;
 3FOR item in (SELECT idx FROM UNNEST(GENERATE_ARRAY(1, num_files + 1, 1)) AS idx WHERE idx < num_files)
 4DO
 5EXPORT DATA
 6  OPTIONS (
 7    uri = CONCAT('gs://trx_user_batches/user_batch_*', item.idx, '.csv'),
 8    format = 'CSV',
 9    overwrite = true,
10    header = true,
11    field_delimiter = ';')
12AS (
13SELECT
14  *
15FROM 
16  `project.dataset.trx_user_batches` 
17WHERE
18    id = item.idx
19ORDER BY
20  userid ASC,
21  accountid ASC,
22  date ASC
23LIMIT
24    9223372036854775807
25);
26END FOR;