Parallel processing using furrr

I am trying to use furrr for a parallel processing. I have a netcdf with about 700000 files in it, with an ID of 1-700000. The file is a time series and don't have lat and lon. For each file, I am trying to compute some stat and save all in to one csv. Here is the code, I modified (thanks @MyKo101)

 file <- nc_open("Merged.nc")
 ID <- ncvar_get(file, "ID")

run_file <- function(file) {
 
 for (i in 1:length(ID)) {
   tryCatch({
      prcp <- ncvar_get(file, varid = "prcp", start = c(i, 1), count = c(1, -1))
      dur <- ts(prcp, start=1979)
      mk = mk.test(dur)
      sample_filename <- ID[i]
   res <- data.frame(sample_filename = sample_filename ,mk$p.value)
 
}, error=function(e){cat("ERROR :",conditionMessage(e), "\n")})
}}
   plan(multiprocess)
   dt <- future_map_dfr(file,run_file, .progress=TRUE)

  write.csv(dt,"test_par.csv")



 ERROR : first argument (nc) is not of class ncdf4! 

Can you help with the code?

I'm not really sure what you're trying to do with this code as I'm not familiar with ncdf objects. I think you will need to think hard about what is happening at each step in the run_file() function.

I'm going to assume that file is storing some sort of data.frame, loaded from "Merged.nc" using the nc_open() function and then you're pulling the ID variable from this data.frame.

The future_map_dfr() function will apply the run_file() to each row. To me, this means that the error would be happening in the prcp <- line because file here is not an ncdf4 type object since you have subsetted it using future_map_dfr().

I think you would be better writing everything that is in the for loop here as a function of i and then pass this function into future_map_dfr() using 1:length(ID) as the first argument.

The object:

cannot be exported to another R processes, that is, you cannot use it for parallelization. This is true for all types of parallelization in R, not just futures.

Here's a simple example showing the problem. Let's start by creating a simple netCDF file that holds a variable 'x':

library(ncdf4)
x <- ncvar_def("x", units="count", dim=list())
file <- nc_create("example.nc", x)
ncvar_put(file, x, 42)
nc_close(file)

We can then access its content later on as:
We can now use this netCDF file next time we start R, e.g.

library(ncdf4)
file <- nc_open("example.nc")
y <- ncvar_get(file)
y
## [1] 42

However, if we attempt to pass file to a parallel worker we'll get an error, e.g.

library(future)
plan(multisession)
library(ncdf4)
file <- nc_open("example.nc")
f <- future(ncvar_get(file))
y <- value(f)
## Error in R_nc4_inq_varndims: NetCDF: Not a valid ID
## Error in ncvar_ndims(ncid, varid) : error returned from C call

My best guess is that it uses some kind of internal reference;

str(file[1:5])
List of 5
 $ filename: chr "example.nc"
 $ writable: logi FALSE
 $ id      : int 131072   # <== PROBABLY THIS ONE
 $ safemode: logi FALSE
 $ format  : chr "NC_FORMAT_CLASSIC"

The workaround would be to have each parallel worker open and close the netCDF file when it needs one, e.g.

library(future)
plan(multisession)
library(ncdf4)
f <- future({
  file <- nc_open("example.nc")
  value <- ncvar_get(file)
  nc_close(file)
  value
})
y <- value(f)
y
## [1] 42

Hope this helps. I suggest that you also reach out to the maintainers of the ncdf4 package and ask for alternative solutions.

PS. I've added the above example to the 'Non-exportable objects' vignette of the next version of future package.

Pay attention to nc_open() in my comment above. Also, you can still use furrr; the problem is with ncdf4 and parallelization in general.

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

@HenrikBengtsson thanks a lot.

if I understand you correctly, this how i modified the code.

  plan(multisession)
  listOfDataFrames <- list() 

  for (i in 1:length(ID)) {
  tryCatch({
future({
prcp <- ncvar_get(file, varid = "prcp", start = c(i, 1), count = c(1, -1))
  dur <- ts(prcp, start=1979)
  mk = mk.test(dur)
  sample_filename <- ID[i]
 res <- data.frame(sample_filename = sample_filename ,mk$p.value)
 listOfDataFrames[[i]] <- res
 message("file ", i , " produced ")
 }, error=function(e){cat("ERROR :",conditionMessage(e), "\n")})
}
 )}

 dt <- do.call("rbind", listOfDataFrames)
 write.csv(dt,"test_par.csv")

similar to message("file ", i , " produced "), which not working in future, is there any way to see the number of files produced. this will help to plan and submit to HPC.

Best regards,

Solomon