Way to faster R-code using parallel computation

I am working on my R-code to analyze my datasets that have ~500 stations.
A station took me almost 2 hours to complete. I tried to avoid loops which I am not able to come up with alternative ways. So, if I could run it 500 times at the same time, I can finish it in ~2 hours using high-performance computing clusters (HPC) ( hypothetically).
Is this what parallel computing means? If so, how could I proceed with the following code?

For example: This code extract all January dataset for station1. Likewise, I want to run for 500 stations in the same way I mentioned above.

id <- "146Ng_b6wCcVNAVGwpej68yAkguZPHBak"
data=read_csv(paste0("https://docs.google.com/uc?id=",id,"&export=download"),
              col_names = TRUE)
data=data %>% 
  mutate(month=month(datetime))
head(data)
data_month=function(mon){
  lag01=data %>% 
    filter(month==mon)
  return(lag01)
}
jan=data_month(mon=1)
suppressPackageStartupMessages({
  library(lubridate)
  library(magrittr)
  library(tictoc)
})

id <- "146Ng_b6wCcVNAVGwpej68yAkguZPHBak"

input <- readr::read_csv(paste0("https://docs.google.com/uc?id=", id, "&export=download"),
  col_names = TRUE
)
#> 
#> ── Column specification ────────────────────────────────────────────────────────
#> cols(
#>   datetime = col_datetime(format = ""),
#>   rain = col_double()
#> )


# subset month == 9

tic()
input[which(month(input[,1][[1]]) == 9),]
#> # A tibble: 14,117 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1976-09-03 22:45:00  0   
#>  2 1976-09-03 23:00:00  2.54
#>  3 1976-09-03 23:15:00  5.08
#>  4 1976-09-03 23:30:00  0   
#>  5 1976-09-03 23:45:00  0   
#>  6 1976-09-04 00:00:00  0   
#>  7 1976-09-04 00:15:00  0   
#>  8 1976-09-04 00:30:00  0   
#>  9 1976-09-04 00:45:00  0   
#> 10 1976-09-04 01:00:00  0   
#> # … with 14,107 more rows
toc()
#> 0.033 sec elapsed

# map over all months

tic()
seq(1:12) %>% purrr::map( ~ input[which(month(input[,1][[1]]) == .x),])
#> [[1]]
#> # A tibble: 14,880 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-01-01 00:00:00     0
#>  2 1977-01-01 00:15:00     0
#>  3 1977-01-01 00:30:00     0
#>  4 1977-01-01 00:45:00     0
#>  5 1977-01-01 01:00:00     0
#>  6 1977-01-01 01:15:00     0
#>  7 1977-01-01 01:30:00     0
#>  8 1977-01-01 01:45:00     0
#>  9 1977-01-01 02:00:00     0
#> 10 1977-01-01 02:15:00     0
#> # … with 14,870 more rows
#> 
#> [[2]]
#> # A tibble: 13,536 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-02-01 00:00:00     0
#>  2 1977-02-01 00:15:00     0
#>  3 1977-02-01 00:30:00     0
#>  4 1977-02-01 00:45:00     0
#>  5 1977-02-01 01:00:00     0
#>  6 1977-02-01 01:15:00     0
#>  7 1977-02-01 01:30:00     0
#>  8 1977-02-01 01:45:00     0
#>  9 1977-02-01 02:00:00     0
#> 10 1977-02-01 02:15:00     0
#> # … with 13,526 more rows
#> 
#> [[3]]
#> # A tibble: 14,880 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-03-01 00:00:00     0
#>  2 1977-03-01 00:15:00     0
#>  3 1977-03-01 00:30:00     0
#>  4 1977-03-01 00:45:00     0
#>  5 1977-03-01 01:00:00     0
#>  6 1977-03-01 01:15:00     0
#>  7 1977-03-01 01:30:00     0
#>  8 1977-03-01 01:45:00     0
#>  9 1977-03-01 02:00:00     0
#> 10 1977-03-01 02:15:00     0
#> # … with 14,870 more rows
#> 
#> [[4]]
#> # A tibble: 12,738 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-04-01 00:00:00     0
#>  2 1977-04-01 00:15:00     0
#>  3 1977-04-01 00:30:00     0
#>  4 1977-04-01 00:45:00     0
#>  5 1977-04-01 01:00:00     0
#>  6 1977-04-01 01:15:00     0
#>  7 1977-04-01 01:30:00     0
#>  8 1977-04-01 01:45:00     0
#>  9 1977-04-01 02:00:00     0
#> 10 1977-04-01 02:15:00     0
#> # … with 12,728 more rows
#> 
#> [[5]]
#> # A tibble: 11,904 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-05-01 00:00:00     0
#>  2 1977-05-01 00:15:00     0
#>  3 1977-05-01 00:30:00     0
#>  4 1977-05-01 00:45:00     0
#>  5 1977-05-01 01:00:00     0
#>  6 1977-05-01 01:15:00     0
#>  7 1977-05-01 01:30:00     0
#>  8 1977-05-01 01:45:00     0
#>  9 1977-05-01 02:00:00     0
#> 10 1977-05-01 02:15:00     0
#> # … with 11,894 more rows
#> 
#> [[6]]
#> # A tibble: 11,520 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-06-01 00:00:00     0
#>  2 1977-06-01 00:15:00     0
#>  3 1977-06-01 00:30:00     0
#>  4 1977-06-01 00:45:00     0
#>  5 1977-06-01 01:00:00     0
#>  6 1977-06-01 01:15:00     0
#>  7 1977-06-01 01:30:00     0
#>  8 1977-06-01 01:45:00     0
#>  9 1977-06-01 02:00:00     0
#> 10 1977-06-01 02:15:00     0
#> # … with 11,510 more rows
#> 
#> [[7]]
#> # A tibble: 11,904 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-07-01 00:00:00     0
#>  2 1977-07-01 00:15:00     0
#>  3 1977-07-01 00:30:00     0
#>  4 1977-07-01 00:45:00     0
#>  5 1977-07-01 01:00:00     0
#>  6 1977-07-01 01:15:00     0
#>  7 1977-07-01 01:30:00     0
#>  8 1977-07-01 01:45:00     0
#>  9 1977-07-01 02:00:00     0
#> 10 1977-07-01 02:15:00     0
#> # … with 11,894 more rows
#> 
#> [[8]]
#> # A tibble: 11,904 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1977-08-01 00:00:00     0
#>  2 1977-08-01 00:15:00     0
#>  3 1977-08-01 00:30:00     0
#>  4 1977-08-01 00:45:00     0
#>  5 1977-08-01 01:00:00     0
#>  6 1977-08-01 01:15:00     0
#>  7 1977-08-01 01:30:00     0
#>  8 1977-08-01 01:45:00     0
#>  9 1977-08-01 02:00:00     0
#> 10 1977-08-01 02:15:00     0
#> # … with 11,894 more rows
#> 
#> [[9]]
#> # A tibble: 14,117 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1976-09-03 22:45:00  0   
#>  2 1976-09-03 23:00:00  2.54
#>  3 1976-09-03 23:15:00  5.08
#>  4 1976-09-03 23:30:00  0   
#>  5 1976-09-03 23:45:00  0   
#>  6 1976-09-04 00:00:00  0   
#>  7 1976-09-04 00:15:00  0   
#>  8 1976-09-04 00:30:00  0   
#>  9 1976-09-04 00:45:00  0   
#> 10 1976-09-04 01:00:00  0   
#> # … with 14,107 more rows
#> 
#> [[10]]
#> # A tibble: 14,880 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1976-10-01 00:00:00     0
#>  2 1976-10-01 00:15:00     0
#>  3 1976-10-01 00:30:00     0
#>  4 1976-10-01 00:45:00     0
#>  5 1976-10-01 01:00:00     0
#>  6 1976-10-01 01:15:00     0
#>  7 1976-10-01 01:30:00     0
#>  8 1976-10-01 01:45:00     0
#>  9 1976-10-01 02:00:00     0
#> 10 1976-10-01 02:15:00     0
#> # … with 14,870 more rows
#> 
#> [[11]]
#> # A tibble: 14,400 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1976-11-01 00:00:00     0
#>  2 1976-11-01 00:15:00     0
#>  3 1976-11-01 00:30:00     0
#>  4 1976-11-01 00:45:00     0
#>  5 1976-11-01 01:00:00     0
#>  6 1976-11-01 01:15:00     0
#>  7 1976-11-01 01:30:00     0
#>  8 1976-11-01 01:45:00     0
#>  9 1976-11-01 02:00:00     0
#> 10 1976-11-01 02:15:00     0
#> # … with 14,390 more rows
#> 
#> [[12]]
#> # A tibble: 14,880 x 2
#>    datetime             rain
#>    <dttm>              <dbl>
#>  1 1976-12-01 00:00:00     0
#>  2 1976-12-01 00:15:00     0
#>  3 1976-12-01 00:30:00     0
#>  4 1976-12-01 00:45:00     0
#>  5 1976-12-01 01:00:00     0
#>  6 1976-12-01 01:15:00     0
#>  7 1976-12-01 01:30:00     0
#>  8 1976-12-01 01:45:00     0
#>  9 1976-12-01 02:00:00     0
#> 10 1976-12-01 02:15:00     0
#> # … with 14,870 more rows
toc()
#> 0.212 sec elapsed
1 Like

Thanks @technocrat for your reply. However, I am trying to do for different files

Let's say I have an input folder-A and output folder-B .

My R-code is a function to read CSV file from folder-A and write it to folder-B. This process goes around ~2 hours.

Likewise, I need to do for other files which are ~500 CSV files which I can do using mapply .

So, could you suggest me a code such a way that I can read all files at once and write them to folder B by processing them at different 100 systems?

I am trying to imitate that I am running the files at 500 different R-studios.

I had understood that the data was for a single station and was showing that if that takes 0.25 sec. to do 12 months then 500 ... .

It just took me 2 sec to download the 2.6MB example file. So, that should be 20 minutes top for 500 over any reasonably fast connection to any reasonably provisioned server.

I wrote back locally the file in 0.3 sec, implying 2.5 minutes for 500 write operations with an aggregate file size of 1.3GB.

So, altogether, 20 minutes to read, 2 minutes to process, another 20 minutes to write locally and, what?, 5 minutes to send and write remotelyβ€”for a total of less than an hour.

That makes it seem like the time to read all 900 CSV files from folder-A , process them and write them to folder-B is excessive by at least a factor of two, I suspect more. If that's the time for just one file, the problem is either connectivity, server load, or the read/write/process logic in your R program.

Conventional tools like ping or traceroute can eliminate connectivity issues. Benchmarking time to run for the same code on the local and target servers for a test suite, can eliminate server load issues. So, let's assume that the problem is with the R code to make sure that it's not the issue. There's no point in wasting time on 500 different workstations if that's not necessary.

So, what workflow are you using now? For example, is the source directory for the csv files under git or other version control? For that

$ git clone your_source # put it on local station
$ git clone your_source # put it on remote server
> source("your_script") # process each csv in local repo and write back
$ git push origin main # write back the differences 
$ git pull origin main # from the remote server

This has the advantage of only transmitting the changed data.

How are you handling it now?

As per your suggestions, I am assuming my process logic needs to revise again.
Basically, I read my station file first (from the local drive) and each row (~200,000 rows) is processed for the analysis and write the output. One of the main challenges is processing my rows for each station as it has to do some selection from a table (table changes with rows properties such as different months has different table) repeatedly.
I have attached the code below which is the summary of my code.

  dat_fun <- function(j){
    
    # Read file that contains the name of the station
    # E.g: station1, station2.......... station500
    setwd("C:/Users/...../QC")
    obs_files=read.csv('station_name.csv', colClasses=c("Station"="character"))
    obs_files=paste0(obs_files$Station, ".csv")
    
    
    # Read input file for each station
    df=read_csv(paste0("C:/Users/...../QC/",
                       obs_files[j]),col_names = TRUE)
    
    
    #write output for the above station
    df_1=read_csv(paste0("C:/Users/...../biased_corrected/",
                             obs_files[j]),col_names = TRUE)
    
   
    setwd("C:/Users/....../sythe")
    write_csv(df_1, path=paste0(obs_files[j]))
    
    print(j)
  }
  
  
  replicate(1,t(mapply(function(j) dat_fun(j), j=1:500)))


OK, so that’s all on local system. Let me think about this tonight

Have you looked at:

{furrr} is a parallelized wrapper around {purrr}, so one can do:

# sequential
library(purrr)
map(path_list, function(x) {...})

# parallel 
library(furrr)

# set up cores
plan(multisession, workers = 20)

future_map(path_list, function(x) {...})

{callr} creates background jobs/processes to run the same code/script.

1 Like