Seeking better practice for sparklyr + purrr::map to iterate query over a list

I have an analysis where I've used purrr::map* to repeat the following pattern for a series of dates.

* Well, really furrr::map, but I want to discuss basic principles here.

  1. Query database for data occurring on Day X.
  2. Crunch some stats for Day X.
  3. Repeat for all dates and return the calculations by day.

Since each day's data is quite large and I'm doing this for a full year's worth of dates, the job takes a long while (8-10 hours depending on EC2 size). I'd like to use sparklyr to leverage spark to speed up the computations, which I'll explain below with code, but as I wrote it out I started to wonder if there was a better practice for this pattern.

library(tidyverse)
library(sparklyr)

analysis_calendar <- analysis_start_date %>%
  seq.Date(analysis_end_date, by = "day")

config <- spark_config()
config$`sparklyr.shell.driver-class-path`  <- "path/to/driver/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"

sc <- spark_connect(master = "local", config = config)

jdbc_read_url <- config %>%
  glue::glue_data("jdbc:redshift://{host_url}:{port}/{db_name}") %>%
  as.character()

calculations <- analysis_calendar %>%
  map(~{
    
    ith_query <- glue::glue_sql(
      "SELECT id, date, col1, col2, col3, col4
       FROM {`schema_code`}.table_5min_data
       WHERE date = {.x}",
      .con = DBI::ANSI()) %>% # Use ANSI connection for proper date quotes
      as.character()
    
    data_sdf <- sc %>%
      spark_read_jdbc(name    = "5min_data",
                    options = list(url      = jdbc_read_url,
                                   user     = creds$user_name,
                                   password = creds$password,
                                   query    = ith_query,
                                   driver   = "com.amazon.redshift.jdbc42.Driver"),
                    memory    = FALSE, 
                    overwrite = TRUE)
    
   # Just a sample computation 
    summarise(data_sdf, average = mean(col1, na.rm = TRUE)) %>% collect()
})

The code above really boils down to purrr::map(R list, ~Spark calls). It seems to work fine, but something about this construction just feels wrong to me. Is there a better way to use sparklyr/Spark to handle the repeated database querying and performing tasks on all of those data frames?

1 Like

I'm still looking for general advice here, but I've also since discovered that sparklyr supports being a backend for foreach, which is another option for iterating my code over the dates. I'm going to try it and see how that works out.

An update here, I ran into a situation with the foreach/sparklyr path which does not allow for parallel-ized queries. I've filed an issue: https://github.com/sparklyr/sparklyr/issues/2760

Maybe this ?

Distributed R

You can execute arbitrary r code across your cluster using spark_apply . For example, we can apply rgamma over iris as follows:

spark_apply(iris_tbl, function(data) {
  data[1:4] + rgamma(1,2)
})
1 Like

Can you do it all in SQL?

SELECT date, mean(col1) as average
FROM {`schema_code`}.table_5min_data
WHERE date BETWEEN stuff --replace
GROUP BY date

Otherwise, you are downloading massive amounts of data just to summarize by day.

@nirgrahamuk in this case, the data frame needs to be called from a jdbc connection, so spark apply won't work.

@Col Thanks, and forgive the simple example, I am really doing something more complex, I left the example basic to focus the attention on parallelizing the queries.

The maintainer of sparklyr agrees that doSpark needs a new function, will be tracked here https://github.com/sparklyr/sparklyr/issues/2761

I understand.

If you are able to refactor to only SQL, there are a lot of benefits. For a given year, you would be scanning through your data 365 times to filter. Even if you have an index on your date field, there's still a lot of filtering instead of addressing what it really is - a grouping problem.

FWIW, here's a simple example. Grouping is about 30x faster than filtering and building an answer.

library(dplyr, warn.conflicts = FALSE)
set.seed(123L)
n = 1e7L
n_days = 365L
DF = data.frame(ID = sample(n_days, n, replace = TRUE),
                V1 = seq(n))

head(DF)
#>    ID V1
#> 1 179  1
#> 2  14  2
#> 3 195  3
#> 4 306  4
#> 5 118  5
#> 6 299  6

system.time(
  dplyr::summarize(dplyr::group_by(DF, ID), res = mean(V1))
)
#> `summarise()` ungrouping output (override with `.groups` argument)
#>    user  system elapsed 
#>    0.67    0.05    0.72

ID_vec = DF[["ID"]]
V1_vec = DF[["V1"]]

system.time({
  uni_ID = unique(ID_vec)
  ans = numeric(length(uni_ID))
  
  for (i in seq_along(uni_ID)) {
    ans[i] = mean(V1_vec[ID_vec == ID_vec[uni_ID[i]]])
  }
})
#>    user  system elapsed 
#>   19.88    1.34   21.22

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.