Mutate, dopar, withRestarts failure

I'm seeing an issue where the dplyr mutate step is causing a foreach loop withRestarts seems to error without error.
If I run the dplyr mutate out of the foreach %dopar% in %do% it works as designed.

library(magrittr)
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(tibble)
library(foreach)
library(future)
library(stringr)
tmp_tb <- tibble(Id = c(1:5),
                 Sample_color = c("green", "blue", "yellow", "orange", "grey"),
                 Sample_text = c("\n Test1", "Test2", "test 3", "test 4", "test 5"))
tmp_fun <- function(loop_n, df) {
  print(paste0(loop_n, "before withCallingHndlears\n"))
  status_tb <- tibble(Foreach_loop = as.character(),
                                 For_loop = as.character(),
                                 Status = as.character())
  for (i in seq_len(nrow(df))) {
    withCallingHandlers({
      withRestarts({
        if (i == 2) {
          tmp_status_tb <- tibble(Foreach_loop = loop_n,
                                             For_loop = i,
                                             Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 3) {
          tb_test_df <- df %>%
            mutate(across(.cols = all_of(names(df)),
                          .fns = ~ (str_to_upper(.))))
          tmp_status_tb <- tibble(Foreach_loop = loop_n,
                                          For_loop = i,
                                          Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
        } else if (i == 4) {
          tb_test_df <- df %>%
            mutate(across(.cols = all_of(names(df)),
                          .fns = ~ (str_replace_all(string = .,
                                                    pattern = "[[:cntrl:]]",
                                                    replacement = " " ))))
          tmp_status_tb <- tibble(Foreach_loop = loop_n,
                                  For_loop = i,
                                  Status = "Good")
          status_tb <- rbind(status_tb, tmp_status_tb)
          
        } else {
          stop("this is an error!")
        }
        
      }, muffleStop = function() {
        message("'stop' muffled")
        tmp_status_tb <- tibble(Foreach_loop = loop_n,
                                           For_loop = i,
                                           Status = "Failure")
        status_tb <- rbind(status_tb, tmp_status_tb)
        assign(x = "status_tb", value = status_tb, envir = parent.frame(n = 4))
      })
    },
    error = function(cond) {
      print(cond$message)
      invokeRestart("muffleStop")
    }
    )
  }
  print(paste0(loop_n, "after withCallingHndlears\n"))
  return(status_tb)
}

doFuture::registerDoFuture()
numWorkers <- 2
future::plan(future::multisession, workers = numWorkers, gc = FALSE, earlySignal = TRUE)
status_ls <- foreach::foreach(out_i = seq_along(1:2), .verbose = FALSE, .errorhandling = "pass") %dopar% {
  tmp_fun(loop_n = out_i, df = tmp_tb)
}
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "1before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "1after withCallingHndlears\n"
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> 'stop' muffled
#> [1] "2before withCallingHndlears\n"
#> [1] "this is an error!"
#> [1] "this is an error!"
#> [1] "2after withCallingHndlears\n"
future::plan("default")
output_df <- bind_rows(status_ls)
output_df
#> # A tibble: 10 x 3
#>    Foreach_loop For_loop Status 
#>           <int>    <int> <chr>  
#>  1            1        1 Failure
#>  2            1        2 Good   
#>  3            1        3 Failure
#>  4            1        4 Failure
#>  5            1        5 Failure
#>  6            2        1 Failure
#>  7            2        2 Good   
#>  8            2        3 Failure
#>  9            2        4 Failure
#> 10            2        5 Failure

tmp_tb %>%
  mutate(across(.cols = all_of(names(tmp_tb)),
                .fns = ~ (str_to_upper(.))))
#> # A tibble: 5 x 3
#>   Id    Sample_color Sample_text
#>   <chr> <chr>        <chr>      
#> 1 1     GREEN        "\n TEST1" 
#> 2 2     BLUE         "TEST2"    
#> 3 3     YELLOW       "TEST 3"   
#> 4 4     ORANGE       "TEST 4"   
#> 5 5     GREY         "TEST 5"

tmp_tb %>%
  mutate(across(.cols = all_of(names(tmp_tb)),
                .fns = ~ (str_replace_all(string = .,
                                          pattern = "[[:cntrl:]]",
                                          replacement = " " ))))
#> # A tibble: 5 x 3
#>   Id    Sample_color Sample_text
#>   <chr> <chr>        <chr>      
#> 1 1     green        "  Test1"  
#> 2 2     blue         "Test2"    
#> 3 3     yellow       "test 3"   
#> 4 4     orange       "test 4"   
#> 5 5     grey         "test 5"

SesssionInfo:

sessionInfo()
#> R version 4.0.2 (2020-06-22)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Red Hat Enterprise Linux
#> 
#> Matrix products: default
#> BLAS:   /opt/R/R_4.0.2/lib64/R/lib/libRblas.so
#> LAPACK: /opt/R/R_4.0.2/lib64/R/lib/libRlapack.so
#> 
#> locale:
#>  [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
#>  [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
#>  [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
#>  [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
#>  [9] LC_ADDRESS=C               LC_TELEPHONE=C            
#> [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       
#> 
#> attached base packages:
#> [1] stats     graphics  grDevices utils     datasets  methods   base     
#> 
#> loaded via a namespace (and not attached):
#>  [1] digest_0.6.27     withr_2.4.1       magrittr_2.0.1    reprex_2.0.0     
#>  [5] evaluate_0.14     highr_0.8         stringi_1.5.3     rlang_0.4.10     
#>  [9] cli_2.4.0         rstudioapi_0.13   fs_1.5.0          rmarkdown_2.7    
#> [13] tools_4.0.2       stringr_1.4.0     glue_1.4.2        xfun_0.22        
#> [17] yaml_2.2.1        compiler_4.0.2    htmltools_0.5.1.1 knitr_1.31

Issue is related to name of withRestarts function....Don't use a name that starts with muffle.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.