Working with a large ragged dataset in Julia

Re-posted from: https://tpapp.github.io/post/large-ragged-dataset-julia/

Introduction

One of the projects I am working on at the moment at the Institute
Studies

(in Vienna) is an analysis of the Austrian Social Security Database. We are using this extremely rich dataset to refine our understanding of the cross-sectional heterogeneity and age structure of employment and labor market participation.1

Naturally, I am using Julia, not only
because it is fast, convenient, and elegant, but also because it
allows me to use a single language for data processing, exploratory
data analysis, descriptive statistics, and more sophisticated Bayesian
indirect inference using MCMC. When analyzing real-world data, it is
useful to do some exploratory plots, fit a simple model, refine,
disaggregate, fit a more complex model, and repeat this until I am
satisfied with the result. Not having to switch languages is a great
bonus.

In this post I talk about my experience with the very first step of
the above process: ingesting and collating a large, ragged dataset
using Julia. I found that accomplishing this was nontrivial: while the
DataFrames.jl ecosystem
is converging nicely, I found that I had to develop some custom tools
to work with this dataset. I hope that others in a similar situation
find them and this writeup useful.

I made the resulting libraries available on
Github
, with some
documentation and lots of unit tests, but they are not finalized since
I will probably rewrite some of them once named
tuples
are
incorporated into the language. This is not a detailed introduction to
any of these libraries, especially since they are subject to change,
rather an account of work in progress and some starting points if you
want to do something similar. However, if you want to use these
libraries, look at the docstrings and the unit tests, and feel free to
ask for help, preferably by opening an issue for the relevant library.

The whole data comprises about 2000 million observations on various
"spells", which either involve contributions to or benefits from
Austria’s comprehensive social security system (eg being employed, or
being on maternity leave). Each spell has the unique ID of the
individual it belongs to, a start and end date, and spell information
(eg insurance event). They look something like this:2

1;...;19800101;19800201;...;AA;...;
1;...;19800301;19800401;...;B;...;
2;...;19800701;19800901;...;CCC;...;


Fields are separated by ;, the ...s are fields which we ignore for
now (we may use them later). Dates have the yyyymmdd format. There

Gzipped data in delimited format is about 45 GB, raw data would be
over 500 GB. Data for each year is in a separate file, so individual
1 may have spells scattered in multiple files. Ideally, for our
analysis, we would like to end up with a data structure that has the
spells organized by individual, eg

• individual 1
• start date 1980-01-01, end date 1980-02-01, spell type AA
• start date 1980-03-01, end date 1980-04-01, spell type B
• … other spells from subsequent files
• individual 2

The number of spells varies by individual, which makes this dataset ragged.3

Rationale for custom tools

A simple back of the envelope calculation is useful to think about the size of the dataset. If we encode each column as an Int64 or similar (eg Date), we use

$8 \text{ bytes} \times 2 \cdot 10^9 \approx 16 \text{ gigabytes}$

per column. For 4 columns, this would use up 64 GB, plus some extra
for keeping track of the ragged structure. While this is feasible if
we have enough RAM, it is very nice to use smaller machines
(especially laptops — I am typing this on the train) for exploratory
data work.4 Also,
economizing on RAM speeds up the calculations, as more data fits into
the CPU cache.

Early experiments suggested that simply reading this data into native
Julia structures or tools in the
DataFrames.jl ecosystem
is either infeasible or unnecessarily slow. I also considered
databases, but found them to be more trouble than it is worth,
especially since what I am doing below is straightforward and fits
into Julia very nicely.

Below, I discuss the key ingredients I used to process this dataset.

Mmapping large columns

Julia supports memory
mapped
arrays, which map virtual memory to disk seamlessly, allowing lazy loading and access managed by the virtual memory manager. Using
the syntax

io = open("path_to_file.bin", "w+") # create, truncate
A = Mmap.mmap(io, Vector{Int}, 200) # map to array

gives you a vector that is mapped to the disk (you have to call
Mmap.sync! after you are finished to make sure, and you have to use
growth = true, which is the default). The advantage is that the size
of the array is limited by the disk space, not RAM, and the OS takes
care of reading, writing, and caching as necessary.

A complication for our data analysis is that we do not know the total
number of elements before having read the whole dataset, so we can’t
specify the dimensions above. Fortunately, simply opening a stream and
writeing values of bits types works fine.5

I packaged the code for managing columns of bits types using the
strategies above in
LargeColumns.jl, which
keeps track of column types and imposing some basic sanity
checks. This makes working with large vectors as easy as

using LargeColumns
cols = MmappedColumns("path/to/directory", 2_000_000_000, Tuple{Float64, Int})

which takes care of mmaping, and makes cols behave as a vector of
tuples of the given type. The files, including metadata, are located
in the given directory.

Ragged data and collation

I want to end up with data grouped by individuals contiguously, eg

1 1 1 1 2 2 2 3 3 4 4 ...


where the first 4 observations belong to individual 1, the second 3
to 2, and so on. This can be indexed with UnitRanges: for
individual 1 we would use 1:4, for 2, 5:8, etc. Note that
storage of both endpoints is unnecessary, since they can be calculated
from a cumulative sum, also, we can reuse the same index for multiple
columns.

The package RaggedData.jl
implements simple datastructures for counting, collating, and indexing
ragged data into vectors. When I first parse and ingest the data, I
count the number of observations for each individual:

id_counter = RaggedCounter(Int32, Int32)

while !eof(...) # process by line
id = parse_id_from_line(...)
push!(id_counter, id)
end

Then in the second pass, I start with the index for the first
observation for each (eg 1, 5, 9 above), and increment it for
each row of the data. So the first observation for individual 2 goes
to index 5, the second 6, and so on. For this, I create a collator
object coll:

coll, ix, id = collate_index_keys(id_counter, true);

for i in indices(first_pass, 1)
id, spell_index, dates = first_pass[i]
j = next_index!(coll, id)
collated[j] = (spell_index, dates)
end

where collated is another set of mmaped columns. ix is used
later for indexing into the result: ix[1] gives the UnitRange for
the observations about the first individual, and so on.

Saving space

Before processing the whole dataset, I assumed that the individual ids
fit into Int32s. This is easy to verify after parsing, with the
standard constructor, which simply throws an error if the value does
not fit:

julia> Int32(typemax(Int64))
ERROR: InexactError()
Stacktrace:
[1] Int32(::Int64) at ./sysimg.jl:77

For the spell types, I simply indexed them in the order of appearance,
saving the index as an Int8. They are reconstructed using
IndirectArrays.jl
when working with the data.

Dates turned out to be the trickiest, until I realized that using
Int16, I can represent a timespan of about 179 years, which is a lot
more than I need for this data. Of course, this requires that we count
from some epoch other than 0001-01-01 like Base.Date. Fortunately,
Julia allows encoding the epoch in the type, making it costless as
long as I use it consistently for the same dataset. The package
FlexDates.jl implements this
approach.

Parsing

Being very impressed by the amazing speed gains for date parsing in
Julia (see #18000,
#15888,
#19545), I used this
project as an excuse to experiment with parsers. Existing packages
like TextParse.jl
are already so fast that writing yet another parser library would not
have made sense for a small amount of data, but since I plan to reuse
this code for large datasets I felt the investment was justified.

Most of the datasets I work with are ASCII: other character sets are
still very rare in social science data, since data is predominantly
anonymous (so no names), categorical variables are usually encoded as
short strings or integers, and the rest are numbers and
punctuation. Moreover, delimited
UTF-8 can be parsed as ASCII
when the delimiters themselves are ASCII.

For this dataset, I was also free to ignore quotes and within-field
linebreaks, since they do not occur in the data dumps. Given these, I
was free to parse this dataset as ASCII, ie UInt8 (bytes). The
algorithms are very simple: parse a given number of characters (eg as
numbers), or stop when hitting a delimiter.

Since I ignore some fields, I also needed functionality to simply
the next byte after the delimiter) — this takes about 25–30% of the
time, compared to parsing it. The result is packaged as
ByteParsers.jl. Parsing
using ASCII turns out to be 30%–120% faster than UTF-8.

Putting it all together

I use three passes to process the data.

First pass

The first pass parses the data and writes it out in a binary format, also counting observations for each individual at the same time. Categorical data is indexed in the order of appearance and written out as an Int8, dates are written using FlexDate{Date(2000, 1, 1), Int16}, represented with 16 bits.

1. Open the gzipped files using CodecZlib.jl. Open sinks for binary data using LargeColumns.jl.

2. Read and parsed them line by line, using ByteParsers.jl. You can also use TextParse.jl.

3. Write the parsed data into the sinks, at the same time counting with a RaggedCounter from RaggedData.jl.

4. Close the streams, write the categorical values and the RaggedCounter using JLD2.jl.

The whole process takes about 90 minutes, and generates 18 GB of binary data.

Second pass

The second pass reads back the binary dump from the first pass using
mmap, and collates observations for the individuals it using a
RaggedCollate indexer from RaggedData.jl. The latter is an object
which keeps track of where observations should end up, if their counts
are consistent with the first pass. The result is written using
LargeColumns.jl into mmapped columns, and it is reasonably fast,
taking about 30–80 minutes, depending on the RAM size (the
non-contiguous collating process has to use the disk if the resulting
large vectors cannot fit in RAM). Finally, the RaggedIndex object is
written out using JLD2.

Third pass

The third pass is optional, it sorts spells by the start date for each individual (we found this helps the kind of analysis we perform). It uses the mmaped columns from the second pass, and takes about 2 minutes (since the data is accessed almost linearly).

Using the data

The columns are mmapped using LargeColumns.jl. IndirectArrays.jl is used to reconstitute categorical data with the keys previously saved, and the resulting vector is wrapped in a ragged access data structure using RaggedColumns (from RaggedData.jl) and the previously saved index. Iterating through the dataset takes about 2 minutes.

Conclusion

Ingesting and working with large amounts of data turns out to be very
simple and convenient using mmaped arrays in Julia. I packaged the
code into libraries because

1. I like to have unit test, especially if I keep benchmarking and optimizing,

2. It simplifies code and communication for colleagues I am cooperating with,

3. May be useful in future projects,

4. I find packaged code with continuous integration tests closer to
the idea of reproducible research.

I plan to register some of these libraries in the future (when the
interface stabilizes).

1. This research is supported by the Austrian National Bank Jubiläumsfonds grant #17378. [return]
2. The samples shown here are made up, the actual dataset is not public. [return]
3. Irregular and non-rectangular are also used. [return]
4. We also used a subset of the data for initial work. [return]
5. Currently needs a workaround for which I submitted a PR. [return]