error when trying to copy data with sparklyr

I'm getting the following errors when I try to copy data spark:

>   # Connect to your Spark cluster
>   spark_conn <- spark_connect(master ="spark://ignacio-XPS-8930:7077",                             
+                               spark_home = "/usr/local/spark-2.4.0-bin-hadoop2.7")
> # Copy track_metadata to Spark
> track_metadata_tbl <- copy_to(spark_conn, track_metadata, overwrite = TRUE)
Error: java.lang.IllegalArgumentException: Unsupported class file major version 55
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
	at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
	at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
	at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
	at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
	at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
	at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
	at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
	at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at sparklyr.Utils$.collectColumnString(utils.scala:44)
	at sparklyr.Utils$.collectColumn(utils.scala:58)
	at sparklyr.Utils.collectColumn(utils.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at sparklyr.Invoke.invoke(invoke.scala:139)
	at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
	at sparklyr.StreamHandler.read(stream.scala:66)
	at sparklyr.BackendHandler.channelRead0(handler.scala:51)
	at sparklyr.BackendHandler.channelRead0(handler.scala:4)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.base/java.lang.Thread.run(Thread.java:834)
> 

What am I doing wrong?

> sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.10

Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/openblas/libblas.so.3
LAPACK: /usr/lib/x86_64-linux-gnu/libopenblasp-r0.3.3.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] dplyr_0.7.8    RSQLite_2.1.1  sparklyr_0.9.4

loaded via a namespace (and not attached):
 [1] tidyselect_0.2.5 xfun_0.4         forge_0.1.0      purrr_0.2.5     
 [5] lattice_0.20-38  generics_0.0.2   htmltools_0.3.6  yaml_2.1.19     
 [9] base64enc_0.1-3  blob_1.1.1       rlang_0.3.1      later_0.8.0     
[13] pillar_1.2.3     glue_1.3.0       withr_2.1.2      DBI_1.0.0       
[17] bit64_0.9-7      dbplyr_1.3.0     bindrcpp_0.2.2   bindr_0.1.1     
[21] htmlwidgets_1.3  memoise_1.1.0    evaluate_0.12    knitr_1.21      
[25] callr_3.1.1      ps_1.3.0         httpuv_1.4.5.1   parallel_3.5.2  
[29] broom_0.5.1      r2d3_0.2.3       Rcpp_1.0.0       clipr_0.5.0     
[33] xtable_1.8-3     openssl_1.2.1    promises_1.0.1   backports_1.1.3 
[37] jsonlite_1.5     config_0.3       mime_0.6         fs_1.2.6        
[41] bit_1.1-14       askpass_1.1      digest_0.6.15    processx_3.2.1  
[45] shiny_1.2.0      rprojroot_1.3-2  grid_3.5.2       tools_3.5.2     
[49] magrittr_1.5     lazyeval_0.2.1   tibble_1.4.2     tidyr_0.8.2     
[53] whisker_0.3-2    pkgconfig_2.0.2  reprex_0.2.1     assertthat_0.2.0
[57] rmarkdown_1.11   httr_1.4.0       rstudioapi_0.7   R6_2.2.2        
[61] nlme_3.1-137     compiler_3.5.2  
1 Like

I was able to make some progress. In order to get a spark running at home I'm using the following docker-compose.yml

version: "2.2"
services:
  master:
    image: gettyimages/spark
    restart: always
    command: bin/spark-class org.apache.spark.deploy.master.Master -h master
    hostname: master
    environment:
      MASTER: spark://master:7077
      SPARK_CONF_DIR: /conf
      SPARK_PUBLIC_DNS: localhost
    expose:
      - 7001
      - 7002
      - 7003
      - 7004
      - 7005
      - 7077
      - 6066
    ports:
      - 4040:4040
      - 6066:6066
      - 7077:7077
      - 8888:8080
    volumes:
      - ./conf/master:/conf
      - ./data:/tmp/data

  worker:
    image: gettyimages/spark
    restart: always
    command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://master:7077
    hostname: worker
    environment:
      SPARK_CONF_DIR: /conf
      SPARK_WORKER_CORES: 2
      SPARK_WORKER_MEMORY: 8g
      SPARK_WORKER_PORT: 8881
      SPARK_WORKER_WEBUI_PORT: 8081
      SPARK_PUBLIC_DNS: localhost
    links:
      - master
    expose:
      - 7012
      - 7013
      - 7014
      - 7015
      - 8881
    ports:
      - 8081:8081
    volumes:
      - ./conf/worker:/conf
      - ./data:/tmp/data

Once I have spark up and running, I have no troubles copying a small table to it:

# Connect to your Spark cluster
spark_conn <- spark_connect(master ="spark://192.168.86.31:7077",
                            spark_home = "/usr/local/spark-2.4.0-bin-hadoop2.7")

# Copy iris to Spark
track_metadata_tbl <- copy_to(spark_conn, iris, overwrite = TRUE)

# List the data frames available in Spark
src_tbls(spark_conn)

[1] "iris"

However, when I try to copy a bigger table things do not work:

# Copy track_metadata to Spark
track_metadata_tbl <- copy_to(spark_conn, track_metadata, overwrite = TRUE)

Error: java.lang.ArrayIndexOutOfBoundsException: 14
	at sparklyr.Utils$$anonfun$11$$anonfun$12.apply(utils.scala:337)
	at sparklyr.Utils$$anonfun$11$$anonfun$12.apply(utils.scala:336)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:234)
	at sparklyr.Utils$$anonfun$11.apply(utils.scala:336)
	at sparklyr.Utils$$anonfun$11.apply(utils.scala:334)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
	at sparklyr.Utils$.createDataFrameFromText(utils.scala:334)
	at sparklyr.Utils.createDataFrameFromText(utils.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sparklyr.Invoke.invoke(invoke.scala:139)
	at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
	at sparklyr.StreamHandler.read(stream.scala:66)
	at sparklyr.BackendHandler.channelRead0(handler.scala:51)
	at sparklyr.BackendHandler.channelRead0(handler.scala:4)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)

Any ideas for how to fix this?

1 Like

might make your driver memory bigger is a potential option.

I don't think memory is the problem

image

these information can not tell me it is not memory issue.
Because spark has two memory: driver memory and worker memory. Most of time, the driver memory is not enough to handle data when copying data.

You can check spark memory in yarn spark-ui or spark configuration, here is the materials that you might need.

https://spark.rstudio.com/guides/connections/

Thanks @harryzhu. I think the following screenshot shows that my problem is with the driver memory

However, I was not able to figure out how to modify my docker-compose.yaml to increase that. Any suggestions?

I do not know what is bin/spark-class in your script, and you need to seek driver related configuration, or if you can share me the git repo or code, then I can help you more.

thanks a lot for the patience and help @harryzhu. I'm very new to this. This repo contains the markdown with the code that I'm running and the docker-compose.yml that I'm using to get Spark to run on my ubuntu box. For ease of use, I also hosted the data there using Git LFS.

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.

If you have a query related to it or one of the replies, start a new topic and refer back with a link.