Introduction
Following R code
is written to read JSON
file. At the end, it is creating database schema. I want to write csv
file. I have written this code to convert JSON to CSV
.
R Code
sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
overwrite = TRUE)
sdf_schema_viewer(sample_tbl) # to create db schema
df <- spark_dataframe(sample_tbl)
After above code, following line has been written, but it shows error.
spark_write_csv(df, path = "data.csv", header = TRUE, delimiter = ",",
charset = "UTF-8", null_value = NULL,
options = list(), mode = NULL, partition_by = NULL)
How to write CSV
file in Sparklyr R
?
No need to cast sample_tbl
to a spark_dataframe(), instead, run:
sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
overwrite = TRUE)
spark_write_csv(sample_tbl, path = "data.csv", header = TRUE, delimiter = ",",
charset = "UTF-8", null_value = NULL,
options = list(), mode = NULL, partition_by = NULL)
@javierluraschi, I have run the same code and instruction given by you. I have not casted sample_tbl
to a spark_dataframe()
The code shows following error while creating CSV file. It failed to create CSV file.
Error:
Error: java.lang.UnsupportedOperationException: CSV data source does not support
at java.lang.Thread.run(Thread.java:748)
What may be the reason of error?
To my knowledge, Spark does not support saving nested data to CSV, so data needs to be un-nested before saving to CSV.
@javierluraschi, Now I have un-nested (flattened) the data in Sparklyr
by using following R code. Now, further I requir to save this data into CSV file. Now, can you help?
library(sparklyr)
library(dplyr)
library(sparklyr.nested)
# spark_install(version = "2.2.0")
library(jsonlite)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
conf <- spark_config()
conf$'sparklyr.shell.executor-memory' <- "20g"
conf$'sparklyr.shell.driver-memory' <- "20g"
conf$spark.executor.cores <- 16
conf$spark.executor.memory <- "20G"
conf$spark.yarn.am.cores <- 16
conf$spark.yarn.am.memory <- "20G"
conf$spark.executor.instances <- 8
conf$spark.dynamicAllocation.enabled <- "false"
conf$maximizeResourceAllocation <- "true"
conf$spark.default.parallelism <- 32
sc <- spark_connect(master = "local", config = conf, version = '2.2.0') # Connection
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE, overwrite = TRUE)
sdf_schema_viewer(sample_tbl) # to create db schema
# Code to un-nest data as below,
columns_to_flatten <- sdf_schema_json(sample_tbl, simplify = T) %>%
# using rlist package for ease of use
rlist::list.flatten(use.names = T) %>%
# get names
names() %>%
# remove contents of brackets and whitespace
gsub("\\(.*?\\)|\\s", "", .) %>%
# add alias for column names, dot replaced with double underscore
# this avoids duplicate names that would otherwise occur with singular
{paste(., "AS", gsub("\\.", "__", .))} %>%
# required, otherwise doesn't seem to work
sub("variants", "variants[0]", .)
# construct query
sql_statement <- paste("SELECT",
paste(columns_to_flatten, collapse = ", "),
"FROM example")
# execute on spark cluster, save as table in cluster
spark_session(sc) %>%
sparklyr::invoke("sql", sql_statement) %>%
sparklyr::invoke("createOrReplaceTempView", "flattened_example")
tbl(sc, "flattened_example") %>%
sdf_schema_viewer()
final_unnested_table <- collect(tbl(sc, "flattened_example))
Now, I want to ask how to create CSV file (I mean already data got un-nested by using above R code) ?
Note - To reproduce the example you can navigate and save the data - JSON Data - 0.5 MB (10 lines only) and apply in above R code.