How to write CSV file in sparklyr R?

sparklyr

#1

Introduction

Following R code is written to read JSON file. At the end, it is creating database schema. I want to write csv file. I have written this code to convert JSON to CSV .

R Code

sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
                              overwrite = TRUE) 
sdf_schema_viewer(sample_tbl) # to create db schema
df <- spark_dataframe(sample_tbl)

After above code, following line has been written, but it shows error.

spark_write_csv(df, path = "data.csv", header = TRUE, delimiter = ",",
                charset = "UTF-8", null_value = NULL,
                options = list(), mode = NULL, partition_by = NULL)

How to write CSV file in Sparklyr R ?


#2

No need to cast sample_tbl to a spark_dataframe(), instead, run:

sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
                              overwrite = TRUE) 

spark_write_csv(sample_tbl, path = "data.csv", header = TRUE, delimiter = ",",
                charset = "UTF-8", null_value = NULL,
                options = list(), mode = NULL, partition_by = NULL)

#4

@javierluraschi, I have run the same code and instruction given by you. I have not casted sample_tbl to a spark_dataframe()

The code shows following error while creating CSV file. It failed to create CSV file.

Error:

Error: java.lang.UnsupportedOperationException: CSV data source does not support 
at java.lang.Thread.run(Thread.java:748)

What may be the reason of error?


#5

To my knowledge, Spark does not support saving nested data to CSV, so data needs to be un-nested before saving to CSV.


#7

@javierluraschi, Now I have un-nested (flattened) the data in Sparklyr by using following R code. Now, further I requir to save this data into CSV file. Now, can you help?

library(sparklyr)
library(dplyr)
library(sparklyr.nested)
# spark_install(version = "2.2.0")
library(jsonlite)

Sys.setenv(SPARK_HOME="/usr/lib/spark")    
conf <- spark_config()
conf$'sparklyr.shell.executor-memory' <- "20g"
conf$'sparklyr.shell.driver-memory' <- "20g"
conf$spark.executor.cores <- 16
conf$spark.executor.memory <- "20G"
conf$spark.yarn.am.cores  <- 16
conf$spark.yarn.am.memory <- "20G"
conf$spark.executor.instances <- 8
conf$spark.dynamicAllocation.enabled <- "false"
conf$maximizeResourceAllocation <- "true"
conf$spark.default.parallelism <- 32
sc <- spark_connect(master = "local", config = conf, version = '2.2.0') # Connection             
 sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE, overwrite = TRUE) 
 sdf_schema_viewer(sample_tbl) # to create db schema

# Code to un-nest data as below,
columns_to_flatten <- sdf_schema_json(sample_tbl, simplify = T) %>%
  # using rlist package for ease of use
  rlist::list.flatten(use.names = T) %>%
  # get names
  names() %>%
  # remove contents of brackets and whitespace
  gsub("\\(.*?\\)|\\s", "", .) %>%
  # add alias for column names, dot replaced with double underscore
  # this avoids duplicate names that would otherwise occur with singular
  {paste(., "AS", gsub("\\.", "__", .))} %>%
  # required, otherwise doesn't seem to work
  sub("variants", "variants[0]", .)

# construct query
sql_statement <- paste("SELECT",
                       paste(columns_to_flatten, collapse = ", "),
                       "FROM example")

# execute on spark cluster, save as table in cluster
spark_session(sc) %>%
  sparklyr::invoke("sql", sql_statement) %>%
  sparklyr::invoke("createOrReplaceTempView", "flattened_example")
tbl(sc, "flattened_example") %>%
  sdf_schema_viewer()
final_unnested_table <- collect(tbl(sc, "flattened_example))

Now, I want to ask how to create CSV file (I mean already data got un-nested by using above R code) ?

Note - To reproduce the example you can navigate and save the data - JSON Data - 0.5 MB (10 lines only) and apply in above R code.