Sparklyr 's error

Hi everyone,

I´m new in the R world and download sparklyr package 2.0.1 version in my rstudio 1.1.4 for windows 64 bits.

I active library spaklyr, connect like "local" and copy to Spark iris dataset perfect no problem, but when I try to copy other dataset from the Environment which have dimension (1292574,16) returns an error. When I filter this file with dimension
(2901,16) works fine copy the file in Spark, but when I filter it with dimension (9226,16) return error:

Anyone knows what´s wrong?

copy_to() is currently not optimized and therefore, it is not recommended for copying medium nor large data sets. Instead, we recommend you copy the data into the cluster and then load the data in Spark using the family of spark_read_*() functions. For instance, by copying all the data as CSVs and then using spark_read_csv().

That said, we are also looking into making improvements to copy_to() and collect() using Apache Arrow, you can track progress of this work with this pull request: github.com/rstudio/sparklyr/pull/1611.

Hi Javier,

I try to use
sparklyr::spark_read_csv(spark_conn, name = "RESP_MOD_EXC_T", path = "D:/Datos/ESCRITORIO/R/Prueba/Prueba.csv", header = TRUE)

but now return this error:

Which version of Spark are you using? I would try to run with at least Spark 2.0, if possible, Spark 1.6.X has a few parsing issues that got solved until 2.X.

For instance, the following code fails in 1.6 but succeeds in 2.X:

library(magrittr)
data.frame(a = c(1,2,3,21602730330000060100100000358582)) %>%
  write.csv("decimals.csv", row.names = F)

sc <- spark_connect(master = "local", version = "2.0.0")
spark_read_csv(sc, "decimals", "decimals.csv")

# Source:   table<decimals> [?? x 1]
# Database: spark_connection
        a
    <dbl>
1 0.     
2 0.     
3 0.     
4 2.16e31

Hi @Georguel! Welcome to RStudio Community!

As a note, it is extremely hard to help debug error messages when screenshots are posted. While you have managed to get help in this thread, it is likely to decrease your chances of getting the help you are looking for in general. Instead of posting a screenshot, please copy and paste the code and error message and format the pasted code/format as shown below.

Put code that is inline (such as a function name, like mutate or filter) inside of backticks (`mutate`) and chunks of code (including error messages and code copied from the console) can be put between sets of three backticks:

```
example <- foo %>%
  filter(a == 1)
```

This process can be done automatically by highlighting your code, either inline or in a chunk, and clicking the </> button on the toolbar of the reply window!

This will help keep our community tidy and help you get the help you are looking for!

For more information, please take a look at the community's FAQ on formating code

1 Like

Hi Tyler, thanks for the information, regards.

Hi Javier I´m using Sparklyr 2.0.1 take a look the following two examples, the first don´t work but the last one works very well, I really don´t know what´s going on:

> # Connect to your Spark cluster
> spark_conn <- spark_connect("local")
* Using Spark: 2.0.1
> 
> # Print the version of Spark
> spark_version(spark_conn)
[1] ‘2.0.1’
> ##################### Example 1 ########################
> 
> # Filter Dataset
> RESP_1 <-read_csv("D:/Datos/ESCRITORIO/R/Prueba/Prueba.csv",col_names = T)
Parsed with column specification:
cols(
  CO_ENCUESTA = col_character(),
  CO_VERSION = col_integer(),
  CO_CUESTIONARIO = col_integer(),
  CO_CLAVE_RESP = col_double(),
  FE_PERIO_REF = col_date(format = ""),
  FE_RECEPCION = col_date(format = ""),
  CO_INSTITUCION = col_integer(),
  CO_APLICACION = col_character(),
  CO_ROL_INFORMANTE = col_integer(),
  CO_CLAVE_INF = col_integer(),
  CO_CIUDAD = col_integer(),
  CO_REGION = col_integer(),
  CO_ESTADO = col_integer(),
  CO_PREGUNTA = col_character(),
  CO_CORREL_RESPUEST = col_integer(),
  TX_RESPUESTA = col_character()
)
|===============================================================================================================| 100%  219 MB
> 
> #Dim RESP_MOD_EXC_T
> dim(RESP_1)
[1] 1292540      16
> 
> #Size RESP_MOD_EXC_T
> object_size(RESP_1)
 119307832

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 4.0 failed 1 times, most recent failure: Lost task 6.0 in stage 4.0 (TID 16, localhost): java.lang.IllegalArgumentException: requirement failed: Decimal precision 9 exceeds max precision 6
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.sql.types.Decimal.set(Decimal.scala:112)
	at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:425)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:264)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:118)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryRelation.scala:110)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:670)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:911)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2227)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2226)
	at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
	at org.apache.spark.sql.execution.command.CacheTableCommand.run(cache.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknow

but in this example works:

##################### Example 2 ########################
> # Filter Dataset
> RESP_3 <-read_csv("D:/Datos/ESCRITORIO/R/Prueba/Prueba.csv",col_names = T) %>% 
+   filter(CO_INSTITUCION ==3)
Parsed with column specification:
cols(
  CO_ENCUESTA = col_character(),
  CO_VERSION = col_integer(),
  CO_CUESTIONARIO = col_integer(),
  CO_CLAVE_RESP = col_double(),
  FE_PERIO_REF = col_date(format = ""),
  FE_RECEPCION = col_date(format = ""),
  CO_INSTITUCION = col_integer(),
  CO_APLICACION = col_character(),
  CO_ROL_INFORMANTE = col_integer(),
  CO_CLAVE_INF = col_integer(),
  CO_CIUDAD = col_integer(),
  CO_REGION = col_integer(),
  CO_ESTADO = col_integer(),
  CO_PREGUNTA = col_character(),
  CO_CORREL_RESPUEST = col_integer(),
  TX_RESPUESTA = col_character()
)
|===============================================================================================================| 100%  219 MB
> 
> write_csv(RESP_3,"D:/Datos/ESCRITORIO/R/Prueba/RESP_3.csv")
> 
> RESP_3 <-read_csv("D:/Datos/ESCRITORIO/R/Prueba/RESP_3.csv",col_names = T)
Parsed with column specification:
cols(
  CO_ENCUESTA = col_character(),
  CO_VERSION = col_integer(),
  CO_CUESTIONARIO = col_integer(),
  CO_CLAVE_RESP = col_integer(),
  FE_PERIO_REF = col_date(format = ""),
  FE_RECEPCION = col_date(format = ""),
  CO_INSTITUCION = col_integer(),
  CO_APLICACION = col_character(),
  CO_ROL_INFORMANTE = col_integer(),
  CO_CLAVE_INF = col_integer(),
  CO_CIUDAD = col_integer(),
  CO_REGION = col_integer(),
  CO_ESTADO = col_integer(),
  CO_PREGUNTA = col_character(),
  CO_CORREL_RESPUEST = col_integer(),
  TX_RESPUESTA = col_character()
)
> 
> #Dim RESP_MOD_EXC_T
> dim(RESP_3)
[1] 2901   16
> 
> #Size RESP_MOD_EXC_T
> object_size(RESP_3)
 287904 

Would changing the mode parameter help you out?

For instance, for a bad CSV file:

writeLines(c("bad", 1, 2, 3, "broken"), "bad.csv")

There are a couple modes that can help troubleshoot parsing issues:

  • PERMISSIVE: NULLs are inserted for missing tokens.
  • DROPMALFORMED: Drops lines which are malformed.
  • FAILFAST: Aborts if encounters any malformed line.

Which can be used as follows:

spark_read_csv(
  sc,
  "bad",
  "bad.csv",
  columns = list(foo = "integer"),
  infer_schema = FALSE,
  options = list(mode = "DROPMALFORMED"))
# Source:   table<bad> [?? x 1]
# Database: spark_connection
    foo
  <int>
1     1
2     2
3     3

In Spar 2.X, there is also a secret column _corrupt_record that can be used to output those incorrect records:

spark_read_csv(
  sc,
  "decimals",
  "bad.csv",
  columns = list(foo = "integer", "_corrupt_record" = "character"),
  infer_schema = FALSE,
  options = list(mode = "PERMISIVE")
)
# Source:   table<decimals> [?? x 2]
# Database: spark_connection
    foo `_corrupt_record`
  <int> <chr>            
1     1 NA               
2     2 NA               
3     3 NA               
4    NA broken 

Hi Javier It´s works, yes!! thanks for your help, regards.