appending to parquet file with {arrow}

I've been trying to figure out how to append to a parquet file using the arrow R package. It looks like the write_parquet function doesn't support appends because if I do the following:

outfile <- FileOutputStream$create("test.out")

chunk_results %>%
    write_parquet(outfile)

outfile$close()

I get this error:

Error in parquet___arrow___ParquetFileWriter__Open(schema, sink, properties,  : 
  IOError: Appending to file not implemented.

if I change my write function to write_feather() it seems to happily append, so this is a limitation of write_parquet() it seems. For the exercise I'm doing I really need parquet files. Any thoughts on how to most easily get there from here? It seems the Python wrapper for arrow does support parquet so I'm pondering just switching to python for this exercise. But curious if you all see a less drastic work around.

I’ve been trying to achieve something similar. My goal is to process large CSVs in R and write the data into Parquet files (preferably one CSV into one Parquet) as a persistent form of storage. My CSVs have a tens of millions of rows but hundreds, sometimes thousands of columns, which means my machine can’t handle them as a single data frame. But in later use, not all columns are needed in an analysis, so having single Parquet files, prepared in chunks, and then selecting columns with the col_select argument in read_parquet() would be really convenient.

AFAIK it is not explicitly documented anywhere but there are many mentions in various forums about how Parquet is supposed to be used in partitioned files. There’s a recent issue in the Apache bugtracker on this topic, where the developers concluded that it is not possible to append Parquet files without major additional work. They recommend writing into separate Parquet files as a partitioned multi-file dataset (see Working with Arrow Datasets and dplyr, and also these, two Stack Overflow answers). This can have advantages, as partitioned datasets can be queried quickly, but it certainly makes things slightly more complicated (setting up a directory structure for each dataset).

There’s another solution, which involves Spark, using sparklyr. This way Spark ingests data in chunks, binds the chunks into a single table, and writes that table on disk into a single file with (relatively) low memory usage. However, it comes with even more complications and limitations.

  • Spark doesn’t handle factors, while arrow supports them. Although datasets are problematic there as well, because factor levels have to be the same across partitions (and you have to check that manually otherwise opening a dataset will silently mess up factor levels). Spark has a feature transformation functions, and sparklyr::ft_string_indexer()) can create index columns, but that’s not a native R solution and much more limited than proper factors.
  • I ran into time zone problems with Spark, although I’m not sure if it’s a bug, or I missed something.

So I guess we should embrace partitions?

1 Like

I’ll share my code for testing Spark below, in case someone finds it useful.

library(tidyverse)
library(arrow)
library(sparklyr)
library(kableExtra)
sc <- spark_connect(master = "local")
x <- letters[1:4]
df <- data.frame(
  chr = x,
  fct = factor(x),
  date = seq(
    as.Date("2020-06-01"),
    as.Date("2020-06-04"),
    length.out = 4
  ),
  dttm = seq(
    as.POSIXct("2020-06-01", tz = "UTC"),
    as.POSIXct("2020-06-02", tz = "UTC"),
    length.out = 4
  ),
  stringsAsFactors = FALSE
)
df <- bind_rows(df, df, .id = "id")

Writing a single Parquet file from Spark is not so straightforward.

  • We need to load the data in chunks into separate Spark DataFrames.
  • These Spark DataFrames need to be coalesced, otherwise Spark will write the data into several partitions on disk.
  • Even without any partitioning, Spark will write the Parquet file into a directory (given as path in spark_write_parquet()), where the actual Parquet file has a random name, something like part-00000-bfefeade-e8a6-4355-90e8-129b6157a3e2-c000.snappy.parquet, with additional metadata in other files (an empty _SUCCESS file, and checksums). I want a simple Parquet file, so some files have to be shuffled around at the end.

The following function automates all these steps.

write_parquet_with_spark <- function(df, path, con = sc) {
  l <- split(df, df$id)
  names(l) <- paste0("t", names(l))

  sc_tbls <- map2(
    l,
    names(l),
    ~ copy_to(con, .x, .y, overwrite = TRUE)
  )

  t <- sdf_bind_rows(sc_tbls) %>%
    sdf_coalesce(partitions = 1) %>%
    compute("t")

  tmp <- tempfile()
  spark_write_parquet(t, path = tmp, mode = "overwrite")
  parquet_from_spark <- list.files(path = tmp, pattern = "*.parquet", full.names = TRUE)
  if (length(parquet_from_spark) != 1) {
    stop("More than one parquet file generated!")
  }
  file.copy(parquet_from_spark, path)
}

Let’s test writing out Parquet files, with both example data frames,
using arrow and sparklyr. It has a lot of overhead with Spark.

fun <- c("write_parquet", "write_parquet_with_spark")
df_write <- crossing(fun, df = list(df)) %>%
  mutate(
    path = str_c("df", fun, sep = "_"),
    path = str_c(path, "parquet", sep = ".")
  ) %>%
  select(fun, df, path)

system.time(pwalk(df_write, ~ exec(..1, ..2, ..3)))
## Warning: `data_frame()` is deprecated as of tibble 1.1.0.
## Please use `tibble()` instead.
## This warning is displayed once every 8 hours.
## Call `lifecycle::last_warnings()` to see where this warning was generated.

##    user  system elapsed 
##    2.32    0.12   37.55
spark_disconnect(sc)

And check the results:

fs::dir_info(glob = "df*.parquet") %>%
  kable()

path

type

size

permissions

modification_time

user

group

device_id

hard_links

special_device_id

inode

block_size

blocks

flags

generation

access_time

change_time

birth_time

df_write_parquet.parquet

file

1.79K

rw-

2020-06-19 11:02:37

NA

NA

114062174

1

0

5.629500e+15

4096

8

0

0

2020-06-18 15:09:59

2020-06-19 11:02:37

2020-06-18 15:09:59

df_write_parquet_with_spark.parquet

file

1.23K

rw-

2020-06-18 15:10:27

NA

NA

114062174

1

0

6.755399e+15

4096

8

0

0

2020-06-18 15:10:27

2020-06-18 15:10:27

2020-06-18 15:10:27

df_check <- df_write %>%
  mutate(
    df_read = map(path, read_parquet),
    all_equal = map2_chr(df, df_read, all_equal)
  )

df_check %>%
  select(path, all_equal) %>%
  kable()

path

all_equal

df_write_parquet.parquet

TRUE

df_write_parquet_with_spark.parquet

  • Different types for column fct: factor<10564> vs character
  • Different types for column dttm: datetime<UTC> vs
    datetime<local>
2 Likes

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.