I am trying to figure out if there is an easy function to drop an intermediate spark dataframe through sparklyr.
Let me explain by taking you through a workflow/use case:
- I have billions of rows in a hadoop "table"
- I connect to it with
spark_read_parquet
andmemory = FALSE
and register that back with R. - I filter the dataset on several fields and narrow it down to a few hundred million rows, assigning the result to
possible_records
in R. SparklyR names the spark dataframe a name generated on the fly:douzpxtpud
and I can see this when I runpossible_records %>% show_query()
- I have 320GB in spark's memory.
- I select nested columns from the dataset using the
sdf_select()
andsdf_explode()
functions from thesparklyr.nested
package/extension and use those columns to further filter the dataset and store it in a different spark dataframe, let's call itbest_records
. -
best_records
is what I want for now, and it's about 1GB. In the meantime, while I'm working interactively with this spark session, I'd like to be able to flush the 320GB from spark's memory so that I'm not taking up resources on the cluster and other users/processes can take advantage of that.
I understand that I could have potentially done everything in a long chain such that no intermediate table was created, but I am not sure that's the best way to proceed in this adhoc analysis. Sometimes I like to script conservatively little by little.
This stack overflow article suggests that this works to uncache / drop the table:
sc %>% spark_session() %>% invoke("catalog") %>% invoke("dropTempView", "my_table")
Indeed, it does, if you know in advance the auto generated Spark table name represented by possible_records
is going to be: (e.g. invoke("dropTempView", "douzpxtpud")
works, but invoke("dropTempView", possible_records)
doesn't.
I was hoping for a function more like:
possible_records %>% sparklyr::spark_drop_table()
Any suggestions? Thanks so much!