cross-product of two large matrices calculated in remote database or Spark

Locally in R, I've been using the following functions to do needed matrix operations (transpose and multiply two matrices):
Matrix::crossprod
Matrix::tcrossprod

However, the matrices I will use at-scale are huge and cannot be brought into an R session. Instead, I will need to do these matrix operations in-database, preferably using Spark. I'm struggling to find sparklyr functions or other R packages for efficiently getting the cross-product of two matrices in a remote database. Any guidance on approaching this task, specific R functions or packages, and examples of R code would be greatly appreciated (my searching of R and Spark resources, StackOverflow, etc hasn't led to the straightforward answer /example I'm hoping for).

I have figured out how to paste my two matrices side-by-side in a single dataframe and then use sparklyr::spark_apply to leverage the Matrix::crossprod function (on the subsets of the dataframe corresponding to the original two matrices) in Spark ... but this seems rather clunky and I've only been able to test this in local Spark. While I wait for the owners of our Hadoop cluster to get R Runtime installed so that I can use sparklyr::spark_apply on our cluster, I can't help but wonder if there is a better way of calculating in-database the cross-product of two matrices.

Spark DataFrames store data row-wise, which means multiplying them as matrices would require information on every partition to calculate each number, which isn't practical. It looks like you need BlockMatrices to multiply matrices:

https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.BlockMatrix

I don't think there's an elegant way to use them from sparklyr yet, unfortunately.

1 Like

Thanks so much for your helpful response!

I started looking at the bigstatsr::big_cprodMat function for the cross-product between a Filebacked Big Matrix (same thing as BlockMatrices?) and a matrix. Ideally, I'd like to be able to work with two distributed matrices. Also, I have some reservations about other packages from The Bigmemory Project being orphaned and their (appreciated) fair warning about potential memory leak issues if R crashes - I'm trying to figure all this out and it seems probable that I'll crash R as I experiment and learn.

I've also started looking at the pbdDMAT::crossprod function which sounds like what I want since it is the cross-product of two distributed matrices ... but I'm a bit intimidated at figuring out how to select an efficient blocking factor. However, your response has encouraged me to revisit this option and the pbdDMAT vignette. Hopefully I'm on the right track.

I have been able to confirm that my (clunky) solution, which I originally mentioned regarding local Spark, does indeed work on a cluster. It works in my situation because I have very long data (millions of rows) that is not very wide (thousands of columns), so distributing the dataframe in Spark makes sense.

In short, you paste the two matrices side-by-side in a single dataframe, use sparklyr::spark_apply to leverage the Matrix::crossprod function (on the subsets of the dataframe corresponding to the original two matrices) in Spark, append a column containing the vector of ID values as ordered in the dataframe (repeated for the number of partitions), and then summarize all of the columns by the ID value to return a single row per ID.

Below is a small example of how my solution works on distributed data with a matrix of Items x Attributes and a matrix of Items x Users that I want to get the cross-product of to return a matrix of Users x Attributes profiles.

library(sparklyr)
library(Matrix)
library(dplyr)
library(readr)

sc <- sparklyr::spark_connect(master = "local")

itemAttribute <- readr::read_csv('"Item_ID","Attr_1","Attr_2","Attr_3","Attr_4","Attr_5","Attr_6","Attr_7","Attr_8"
                                 111,0,0.5,0.5,0,0.5,0.5,0,0
                                 222,0.5,0,0,0.5,0.5,0,0.5,0
                                 333,0,0.577350269189626,0,0.577350269189626,0.577350269189626,0,0,0
                                 444,0,0,0,0,0.577350269189626,0.577350269189626,0,0.577350269189626
                                 555,0.707106781186547,0,0,0.707106781186547,0,0,0,0')

itemUser <- readr::read_csv('"Item_ID","User_1","User_2"
                            111,0,1
                            222,-1,1
                            333,1,0
                            444,0,1
                            555,1,-1')

singleDf <- itemAttribute %>% dplyr::left_join(itemUser, by = "Item_ID")
  
singleSparkDf <- sparklyr::sdf_copy_to(sc,
                                       singleDf, 
                                       "singleDfNamed",
                                       overwrite = TRUE,
                                       repartition = 2)

attrColNames <- base::colnames(itemAttribute %>% dplyr::select(-Item_ID))
userColNames <- base::colnames(itemUser %>% dplyr::select(-Item_ID))

userColIndexLast <- base::ncol(singleDf)
userColIndexFirst <- base::ncol(singleDf) - base::length(userColNames) + 1
attrColIndexLast <- userColIndexFirst - 1
attrColIndexFirst <- attrColIndexLast - base::length(attrColNames) + 1

userAttribute_partitions <- singleSparkDf %>% sparklyr::spark_apply(function(df, context) {
  for (name in names(context)) assign(name, context[[name]], envir = .GlobalEnv)
  profiles <- Matrix::crossprod(Matrix::as.matrix(base::as.data.frame(df)[,userColIndexFirst:userColIndexLast]), 
                                Matrix::as.matrix(base::as.data.frame(df)[,attrColIndexFirst:attrColIndexLast]))
  return(base::as.data.frame(profiles))
}, context = list(
  userColIndexLast = userColIndexLast,
  userColIndexFirst = userColIndexFirst,
  attrColIndexLast = attrColIndexLast,
  attrColIndexFirst = attrColIndexFirst
))

userColNames_partitions <- base::rep(userColNames, sparklyr::sdf_num_partitions(singleSparkDf))
User_ID <- userColNames_partitions
userColNames_partitionsSpark <- sparklyr::sdf_copy_to(sc,
                                                      dplyr::tibble(User_ID), 
                                                      "userColNames_partitionsNamed", 
                                                      overwrite = TRUE)

userAttribute_partitionsUserID <- userAttribute_partitions %>% 
  sparklyr::sdf_bind_cols(userColNames_partitionsSpark)

userAttribute <- userAttribute_partitionsUserID %>%
  dplyr::group_by(User_ID) %>%
  dplyr::summarise_all(sum) %>%
  dplyr::arrange(User_ID)

userAttribute

## for comparison, crossprod of original matrices in R session (no Spark)
Matrix::crossprod(Matrix::as.matrix(itemUser %>% dplyr::select(-Item_ID)), 
                  Matrix::as.matrix(itemAttribute %>% dplyr::select(-Item_ID)))

In this example, the cross-product gives us the User-Attribute weight which is the sum of each Item's (User preference x Attribute flag) value. It is fine to partition the data by rows because each row still contain's an Item's Attribute flag and User preference, so the product can be calculated. These products are summed within each partition by using the Matrix::crossprod function within sparklyr::spark_apply, so then the partitioned results just need to be summed to complete the calculation and return a single vector of Attribute weights for each User.

This topic was automatically closed 21 days after the last reply. New replies are no longer allowed.