Question multidplyr behavior

I read this

but I can't understand 2 points.

1 . How will it be rationed to the core?
dest has 105 UNIQUE.
cluster <- new_cluster(2)
will it be distributed with 52,53 values?

cluster <- new_cluster(2)
flights1 <- flights %>% group_by(dest) %>% partition(cluster)
flights1

flights1 return is

Source: party_df [336,776 x 19]
Groups: dest
Shards: 2 [166,251--170,525 rows]

how can I extract shards 1 result ?

Is there any way to verify the difference in the split data?

The way to combine is to collect() right?

thank you!

From the same doc you site:

partition() splits flights1 into roughly equal subsets on each worker, ensuring that all rows in a group are transfered to the same worker. The result is a party_df , or partitioned data frame.

So, the shards you see in the result ( 166,251--170,525 rows) are the rows split across workers, while keeping the original groups together.

Yes, the way to get the data back into the interactive session is through collect():

Once you have a partitioned data frame, you can operate on it with the usual dplyr verbs. To bring the data back to the interactive process, use collect()

I'm not totally sure what you mean by this.

From looking at the source code for party_df(), I'm not sure if you can access results of a party_df by shard.

The order is preserved when you collect, so I guess you could literally subset the data by rows once collected:

library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)

cluster <- new_cluster(2)
cluster
#> 2 session cluster [..]
flights1 <- flights %>% group_by(dest) %>% partition(cluster)
result <- flights1
result$cluster
#> 2 session cluster [..]
str(result)
#> List of 3
#>  $ cluster    :List of 2
#>   ..$ :Classes 'r_session', 'process', 'R6' PROCESS 'R', running, pid 10764.
#>  
#>   ..$ :Classes 'r_session', 'process', 'R6' PROCESS 'R', running, pid 10768.
#>  
#>   ..- attr(*, "cleaner")=Classes 'Cleaner', 'R6' <Cleaner>
#>   Public:
#>     add: function (x) 
#>     clone: function (deep = FALSE) 
#>     names: 
#>     reset: function (x)  
#>   ..- attr(*, "class")= chr "multidplyr_cluster"
#>  $ name       : symbol _DF1
#>  $ .auto_clean:<environment: 0x7f9f5f2827d0> 
#>  - attr(*, "class")= chr "multidplyr_party_df"

multidplyr:::tbl_vars.multidplyr_party_df(result)
#> <dplyr:::vars>
#>  [1] "year"           "month"          "day"            "dep_time"      
#>  [5] "sched_dep_time" "dep_delay"      "arr_time"       "sched_arr_time"
#>  [9] "arr_delay"      "carrier"        "flight"         "tailnum"       
#> [13] "origin"         "dest"           "air_time"       "distance"      
#> [17] "hour"           "minute"         "time_hour"     
#> Groups:
#> [1] "dest"

collected <- collect(result)

collected[1:166251,]
#> # A tibble: 166,251 x 19
#> # Groups:   dest [58]
#>     year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>    <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#>  1  2013     1     1      557            600        -3      709            723
#>  2  2013     1     1      557            600        -3      838            846
#>  3  2013     1     1      558            600        -2      849            851
#>  4  2013     1     1      558            600        -2      853            856
#>  5  2013     1     1      559            559         0      702            706
#>  6  2013     1     1      559            600        -1      854            902
#>  7  2013     1     1      601            600         1      844            850
#>  8  2013     1     1      602            610        -8      812            820
#>  9  2013     1     1      602            605        -3      821            805
#> 10  2013     1     1      615            615         0     1039           1100
#> # … with 166,241 more rows, and 11 more variables: arr_delay <dbl>,
#> #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>

Created on 2021-04-05 by the reprex package (v1.0.0)

1 Like

Thanks for your answer.

The meaning of "subset" was not clear to me.
However, I could infer from your comment that library are probably cutting out all the rows for one unique and assigning them to the same core.

I am sure that ( 166,251--170,525 rows) is the result of processing one unique.

I was hoping to solve this mystery by looking at the results processed by shade 1 and shade 2, but I guess there is no way to do that.
(Thank you for looking for the source code as well !!)

Please advise me if I am wrong.

thank you!