Author Archives: Blog by Bogumił Kamiński

DataFrames.jl user’s corner: filtering performance

By: Blog by Bogumił Kamiński

Re-posted from: https://bkamins.github.io/julialang/2021/08/20/userscorner.html

Introduction

Today I write about a topic that was proposed by DataFrames.jl user.
The question was related the performance of the filter function. I hope
this will be useful to people who start using Julia and DataFrames.jl.

In this post I will show several ways to perform the same row sub-setting
operation using the filter function on a data frame and compare their performance.

The post was written under Julia 1.6.1 and DataFrames.jl 1.2.2.

The filter performance test

Assume that we have some data on financial transactions. The data frame
storing them has two columns: :from and :to storing account numbers between
which the money transfer was performed.

Let us first create some random data frame with this structure and 100,000 rows:

julia> using DataFrames

julia> using Random

julia> Random.seed!(1);

julia> transfers = DataFrame(from=rand(1:10^5, 10^5), to=rand(1:10^5, 10^5))
100000×2 DataFrame
    Row │ from   to
        │ Int64  Int64
────────┼──────────────
      1 │ 65190  19016
      2 │ 84951  66344
   ⋮    │   ⋮      ⋮
  99999 │ 21480  54671
 100000 │ 86787  30151
     99996 rows omitted

Now the task is to find all transactions from accounts that received some
transfer to them.

Here is a first attempt to do it (I am showing everywhere the timing of a second
run of a specific filter operation, so the compilation time is this related
to only this specific call; also I am showing the output of the call to makes
sure that the result is the same):

julia> @time filter(x -> x.from in transfers.to, transfers)
  3.627735 seconds (621.32 k allocations: 23.318 MiB, 3.03% compilation time)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

The code above is the most natural filter call following the Julia Base syntax.
However, we can read in the documentation of filter in DataFrames.jl that
it is faster to pass the column we want to operate on (in this case :filter)
using the => syntax. In this way the operation can be made more efficient:

julia> @time filter(:from => in(transfers.to), transfers)
  2.543524 seconds (28 allocations: 1.463 MiB)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

As you can see it is somewhat faster. There is no compilation as in(transfers.to)
has to be compiled only once, see:

julia> in(transfers.to) # this creates a callable type
(::Base.Fix2{typeof(in), Vector{Int64}}) (generic function with 1 method)

and we have a small number of allocations as the code is type stable.

Note that the code below uses the same idea:

julia> @time filter(:from => x -> x in transfers.to, transfers)
  2.663027 seconds (458.82 k allocations: 22.803 MiB, 5.28% compilation time)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

but the difference is that as opposed to in(transfers.to) the the code
x -> x in transfers.to creates a new anonymous function each time it is called,
which triggers compilation.

Can we do better? An experienced Julia programmer willre immediately comment
that it would be more efficient to perform a lookup in a Set and not in a Vector.

Therefore our next attempt is:

julia> @time filter(:from => in(Set(transfers.to)), transfers)
  0.007884 seconds (36 allocations: 3.714 MiB)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

This time it is super fast. Let us dissect the timing:

julia> @time s = Set(transfers.to)
  0.004668 seconds (8 allocations: 2.251 MiB)
Set{Int64} with 63246 elements:
  92533
  76914
  45120
  1703
  37100
  ⋮

julia> @time filter(:from => in(s), transfers) # remember it is a second run timing
  0.004394 seconds (28 allocations: 1.463 MiB)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

and we can see that roughly similar time is spent in constructing of the Set
as in the filtering later.

Note, however, that it is essential that we use the in(Set(transfers.to))
construct, as it is evaluated only once.

Here is what happens if we tried to use the Set in our original filter call:

julia> @time filter(x -> x.from in Set(transfers.to), transfers)
272.379325 seconds (1.82 M allocations: 219.832 GiB, 1.12% gc time, 0.08% compilation time)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

This is very bad, as we call Set on each row of our transfers data set.
Let us try reusing s variable we have created above:

julia> @time filter(x -> x.from in s, transfers)
  0.140256 seconds (620.53 k allocations: 23.268 MiB, 90.38% compilation time)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

Now it is already reasonably good. As a final test we switch s to be a constant:

julia> const cs = s;

julia> @time filter(x -> x.from in cs, transfers) # remember it is a second run timing
  0.122081 seconds (619.03 k allocations: 23.060 MiB, 86.92% compilation time)
63154×2 DataFrame
   Row │ from   to
       │ Int64  Int64
───────┼──────────────
     1 │ 84951  66344
     2 │ 11514   5399
   ⋮   │   ⋮      ⋮
 63153 │ 91605  34813
 63154 │ 86787  30151
    63150 rows omitted

And we see that it is a bit better, but not much, as we are in type unstable mode anyway.

Conclusions

I would summarize the key general observations as follows:

  • when using in it is recommended to pass a Set as a collection in which
    we perform test if the test is executed many times;
  • the filter(predicate, data_frame) style is type unstable, and typically a faster
    alternative is filter(column => predicate, data_frame);
  • in(collection) a callable object that can be used later to test
    for inclusion of its argument in collection; as a side benefit it is compiled
    only once per type of collection which reduces compilation latency;
  • one needs to understand how Julia would execute code; following general
    recommendations blindly as in the filter(x -> x.from in Set(transfers.to), transfers)
    example can lead to extremely bad performance.

The state of multiple threading in DataFrames.jl

By: Blog by Bogumił Kamiński

Re-posted from: https://bkamins.github.io/julialang/2021/08/13/threading.html

Introduction

One of the most frequent advanced questions related to DataFrames.jl
I get is about its support for multi-threaded operations.

In order to summarize the state of this topic in the manual of the
DataFrames.jl we will soon merge the #2823 PR that hopefully
clarifies this issue. This post is meant to accompany this PR with some examples
showing the performance of selected operations when multiple threads are used.

The post is divided into two sections. In the first one I describe cases in
which multi threading is used automatically. In the second I show the most
simple pattern of how the user can invoke multiple threads manually when using
the DataFrames.jl package.

In all comparisons I will show the timing of exactly the same operations when
one uses 1 thread vs 4 threads as this is something that I can comfortably test
on my laptop.

The post was tested under Julia 1.6.1, DataFrames.jl 1.2.2,
and BenchmarkTools.jl 1.1.1.

Automatic multi threading

In general, DataFrames.jl uses multiple threading automatically in selected
operations, provided that the data that we work with is large enough (for small
data usually the cost of managing multiple threads is too large to bring benefits).

In the comparison I show all scenarios that are listed in the #2823 PR on
a data frame that is large enough.

First start with a single thread (I started Julia just by writing julia in my
terminal):

julia> Threads.nthreads()
1

julia> using DataFrames

julia> using BenchmarkTools

julia> df = DataFrame(reshape(1:50_000_000, :, 25), :auto);

julia> df.id1 = repeat(1:10_000, 200);

julia> df.id2 = string.(df.id1);

julia> summary(df)
"2000000×27 DataFrame"

julia> @btime copy($df); # constructor
  36.997 ms (88 allocations: 411.99 MiB)

julia> @btime $df[1:2:end, :]; # indexing
  45.684 ms (117 allocations: 206.00 MiB)

julia> @btime groupby($df, :id1); # grouping on refs
  3.537 ms (109 allocations: 15.27 MiB)

julia> @btime groupby($df, :id2); # grouping on hash
  48.679 ms (61 allocations: 62.52 MiB)

julia> @btime innerjoin($df, $df, on=:x1, makeunique=true); # joining
  270.209 ms (937 allocations: 839.28 MiB)

julia> gdf = groupby(df, :id1);

julia> @btime select($gdf, names($df, r"x") .=> x -> x .^ 2); # many transformations
  965.715 ms (1740851 allocations: 2.01 GiB)

julia> function f(x)
           s = zero(eltype(x))
           for _ in 1:1000, v in x
               s += v
           end
           return s
       end
f (generic function with 1 method)

julia> @btime combine($gdf, :x1 => f); # complex reduction
  1.513 s (339 allocations: 252.42 KiB)

And now the same exampls using 4 threads (start Julia with julia -t 4):

julia> Threads.nthreads()
4

julia> using DataFrames

julia> using BenchmarkTools

julia> df = DataFrame(reshape(1:50_000_000, :, 25), :auto);

julia> df.id1 = repeat(1:10_000, 200);

julia> df.id2 = string.(df.id1);

julia> summary(df)
"2000000×27 DataFrame"

julia> @btime copy($df); # constructor
  34.748 ms (265 allocations: 412.01 MiB)

julia> @btime $df[1:2:end, :]; # indexing
  34.026 ms (294 allocations: 206.01 MiB)

julia> @btime groupby($df, :id1); # grouping on refs
  3.024 ms (145 allocations: 15.30 MiB)

julia> @btime groupby($df, :id2); # grouping on hash
  43.516 ms (75 allocations: 62.52 MiB)

julia> @btime innerjoin($df, $df, on=:x1, makeunique=true); # joining
  194.656 ms (1297 allocations: 839.31 MiB)

julia> gdf = groupby(df, :id1);

julia> @btime select($gdf, names($df, r"x") .=> x -> x .^ 2); # many transformations
  419.619 ms (1740902 allocations: 2.01 GiB)

julia> function f(x)
           s = zero(eltype(x))
           for _ in 1:1000, v in x
               s += v
           end
           return s
       end
f (generic function with 1 method)

julia> @btime combine($gdf, :x1 => f); # complex reduction
  542.469 ms (388 allocations: 254.67 KiB)

As you can see for copy there is almost no benefit on a laptop as the
operation is memory bound. However, potentially on some machines with a
different hardware the gain might be bigger.

On the other end — the most noticeable benefits are for the two last examples
where we perform transformations of GroupedDataFrame. This is understandable,
as in this case I have chosen operations that require computation (and not only
data movement).

Finally note that in the last case (complex reduction), since combine uses
multiple threads it is assumed that the transformation function is thread safe.

As a warning let me give an example of a “bad” transformation function:

julia> g = Int[];

julia> res = combine(gdf, :x1 => (x -> (t = Threads.threadid(); push!(g, t); t)) => :tid);

julia> combine(groupby(res, :tid), nrow)
4×2 DataFrame
 Row │ tid    nrow
     │ Int64  Int64
─────┼──────────────
   1 │     1   2500
   2 │     2   2500
   3 │     3   2500
   4 │     4   2500

julia> combine(groupby(DataFrame(tid=g), :tid), nrow)
5×2 DataFrame
 Row │ tid    nrow
     │ Int64  Int64
─────┼──────────────
   1 │     0      1
   2 │     1   2333
   3 │     2   2443
   4 │     3   2369
   5 │     4   2498

And we can see that we get bad result in the global g variable because the
push! operation is not thread safe.

To stress the problem let me give two more similar examples which do
not produce a correct result either:

julia> g = Int[];

julia> res = combine(gdf, :x1 => (x -> (Threads.threadid(), push!(g, Threads.threadid())[end])) => :tid);

julia> combine(groupby(res, :tid), nrow)
6×2 DataFrame
 Row │ tid     nrow
     │ Tuple…  Int64
─────┼───────────────
   1 │ (1, 1)   2501
   2 │ (2, 3)     41
   3 │ (2, 2)   2459
   4 │ (4, 4)   2500
   5 │ (3, 3)   2466
   6 │ (3, 2)     33

julia> g = Int[];

julia> combine(gdf, :x1 => (x -> push!(g, Threads.threadid())[end]) => :tid);
ERROR: BoundsError: attempt to access 8022-element Vector{Int64} at index [4342]

Manual multi threading

If you want to run a custom code using multi threading you should follow the
recommendation given in the #2823 PR, which is: objects in DataFrames.jl
are safe to use when using multi threading for reading but they are not safe
for writing.

Let me give a simple example of how one would typically parallelize computations
(I use the simplest parallelization pattern here). It will be a simulation of
Buffon’s needle problem.

We start with a single threaded scenario:

julia> using DataFrames

julia> Threads.nthreads()
1

julia> function toss_needle(t, l)
           x = rand() * t / 2
           θ = rand() * π / 2
           return x < sin(θ) * l / 2
       end
toss_needle (generic function with 1 method)

julia> function sim_cross(t, l, reps)
           c = 0
           for _ in 1:reps
               c += toss_needle(t, l)
           end
           return c / reps
       end
sim_cross (generic function with 1 method)

julia> function analytical_cross(t, l)
           if t > l
               return 2 * l / (t * π)
           else
               return acos(t / l) * 2 / π + 2 * l * (1 - sqrt(1 - (t / l) ^ 2)) / (t * π)
           end
       end
analytical_cross (generic function with 1 method)

julia> function run_sim(vt, vl)
           p = Vector{Float64}(undef, length(vt))
           for i in eachindex(vt, vl)
               p[i] = sim_cross(vt[i], vl[i], 10^7)
           end
           return p
       end
run_sim (generic function with 1 method)

julia> run_sim([1.0], [1.0])
1-element Vector{Float64}:
 0.6367981

julia> 2 / π # double check if the result is correct
0.6366197723675814

julia> df = DataFrame(t = repeat(0.1:0.1:1.0, inner=9), l = repeat(0.1:0.1:1.0, outer=9))
90×2 DataFrame
 Row │ t        l
     │ Float64  Float64
─────┼──────────────────
   1 │     0.1      0.1
   2 │     0.1      0.2
   3 │     0.1      0.3
  ⋮  │    ⋮        ⋮
  88 │     1.0      0.8
  89 │     1.0      0.9
  90 │     1.0      1.0
         84 rows omitted

julia> transform!(df, [:t, :l] => ByRow(analytical_cross) => :prob_a)
90×3 DataFrame
 Row │ t        l        prob_a
     │ Float64  Float64  Float64
─────┼────────────────────────────
   1 │     0.1      0.1  0.63662
   2 │     0.1      0.2  0.837248
   3 │     0.1      0.3  0.89288
  ⋮  │    ⋮        ⋮        ⋮
  88 │     1.0      0.8  0.509296
  89 │     1.0      0.9  0.572958
  90 │     1.0      1.0  0.63662
                   84 rows omitted

julia> @time transform!(df, [:t, :l] => run_sim => :prob_s)
 27.173996 seconds (3.67 k allocations: 235.219 KiB, 0.03% compilation time)
90×4 DataFrame
 Row │ t        l        prob_a    prob_s
     │ Float64  Float64  Float64   Float64
─────┼──────────────────────────────────────
   1 │     0.1      0.1  0.63662   0.63651
   2 │     0.1      0.2  0.837248  0.83734
   3 │     0.1      0.3  0.89288   0.892879
  ⋮  │    ⋮        ⋮        ⋮         ⋮
  88 │     1.0      0.8  0.509296  0.509203
  89 │     1.0      0.9  0.572958  0.57293
  90 │     1.0      1.0  0.63662   0.636605
                             84 rows omitted

julia> extrema( df.prob_s ./ df.prob_a .- 1.0)
(-0.002207239593510546, 0.0005603464546692916)

Now we will use 4 threads. Note that I add Threads.@spawn and @sync in the
run_sim function.

julia> using DataFrames

julia> Threads.nthreads()
4

julia> function toss_needle(t, l)
           x = rand() * t / 2
           θ = rand() * π / 2
           return x < sin(θ) * l / 2
       end
toss_needle (generic function with 1 method)

julia> function sim_cross(t, l, reps)
           c = 0
           for _ in 1:reps
               c += toss_needle(t, l)
           end
           return c / reps
       end
sim_cross (generic function with 1 method)

julia> function analytical_cross(t, l)
           if t > l
               return 2 * l / (t * π)
           else
               return acos(t / l) * 2 / π + 2 * l * (1 - sqrt(1 - (t / l) ^ 2)) / (t * π)
           end
       end
analytical_cross (generic function with 1 method)

julia> function run_sim(vt, vl)
           p = Vector{Float64}(undef, length(vt))
           @sync for i in eachindex(vt, vl)
               Threads.@spawn p[i] = sim_cross(vt[i], vl[i], 10^7)
           end
           return p
       end
run_sim (generic function with 1 method)

julia> run_sim([1.0], [1.0])
1-element Vector{Float64}:
 0.6366706

julia> 2 / π # double check if the result is correct
0.6366197723675814

julia> df = DataFrame(t = repeat(0.1:0.1:1.0, inner=9), l = repeat(0.1:0.1:1.0, outer=9))
90×2 DataFrame
 Row │ t        l
     │ Float64  Float64
─────┼──────────────────
   1 │     0.1      0.1
   2 │     0.1      0.2
   3 │     0.1      0.3
  ⋮  │    ⋮        ⋮
  89 │     1.0      0.9
  90 │     1.0      1.0
         85 rows omitted

julia> transform!(df, [:t, :l] => ByRow(analytical_cross) => :prob_a)
90×3 DataFrame
 Row │ t        l        prob_a
     │ Float64  Float64  Float64
─────┼────────────────────────────
   1 │     0.1      0.1  0.63662
   2 │     0.1      0.2  0.837248
   3 │     0.1      0.3  0.89288
  ⋮  │    ⋮        ⋮        ⋮
  89 │     1.0      0.9  0.572958
  90 │     1.0      1.0  0.63662
                   85 rows omitted

julia> @time transform!(df, [:t, :l] => run_sim => :prob_s)
  7.612572 seconds (4.35 k allocations: 341.516 KiB, 0.11% compilation time)
90×4 DataFrame
 Row │ t        l        prob_a    prob_s
     │ Float64  Float64  Float64   Float64
─────┼──────────────────────────────────────
   1 │     0.1      0.1  0.63662   0.636561
   2 │     0.1      0.2  0.837248  0.83723
   3 │     0.1      0.3  0.89288   0.892781
  ⋮  │    ⋮        ⋮        ⋮         ⋮
  89 │     1.0      0.9  0.572958  0.573051
  90 │     1.0      1.0  0.63662   0.636735
                             85 rows omitted

julia> extrema( df.prob_s ./ df.prob_a .- 1.0)
(-0.0017633325515583609, 0.0010282866804216528)

As you can see since the operation we do is CPU intensive we have a noticeable
performance boost when using multiple threads.

Conclusions

In summary when you consider using multiple threads with DataFrames.jl
remember that:

  • not all operations will get a big boost when using multiple threads; this
    is especially true when the task you want to perform is memory bound;
  • some operations in DataFrames.jl use multiple threads automatically (and
    you can expect that in the future the support of multi threading will grow);
    when this is the case and you use a custom transformation function make sure
    it is thread safe;
  • you can safely use standard multi threading constructs offered by Julia;
    however, remember that types offered by DataFrames.jl are not
    thread safe for writing.

Summer break puzzle

By: Blog by Bogumił Kamiński

Re-posted from: https://bkamins.github.io/julialang/2021/08/06/juliacon2021.html

Introduction

After an amazing JuliaCon 2021 this week I thought to cover some
lightweight topic in my post.

An interesting question I recently was asked is to find \(x\in\mathbf{R}^+\cup\{0\}\)
that maximizes \(x^{1/x}\) (assuming that in \(0\) the value of the function is
established by the limit as \(x\to0^+\)).

As usual I will want to use Julia to get some
insight into this problem before giving it a try.

This post was tested under Julia 1.6.1, Plots.jl 1.20.0, Optim.jl 1.4.1,
and Symbolics.jl 2.0.

Plotting

We start with plotting of the function we want to investigate:

julia> using Plots

julia> plot(x -> x^(1/x), 0, 10, legend=nothing,
            xlab=raw"$x$", ylab=raw"$x^{1/x}$")

x^(1/x) plot

It seems that the maximum is achieved around \(2.5\), but we want to get more
a detailed answer.

Optimizing

Now we switch to optimization to find the value that maximizes our function
numerically. Note that I optimize x -> -x^(1/x) (negated), as optim by
default is looking for a minimum of a function.

julia> using Optim

julia> optimize(x -> -x^(1/x), 0, 10)
Results of Optimization Algorithm
 * Algorithm: Brent's Method
 * Search Interval: [0.000000, 10.000000]
 * Minimizer: 2.718282e+00
 * Minimum: -1.444668e+00
 * Iterations: 13
 * Convergence: max(|x - x_upper|, |x - x_lower|) <= 2*(1.5e-08*|x|+2.2e-16): true
 * Objective Function Calls: 14

The optimal solution looks to be equal to \(e\). Let us verify this using
calculus.

Solving

Now we check if the derivative of our function is equal to zero for \(e\) as
an argument.

julia> using Symbolics

julia> @variables x
1-element Vector{Num}:
 x

julia> ex = Differential(x)(x^(1/x)) |> expand_derivatives |> simplify
x^(x^-1 - 2) - (log(x)*(x^(x^-1 - 2)))

julia> substitute(ex, x => MathConstants.e)
0.0

From the form of ex we also see that \(e\) is the only zero of our derivative
for positive values of \(x\).

Now I am convinced enough about the result to try proving it using some elementary
arguments.

Pen and paper

Note that:

\[x^{1/x} = e^{\log(x)/x}\]

As exponential function is increasing it is enough to analyze \(\log(x)/x\),
which hopefully will be simpler. Substitute \(x = e^y\), where \(x\to0^+\)
as \(y\to-\infty\). With this substitution we get that we need to analyze:

\[\frac{y}{e^y}.\]

Now notice that as \(y\to-\infty\) the fraction tends to \(-\infty\), so our
original expression \(x^{1/x}\) tends to \(0^+\) as \(x\to0^+\).
Similarly if \(y\to+\infty\) the fraction tends to \(0^+\), so
\(x^{1/x}\) tends to \(1\) as \(x\to+\infty\).

Having done this warm up task let us ask what \(y\) maximizes \(y/e^y\). From
our earlier analyses we postulate that it is maximized for \(y=1\), so we
want to show that:

\[\frac{y}{e^y} \leq \frac{1}{e}\]

or, equivalently, substituting \(z=y-1\):

\[1+z \leq e^z.\]

This is a well known consequence of Bernoulli’s inequality and the
equality holds for \(z=0\), which translates to \(x=e\).

Conclusions

I hope you enjoyed the post. Next week I will be back with DataFrames.jl
related topics, so stay tuned.