*edited to incorporate clarifications in the comments below
I work with a remote Postgres database to which I write a fairly large number of records. The datasets that I work with change over time and require upkeep with daily record inserts/updates ranging from the thousands to the low millions
I understand that R is may not be the best tool for the job but I was wondering if anyone could provide some feed back on what the most efficient insert/update process would look like. Below is a reprex that requires a Postgres 9.3ish or later database (to take advantage of the 'on conflict' clause), preferably a remote instance to reproduce the performance I'm experiencing.
The insert/update of the ~17.5k records I show below takes ~135 seconds for me. When I run the same code from a VM sitting next to the database, execution takes ~5 seconds...This indicates to me that dbBind is sending the upsert by line and is awaiting a response which is exacerbated by the time it takes to send data over the wire.
In the example below I've created a python script that executes the same upsert remotely using python's postgresql library. This function allows the script to perform a bulk upsert which completes in ~5 seconds. Are bulk upserts (or bulk updates) possible in pure R using the DBI parameterized framework?
db_insert_fun.py
import pandas
import postgresql
def db_insert_fun(load_sql, df):
with postgresql.open("pq://db_user@db_host/db_name") as db:
with db.xact():
print('SQL to prepare:')
print(load_sql)
# prepare parameterized query
ps = db.prepare(load_sql)
# Turn DF into array of tuples
out_rows = []
for r in df.iterrows():
row = r[1]
# make list of tuples
t = ()
for c in df.columns:
t = t + (row[c], )
out_rows.append(t)
#Send to DB
ps.load_rows(out_rows)
db_speed_test.R
# Database insert/update speed test
library(tidyverse)
library(RPostgres)
library(glue)
#>
#> Attaching package: 'glue'
#> The following object is masked from 'package:dplyr':
#>
#> collapse
library(reticulate)
# remote database connection details
con <- dbConnect(drv = Postgres(),
dbname = "db_name",
host = "db_host",
port = 5432,
user = "db_user")
# build test dataframe
df <- tibble(eff_date = seq(Sys.Date() - 25, to = Sys.Date(), by = "day"),
item = letters,
report_center = LETTERS) %>%
expand(eff_date, item, report_center) %>%
mutate(value = rnorm(17576, mean = 0, sd = 1000))
# copy the dataframe to the database under the name 'speed_test'
copy_to(con, df, "speed_test", unique_indexes = list(c("eff_date", "item", "report_center")), temporary = FALSE)
# add an insert_timestamp column
dbExecute(con, "ALTER TABLE speed_test ADD COLUMN insert_timestamp timestamp with time zone")
#> [1] 0
# pre upsert table
tbl(con, "speed_test")
#> # Source: table<speed_test> [?? x 5]
#> # Database: postgres [dbc_write@yoda.deepbasin.capital:5432/dbc]
#> eff_date item report_center value insert_timestamp
#> <date> <chr> <chr> <dbl> <dttm>
#> 1 2018-10-11 a A -750. NA
#> 2 2018-10-11 a B 244. NA
#> 3 2018-10-11 a C 1230. NA
#> 4 2018-10-11 a D -582. NA
#> 5 2018-10-11 a E 1411. NA
#> 6 2018-10-11 a F 509. NA
#> 7 2018-10-11 a G 1356. NA
#> 8 2018-10-11 a H 498. NA
#> 9 2018-10-11 a I -934. NA
#> 10 2018-10-11 a J 69.7 NA
#> # ... with more rows
# data changes and adds new records since initial load
df <- df %>%
mutate(value = value + rnorm(nrow(df))) %>%
bind_rows(tibble(
eff_date = Sys.Date() + 1,
item = letters[1],
report_center = LETTERS[1],
value = rnorm(1, mean = 0, sd = 1000)))
# build column index column names from dataframe
col_index <- glue_collapse(glue("${index}",index = 1:length(df)),",")
col_names <- glue_collapse(names(df),",")
# assign (optionally composite) unique index
u_index <- glue_collapse(c("eff_date", "item", "report_center"), ",")
upsert_sql <- glue(
"INSERT INTO speed_test ({col_names}) VALUES ({col_index})
ON CONFLICT ({u_index})
DO UPDATE SET ({col_names},insert_timestamp) = ({col_index},CURRENT_TIMESTAMP)")
# R function that takes upsert_sql and df
r_db_insert_fun <- function(upsert_sql, df) {
dbBegin(con)
results <- dbSendStatement(con, upsert_sql)
print(dbGetStatement(results))
out_rows <- df %>% unname
dbBind(results, out_rows)
print(glue("Rows Inserted/Updated: {row_count}", row_count = dbGetRowsAffected(results)))
dbClearResult(results)
dbCommit(con)
}
# call python insert function using reticulate
source_python("db_insert_fun.py")
# apply r database insert function
system.time(r_db_insert_fun(upsert_sql, df))
#> INSERT INTO speed_test (eff_date,item,report_center,value) VALUES ($1,$2,$3,$4)
#> ON CONFLICT (eff_date,item,report_center)
#> DO UPDATE SET (eff_date,item,report_center,value,insert_timestamp) = ($1,$2,$3,$4,CURRENT_TIMESTAMP)
#> Rows Inserted/Updated: 17577
#> user system elapsed
#> 0.47 0.69 137.54
# apply python database insert function
system.time(db_insert_fun(upsert_sql, df))
#> user system elapsed
#> 2.86 0.93 5.07
# table post upsert(s)
tbl(con, "speed_test")
#> # Source: table<speed_test> [?? x 5]
#> # Database: postgres [dbc_write@yoda.deepbasin.capital:5432/dbc]
#> eff_date item report_center value insert_timestamp
#> <date> <chr> <chr> <dbl> <dttm>
#> 1 2018-10-11 a A -751. 2018-11-05 10:05:50
#> 2 2018-10-11 a B 245. 2018-11-05 10:05:50
#> 3 2018-10-11 a C 1229. 2018-11-05 10:05:50
#> 4 2018-10-11 a D -582. 2018-11-05 10:05:50
#> 5 2018-10-11 a E 1411. 2018-11-05 10:05:50
#> 6 2018-10-11 a F 511. 2018-11-05 10:05:50
#> 7 2018-10-11 a G 1355. 2018-11-05 10:05:50
#> 8 2018-10-11 a H 499. 2018-11-05 10:05:50
#> 9 2018-10-11 a I -935. 2018-11-05 10:05:50
#> 10 2018-10-11 a J 68.8 2018-11-05 10:05:50
#> # ... with more rows
# drop speed_test table
dbExecute(con, "DROP TABLE speed_test")
#> [1] 0
dbDisconnect(con)
Created on 2018-11-05 by the reprex package (v0.2.1.9000)