Use exponential back off with DBI?

I am using DBI with noctua::athena connection to write data to Amazon S3. I keep encountering a throttling error and from reading around I think the prescribed solution is to use exponential back off when attempting to write (or read).

I looked on DBI documentation as well as some Google search. I found the retry package which seems to do what I need however I'm not sure how to mix this with DBI::dbWriteTable().

con_s3 <- DBI::dbConnect(noctua::athena(), s3_staging_dir = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))

  dbWriteTable(conn = con_s3,
               name = paste0("revenue_predictions.", game_name),
               value = cohort_data,
               append = T,
               overwrite = F,
               file.type = "parquet",
               partition = c(year = yr, month = mt, day = d),
               s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.", game_name))

Before I try to create my own exponential back off function, I wanted to ask if anyone has done this before? Is there a parameter or another package or existing method that I can use with an exponential back off flag or similar with DBI?

I found a try_backoff function online: https://rdrr.io/github/ramhiser/retry/src/R/try-backoff.r

try_backoff <- function(expr, silent=FALSE, max_attempts=10, verbose=FALSE) {
  for (attempt_i in seq_len(max_attempts)) {
    results <- try(expr=expr, silent=silent)
    if (class(results) == "try-error") {
      backoff <- runif(n=1, min=0, max=2^attempt_i - 1)
      if (verbose) {
        message("Backing off for ", backoff, " seconds.")
      }
      Sys.sleep(backoff)
    } else {
       if (verbose) {
         message("Succeeded after ", attempt_i, " attempts.")
       }
      break
    }
 }
 results
}

Nesting my function in ramhiser's try_backoff() solved my problem.

Hi @dougfir,
I am the author of noctua. Sorry to hear you are having difficulty with the package. I will raise a ticket on the package's github page link.

If you come across any more issues with the package, or any feature requests please raise them here. https://github.com/DyfanJones/noctua/issues.

In the mean time I look to implement your solution into the noctua package.

Hi @larefly. Thank you for being responsive and active on here! For what it's worth your package has really helped me out - thank you.

Hi @dougfir, I have create a pull request that addresses the problem you are having.

remotes::install_github("dyfanjones/noctua", ref = "retry")

My current unit tests aren't coming into the error you are getting. If possible can you try out this new branch and let me know if it fixes your problem. The new features include additional parameters in noctua_options function. These parameters let you configure how noctua handles retries, by default noctua will do 5 retries but won't do them quietly, you can change this for example:

For example:

library(DBI)
library(noctua)


con = dbConnect(athena())

# increase number of retries to 10 and set it to quiet
noctua_options(retry = 10, retry_quiet = TRUE)

To return to default settings:

noctua_options()

Hi, I am trying this now. I set retry_quiet = FALSE assuming it would give a message if it had to retry. It's hard to tell if it's working correctly because everything is working! If it did have to retry which message would it return in the console, assuming that's what retry_quiet is for?

Hi @dougfir,

noctua by default will do the retry noisily, so there is no need to call nocuta_options to set the retry_quiet = FALSE. If it needs to retry it will return the error message and then will retry the call.

I have tried to followed httr::retry retry message:

message(resp, "Request failed. Retrying in ", round(backoff_len, 1), " seconds...")

Were resp is the error message. I hope this helps.

If you want a better understanding of how noctua is doing the retry, please feel free to view the current pull request #80

Hi @larefly, OK good to know thanks. I do not see any messages as yet, implying other folk in my org are less active on this S3 bucket today. When I eventually see that message "Request failed. Retrying in ..." I will confirm here

1 Like

Hi @larefly this failed. Script and output below:

# With the revenue poredictions, send them to S3 ----


#libraries
library(noctua) # remotes::install_github("dyfanjones/noctua", ref = "retry")
noctua_options(retry = 20) # how many times to retry with dbWriteTable

source('/home/rstudio-doug/analysis/radhoc/revenue_model/functions/general_functions.R', local = T) # for try_backoff

con_s3 <- DBI::dbConnect(noctua::athena(), s3_staging_dir = paste0("s3://ourco-emr/tables/revenue_predictions.db.", game_name))
con_athena <- DBI::dbConnect(odbc(), "Athena")

yr <- year(cohort_date) %>% toString()
mt <- format(cohort_date %>% as.Date(), '%m')
d <- format(cohort_date %>% as.Date(), '%d')
  
  ## download existing data
  cohort_query <- read_lines("/home/rstudio-doug/analysis/radhoc/revenue_model/sql_queries/cohorts_data.sql") %>% 
    glue_collapse(sep = "\n") %>% 
    glue_sql(.con = con_athena)
  cohort_data <- dbGetQuery(con_athena, cohort_query)
  
  if(cohort_data %>% nrow() == 0) { # first entry for this cohort, likely day 30 horizon
  
    # try_backoff(
    dbWriteTable(conn = con_s3,
                 name = paste0("revenue_predictions.", game_name),
                 value = prediction_df,
                 append = T,
                 overwrite = F,
                 file.type = "parquet",
                 partition = c(year = yr, month = mt, day = d),
                 s3.location = paste0("s3://glu-emr/tables/revenue_predictions.db/", game_name)
                 )
    
  } else { # else amend existing row
  
  ## join onto local prediction_df
  cohort_data <- cohort_data %>% 
    select_at(vars(-c(paste0("day_", day_from, "_day_", day_to)))) %>% 
    left_join(prediction_df, by = "s") %>% # left join in case it's first field added for this cohort
    select_at(vars(s, day_7_day_30, day_7_day_60, day_7_day_90, day_7_day_120)) # remove partitions, those will be re added below
  
  ## push the new df (even though setting says append, this seems to update as desired)
    dbWriteTable(conn = con_s3,
                 name = paste0("revenue_predictions.", game_name),
                 value = cohort_data,
                 append = T,
                 overwrite = F,
                 file.type = "parquet",
                 partition = c(year = yr, month = mt, day = d),
                 s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.db/", game_name))
    }

Output after sourcing() this script in a loop that iterates over a date range and outputs the date once successful:

[1] "2020-02-01"
[1] "2020-02-02"
[1] "2020-02-03"
[1] "2020-02-04"
[1] "2020-02-05"
[1] "2020-02-06"
[1] "2020-02-07"
[1] "2020-02-08"
[1] "2020-02-09"
[1] "2020-02-10"
[1] "2020-02-11"
[1] "2020-02-12"
[1] "2020-02-13"
[1] "2020-02-14"
[1] "2020-02-15"
[1] "2020-02-16"
Error: ThrottlingException: Rate exceeded
Execution halted

Looks like no attempted retries since we'd expect to see 'Request failed. Retrying in ...'

Let me know if there's any more info that I can provide?

Thanks for this @dougfir are you able to do a traceback? I want to double check where the throttling starts to happen. I have put the retry when uploading of the data to s3. Just need to check if it isn't happening around anywhere else.

Apologizes @dougfir, I thought I had set the default to be noise but I forgot to set it in the backend environment. This has now been fixed can you re-run your code to see if it outputs noisily please.

Hi @larefly OK, will try again but I'll be a little while over here. Will update when I get to rerun this loop I've been working on. Cheers!

1 Like

Hi @dougfir,

Have you managed to test if the dev version has resolved the issue you are having here?

remotes::install_github("dyfanjones/noctua")

Please let me know so I can close off the issue on the package's github or if further work is required.

Hi @larefly, till today I've still had to fall back on my exponential back off function. However, on the other post we are communicating on regarding MSCK, when I installed the dev version and ran the loop I did not see any errors. Usually I do experience those. When the cron runs in the morning I can check the log file to see a message of the form

message(resp, "Request failed. Retrying in ", round(backoff_len, 1), " seconds...")

If I do presumably we can conclude all is working as expected? I will update here tomorrow morning California time and let you know the outcome.

1 Like

This would be very helpful

When I use the dev version of noctua I get this error message:

Error: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:User: arn:aws:iam::456438553626:user/myname is not authorized to perform: glue:BatchCreatePartition on resource: arn:aws:glue:us-east-1:12345678:catalog (Service: AmazonDataCatalog; Status Code: 400; Error Code: AccessDeniedException; Request ID: abcdef-a123-41fa-b68a-511352fd6473))
Execution halted

I amended some of the error message output in there for privacy before posting.

When I switch back to the package on cran with install.packages(), this does not happen.

What was roughly the code you used that caused the issue?

Hi, here is the call I tried:

con_s3 <- DBI::dbConnect(noctua::athena(), s3_staging_dir = "s3://ourco-emr/tables/revenue_predictions.db/")

      dbWriteTable(conn = con_s3,
                   name = paste0("revenue_predictions.", game_name),
                   value = prediction_df,
                   append = T,
                   overwrite = F, 
                   file.type = "parquet",
                   partition = c(year = yr, month = mt, day = d),
                   s3.location = paste0("s3://ourco-emr/tables/revenue_predictions.db/", game_name)
                   )

Ah ok the only thing i have changed is how the table is repaired. The Cran uses msck repair table and the dev version uses alter table. From checking your error log it looks like you don't have permission to run not authorized to perform: glue:BatchCreatePartition.

Are you able to contact your engineering team to give you this permission

Yep, I'm going to ask them that this morning.