reactiveFileReader halts until other process is finished

I would like to continuously update and display a process log file in my Shiny app, please find replex below.
However, the log file is only displayed after the process is completed and not every 500 ms as I specified.

How can I let the app perform reactiveFileReader and the time intensive computational process in parallel?

slow.function = function() {
  # this function is not part of the shiny app,
  # it may be inside another R package 
  # or even a System command using non-R software.
  # so you cannot edit it.
  for (i in 1:10) {
    cat(paste0("status = ", i, "\n"))
    Sys.sleep(0.5)
  }
}

ui <- fluidPage(
  h3("My process log:"),
  htmlOutput("mylog")
)
server <- function(input, output, session) {
  # create temp log file
  logfile_tmp <- tempfile(fileext = ".log")
  # send temp log to UI twice per second
  mylog <- reactiveFileReader(500, NULL, logfile_tmp, readLines)
  output$mylog <- renderUI({
    HTML(paste(mylog(), collapse = '<br/>'))
  })
  # fill temp log file with the console output of some time intensive computation
  con <- file(logfile_tmp)
  sink(con, append = TRUE)
  sink(con, append = TRUE, type = "message")
  # start time intensive computation
  slow.function()
  sink()
  sink(type = "message")
}
shinyApp(ui, server)

I tried to do this with shiny promises / async.
Partial success I think, you do see reactive reads between the start and completion, and in my setup with non-async times to take 7.5 seconds to finish, the async version gives all the results within about half that time.

library(shiny)
library(future)
library(stringr)
library(promises)

plan(multisession)
num_to_do <-10

slowfunc <- function(x,file){
  
  Sys.sleep(.75)
  write(x=paste0("tofile status = ", x),file = file,append=TRUE)
  cat(paste0("tolog status = ", x, "\n"))

}

slowfunc.async <- function(x,file){
  cat(paste0("entered async with x of ", x,"\n"))
  x
  future_promise({
    Sys.sleep(.75)
  }) %...>%    {
    NULL
    write(x=paste0("tofile status = ", x),file = file,append=TRUE)
    cat(paste0("tolog status = ", x, "\n"))
    }
}


ui <- fluidPage(
  h3("My process logs:"), 
  verbatimTextOutput("t_elapse"),
  shiny::selectInput("useAsync","use Async?",choices = c(FALSE,TRUE)),
  actionButton("debug","debug"),
  actionButton("start","start"),
                  htmlOutput("mylog")

)
server <- function(input, output, session) {
  # create temp log file
  logfile_tmp <- tempfile(fileext = ".log")
  file.create(logfile_tmp)

  start_time <- reactiveVal(NULL)
  end_time <- reactiveVal(NULL)
  output$t_elapse <- renderPrint({

    s <- start_time()
    e <- end_time()
    if(isTruthy(s) && isTruthy(e))
    difftime(e,s) else {
      "press start"
    }
  })
  
  observeEvent(input$debug,
               browser())
  # send temp log to UI twice per second
  mylog <- reactiveFileReader(500, session, logfile_tmp, readLines)
  
  myloghtml <- reactive({
    slog <- sort(req(mylog()))
    HTML(paste(slog, collapse = '<br/>'))
  })
  output$mylog <- renderUI({
    req(myloghtml())
  })

  # some basis for judging completion of logging task
  observeEvent(mylog(), {
    if (identical(
      req(myloghtml()),
      HTML(paste0(paste0("tofile status = ", str_pad(1:num_to_do,2,pad = 0)),
        collapse = "<br/>"
      ))
    )) {
      end_time(Sys.time())
    }
  })
  observeEvent(input$start,
               {
    start_time(Sys.time())
    end_time(NULL)}
)
observeEvent (input$useAsync, # for resetting
             end_time(NULL))
  
  observeEvent(start_time(),{
    file.create(logfile_tmp)
    
  # fill temp log file with the console output of some time intensive computation
  # start time intensive computation
  for (i in 1:num_to_do) {
    ipad <- str_pad(i,2,pad = 0)
    if(input$useAsync)
      slowfunc.async(ipad,logfile_tmp)
    else
     slowfunc(ipad,logfile_tmp)
  }
  })}

shinyApp(ui, server)

Thanks, but I think my example should have been clearer. The lines:

for (i in 1:10) {
    cat(paste0("status = ", i, "\n"))
    Sys.sleep(0.5)
  }

as a whole are to simulate a slow external function that generates console messages as it progresses. It is not actually a loop where I can insert code to capture the status at each iteration of the loop. I will update the example code above to make this clearer.

is this closer ?

library(shiny)
library(future)
library(stringr)
library(promises)

plan(multisession)
num_to_do <-10

slowfunc <- function(file){
  cat(paste0("entered normal : ",file,"\n"))
  for (i in 1:num_to_do) {
    i2 <-  str_pad(i,width = 2,pad=0)
  Sys.sleep(.75)
  write(x=paste0("tofile status = ", i2 ),file = file,append=TRUE)

  cat(paste0("tolog status = ", i2, "\n"))
  
}}

slowfunc.async <- function(file){
  cat(paste0("entered async\n"))

  future_promise({
    for (i in 1:num_to_do) {
      i2 <-  str_pad(i,width = 2,pad=0)
    Sys.sleep(.75)
      write(x=paste0("tofile status = ", i2),file = file,append=TRUE)
      cat(paste0("tolog status = ", i2, "\n"))
  }}) %...>% {
    NULL
  }
}


ui <- fluidPage(
  h3("My process logs:"), 
  verbatimTextOutput("t_elapse"),
  shiny::selectInput("useAsync","use Async?",choices = c(FALSE,TRUE)),
  actionButton("debug","debug"),
  actionButton("start","start"),
  htmlOutput("mylog")
  
)
server <- function(input, output, session) {
  # create temp log file
  logfile_tmp <- tempfile(fileext = ".log")
  file.create(logfile_tmp)
  
  start_time <- reactiveVal(NULL)
  end_time <- reactiveVal(NULL)
  output$t_elapse <- renderPrint({
    
    s <- start_time()
    e <- end_time()
    if(isTruthy(s) && isTruthy(e))
      difftime(e,s) else {
        "press start"
      }
  })
  
  observeEvent(input$debug,
               browser())
  # send temp log to UI twice per second
  mylog <- reactiveFileReader(500, session, logfile_tmp, readLines)
  
  myloghtml <- reactive({
    slog <- sort(req(mylog()))
    HTML(paste(slog, collapse = '<br/>'))
  })
  output$mylog <- renderUI({
    req(myloghtml())
  })
  
  # some basis for judging completion of logging task
  observeEvent(mylog(), {
    if (identical(
      req(myloghtml()),
      HTML(paste0(paste0("tofile status = ", str_pad(1:num_to_do,2,pad = 0)),
                  collapse = "<br/>"
      ))
    )) {
      end_time(Sys.time())
    }
  })
  observeEvent(input$start,
               {
                 start_time(Sys.time())
                 end_time(NULL)}
  )
  observeEvent (input$useAsync, # for resetting
                end_time(NULL))
  
  observeEvent(start_time(),{
    file.create(logfile_tmp)
    
    # fill temp log file with the console output of some time intensive computation
    # start time intensive computation

      if(input$useAsync)
        slowfunc.async(logfile_tmp)
      else
        slowfunc(logfile_tmp)
    
    print("finished launching function that writes")
  })}

shinyApp(ui, server)

The external function should not write the log file. From within the shiny app I would like to capture the console message of the slow function and send them to both a log file and to the UI. The external function may even be some command line software written in a language other than R.

So, your function:

slowfunc.async <- function(file){
  cat(paste0("entered async\n"))

  future_promise({
    for (i in 1:num_to_do) {
      i2 <-  str_pad(i,width = 2,pad=0)
    Sys.sleep(.75)
      write(x=paste0("tofile status = ", i2),file = file,append=TRUE)
      cat(paste0("tolog status = ", i2, "\n"))
  }}) %...>% {
    NULL
  }
}

cannot have the line write(x=paste0("tofile status = ", i2),file = file,append=TRUE) in it.

ok, I get it, thats why you were sinking, whereas I removed sink and made it a file. as sink is a pain for me to turn on and off when developing and debugging.
If I have some time later, I'll try to rework the example with sink ...

Here you can find out why promises::future_promise might not be what you are searching for in this context (intra-session vs. inter-session responsiveness).

I'd propose using callr::r_bg (for async R processes) or processx::process (all processes - r_bg is based on processx) in this situation.

Setting reactiveFileReader's session parameter to NULL suggests that you want to create a cross-session file reader - this needs to be done outside of the server function. Please see my related post here.

library(shiny)
library(callr)

slow.function = function() {
  # this function is not part of the shiny app,
  # it may be inside another R package 
  # or even a System command using non-R software.
  # so you cannot edit it.
  for (i in 1:10) {
    cat(paste0("status = ", i, "\n"))
    Sys.sleep(0.5)
  }
}

# create temp log file
logfile_tmp <- tempfile(fileext = ".log")
# send temp log to UI twice per second
mylog <- reactiveFileReader(500, NULL, logfile_tmp, readLines)

ui <- fluidPage(
  h3("My process log:"),
  htmlOutput("mylog")
)

server <- function(input, output, session) {
  output$mylog <- renderUI({
    HTML(paste(mylog(), collapse = '<br/>'))
  })
  r_bg(func = function(slow.function){print(slow.function())}, args = list(slow.function), stdout = logfile_tmp, stderr = "2>&1")
  # or directly use processx::process instead of callr::r_bg
  print(logfile_tmp)
}
shinyApp(ui, server)

Thank you!

To others arriving here with the same question. Please find below a few additional tweaks I just made to the solution by @ismirsehregal. These additions copy the temp log file to the user once the process is finished.

library(shiny)
library(callr)

slow.function = function() {
  # this function is not part of the shiny app,
  # it may be inside another R package 
  # or even a System command using non-R software.
  # so you cannot edit it.
  for (i in 1:10) {
    cat(paste0("status = ", i, "\n"))
    Sys.sleep(0.5)
    if (i == 6) warning("\nWarning i = ", i, " should not be used")
  }
}

# create temp log file
stdout_tmp <- tempfile(fileext = ".log")
mylog <- reactiveFileReader(500, NULL, stdout_tmp, readLines)

ui <- fluidPage(
  h3("My process log:"),
  htmlOutput("mylog"),
)

server <- function(input, output, session) {
  output$mylog <- renderUI({
    HTML(paste(mylog(), collapse = '<br/>'))
  })
  x <- r_bg(func = function(slow.function){slow.function()},
              args = list(slow.function),
              stdout = stdout_tmp,
              stderr = "2>&1")
  observe({
    invalidateLater(5000)
    if(isolate(x$is_alive()) == FALSE) {
      file.copy(from = stdout_tmp, to = "copy_for_user.log")     
      #Note: "copy_for_user.log" is copied to the working directory, replace by specific path if needed
    }
  })
}
shinyApp(ui, server)

Just a minor comment on your addition:

This is the recommended way to check for the process being ready:

 observe({
    invalidateLater(5000)
    if(x$poll_io(0)[["process"]] == "ready") {
      file.copy(from = stdout_tmp, to = "copy_for_user.log", overwrite = TRUE)     
      #Note: "copy_for_user.log" is copied to the working directory, replace by specific path if needed
    }
  })

Also note that you might want to set overwrite = TRUE for file.copy.

Cheers

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.