# Optimising Parallel Python and the Billion Row Challenge

### January 6, 2024

1brc, Multiprocessing, Python

Attempting the Billion Row Challenge but in python.

This blog post is a copy of 1brc or The Billion Row Challenge - The goal, as per Gunnar Morlings 1brc is stated as:

The text file contains temperature values for a range of weather stations. Each row is one measurement in the format <string: station name>;<double: measurement>, with the measurement value having exactly one fractional digit. The following shows ten rows as an example:
1
2
3
4
5
6
7
8
9
10
Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
Bridgetown;26.9
Istanbul;6.2
Roseau;34.4
Conakry;31.2
Istanbul;23.0
The task is to write a Java program which reads the file, calculates the min, mean, and max temperature value per weather station, and emits the results on stdout like this (i.e. sorted alphabetically by station name, and the result values per station in the format <min>/<mean>/<max>, rounded to one fractional digit):
1
{Abha=-23.0/18.0/59.2, Abidjan=-16.2/26.0/67.3, Abéché=-10.0/29.4/69.0, Accra=-10.1/26.4/66.4, Addis Ababa=-23.7/16.0/67.0, Adelaide=-27.8/17.3/58.5, ...}

Obviously, this will be done in Python in this case! As of writing the leaderboard is looking quite fast! With a result of 00:12.063/00:59.430/04:13.449.

Results are determined by running the program on a Hetzner Cloud CCX33 instance (8 dedicated vCPU, 32 GB RAM)

I'm running everything locally on my M1 Mac with a M1 Max chip. According to my brief research into the chips, the Hertzner cloud runs on I'm going to assume my times will be faster by up to a factor of 2. That means to get anywhere near performant be aiming for ~2 mins and I'll be clinging on the slowest times of the original 1brc.

These are going to be some hard numbers to beat, especially in Python - but if we can get under our target that would be amazing. My approach will take a few stages, iteratively trying to get better with different optimisation approaches.

## 0 - ASAP With Polars

It's a nice place to start, a tool I'm familiar with and will likely be relatively fast thanks to its low-level implementations of operations using the Arrow in-memory storage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import logging
from pathlib import Path

import polars as pl

from brc.util import DATA_DIR, timit_context

def main(file_path: Path):
lf = pl.scan_csv(
file_path,
separator=";",
with_column_names=lambda _: ["city", "temp"],
schema={"city": pl.Categorical, "temp": pl.Float64}
)
q = (
lf.group_by("city")
.agg(
pl.min("temp").alias("min_temp"),
pl.mean("temp").round(1).alias("mean_temp"),
pl.max("temp").alias("max_temp"),
)
.select(
pl.col("city"),
pl.concat_str([
pl.col("min_temp"),
pl.col("mean_temp"),
pl.col("max_temp")
], separator="/").alias("_res_str")
)
.sort("city")
.select(
pl.concat_str([
pl.col("city"),
pl.col("_res_str")
], separator="=").alias("res_str")
)
)
res = q.collect()
print(", ".join(res["res_str"]))

if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)

path = DATA_DIR / "data_1_000_000_000.txt"
with timit_context(1_000_000_000):
main(path)

In ~30 lines of code, we can do all the parsing, maths and string concat operation to create the result. On my machine, this took just over 1 minute*. Given how fast I was able to put this together using industry-standard Python libs is pretty sweet, that and the fact I'm now able to move on to other work rather than mess around using multiprocessing and parsing algos means the total cost of this solution is very cheap.

1
2
3
INFO:brc.util:benchmark >>> Starting timer
INFO:brc.util:benchmark <<< Elapsed time: 0:01:00.263331
INFO:brc.util:benchmark <<< with 1,000,000,000 rows => 16,593,838.763 rows/s.

HOWEVER

Literally rule 2 mentions (rule 1 is about using Java… rules are meant to be broken right?)

No external library dependencies may be used

So let's do it with no libs...

## 1 - Make a Python Script to Read Then Calculate

The file I'm working with is ~13Gb, and my laptop has 64Gb, so I can do this all in memory. I'll probably end up not doing it all in memory as the target machine has less than 8Gb but I'll cross that bridge when I get there.

Here is the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import csv
import logging
from dataclasses import dataclass
from pathlib import Path

from brc.util import DATA_DIR, timit_context

def main(file_path: Path):
@dataclass
class CollectionStruct():
city: bytes
min_temp: float
max_temp: float
sum_temps: float
count: int

def mean(self):
return self.sum_temps / self.count

def __repr__(self):
return f"{self.city.decode("utf-8")}={self.min_temp}/{self.mean():,.1f}/{self.max_temp}"

temps: dict[bytes, CollectionStruct] = {}
with open(file_path, 'rb') as f:
for line in f:
# parse line data
city, temp = line.split(b";")
temp = float(temp)

# grab existing data
collect = temps.get(city, CollectionStruct(city, temp, temp, 0, 0))

# add line to the collection settings
collect.count += 1
collect.sum_temps += temp
collect.min_temp = min(collect.min_temp, temp)
collect.max_temp = max(collect.max_temp, temp)

# update the dictionary
temps[city] = collect

# ordering here is slightly quicker than the dataclass ordering
print(" ".join((str(s) for s in sorted(temps.values(), key=lambda c: c.city))))

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

# This solution will not be fast, 100mm records is enough for a sample at the moment.
n = 100_000_000
path = DATA_DIR / f"data_{n:_}.txt"
with timit_context(n, "basic"):
main(path)

with timit_context(n, "basic_profile", profile=True):
main(path)

We use data classes and a non-parallelized approach, giving us a result for 100mm records. This first step is really to gauge how fast native Python is vs polars and estimate how long 1bn records would take. the results are below:

1
2
3
4
INFO:brc.util:basic >>> Starting timer
INFO:brc.util:basic <<< Elapsed time: 0:01:13.951427
INFO:brc.util:basic <<< with 100,000,000 rows => 1,352,238.944 rows/s. brc complete est 0:12:19.514273
! Note: the different values are due to producing the data of different lengths.

Profiling this code with cProfile shows the weak points, this result is logged to the screen using the Python builtins. Profiling the code does make it slower - so the below extended time is because of the attached profiler.

1
2
3
4
5
6
7
8
9
10
11
500002507 function calls (500002504 primitive calls) in 161.663 seconds

Ordered by: cumulative time
List reduced from 108 to 5 due to restriction <5>

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
1   96.971   96.971  161.663  161.663 /Users/toby.devlin/dev/projects/1brc/src/brc/challenge_basic.py:9(main)
100000000   15.139    0.000   15.139    0.000 {built-in method builtins.max}
100000000   15.119    0.000   15.119    0.000 {method 'split' of 'bytes' objects}
100000000   15.030    0.000   15.030    0.000 {built-in method builtins.min}
100000013   13.081    0.000   13.081    0.000 {method 'get' of 'dict' objects}

It looks like most of the time (main) is spent doing the min and max for the temp values, then parsing the file lines as if it were a CSV, and then doing the lookups for the collection objects. Fundamentally we will need to change the processing approach as these are all builtins of Python and are considered as fast as they can get. So let's try spreading these operations to other processes.

## 2 - Parallelize The Slow Bits

If we look to leverage the multiprocessing and multithreading modules we would be able to pass around data to separate processes to allow these min() max() and split() lines to be run by various processes. In part 1 we saw the profile results showing the code is still CPU bound, not IO bound; this means threading isn't needed (yet) and we should opt for more processes. Multiprocessing is more heavyweight and takes more to launch a new process than a thread. However once the thread is up it's relatively fast and unbound by the GIL. (It's also my personal preference in Python to start with coroutines then processes and approach multiprocessing from a 1:1 core:process implementation design, then thread these processes if needed)

Below is the code which allows the summing of various read results to be paralleled across several processes. It essentially batches 100k records being read from the file and places them onto a worker to complete the aggregation process. The largest part of refactoring this code is moving data back and forth from different processes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import logging
from collections import defaultdict
from dataclasses import dataclass
from functools import lru_cache
from itertools import batched, chain
from multiprocessing import Pool
from pathlib import Path

from brc.util import DATA_DIR, timit_context

@dataclass
class CollectionStruct():
city: str
min_temp: float
max_temp: float
sum_temps: float
count: int

def __init__(self, city: str = '', init_value: float = 0):
self.city = city
self.min_temp = init_value
self.max_temp = init_value
self.sum_temps = init_value
self.count = 1

def mean(self):
return self.sum_temps / self.count

# this is more of a merge function
# the city line is to allow for the default dict default factory. there's probably a performance hit here
self.city = other.city
self.count += other.count
self.sum_temps += other.sum_temps
self.min_temp = min(self.min_temp, other.min_temp)
self.max_temp = max(self.max_temp, other.max_temp)
return self

def __repr__(self):
return f"{self.city}={self.min_temp}/{self.mean():.1f}/{self.max_temp}"

@lru_cache(None)
def parse_city(city: bytes) -> str:
return city.decode("utf-8")

@lru_cache
def parse_temp(temp: bytes) -> float:
return float(temp)

def do_parse(line: bytes):
city, temp = line.split(b";")
temp = float(temp)
city = parse_city(city)
return CollectionStruct(city, temp)

def process_lines(*lines: bytes):
"""
Takes a number of lines from the file and aggregates them to a single results dict
"""
totals = defaultdict(CollectionStruct)

for line in (do_parse(l) for l in lines):
totals[line.city] += line

def main(file_path: Path):
totals = defaultdict(CollectionStruct)

# read in file and batch lines in groups to a process to aggregate
with open(file_path, 'rb') as f:
# 8 to match the change target machine cores.
with Pool(8) as pool:
# this is slightly faster than pool.map(process_lines, f, n) thanks to serializing more lines to each
# process at once (the slow bit is moving bin objects to python
res = pool.starmap(process_lines, batched(f, 250_000))

# group all results (approx n_proc x m_distinct_stations elements)
for c in chain.from_iterable((r.values() for r in res)):
totals[c.city] += c

print(" ".join((str(s) for s in sorted(totals.values(), key=lambda c: c.city))))

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

n = 100_000_000
path = DATA_DIR / f"data_{n:_}.txt"
with timit_context(n, "parallel"):
main(path)

with timit_context(n, "parallel_profile", profile=True):
main(path)


Then we look at the results and profile of this run:

1
2
3
4
INFO:brc.util:parallel >>> Starting timer
INFO:brc.util:parallel <<< Elapsed time: 0:00:27.071418
INFO:brc.util:parallel <<< with 100,000,000 rows => 3,693,932.807 rows/s. brc complete est 0:04:30.714183

30 seconds isn't bad for 100mm records in Python, not amazing but it over halved the time from the non-multiprocess solution. It is however less of an improvement than I would have liked; only ~3x records processed per second with 8 more cores really should be a much larger improvement. This will also leave us with a very long time for the full billion too.

1
2
3
4
5
6
7
8
9
10
11
553270 function calls (553255 primitive calls) in 29.698 seconds

Ordered by: cumulative time
List reduced from 280 to 5 due to restriction <5>

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
249    0.001    0.000   60.440    0.243 /Users/toby.devlin/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/pool.py:500(_wait_for_updates)
39    0.029    0.001   35.980    0.923 /Users/toby.devlin/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/connection.py:201(send)
44    0.000    0.000   31.395    0.714 /Users/toby.devlin/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/connection.py:389(_send_bytes)
75    0.000    0.000   31.394    0.419 /Users/toby.devlin/.pyenv/versions/3.12.0/lib/python3.12/multiprocessing/connection.py:364(_send)
107    2.057    0.019   31.394    0.293 {built-in method posix.write}

Looking at the profile We can see most of the code time has migrated to waiting for updates and sending data to and from each child process. This approach is taking a hacksaw to the problem and brute-forcing the code to run in more places. It's not a bad approach, but now we would have to optimise the heebie-jeebies of Python multiprocessing, which would be rather technical. One problem with this approach is that we haven't fine-tuned the single process first; which leads to just slamming the inefficient process.

Another approach to this multiprocessing module would be to try leveraging some of the python's shared state tools, such as the Value, Array, or in this case, a Manager would be best. (We could also look into using a Queue but this isn't really the right problem). In the end, this will likely end up still having the same problem as before; python objects are expensive to send and receive across processes.

## 3 - Change the Data Pipeline

As part 2 showed were still wasting a lot of time passing data back and forth across Python processes and we hadn't really thought of the underlying problem. This can be improved by partitioning the problem and allowing each process to reach the file at the same time then do its processing for its chunk then return the result. This allows us to focus on optimising a single partition flow and then distributing that to multiple cores, it will also allow us to reduce the data we send back and forth between the processes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import os
from collections import defaultdict
from functools import reduce
from itertools import chain, pairwise
from multiprocessing import Pool
from pathlib import Path

from brc.util import DATA_DIR, timit_context

def do_parse(line: bytes):
# this is basically O(n) every time.
# todo: no improvement possible for parsing in python?
city, temp = line.split(b";", maxsplit=1)
temp = float(temp)

return city, temp

result_tuple = tuple[float, float, float, int]

def collect_part(file_path: Path, start: int, end: int) -> dict[bytes, result_tuple]:
data = defaultdict(list)

for part in part_data:
city, temp = do_parse(part)
# todo: .append() is still slow, change this list approach somehow?
data[city].append(temp)

return {
city: (min(items), max(items), sum(items), len(items)) for city, items in data.items()
}

def merge_result(one: dict[bytes, result_tuple], two: dict[bytes, result_tuple]):
# todo: can this be optimised further?
for k, two_v in two.items():
# if we have a common data, do the partial sum
if k in one.keys():
one_v = one[k]
one[k] = (
min(one_v[0], two_v[0]), max(one_v[1], two_v[1]), one_v[2] + two_v[2], one_v[3] + two_v[3]
)
else:
# if we don't have a common data, overwrite
one[k] = two_v
return one

def read_file_part(file_path: Path, start: int, end: int) -> list[bytes]:
"""
Reads in the files bytes from a start position to the end position
"""
with open(file_path, 'rb') as f:
f.seek(start)
# we provide the offset form our data

def find_next_newline(file_path: Path, start: int) -> int:
"""
Finds the next newline in the file after the position given.
"""

# special case if were at the start of the file
if start == 0:
return 0

offset = start + 1
with open(file_path, 'rb') as f:
# move to start place
f.seek(start)
# read byte by byte til a newline is found
offset += 1

return offset

def main(file_path: Path):
file_size = os.path.getsize(file_path)

n_splits = 5000
split_size = file_size // n_splits
partitions = (find_next_newline(file_path, n * split_size) for n in range(n_splits))
# extend partitions with the end of the file.
partitions = chain(partitions, (file_size,))

# single threaded version for profiling
# results = []
# for i, j in pairwise(partitions):
#     results.append(collect_part(file_path, i, j - 1))

# multithreading version distributing the loop/generator
with Pool(8) as pool:
partitions_ = ((file_path, i, j - 1) for i, j in pairwise(partitions))
results = pool.starmap(collect_part, partitions_, 10)

parts = reduce(merge_result, results)

sorted_items = sorted(parts.items())
print(" ".join(f"{k.decode("utf-8")}={p[0]}/{p[2] / p[3]:.1f}/{p[1]}" for k, p in sorted_items))

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

n = 1_000_000_000
path = DATA_DIR / f"data_{n:_}.txt"
with timit_context(n, f"parallel"):
main(path)


The single process version produces the following result for 100mm rows:

1
2
3
4
INFO:brc.util:parallel2 >>> Starting timer
INFO:brc.util:parallel2 <<< Elapsed time: 0:00:37.261046
INFO:brc.util:parallel2 <<< with 100,000,000 rows => 2,683,767.901 rows/s. brc complete est 0:06:12.610463

An estimate of just over 6 minutes for a billion rows is decent for Python and beat our first attempt for 100mm by half. Also profiling a single-threaded version is much easier to understand:

1
2
3
4
5
6
7
8
9
10
11
302615043 function calls (302615041 primitive calls) in 87.055 seconds

Ordered by: cumulative time
List reduced from 51 to 5 due to restriction <5>

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
1    1.793    1.793   87.055   87.055 /Users/toby.devlin/dev/projects/1brc/src/brc/challenge_parallel_2.py:91(main)
999   27.433    0.027   84.822    0.085 /Users/toby.devlin/dev/projects/1brc/src/brc/challenge_parallel_2.py:33(collect_part)
99899999   24.782    0.000   39.515    0.000 /Users/toby.devlin/dev/projects/1brc/src/brc/challenge_parallel_2.py:21(do_parse)
99899999   14.733    0.000   14.733    0.000 {method 'split' of 'bytes' objects}
99900998    9.070    0.000    9.070    0.000 {method 'append' of 'list' objects}


This shows the slow bits are still down to Python builtin calls (split and append) which in their cases are because of the data structures we use. Changing the processing would be the solution to reduce this time, however, these are well-designed for the problem so some out-of-the-box thinking with lower-level stuff will be needed for these numbers to improve. I find that probably out of scope for "pure Python".

The multiprocessing result is, surprisingly, much faster than expected, even for 1 billion records!

1
2
3
4
INFO:brc.util:parallel >>> Starting timer profile=False
INFO:brc.util:parallel <<< Elapsed time: 0:01:00.662966
INFO:brc.util:parallel <<< with 1,000,000,000 rows => 16,484,521.949 rows/s. brc complete est 0:01:00.662966

There are probably some improvements that can be made such as grid searching for n_splits and chunksize but mainly from the comments I have made in the code. Personally, I'm well-chuffed with the results I've achieved with multiprocessing and optimization at this level of Python. The next step would likely start looking at how to solve only using the C bindings (i.e. solve it in C) or rewrite in a faster language.

I also allowed the Pool() to use all the cores on the machine which resulted in a sub-1min total time of 0:00:55.958772 - beating Polars (for this one very specific simple task) by a couple of seconds!!

## Other Notable Tools

### DuckDB

DuckDB took a whole 47 seconds to run the below query, of which loading the CSV took more than 40 seconds. The query itself only took 3.81 seconds on average! This is because the data format on disk took much less time to parse; It really shows the trade-off between having the data in the right format to start with.

1
2
3
4
5
6
7
8
9
10
CREATE TABLE brc AS
SELECT *

ALTER TABLE brc RENAME column0 TO city;
ALTER TABLE brc RENAME column1 TO temp;

select city, min(temp), avg(temp), max(temp)
from brc
group by city

As an aside I also tried this with SQLite for shits and gigs; it loaded the records in about an hour and then took over 5 minutes to compute the same query, so I gave up. Local ODBC has been won by DuckDB in this one.

### Polars with Parquet

Step one was dumping the CSV over to a parquet file with a short bit of code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import polars as pl

from brc.util import DATA_DIR, timit_context

def sink_to_parquet():
n = 1_000_000_000
with timit_context(n, "scan_parquet"):
lf = pl.scan_csv(
DATA_DIR / "data_1_000_000_000.txt",
separator=";",
with_column_names=lambda _: ["city", "temp"],
schema={"city": pl.Categorical, "temp": pl.Float64}
)

with timit_context(n, "sink_parquet"):
lf.sink_parquet(DATA_DIR / "data_1_000_000_000.parquet")

with timit_context(n, "sink_parquet_stats"):
lf.sink_parquet(DATA_DIR / "data_1_000_000_000_stats.parquet", statistics=True)

This code is part of the benchmark_parquet.py file and source -> sink time was 0:01:15.312101 for the non-stats file and 0:01:24.109170 for the stats to be added and on disk both these files are only 3.6GB! Parquet has much better compression and RLE to save space & is columnar which means it interfaces with Arrow very well (natively by design actually). We can now rerun the benchmark code but pass in the parquet file as the source. This gives the result of 0:01:25.495492 and 0:01:28.913853 respectively, which is fascinating!

I'm probably doing something wrong here and will look into it. I was expecting an order of magnitude speed up.

### ChatGPT

I did ask ChatGPT (GPT3.5) for a solution and, very unhelpfully, it provided code that errored and was fundamentally flawed. As with other large code tasks, ChatGPT fell flat; however, it did help with smaller more targeted feature requests and helped me with some of the aggregation code, once I gave it the gist and asked for a refactor. Personally, this approach is much better for feedback and a "give me a starting point" type

# Final Results

• Single process: 0:06:12.610463 (estimated from 0:00:37.261046 for 100mm)
• Multi-process: 0:01:00.662966
• Multi-process, all cores: 0:00:55.958772
• Polars: 0:01:00.263331
• Other tools: Very fast once data is loaded.

Its official (on my machine) boys and girls, I beat polars in native Python. Although I did spend nearly 60x longer to write it.

All the code can be seen on the GitHub project