streaming updates to df

Q: Given a starting df and subsequent incoming batches of updated data, how can I efficiently incorporate this new data into a tidy final form?

eg

## our starter data template
starting_df <- data.frame(
  user = c("Amy", "Bob", "Carl"),
  timestamp = c(1, 1, 1),
  location = c(state.name[1:2] , NA),
  mood = c(NA, NA, "happy")
)


## we get two pieces of new info on Amy, one on Bob, but nothing on Carl
data_update <- data.frame(
  user = c("Amy", "Amy", "Bob"),
  timestamp = c(2,3,3),
  location = c(state.name[49:50] , NA),
  mood = c("sad", "happy", "sad")
)


desired_ending_df <-  data.frame(
  user = c("Amy", "Bob", "Carl"),
  timestamp = c(3,3,1),
  location = c(state.name[c(50,2)] , NA),
  mood = c("happy", "sad", "happy")
)

I can double loop, testing each element, row by column, but it's super ugly, tedious, and computationally expensive (in reality, the streams are frequent and much larger than this toy). I know there must be a way to use a grouping variable + some sort of purrr::mapping, but the details are tripping me up.

Ps: it seems outside the scope of this reprex, but IRL the incoming data to generate data_update is JSON streaming over a websocket which I then assemble into a df, in case that detail informs anyone's advice.

Thanks in advance for any help.

I'm very much looking to be corrected on this, but I would say that purrr is not a good tool for that. With any non-trivial amount of data you'll soon see that you are required to create new objects all over the place and it will be very slow.

This will happen because one of the cornerstones of how tidyverse works in general is immutable data. Whenever you update any vector (and updating rows in a dataframe is extremely expensive in R), you'll need to create a copy.

So I would say that it makes sense to take a look at, e.g., data.table since it modifies data in place and each subsequent update is quite cheap.

1 Like

I always do something like

possible_ending_df <- starting_df %>%
  bind_rows(data_update) %>%
  group_by(user) %>%
  filter(timestamp == max(timestamp))
# N.B user needs to be a character, not a factor, otherwise this breaks

or

possible_ending_df_2 <- bind_rows(
  starting_df %>% anti_join(data_update, by = "user"),
  data_update %>% group_by(user) %>% filter(timestamp == max(timestamp)) %>% ungroup()
)

But both of those become quite slow with big datasets and also can be tripped up if there are duplicates (rows with the same timestamp for the same user).

Epilogue: For various reasons, I ended up using data.table:

library(data.table)

# append updated data
new_df <- rbind(starting_df, data_update)

# convert to DT
new_df <- as.data.table(new_df)

# group by user (the key) and take the last non NA value
new_df[ , lapply(.SD, function(x) tail( x[!is.na(x)], 1), by = user]

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.