Postgres Parameterized Insert/Update Optimization

*edited to incorporate clarifications in the comments below

I work with a remote Postgres database to which I write a fairly large number of records. The datasets that I work with change over time and require upkeep with daily record inserts/updates ranging from the thousands to the low millions

I understand that R is may not be the best tool for the job but I was wondering if anyone could provide some feed back on what the most efficient insert/update process would look like. Below is a reprex that requires a Postgres 9.3ish or later database (to take advantage of the 'on conflict' clause), preferably a remote instance to reproduce the performance I'm experiencing.

The insert/update of the ~17.5k records I show below takes ~135 seconds for me. When I run the same code from a VM sitting next to the database, execution takes ~5 seconds...This indicates to me that dbBind is sending the upsert by line and is awaiting a response which is exacerbated by the time it takes to send data over the wire.

In the example below I've created a python script that executes the same upsert remotely using python's postgresql library. This function allows the script to perform a bulk upsert which completes in ~5 seconds. Are bulk upserts (or bulk updates) possible in pure R using the DBI parameterized framework?

db_insert_fun.py

import pandas
import postgresql

def db_insert_fun(load_sql, df):
    with postgresql.open("pq://db_user@db_host/db_name") as db:
        with db.xact():
            print('SQL to prepare:')
            print(load_sql)
            # prepare parameterized query
            ps = db.prepare(load_sql)

            # Turn DF into array of tuples
            out_rows = []
            for r in df.iterrows():
                row = r[1]
                # make list of tuples
                t = ()
                for c in df.columns:
                    t = t + (row[c], )
                out_rows.append(t)
                
            #Send to DB
            ps.load_rows(out_rows)

db_speed_test.R

# Database insert/update speed test
library(tidyverse)
library(RPostgres)
library(glue)
#> 
#> Attaching package: 'glue'
#> The following object is masked from 'package:dplyr':
#> 
#>     collapse
library(reticulate)

# remote database connection details
con <- dbConnect(drv = Postgres(),
                 dbname = "db_name",
                 host = "db_host",
                 port = 5432,
                 user = "db_user")

# build test dataframe
df <- tibble(eff_date = seq(Sys.Date() - 25, to = Sys.Date(), by = "day"),
             item = letters,
             report_center = LETTERS) %>%
    expand(eff_date, item, report_center) %>%
    mutate(value = rnorm(17576, mean = 0, sd = 1000))

# copy the dataframe to the database under the name 'speed_test'
copy_to(con, df, "speed_test", unique_indexes = list(c("eff_date", "item", "report_center")), temporary = FALSE)

# add an insert_timestamp column
dbExecute(con, "ALTER TABLE speed_test ADD COLUMN insert_timestamp timestamp with time zone")
#> [1] 0

# pre upsert table
tbl(con, "speed_test")
#> # Source:   table<speed_test> [?? x 5]
#> # Database: postgres [dbc_write@yoda.deepbasin.capital:5432/dbc]
#>    eff_date   item  report_center   value insert_timestamp   
#>    <date>     <chr> <chr>           <dbl> <dttm>             
#>  1 2018-10-11 a     A              -750.  NA                 
#>  2 2018-10-11 a     B               244.  NA                 
#>  3 2018-10-11 a     C              1230.  NA                 
#>  4 2018-10-11 a     D              -582.  NA                 
#>  5 2018-10-11 a     E              1411.  NA                 
#>  6 2018-10-11 a     F               509.  NA                 
#>  7 2018-10-11 a     G              1356.  NA                 
#>  8 2018-10-11 a     H               498.  NA                 
#>  9 2018-10-11 a     I              -934.  NA                 
#> 10 2018-10-11 a     J                69.7 NA                 
#> # ... with more rows

# data changes and adds new records since initial load
df <- df %>% 
    mutate(value = value + rnorm(nrow(df))) %>% 
    bind_rows(tibble(
        eff_date = Sys.Date() + 1,
        item = letters[1],
        report_center = LETTERS[1],
        value = rnorm(1, mean = 0, sd = 1000)))
    
# build column index column names from dataframe
col_index <- glue_collapse(glue("${index}",index = 1:length(df)),",")
col_names <- glue_collapse(names(df),",")

# assign (optionally composite) unique index
u_index <- glue_collapse(c("eff_date", "item", "report_center"), ",")

upsert_sql <- glue(
    "INSERT INTO speed_test ({col_names}) VALUES ({col_index})
    ON CONFLICT ({u_index})
    DO UPDATE SET ({col_names},insert_timestamp) = ({col_index},CURRENT_TIMESTAMP)")

# R function that takes upsert_sql and df
r_db_insert_fun <- function(upsert_sql, df) {
    
    dbBegin(con)
    results <- dbSendStatement(con, upsert_sql)
    print(dbGetStatement(results))
    
    out_rows <- df %>% unname
    
    dbBind(results, out_rows)
    
    print(glue("Rows Inserted/Updated: {row_count}", row_count = dbGetRowsAffected(results)))
    dbClearResult(results)
    dbCommit(con)
    
}

# call python insert function using reticulate
source_python("db_insert_fun.py")

# apply r database insert function
system.time(r_db_insert_fun(upsert_sql, df))
#> INSERT INTO speed_test (eff_date,item,report_center,value) VALUES ($1,$2,$3,$4)
#> ON CONFLICT (eff_date,item,report_center)
#> DO UPDATE SET (eff_date,item,report_center,value,insert_timestamp) = ($1,$2,$3,$4,CURRENT_TIMESTAMP)
#> Rows Inserted/Updated: 17577
#>    user  system elapsed 
#>    0.47    0.69  137.54

# apply python database insert function
system.time(db_insert_fun(upsert_sql, df))
#>    user  system elapsed 
#>    2.86    0.93    5.07

# table post upsert(s)
tbl(con, "speed_test")
#> # Source:   table<speed_test> [?? x 5]
#> # Database: postgres [dbc_write@yoda.deepbasin.capital:5432/dbc]
#>    eff_date   item  report_center   value insert_timestamp   
#>    <date>     <chr> <chr>           <dbl> <dttm>             
#>  1 2018-10-11 a     A              -751.  2018-11-05 10:05:50
#>  2 2018-10-11 a     B               245.  2018-11-05 10:05:50
#>  3 2018-10-11 a     C              1229.  2018-11-05 10:05:50
#>  4 2018-10-11 a     D              -582.  2018-11-05 10:05:50
#>  5 2018-10-11 a     E              1411.  2018-11-05 10:05:50
#>  6 2018-10-11 a     F               511.  2018-11-05 10:05:50
#>  7 2018-10-11 a     G              1355.  2018-11-05 10:05:50
#>  8 2018-10-11 a     H               499.  2018-11-05 10:05:50
#>  9 2018-10-11 a     I              -935.  2018-11-05 10:05:50
#> 10 2018-10-11 a     J                68.8 2018-11-05 10:05:50
#> # ... with more rows

# drop speed_test table
dbExecute(con, "DROP TABLE speed_test")
#> [1] 0

dbDisconnect(con)

Created on 2018-11-05 by the reprex package (v0.2.1.9000)

1 Like

Hi,

If you have already uploaded the full dataframe on your remote server in speed_test table, why are you not doing a single insert using the full content of speed_test?

I.e.

insert into your_table (col)
select col from speed_test
on conflict [...]

Regards,

jm

Hey @jm_t. Perhaps my example is not fully illustrative of my use case, but I wanted to try to keep it simple. The data that underpins "speed_test" is in constant flux, with new records being created and old records being constantly revised. This is why I have to use some sort of combination of insert and update, dictated by the structure of the unique key constraint to identify if a record is new or merely an old record showing up again. I've made changes to the example to reflect this.

Hi,

It is weird to see an insert values statement i.e. operating at the record level while you are targeting millions rows. SQL is all about operations on sets.

You should review your process so that you work on bigger data sets. Would reduce the amount of transactions sent to your remote database.

Just my two cents.

jm

@jm_t thanks for the input. I suppose to should offer some additional context. I have a very efficient python function (relying on the old but very good python postgresql) that converts my dataframe into a list of tuples and loads those to the database using effectively the same methodology. I'm wondering how DBI::dbSendQuery DBI::dbBind differs such that the performance is not comparable. What follows is the python function and the use of (the incredibly cool) reticulate::source_python to call the function and apply it in R.

db_insert_fun.py

import pandas
import postgresql

def db_magic(load_sql, df):
    with postgresql.open("pq://user@db_host/db_name") as db:
        with db.xact():
            print('SQL to prepare:')
            print(load_sql)
            # prepare parameterized query
            ps = db.prepare(load_sql)

            # Turn DF into array of tuples
            out_rows = []
            for r in df.iterrows():
                row = r[1]
                # make list of tuples
                t = ()
                for c in df.columns:
                    t = t + (row[c], )
                out_rows.append(t)
                
            #Send to DB
            ps.load_rows(out_rows)

db_speed_test.R

# Database insert/update speed test
library(tidyverse)
library(RPostgres)
library(glue)
library(reticulate)

# build test dataframe
df <- tibble(eff_date = seq(Sys.Date() - 9, to = Sys.Date(), by = "day"),
             item = letters[1:10],
             report_center = LETTERS[1:10]) %>%
    expand(eff_date, item, report_center) %>%
    mutate(value = rnorm(1000, mean = 0, sd = 1000))

# remote database connection details
con <- dbConnect(drv = Postgres(),
                 dbname = "db_name",
                 host = "db_host",
                 port = 5432,
                 user = "user")

# copy the dataframe to the database under the name 'speed_test'
copy_to(con, df, "speed_test", unique_indexes = list(c("eff_date", "item", "report_center")), temporary = FALSE)

# add an insert_timestamp column
dbExecute(con, "ALTER TABLE speed_test ADD COLUMN insert_timestamp timestamp with time zone")

tbl(con, "speed_test")

# data changes and adds new records since initial load
df <- df %>% 
    mutate(value = value + rnorm(1000)) %>% 
    bind_rows(tibble(
        eff_date = Sys.Date() + 1,
        item = letters[11],
        report_center = LETTERS[11],
        value = rnorm(1, mean = 0, sd = 1000)
    ))

# general parameterized insert/update function
param_upsert <- function (db_tbl, u_index, df) {
    
    # build column index column names from dataframe
    col_index <- glue_collapse(glue("${index}",index = 1:length(df)),",")
    col_names <- glue_collapse(names(df),",")
    
    # assign (optionally composite) unique index
    u_index <- glue_collapse(u_index, ",")
    
    upsert_sql <- glue(
        "INSERT INTO {db_tbl} ({col_names}) VALUES ({col_index})
        ON CONFLICT ({u_index})
        DO UPDATE SET ({col_names},insert_timestamp) = ({col_index},CURRENT_TIMESTAMP)"
    )
    
    # call python insert function using reticulate
    source_python("db_insert_fun.py")
    
    # apply python database insert function
    db_magic(upsert_sql, df)
    
}

system.time(param_upsert("speed_test", c("eff_date", "item", "report_center"), df))

tbl(con, "speed_test")

dbExecute(con, "DROP TABLE speed_test")

dbDisconnect(con)

This executes the upsert in ~0.68 seconds, down from 30 seconds in the pure R version shown in my initial question. Is there a pure R version of this function that maintains the flexibility of this framework while improving the execution time?

1 Like

Hi,

My understanding is as follow:

The python code is doing a bulk insert on the database. The rows are locally added together and then sent as one batch to the database.

The r code is sending one row at a time to the remote database.

Hence the likely extra amount of connection. You should trace what is happening on the Postgres server in both scenario using 1k records to validate this hypothesis.

Regards,

JM

1 Like

Thanks @jm_t. That is my understanding as well. With that in mind, is there a way to execute bulk upserts (or updates for that matter) using DBI? Or is this only possible using Python or having all the logic live in SQL and executing with a series of triggers?

I am not a DBI user so take the following with caution.

dbWriteTable seams to be the way to upload your dataframe as a table.

sqlAppendTable seams to be of a lower level.

As a preference, I would do the following as pseudo code:

  1. Start a transaction as serializable
  2. Upload dataframe as a temp table with a random name
  3. Execute a function doing the upsert, table name as input argument
  4. Drop the table
  5. Commit the transaction

Should give some protection against concurrent execution while having server side logic.

The function logic should be quite basic upsert statement using the temp table as source.

JM

Thanks @jm_t . Writing to a temp table and performing the insert/update inside the database was exactly how I had been handling this for the last several years and it wasn't until I built the python helper function that I even realized that bulk insert/update processing could be handled all in code as opposed to in the database. In my view, handling the database processing in code is preferable because it makes for a more flexible framework, but I maybe I'm splitting hairs at this point.

In any case, the python insert function does what I need and my question is really more of an academic one, specifically can bulk insert/updates be done in R. I really appreciate all your feedback.