capture streaming json over websocket

Hello, I'm working with a data source that streams json over a websocket.

I'd like to capture this and save it somewhere, maybe eventually in a db

I've been trying to get up to speed on both the websocket and jsonlite packages and to also better understand R connection objects generally, as covered by Jeroen Ooms in this helpful SO post.

A seemingly natural solution would be to use jsonlite::stream_in(), which expects a connection object supplying json. However, the object returned by websocket::WebSocket() is not that.

My question: how can I best capture the json stream?

And fwiw, my use case streams A LOT of frequent data,

library("websocket")
library("jsonlite")

# ws is an R6 object of class 'WebSocket'
ws <- websocket::WebSocket$new("ws://echo.websocket.org/", autoConnect = FALSE)

ws$onMessage(function(event) {
  cat(event$data)
  ## instead of cat'ing to console, I want to stream the json somewhere usefull
  ## my sense is that this should happen within this event handler
})

ws$connect()

for (i in 1:10) {
  ws$send(jsonlite::toJSON(data.frame(msg = i)))
}
#> Error in wsSend(private$wsObj, msg): invalid state

ws$close()

Created on 2018-10-24 by the reprex package (v0.2.1)

Session info
devtools::session_info()
#> Session info -------------------------------------------------------------
#>  setting  value                       
#>  version  R version 3.5.1 (2018-07-02)
#>  system   x86_64, darwin15.6.0        
#>  ui       X11                         
#>  language (EN)                        
#>  collate  en_US.UTF-8                 
#>  tz       America/Chicago             
#>  date     2018-10-24
#> Packages -----------------------------------------------------------------
#>  package   * version    date       source                            
#>  backports   1.1.2      2017-12-13 CRAN (R 3.5.0)                    
#>  base      * 3.5.1      2018-07-05 local                             
#>  compiler    3.5.1      2018-07-05 local                             
#>  datasets  * 3.5.1      2018-07-05 local                             
#>  devtools    1.13.6     2018-06-27 CRAN (R 3.5.0)                    
#>  digest      0.6.15     2018-01-28 CRAN (R 3.5.0)                    
#>  evaluate    0.10.1     2017-06-24 CRAN (R 3.5.0)                    
#>  graphics  * 3.5.1      2018-07-05 local                             
#>  grDevices * 3.5.1      2018-07-05 local                             
#>  htmltools   0.3.6      2017-04-28 CRAN (R 3.5.0)                    
#>  jsonlite  * 1.5        2017-06-01 CRAN (R 3.5.0)                    
#>  knitr       1.20       2018-02-20 CRAN (R 3.5.0)                    
#>  later       0.7.5      2018-09-18 cran (@0.7.5)                     
#>  magrittr    1.5        2014-11-22 CRAN (R 3.5.0)                    
#>  memoise     1.1.0      2017-04-21 CRAN (R 3.5.0)                    
#>  methods   * 3.5.1      2018-07-05 local                             
#>  R6          2.2.2      2017-06-17 CRAN (R 3.5.0)                    
#>  Rcpp        0.12.19    2018-10-01 cran (@0.12.19)                   
#>  rlang       0.2.2      2018-08-16 cran (@0.2.2)                     
#>  rmarkdown   1.9        2018-03-01 CRAN (R 3.5.0)                    
#>  rprojroot   1.3-2      2018-01-03 CRAN (R 3.5.0)                    
#>  stats     * 3.5.1      2018-07-05 local                             
#>  stringi     1.2.2      2018-05-02 CRAN (R 3.5.0)                    
#>  stringr     1.3.1      2018-05-10 CRAN (R 3.5.0)                    
#>  tools       3.5.1      2018-07-05 local                             
#>  utils     * 3.5.1      2018-07-05 local                             
#>  websocket * 0.0.0.9001 2018-09-21 Github (rstudio/websocket@ef78ced)
#>  withr       2.1.2      2018-03-15 CRAN (R 3.5.0)                    
#>  yaml        2.1.19     2018-05-01 CRAN (R 3.5.0)

Possible solution patterns I've considered:

  1. turn the ws object above into a connection object, which is expected by json_lite::stream_in()
  2. manually parse json and append to some object/ insert into a db (I've done the former, but it seems inefficient and non-scalable)

Any guidance would be appreciated.

2 Likes

Small update: I have cross-posted a similar version of this question on SO and will link to a solution there, should one be forthcoming.

have you validated the JSON data? if not, you can use these tools to do that.

JSON Parser
JSON Validator

2 Likes

I wonder if your use case would be a good fit for new sparklyr feature about streaming. Spark streaming definitely is something that could be use to do what you want, and last version of sparklyr :package: include this.: https://spark.rstudio.com/guides/streaming/

However, stream_read_json supports hdfs, s3a and file protocols only. Don't know if it would be interested to write the websocket in one of these to read from spark.

Just wanted to share this thoughts as I am curious to know if it could be useful here. I'm not really familiar with websockets sorry. If I found something else, I'll will come back.

2 Likes

Did you ever find a solution to this? I'm trying to solve the same issue

I sort of solved it myself in an ad hoc way, parsing the incoming json and building up a dataframe in memory (eg ‘purrr::map_df(.x = new_json, .f = some_parser_fn’) , which is then written to a local db when the df exceeds a certain row threshold (if you follow this pattern, you might wish to tune this, depending on the resource/ cpu/ memory constraint you face).

But I’m in the process of building something more robust myself and am taking @cderv ‘s advice above, learning spark/ sparklyr, and I’m especially interested in using Apache Arrow as a pre-db-write step in order to have access to the most recent in-memory data.

1 Like

So what code did you use within the onMessage event? Or is that not how you resolved it.

If your question's been answered (even by you!), would you mind choosing a solution? It helps other people see which questions still need help, or find solutions if they have similar problems. Here’s how to do it:

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.