Run predict/augment in parallel

I have a situation where I have fit many multiple models, all with different outcome variables, and I need to score (generate predictions for) anywhere between 5M and 180M additional records at a time. When I have 20+ models and that many predictions, the scoring stage is slow and I'm interested in running either predict or augment in parallel to speed things up. Here's a reproducible example which is similar:

Skip the setup!
library(tidyverse)
library(nycflights13)
library(tidymodels)
library(workflows)
‚Äč‚Äč
flight_data <- flights %>% 
  mutate(
    # Convert the arrival delay to a factor
    arr_delay = ifelse(arr_delay >= 30, "late", "on_time"),
    arr_delay = factor(arr_delay),
    is_delta  = ifelse(carrier %in% "DL", "delta", "not_delta"),
    is_delta  = factor(is_delta),
    # We will use the date (not date-time) in the recipe below
    date = lubridate::as_date(time_hour)
  ) %>% 
  # Include the weather data
  inner_join(weather, by = c("origin", "time_hour")) %>% 
  # Only retain the specific columns we will use
  select(arr_delay, is_delta, dep_time, flight, origin, dest, air_time, distance,
         carrier, date, time_hour, wind_dir, wind_speed) %>% 
  # Exclude missing data
  na.omit()
‚Äč
set.seed(222)
‚Äč
# Skim off  two ~6,000k samples for training two models
‚Äč
data_splits <- flight_data %>%
  bind_rows(flight_data, .id = "group_id") %>%
  group_by(group_id) %>%
  group_split() %>%
  map2(.y = list(c("arr_delay", "distance", "carrier", "wind_speed", "flight"),
                 c("is_delta",  "distance",    "dest", "wind_speed", "flight")), 
       .f = ~select(.x, all_of(.y))) %>%
  map2(.y = c("arr_delay", "is_delta"), 
       .f = ~initial_split(.x, prop = 0.02, strata = all_of(.y)))
‚Äč
train_data  <- map(data_splits, ~training(.x))
‚Äč
recipe_lst <- train_data %>%
  map2(.y = c("arr_delay", "is_delta"), 
       .f = ~recipes::recipe(.x) %>%
        recipes::update_role(everything(), new_role = "predictor") %>%
        # Specify which column(s) is/are the targets/outcomes
        recipes::update_role(all_of(.y), new_role = "outcome") %>%
        recipes::update_role(flight, new_role = "id") %>%
        # Turn strings into factor variables
        recipes::step_string2factor(recipes::all_nominal_predictors(), -recipes::has_role(match = "id")) %>%
        recipes::step_novel(recipes::all_nominal_predictors()) %>%
        recipes::step_other(recipes::all_nominal_predictors(), threshold = 0.03)
)
‚Äč
rf_spec <- rand_forest(trees = 500) %>%
  set_engine("ranger", importance = "impurity", num.threads = 4) %>%
  set_mode("classification")
‚Äč
workflow_fits <- recipe_lst %>%
  map(~workflows::workflow(spec = rf_spec) %>% add_recipe(.x)) %>%
  map2(train_data, ~fit(.x, data = .y))

# Stack the original flights data to simulate scoring millions of rows

flight_data_larger <- flight_data %>% 
  bind_rows(flight_data, flight_data, flight_data, flight_data, flight_data)

# Score models sequentially (my current process) 

tictoc::tic()
scores <- workflow_fits %>%
  map(~extract_fit_parsnip(.x)) %>%
  map(~generics::augment(.x, new_data = flight_data_larger, type = "prob")) %>%
  map(~select(.x, flight, .pred_class))
tictoc::toc() # ~372 seconds‚Äč

‚Äč# Try scoring in parallel over the models with one data frame

library(furrr)

plan(multisession, workers = 3)

tictoc::tic()
scores_parallel <- workflow_fits %>%
  furrr::future_map(
    ~workflows::extract_fit_parsnip(.x) %>%
      generics::augment(new_data = flight_data_larger, type = "prob"), 
    .options = furrr_options(seed = TRUE))
tictoc::toc()

# Attempt 2, break the data frame into chunks 

large_data_by_group <- flight_data_larger %>%
  group_by(origin) %>%
  group_split() %>%
  purrr::cross2(.y = workflow_fits)

tictoc::tic()
scores_parallel <- large_data_by_group %>%
  furrr::future_map(.f = function(x){
    
    x %>%
      magrittr::extract2(2) %>%
      workflows::extract_fit_parsnip() %>%
      generics::augment(new_data = magrittr::extract2(x, 1), 
                        type = "prob")
    
  }, .options = furrr_options(seed = TRUE))
tictoc::toc() # ~404 seconds‚Äč

I'm not sure why either attempt is not faster here. Maybe someone else with more future/parallel experience might be able to suggest a better way to frame the solution. In my real data I actually do see a speed increase with the first parallel construction, but it is only about a 1/3 faster than the sequential solution, which made me want to try splitting the data into chunks. The chunked solution on my real data was actually slower.

Thoughts anyone?

(tagging @davis in hopes he sees this too)

1 Like

I think the first thing to note is that this is probably not doing what you want:

~workflows::extract_fit_parsnip(.x) %>%
      generics::augment(new_data = flight_data_larger, type = "prob")

By dropping the workflow, that means you also drop the preprocessor (the recipe), so when you run augment() on flight_data_larger it isn't running the preprocessing steps of the recipe on that flight_data_larger dataset.


Beyond that, there are generally two things that can hurt you.

  1. Sending large amounts of data to and from the workers. You are sending some fairly large data frames and model objects to and from the workers, keep in mind that serializing and unserializing these things take time.

  2. Parallel within parallel issues. I'm not sure about you, but I have 6 physical cores (parallel::detectCores(logical = FALSE)), so at most I'd want to split my work over 6 workers. You have 2 fits in the example above, BUT the predict() method of ranger ALSO runs in parallel. See the num.threads argument of ?ranger:::predict.ranger. It defaults to the number of CPUs you have, which I'm guessing would be 6 for me (unless they decide to include logical cores in which case it would be 12). This means at a minimum I am trying to do 12 cores worth of work (2 * 6) across 6 cores, and there is just no way that can be done. Our low level hardware is smart enough to not crash, but we won't see any performance increases.

I think the second point is the real issue. Basically, ranger is already running in parallel and is using all available compute resources on your computer, so trying to add more parallelism on top of that is a moot point.

Hey @davis, thank you so much for the quick reply. Good catch on the extract_fit_parsnip(), I tacked that onto my example, because I was getting complaints from augmenting the workflows about NA values, which did not make immediate sense-- I get rid of those with the na.omit() step above. In the end, the model results were just a way to facilitate my parallel predict quandary.

Your thoughts on the parallel aspects of my example are insightful. For my real use case then I will first investigate that num.threads of predict.ranger is using all of my CPUs. I do know that parsnip sets num.threads to 1 by default in the fit method. If that doesn't help, I suppose my next step would be to figure out if there is a way to send the data to and from the workers without having to go through the serializations.

Follow on-question - is the parallelization engine-specific? Or is this picked up by augment()? From the comments it sounds like the former, but thought I might clarify

This is exactly what I am now trying to figure out. You can see here this is where the num.threads argument for fits with ranger is set to 1 via tidymodels. I do not see the same in the predict section.

1 Like

I was able to confirm via htop that all 8 of my cores are used during the augment stage. I will abandon trying to use furrr for this task now and proceed by optimizing the number of cores used by the server. Many thanks @davis !

Hello! I'd like to inquire if I correctly understood the solution.

Does specifying the num.threads argument mean that predicting with a workflow uses the specified number of cores?

Thank you!

# Specify model
model_spec <-
  parsnip::rand_forest(
    mtry = tune::tune(),
    min_n = tune::tune(),
    trees = tune::tune()
  ) %>%
  parsnip::set_mode("classification") %>%
  parsnip::set_engine(
    engine = "ranger",
    num.threads = 12,
    seed = 123,
    importance = "permutation"
  )

To fit the model with more than one core, specify num.threads as you have done.

For prediction, tidymodels does not alter predict.ranger which uses all cores by default. Nothing to specify!

Awesome! Thank you, Jake.
That helps clarifies Davis' bullet point number 2 as well.

PS. Great work all, tidymodels is fantastic.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.