Empty queue of post requests on plumber API running on GCR

Hi,

I am running a plumber API on Google Cloud Run where I trigger the API with a Github webhook event. I want to run the API one time, do the work, and then delete the concurrent calls that arrived and waiting in the queue while the computation was still running.

I will receive a lot of post requests in a short amount of time and then nothing for a longer period of time. My main aim is to run the calculation once for each batch of calls.

Is there a way to delete the queue of concurrent requests in R? Or do you have any other suggestions for a possible solution?

Thanks.

Is there a way to delete the queue of concurrent requests in R?

No. They must be managed.


Do you know how long the burst lasts? Ex: Everything happens in ~ 5 seconds.

Without getting into shared memory and forked processes, we can set up a flag to cause other routes that are being requested within X seconds to be ignored / fail.

I believe we could lower the window time down to 1 second and that would state that "any route that will attempt to be processed within 1 second after the first successful processing will fail." This would include all routes currently in the queue and any new routes within that time period, as they would flush through fairly quickly.

That being said, doesn't hurt to make the window something larger, like 60 seconds.

# plumber.R

cache = list()

do_work <- function(id) {
  cat("processing id: ", id, "\n")
  str(cache)
  Sys.sleep(10)
  ans <- list(data = runif(10))
  cat("done with  id: ", id, "\n")
  ans
}

#' @param id identification value to use when processing
#' @param window number of seconds after the first request finishes to not accept any more requests for a given id
#' @get /frontheavy
function(id = "abc", window = 10) {

  if (!is.null(cache[[id]])) {
    stop("already processing id: ", id, call. = FALSE)
  }

  # do the processing
  ans <- do_work(id)

  # like a semaphore, block the id from processing for a minimum amount of time
  cache[[id]] <<- later::later(function() {
    # Unblock the id
    cache[[id]] <<- NULL
  }, delay = window)

  ans
}

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.

Your solution would work as well! Yours is cleaner in that it will only stop requests that have queued up. (Only downside of the solution is there is no way to distinguish between different ID values, but it doesn't seem like that is a necessity. )

The filter logic could be moved into the route if you don't want all of your routes to do this checking logic.


When setting the last_run value, I would feel free to add some extra time (10 seconds? / '5 minutes') to avoid having your burst actually produce 2+ events and not 1 event. (Such as when trying debounce requests and it ends up looking like throttleing due to a short timeout.)

Note, be sure to use <<- when setting last_run in your route. (Otherwise it will only modify locally and the new value will be forgotten.)

1 Like

Thank you for the detailed response. The processing does not take long, depending on the number of new datapoints it varies, however I can put in a sleep command for up to 5 minutes. After that the API should listen to further calls. I am not familiar with your proposed solution. How is this different than using a filter function and updating time of the execution and time of the call?

# The object last_run is defined as 0 in the entrypoint file containing plumb()
#* @filter checkTime
function(req, res){
  webhook_payload <- jsonlite::fromJSON(req$postBody)
  push_time <- webhook_payload$repository$pushed_at
  if (last_run != 0 & push_time <= last_run) {
    res$status <- 503
    return(
      list(
        error="Service unavailable until previous calculation is finished"
        )
      )
  } else {
    plumber::forward()
  }
}

#* Receive pub/sub message
#* @post /pubsub
function(){
  Sys.sleep(120)
  # Do the work here
  last_run <- as.numeric(as.POSIXct(Sys.time()))
  return(
    list(
      last_run, push_time
      )
    )
}

1 Like