Parallelizing Dplyr & Summarize

I have a large dataset ~6.2 million observations which I need to condense down by calculating the mean and quantiles for ~25k groups.

The original dplyr code looks like this and take about 25-30 s to run. (I have to run this code like 400+ times so yeah 30 seconds is a big deal)

edf_data = values_data %>% 
  select(region, year, value) %>%
  group_by(region, year) %>% 
  summarise(mean = mean(value),
            q95 = quantile(value, 0.95),
            q90 = quantile(value, 0.90),
            q75 = quantile(value, 0.75),
            q66 = quantile(value, 0.66),
            q50 = quantile(value, 0.50),
            q33 = quantile(value, 0.33),
            q25 = quantile(value, 0.25),
            q10 = quantile(value, 0.10),
            q5 = quantile(value, 0.05))

My plan was to use furrr to parallelize this call but it actually takes longer.
I first tested just setting up the formula with purr and only calculating the mean which took about the same time to run as calculating just the mean in a normal summarize format roughly 2s.

edf_data = values_data %>%
  select(region, year, value) %>% 
  group_nest(region, year) %>% 
  mutate(mean = map_dbl(data, ~mean(.x$value)))

But when I changed to future_map_dbl it now takes over a minute which is longer than the original code.

plan(multisession)

edf_data = values_data %>%
  select(region, year, value) %>% 
  group_nest(region, year) %>% 
  mutate(mean = future_map_dbl(data, ~mean(.x$value)))

I am guessing that my code was slowed down by having to shunt around 6 million rows to different threads but I don't know how to fix it. Does any one have better ideas for implementing furrr to solve a summarize problem? Or how to parallelize this code in general?

Abstractly I recognize it would be great if I could get both the calculations within a group and between groups running in parallel since they are completely independent of each other. But I have no clue how to implement that. Also I like my original dplyr set up because the output is very clean. I have seen examples of parallelization where the output which you have to reformat and combine back into the main dataset increasing the chances of bugs.

I'm pretty sure you want to declare the number of workers in the plan() function.
So your code would look like plan(multisession, workers = 3)
I would start there and see if things speed up. If there is only one worker then you are adding the overhead of setting up the futures without taking advantage of multiple threads, so your code would be slower.

I don't use furrr really and stick with future_apply but that would require some refactoring of your code.

I know you asked about parallelization, however before going down that route I would first refactor the code to leverage quantile and the ability to pass a vector of probabilities. I created a dataset that I thought mirrored yours based on your description. Using the code below, you cut down computation time to ~3 seconds, without any parallelization.

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tidyr)
library(purrr)
library(stringr)
library(glue)
library(tictoc)

# Create fake data
region = seq(1, 18000)
year = c(2021, 2022)

values_data = c(1:280) %>% 
  map_dfr(~tidyr::crossing(region, year)) %>%
  mutate(value = runif(n(), 0, 100))

dim(values_data)
#> [1] 10080000        3

# Define probs for `quantile`
my_probs = c(0.05,.1,.25,.33,.5,.66,.75,.9,.95)

# Run
tic()
(edf_data = values_data %>% 
  select(region, year, value) %>%
  group_by(region, year) %>% 
  summarise(mean = mean(value),
            quant = list(quantile(value, probs = my_probs))) %>%
  unnest_wider(quant) %>%
  rename_with(.fn = ~glue("q{str_remove(.x, \"%\")}"), .cols = ends_with("%")))
#> `summarise()` has grouped output by 'region'. You can override using the
#> `.groups` argument.
#> # A tibble: 36,000 × 12
#> # Groups:   region [18,000]
#>    region  year  mean    q5   q10   q25   q33   q50   q66   q75   q90   q95
#>     <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1      1  2021  49.0  4.94  8.88  20.1  29.0  48.3  66.1  76.9  91.5  94.6
#>  2      1  2022  51.1  6.89 11.7   25.8  32.6  51.0  67.6  75.4  89.3  95.5
#>  3      2  2021  50.9  3.82  9.19  23.0  33.7  54.5  69.3  76.5  90.0  95.9
#>  4      2  2022  51.3  5.57 11.1   26.6  35.6  53.0  67.6  77.7  89.9  94.8
#>  5      3  2021  48.4  3.54  9.21  21.4  29.8  45.1  67.7  75.6  91.0  95.1
#>  6      3  2022  50.0  4.04  8.58  24.6  32.9  50.6  67.5  74.1  91.0  93.5
#>  7      4  2021  50.1  5.14  9.72  23.3  31.3  49.7  65.7  73.8  92.9  97.0
#>  8      4  2022  52.8  4.07 10.9   29.3  37.2  54.2  69.3  77.5  92.7  96.5
#>  9      5  2021  48.2  5.62  9.75  25.0  32.3  47.2  63.9  70.1  89.0  92.1
#> 10      5  2022  49.8  6.46 11.3   26.4  34.1  49.6  66.4  73.4  88.0  92.7
#> # … with 35,990 more rows
#> # ℹ Use `print(n = ...)` to see more rows
toc()
#> 3.148 sec elapsed

Created on 2023-01-11 by the reprex package (v2.0.1)

1 Like

I'd like to add that depending on your specific data, if you are running these computations X times it may make sense to sort your data beforehand since quantile spends time sorting (which is part of the reason your original code took much longer).

values_data %>% dplyr::arrange(region, year, value)

For a dataset of your size, you'll see minimal speed improvements pre-sorting. However if your data grow larger, this may be a worthwhile approach

You can also make a helper that returns a 1 row tibble containing the quantiles, and then rely on the fact that summarise() will automatically unpack unnamed data frame results.

I definitely don't think you need parallelism here.

library(dplyr)
library(tidyr)
library(purrr)

region = seq(1, 18000)
year = c(2021, 2022)

values_data = c(1:280) %>% 
  map_dfr(~crossing(region, year)) %>%
  mutate(value = runif(n(), 0, 100))

df_quantile <- function(x, probs, names) {
  out <- quantile(x = x, probs = probs, names = FALSE)
  out <- as.list(out)
  names(out) <- names
  tibble::new_tibble(out, nrow = 1L)
}

my_probs <- c(0.05,.1,.25,.33,.5,.66,.75,.9,.95)
my_names <- paste0("q", my_probs * 100)

# One row result
df_quantile(1:5, my_probs, my_names)
#> # A tibble: 1 × 9
#>      q5   q10   q25   q33   q50   q66   q75   q90   q95
#>   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#> 1   1.2   1.4     2  2.32     3  3.64     4   4.6   4.8

tictoc::tic()

values_data %>% 
  select(region, year, value) %>%
  group_by(region, year) %>%
  summarise(
    mean = mean(value), 
    df_quantile(value, my_probs, my_names), 
    .groups = "drop"
  )
#> # A tibble: 36,000 × 12
#>    region  year  mean    q5   q10   q25   q33   q50   q66   q75   q90   q95
#>     <int> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
#>  1      1  2021  50.0  4.95  9.10  26.8  33.7  47.8  67.1  73.8  88.2  92.3
#>  2      1  2022  51.6  6.22 13.3   27.4  37.4  52.6  67.1  73.9  88.5  92.1
#>  3      2  2021  50.5  5.66  9.11  24.7  32.1  51.6  66.0  75.4  90.3  95.4
#>  4      2  2022  51.5  6.54 12.9   27.1  34.5  50.5  69.0  77.6  90.9  95.9
#>  5      3  2021  52.2  6.20 10.8   27.1  35.6  55.3  70.8  76.7  90.8  96.0
#>  6      3  2022  48.9  6.29 10.0   23.1  33.2  49.0  63.7  72.1  91.0  95.9
#>  7      4  2021  53.1  8.07 13.1   28.0  36.3  55.4  69.8  76.9  90.4  94.6
#>  8      4  2022  53.1  4.91  9.90  28.9  36.0  55.6  73.6  79.1  92.5  96.8
#>  9      5  2021  48.3  4.63 11.7   25.1  29.5  45.3  64.5  72.7  88.4  93.9
#> 10      5  2022  50.1  4.85  9.14  25.2  34.0  51.6  65.0  74.8  89.6  94.9
#> # … with 35,990 more rows

tictoc::toc()
#> 5.008 sec elapsed

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.