Plumber API + Scheduled RMarkdown ETL

I quite like the simplicity of using scheduled RMarkdown documents in RStudio Connect to create simple ETL flows for subsequent document renders or even shiny apps. @kellobri describes this process here, and suggests using reactivePoll or reactiveFileReader to allow shiny apps to detect changes in a data product and act accordingly.

I would like to know if there is a similar or analogous workflow for plumber APIs? I would like my plumber API to be able to detect changes in a source data set using the above ETL method, and reload the data over HTTP.

I can do this naively by loading the data every time my plumber API is called, but this is no good when the data are large. I think the solution would be to put the code that loads the data outside of the API endpoint functions in my plumber.R file, but then how would I get plumber to restart when the data changes?

2 Likes

Hi @jeff.keller! I'm not quite sure if our problems were like-for-like but they sound pretty similar so let me describe what we recently did in our organisation.

  1. We have deployed a Plumber API that calculated certain attributes over passed raw data that were used in one of our ML models

  2. We wanted to process a large group of clients (couple thousand) through that API and save the results in DB for further consumption (analytics etc.)

  3. We built a parameterised Rmd report that did the following:

  • pulled a set of users with their data that was supposed to be pushed into the API

  • made an API call for each of those users to get the response

  • saved the outcome to DB

  1. A couple of important points to mention here:
  • when we first imported data from DB we chunked into into groups for two reasons: 1) we used the furrr package when making requests using multiple processes behind the API to speed things up and 2) it was much faster to append each group into the table in DB instead of the entire output at once

  • our Rmd file is parameterized and when it's first deployed to RSC it doesn't render - it's important because we would like the entire procedure to run on the server and not locally. Thanks to this solution it deploys in seconds and then on RSC you can just tick the right parameter and render it and the job is completed after approx 1 hour

Hope this helps!

6 Likes

Thanks for the reply @konradino. You described a few things there, but I think the most relevant one is that you are using an external persistent database to store data, which is what I'm trying to avoid.

A minimal example of what I'm trying to do might be:

# plumber.R

#' @apiTitle ETL Example

library(plumber)

constant <- read.csv("https://connect.example.com/content/1337/constant.csv")

#' Add
#' @get /add
add <- function(x) {
  x + constant
}

Where I want the plumber API to restart (or somehow reload constant) when a change is detected at https://connect.example.com/content/1337/constant.csv

Oh, ok - I see now. We haven't come across something like this before but let me know if you find a way of doing it :slight_smile:

@jeff.keller - Something similar to the reactivePoll idea would be to perform a conditional GET of the data resource. Connect will return an HTTP 304 Not Modified response to a conditional GET request when the data is not returned.

  1. Perform a GET to load your data as the API initializes. Save the value of the Last-Modified header.
  2. Perform a GET request with the If-Modified-Since header set to the value from the previous Last-Modified response header.
  3. When the data is unchanged, you will receive an HTTP 304 Not Modified and should take no action.
  4. When the data is changed, you will receive an HTTP 200 response with your content. Act on that data. Save the value of the Last-Modified header for subsequent attempts.

Depending on how immediately you want to react to data changes, you could perform the conditional GET every time your API is hit, every N requests, or after some configured amount of time has passed.

The httr::cache_info and httr::rerequest can help with that reloading.

4 Likes

I don't think what you want can be done by plumber, since plumber starts new instances for API requests automatically if i understand it correctly. Is your dataset really that big, or does it just take long to parse? If the second, just cache it as an .rds file, they load pretty quickly (feather might be even faster, look at benchmarks)

If you really want an in-memory solution, you could possibly try to use something like SQLite or the storr and/or redux packages to manage an in-memory data structure independently from the plumber API. I am not 100% sure how to go about that though.

In both cases I would just regularly check if the data was modified (say every 1 second, every 10 seconds, and whenever get request for the data is sent)

@aron I ended up doing something similar to what you suggested. I'm not a fan of modifying environments this way, so if you have any suggestions to clean this up, I'm all ears!

The implication of this approach is that the first API request after the data source has changed is going to be slower (perhaps significantly) than all others. Depending on the size of the data, there may even be HTTP timeouts to overcome.

ETL.rmd

---
title: "ETL"
author: "Jeff Keller"
date: "2019-06-14"
output: html_document
rmd_output_metadata:
  rsc_output_files:
    - "output/constant.rds"
---

This document represents an ETL process. In this minimal example, the output of the process is just a random number between 1 and 10. The output is available [here](output/constant.rds).

```{r etl}
constant <- sample(1:10, size = 1)
dir.create("output", showWarnings = FALSE)
saveRDS(constant, file = "output/constant.rds")
```
The current value of `constant` is `r constant` and it changes every 5 minutes.

plumber.R

#' @apiTitle ETL Example
#' @apiDescription An example plumber API that loads data from a scheduled RMarkdown document (https://connect.example.com/etl-poc/etl.html)

library(plumber)
library(httr)

# Load the API_KEY environment variable
API_KEY <- Sys.getenv("API_KEY")

last_mtime <- NULL

#' Add
#' @get /add
#' @param x
add <- function(x) {
  loadData(last_mtime)
  return(as.numeric(x) + constant)
}

# Function that checks whether the input data has changed
loadData <- function(last_mtime) {
  
  resp <- httr::HEAD(
    url = "https://connect.example.com/etl-poc/output/constant.rds",
    add_headers(Authorization = paste("Key", API_KEY))
  )
  
  mtime <- headers(resp)[["last-modified"]]
  
  if (!identical(mtime, last_mtime)) {
    constant <- readRDS(
      file = url(
        description = "https://connect.example.com/etl-poc/output/constant.rds",
        headers = c(Authorization = paste("Key", API_KEY))
      )
    )
    assign("constant", value = constant, pos = .GlobalEnv)
    assign("last_mtime", value = mtime, pos = .GlobalEnv)
    
  }
  
}

@jeff.keller - Your solution is basically equivalent to what I was recommending.

I'm not seeing a way to avoid one request paying the cost of a data reload. It might be interesting to give Plumber a post-request hook that lets perform update/cleanup operations without causing the current request pain.

Could you shift this idea into a Plumber issue if it sounds useful?

This issue is tracking the addition of some hooks, but not post-response.

The "onexit" hooks are invoked as the Plumber server exits, not as an endpoint wraps up.

CC @barret

1 Like

Well, you can just regularly check yourself if the data was modified; either with a cron job on the server, or by sending GET (better PUT) requests yourself. A "real" user would only have to pay the cost for the data reload if the data changed just before they requested it. How well this would work depends on how frequently the data changes and how costly it is to load.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.