parallel processing

I have more than 700,000 NetCDF files (point information not gridded) and for each file, I am computing a trend using MK.test. The following code with working perfectly but it is slow and need help with parallel processing. I tried the forech and dopar function but I wasn't successful. Here is my working code

    files <- list.files(path = dir, pattern = ".*\\.nc$",
                ignore.case = TRUE, full.names = TRUE)

listOfDataFrames <- list()
for (i in 1:700000) {                     # this is same as (i in seq_along(files))
tryCatch({
  river <- ncdf_timeseries(filename = files[i])
dur=ts( river, start=1979)
       mk=mk.test(dur)
      slop=sens.slope(dur)            # this part takes time
sample_filename <- basename(files[i])
fre2=cbind(sample_filename, mk,slop)
fre3=as.data.frame(fre2)
listOfDataFrames[[i]] <- fre3
message("file ", i , " produced ")
}, error=function(e){cat("ERROR :",conditionMessage(e), "\n")})
}

dt <- do.call("rbind", listOfDataFrames)
write.csv(dt, "all.csv")

this is working but it's very slow (> 2 months), the data is daily and greater than 40 years. Please let me know if this is possible in parallel processing. I have access to HPC and I have 40 cores.

 no_cores <- detectCores() - 1
 no_cores    [1] 39

The easiest way to parallelise any code is with the {furrr} package, which combines the parallelisation implemented by the {futures} with the tidy-ness of the {purrr} package

Here's how to do it:

Re-write that for loop as a single function, let's call it run_file(), which takes a single argument, file, which is the file name. You should be able to use most of the same code apart from the line which assigns fre3 to listofDataFrames, instead you should return fre3 as the outcome of your function. It's also tricky to output the message since parallelisation means these operations are happening at the same time and messages can be garbled. You'd also need to output something, even when an error is hit.

We can write this to use the map() function from {purrr} as simply:

map(files,run_file)

This will apply the function run_file() to every element in files. This would still take a while as it is running in sequence and we haven't done any parallelisation yet. However, once we've got our code into the {purrr} style, to convert it to {furrr}, we just add future_ to the start of the function (works for most of the map() family)

future_map(files,run_file)

There are a few options for this depending on your operating system. But if we use the plan() function, we can tell {furrr} how we want to run it, the default is sequential, so if we put this alone, it'll still take a while. We can add plan(multiprocess) which will choose a parallelisation depending on your OS:

plan(multiprocess)
listOfDataFrames <- future_map(files,run_file)

Another advantage of the {purrr} package is that your outcome can be simplified before it gets finally returned. At the end of your code, you've simplified your listofDataFrames into dt by using rbind. This can be done in {purrr} with map_dfr() and therefore can also be done in {furrr} with future_map_dfr().

You can also add a progress bar to print to the console with the argument .progress=TRUE.

Here's the final code, I also neatened up the function from your for loop to make it a bit smoother (you don't need to do quite as much assigning of variables):

library(furrr)
files <- list.files(path = dir, pattern = ".*\\.nc$",
                    ignore.case = TRUE, full.names = TRUE)

run_file <- function(file) {
  river <- ncdf_timeseries(filename = file)
  sample_filename <- basename(file)
  dur <- ts(river, start=1979)
  tryCatch({
    res <- data.frame(sample_filename = sample_filename ,
                      mk = mk.test(dur),
                      slop = sens.slope(dur))
    },
    error = function(e){
      res <- data.frame(sample_filename = sample_filename ,
                        mk = NA,
                        slop = NA)
    }
  res
}

plan(multiprocess)
dt <- future_map_dfr(files,run_file, .progress=TRUE)
write.csv(dt, "all.csv")

Edit: fixed link and added .progress

4 Likes

Dear @MyKo101,

Thanks a lot and managed to run after a few edits. Now I have one more question,
I have merged all the files into one by their ID "Merged.nc". To make the process faster by submitting jobs into multiple cores in HPC, I am trying split the task by ID into 4, using something like "for (i in 1:150000)" but not successful yet and need you help.

    ncfile <- nc_open("Merged.nc")    # this has 700,000 files with IDs 1:700000 # no lat, lon, time
    IDs <- ncvar_get(ncfile, "ID")

 run_file <- function(IDs[1:150000]) {
    prcp <- ncvar_get(ncfile, varid = "prcp", start = c(i, 1), count = c(1, -1))
    sample_filename <- IDs[i]
    dur <- ts(prcp, start=1979)
tryCatch({
res <- data.frame(sample_filename = sample_filename ,
                  mk = mk.test(dur),
                  slop = sens.slope(dur))
},
error = function(e){
 
}
 res
}

plan(multiprocess)
dt <- future_map_dfr(IDs,run_file, .progress=TRUE)
write.csv(dt, "all.csv")

this seems like an error. presumably this function wouldnt compile. and in the function body, I see no sign of where variable i is coming from...

@nirgrahamuk @MyKo101

yes, the "function(IDs[1:150000])" is not the right way, I just put it as an example. my question is where to put the following type of division.

   file <- nc_open("Merged.nc")   
    IDs <- ncvar_get(file, "ID")
     run_file <- function(file) {
```  for (i in 1:150000) {                  # ID in Merged.nc there are 700000 IDS (1:700000)

the idea is to divide the 700000 files in Merged.nc in 4 run separately

I think you may be overcomplicating things. MyKo101's example

dt <- future_map_dfr(files,run_file, .progress=TRUE)

shows that run_file function shall just run a single operation, any core can do this with input data its provided. the files contains the items to process, future_map_dfr decides which to send to which core, you dont need to invent a regime.

@nirgrahamuk @MyKo101

I think you are right. i tried in this was just with one file (merged in one file) instead of >100000 files, but i am getting the folloing error

 file <- nc_open("Merged.nc")
 ID <- ncvar_get(file, "ID")

run_file <- function(file) {
 
 for (i in 1:length(ID)) {
   tryCatch({
      prcp <- ncvar_get(file, varid = "prcp", start = c(i, 1), count = c(1, -1))
      dur <- ts(prcp, start=1979)
      mk = mk.test(dur)
      sample_filename <- ID[i]
   res <- data.frame(sample_filename = sample_filename ,mk$p.value)
 
}, error=function(e){cat("ERROR :",conditionMessage(e), "\n")})
}}
   plan(multiprocess)
   dt <- future_map_dfr(file,run_file, .progress=TRUE)

  write.csv(dt,"test_par.csv")



 ERROR : first argument (nc) is not of class ncdf4! 

do you have any idea?