How to write CSV file in sparklyr R?

sparklyr

#1

Introduction

Following R code is written to read JSON file. At the end, it is creating database schema. I want to write csv file. I have written this code to convert JSON to CSV .

R Code

sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
                              overwrite = TRUE) 
sdf_schema_viewer(sample_tbl) # to create db schema
df <- spark_dataframe(sample_tbl)

After above code, following line has been written, but it shows error.

spark_write_csv(df, path = "data.csv", header = TRUE, delimiter = ",",
                charset = "UTF-8", null_value = NULL,
                options = list(), mode = NULL, partition_by = NULL)

Error

Error in spark_expect_jobj_class(x, "org.apache.spark.sql.DataFrame") : 
  This operation is only supported on org.apache.spark.sql.DataFrame jobjs but found org.

How to write CSV file in Sparklyr R ?


#2

No need to cast sample_tbl to a spark_dataframe(), instead, run:

sc <- spark_connect(master = "local", config = conf, version = '2.2.0')
sample_tbl <- spark_read_json(sc,name="example",path="example.json", header = TRUE, memory = FALSE,
                              overwrite = TRUE) 

spark_write_csv(sample_tbl, path = "data.csv", header = TRUE, delimiter = ",",
                charset = "UTF-8", null_value = NULL,
                options = list(), mode = NULL, partition_by = NULL)

#4

@javierluraschi, I have run the same code and instruction given by you. I have not casted sample_tbl to a spark_dataframe()

The code shows following error while creating CSV file. It failed to create CSV file.

Error:

Error: java.lang.UnsupportedOperationException: CSV data source does not support struct<hashtags:array<string>,media:array<struct<additional_media_info:struct<description:string,embeddable:boolean,monetizable:boolean,title:string>,display_url:string,expanded_url:string,id:bigint,id_str:string,indices:array<bigint>,media_url:string,media_url_https:string,sizes:struct<large:struct<h:bigint,resize:string,w:bigint>,medium:struct<h:bigint,resize:string,w:bigint>,small:struct<h:bigint,resize:string,w:bigint>,thumb:struct<h:bigint,resize:string,w:bigint>>,source_status_id:bigint,source_status_id_str:string,source_user_id:bigint,source_user_id_str:string,type:string,url:string>>,symbols:array<string>,urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>,user_mentions:array<struct<id:bigint,id_str:string,indices:array<bigint>,name:string,screen_name:string>>> data type.
	at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:127)
	at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)
	at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:131)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.types.StructType.foreach(StructType.scala:98)
	at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.verifySchema(CSVUtils.scala:131)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:65)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:142)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:598)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sparklyr.Invoke.invoke(invoke.scala:137)
	at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
	at sparklyr.StreamHandler.read(stream.scala:66)
	at sparklyr.BackendHandler.channelRead0(handler.scala:51)
	at sparklyr.BackendHandler.channelRead0(handler.scala:4)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
	at java.lang.Thread.run(Thread.java:748)

What may be the reason of error?


#5

To my knowledge, Spark does not support saving nested data to CSV, so data needs to be un-nested before saving to CSV.


#6

@javierluraschi, Yes, trying to do that only. But, my data is nested upto 6th level. So difficult to unnest. Trying trying and trying. My data is 1 billion in JSON file. Only sparklyr package can read such huge file within few minutes by following cluster configuration. So following Sparklyr package only. All other ways have tried and at last sparklyr is supporting me. But not for unnesting. Jsonlite package easily flatten the file by using jsonlite::flatten(file_name). But it takes non-ending time to read such huge file. (Chunks also not be the solution). :disappointed_relieved: