Changing Base R code to dplyr/sparklyr

dplyr
sparklyr

#1

Hello everyone,

I have some existing (base) R code that uses some nested 'for' loops to iterate over a data frame and this takes quite some time.

To increase performance I created a Hadoop/Spark cluster and transferred the dataframe that needs to be manipulated by my code to a spark dataframe.

The dataframe contains 14 columns of which 3 columns (lost_offer, won_offer, last_offer) that contain all zeros. Based on other variables, the code decides whether or not these zeros should become ones.

What the current code does is iterate over the customers (also a column in my df), then iterate over each sku (read 'product group'). So for each customer-sku combination the code looks at some other columns (e.g type, auto_create etc) to decide wether or not the zeros in the last 3 columns should be changed to a '1'.

Below you see the old (base) R code which selects a chunk of data that relates to a unique 'customer-sku' combination (stored under 'select_data') and the main dataframe (country_new1) gets changed by analyzing the chunk of data (select_data).

n <- 0   ##Initialize a counter to reach at right row number
# print(paste0("n=", n))
start = Sys.time()
#1###########################################################
#first loop across customers
for(i in 1:length(customers)){     
  # print(unique(country_new1$updated_actual_related_customer)[i])
  
  customer_c <- customers[i]
  sdf <- country_new1 %>% filter(updated_actual_related_customer == customer_c) %>% distinct(sku)
  skus <- sdf_read_column(sdf, "sku")
  #2###########################################################  
  #Second loop across sku for each customer
  for(j in 1:length(skus)){      
    # print(paste0("sku = ",skus[j]))
    
    select_data <- country_new1 %>% filter(updated_actual_related_customer == customer_c, sku == skus[j]) %>% collect()
    nrow_sd <- nrow(select_data)
    # print(paste0("#rows for given cust-sku=",nrow_sd))
    
    if(nrow_sd==1){
      ## We check if there is only one order without any offer.. 
      # print("Only One Order - so Won")
      # print(paste0("Row#=",n+k))
      if(select_data$type[1]=='offer'){  
        country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
        country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
      }
    }
    
    else if(nrow_sd>1){
      #3###########################################################
      #third loop for each row  
      for(k in 1:nrow_sd){       ## A loop for #rows in a sku for a customer
        # print(paste0("K loop will go from ",k," to ",nrow_sd))
        # print(paste0("k=",k))
        
        if(k==nrow_sd){ ### If there is only one row in selected data or the last row of selected data is an offer.. It is final and lost
          if(select_data$type[k]=='offer'){  
            dplyr::mutate(country_new1, final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
            dplyr::mutate(country_new1, lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
            break
          }
          else if(select_data$type[k]=='order'){break} ## Do not process the last row, when it's an order
        }
        
        if(select_data$type[k] == 'offer'){
          Current_date <- select_data$orderoffer_date[k]   ###Store date of current row#
          given_unitprice <- select_data$unitprice_transaction_currency[k]  ###Store unit price of current row#
          Is_autocreated <- select_data$auto_create[k]
          
          #4###########################################################
          #fourth loop within 30days window 
          for(l in k+1:nrow_sd){
            
            if(l>nrow_sd){break}else(
              ### If next row is within 30 days 
              if(select_data$orderoffer_date[l]>=Current_date & select_data$orderoffer_date[l] < Current_date+30){
                
                if(select_data$type[l]=='offer'){ ## First is an offer, second is also an offer
                  
                  if(Is_autocreated=='Y' & select_data$auto_create[l]=='Y'){
                    # print("First Offer is AC and second is also AC, move on..")
                    break} ## If First Offer is AC and second is also AC, stop for this offer
                  if(Is_autocreated=='Y' & select_data$auto_create[l]=='N'){
                    # print("first Offer is AC and second is MC, AC is neither won/loss, move on..")  
                    break} ## If first Offer is AC and second is MC, AC is neither won/loss, move on
                  if(Is_autocreated=='N' & select_data$auto_create[l]=='Y'){ ## If first Offer is MC and second is AC, check prices
                    # print("first Offer is MC and second is AC. Check prices..")
                    if(select_data$unitprice_transaction_currency[l] == given_unitprice){
                      # print("Offer with same price") 
                      break      ###If we found that there is a similar offer available in next - 30 days, End further iterations
                    }
                    else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
                      # print("Offer with diff price")
                      # print(paste0("Row#=",n+k))
                      if(select_data$qty_disc[l]==1){break}
                      else {
                        country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        break
                      }
                    }
                  } 
                  
                  if(Is_autocreated=='N' & select_data$auto_create[l]=='N'){ ## Both are manual offers, Normal case
                    if(select_data$unitprice_transaction_currency[l] == given_unitprice){
                      # print("Offer with same price") 
                      break      ###If we found that there is a similar offer available in next - 30 days, End further iterations
                    }
                    else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
                      # print("Offer with diff price")
                      # print(paste0("Row#=",n+k))
                      if(select_data$qty_disc[l]==1){break}
                      else {
                        country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        break
                      }
                      ## We established that this is a final-lost offer. break the loop
                    }
                  }  
                }
                else if(select_data$type[l]=='order'){
                  if(select_data$unitprice_transaction_currency[l]==given_unitprice){
                    # print("Order with same price - Won Offer")
                    # print(paste0("Row#=",n+k))
                    country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                    country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                    country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
                    break
                  }
                  else if(select_data$unitprice_transaction_currency[l]>given_unitprice){
                    # print("Order with higher price - Not a final offer")
                    break
                  }
                  else if(select_data$unitprice_transaction_currency[l]<given_unitprice){
                    # print("Order with low price")
                    if(select_data$qty_disc[l]==1){
                      country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                      country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                      country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
                      break
                    }
                    else {
                      country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                      country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
                      country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                      break
                    }
                  }
                }
              }
              else if(select_data$orderoffer_date[l] >= Current_date+30){
                # print("Current date is more than 30 days")
                country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                break
              }
            )}
        }
      }
    }
    n <- n + nrow_sd    ##Value of n should be updated once loop has run for selected data
    #print(paste0("New n=", n))
  }
}

I'm trying to get rid of at least the outer 2 loops by using spark_apply( ..... , group_by = c("customers", "skus")) as you can see on the code below. I want to reuse the 2 inner loops by copy-pasting them as a function. In order to keep my old code unchanged, I need a way to iterate with an index over the chunks of data corresponding to the 'customer-sku' combination. Does anyone know how to do that?

countries_tbl <- copy_to(sc,country_new1)
results_tbl <- spark_apply(countries_tbl, function(country_new1) {
  nrow_sd = count()
  dplyr::mutate(test = nrow_sd)
  if (nrow_sd == 1) {
    if (country_new1[type] == 'offer') {
      dplyr::mutate(country_new1, final_offer <- 1, lost_offer <- 1)
    }
    else {
      dplyr::mutate(country_new1, Won_offer <- 1)
    }
  }
  
  
} , group_by = c("updated_actual_related_customer", "sku"))

i'm looking forward to your answers!

Kind Regards,
Charles


#3

Not an answer to your question (sorry!) but a small request: can you please paste in the code itself, instead of screenshots? Screenshots are difficult to read and force anybody who wants to help to re-type your code.

When you do paste in code here, it’s good practice to format it :sparkles: by surrounding your code blocks with 3 backticks (or you can select the code and click the little </> button at the top of the posting box). More info on code formatting: FAQ: How to make your code look nice? Markdown Formatting


#5

This seems like it could all be done with a few case_when calls inside of a mutate function. How many rows is your dataframe? You may not even need to use spark, as the nested for loops is likely what is slowing this process down. Especially if all of the for loops are just iterating over a different column in the data frame.


#6

@tbradley thank you so much for your reply.

the data frame is one million rows for just one country, if this code runs on spark I would like to add multiple countries, so it could get large.

I managed to replace the 3 outer loops by this piece of code:

analysis <- country_new1 %>%
  group_by(updated_actual_related_customer, sku) %>%
  mutate(k = row_number()) %>%
  mutate(final_offer = ifelse(count() == 1,1,0), lost_offer = ifelse(count() == 1,1,0)) %>%
  mutate(final_offer = ifelse(count() > 1 & k==count() & type == 'offer',1,0 ), lost_offer = ifelse(count() > 1 & k==count() & type == 'offer',1,0 )) %>%
  ##everything that happens inside the fourth loop is missing here
  compute("analysis")

Everything that happens inside the 4th loop is missing here because that loop compares one row's data to the following rows and I don't know how to do that without a loop.

If there is any way that I could just copy-paste the fourth loop in a function and apply it to the customer-sku blocks in my country_new1 spark dataframe, I would prefer that.

thanks a lot!
Charles


#7

You can use the lag and lead functions in dplyr to look at the previous and subsequent rows, respectively. You can also specify how many rows back or forward you want to look. If you do this after calling group_by like you did than it will only look within the group. It will return NA if there are no values before or after, respectively, the row currently being looked at.

One thing to note is that your data has to be arranged in the correct manner in order to ensure that the correct value is found when using lead/lag, so it may be worthwhile to put an arrange call in there to ensure that the data is in the format you want.

As for size, that is fairly large so you may benefit from using spark. It mostly depends on the RAM on your computer how many rows you can use in memory, so I guess it depends on how many countries you have total, as to whether the switch to spark is worthwhile! But making the change from for loops to dplyr should greatly increase the speed of your code.


#8

The data needs to be analysed in customer-sku groups.

how do I specify that spark needs to look for rows that are less than 30 days further from the current row? (a 'date' column is available in the dataframe)

Could you maybe translate this part into dplyr as an example? The if statements after that are similar so I could easily go further myself :slight_smile: You can omit some if statements, I would just like to know how to let the if statements check the rows that are within 30 days from the current row.

Current_date <- select_data$orderoffer_date[k]   ###Store date of current row#
          given_unitprice <- select_data$unitprice_transaction_currency[k]  ###Store unit price of current row#
          Is_autocreated <- select_data$auto_create[k]
          
          #4###########################################################
          #fourth loop within 30days window 
          for(l in k+1:nrow_sd){
            
            if(l>nrow_sd){break}
            else(
              ### If next row is within 30 days 
              if(select_data$orderoffer_date[l]>=Current_date & select_data$orderoffer_date[l] < Current_date+30){
                
                if(select_data$type[l]=='offer'){ ## First is an offer, second is also an offer
                  
                
                  if(Is_autocreated=='N' & select_data$auto_create[l]=='Y'){ ## If first Offer is MC and second is AC, check prices

                    if(select_data$unitprice_transaction_currency[l] != given_unitprice){

                      if(select_data$qty_disc[l]==1){break}
                      else {
                        country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
                        break
                      }

#9

Could you please turn this into a self-contained reprex (short for reproducible example)? It will help us help you if we can be sure we're all working with/looking at the same stuff.

install.reprex("reprex")

If you've never heard of a reprex before, you might want to start by reading the tidyverse.org help page. The reprex dos and don'ts are also useful.

What to do if you run into clipboard problems

If you run into problems with access to your clipboard, you can specify an outfile for the reprex, and then copy and paste the contents into the forum.

reprex::reprex(input = "fruits_stringdist.R", outfile = "fruits_stringdist.md")

For pointers specific to the community site, check out the reprex FAQ, linked to below.


Without sample data especially, it is hard to know for sure if what I am doing is what you want. As an arbitrary stab in the dark, I would suggest maybe saving an intermediate dataframe after your third mutate call in the revised code. From there you can use the fuzzyjoin package to join it to itself based on whether the date is within 30 days. That may prove to not be the best option in your case, but it is where my head went when I first read your last post.


#10

I could turn this into a reprex, but It won't run since it would not have the input. (the input dataframe is immediately transferred from google storage to spark through the gcs connector), I can't download it since my computer can't handle the file size and I don't know how to take a fraction of it.

The full loop that works on the dataframe is pasted in the first post here,

first loop iterates over the different customers,
second loop iterates over the different product categories (skus)
third and fourth loop iterate over the subset of data that's defined by the 'customer-sku' combination.

I managed to replace the outer 2 loops by doing group_by(customer, sku).

Do you see a possibility to keep the inner 2 loops by applying them as a function on the 'customer-sku' chunks?

If not: How do you write 'for this row, check the following rows that are within 30 days' , 'if the price of the next row has lowered to the previous, set final_offer and lost_offer to 1'

You could simulate the data frame if I gave you the columns right?


#11

I have never worked with google storage or spark so I can't help with that. I still think the best thing would be to do a self join using fuzzyjoin as mentioned above.

And if you provide simulated data than that it will be much easier to help you. Once you create the simulated dataframe you can share it by using the datapasta package.


#12

I have 680 rows of sample data in a csv, can i send it to you somewhere?

This is the old code again that should be turned into dplyr:

customers <- unique(country_new1[,updated_actual_related_customer]) ## unique customers list
country_new1[,final_offer := 0L]
country_new1[,Won_offer := 0L]
country_new1[,Lost_offer := 0L]

n <- 0   ##Initialize a counter to reach at right row number
# print(paste0("n=", n))

#1###########################################################
#first loop across customers
for(i in 1:length(customers)){     
 
  customer_c <- customers[i]
  skus <- unique(country_new1[updated_actual_related_customer == customer_c,sku])
  
  #2###########################################################  
  #Second loop across sku for each customer
  for(j in 1:length(skus)){      
    # print(paste0("sku = ",skus[j]))
    
    
    select_data <- country_new1[updated_actual_related_customer == customer_c  &  sku == skus[j], ]
    nrow_sd <- nrow(select_data)

    if(nrow_sd==1){
      ## We check if there is only one order without any offer.. 
      if(select_data$type[1]=='offer'){  
        country_new1[n+1,final_offer := 1L]
        country_new1[n+1,Lost_offer := 1L]
      }
    }
    
    else if(nrow_sd>1){
      #3###########################################################
      #third loop for each row  
      for(k in 1:nrow_sd){       
        
        if(k==nrow_sd){ ### If there is only one row in selected data or the last row of selected data is an offer.. It is final and lost
          if(select_data$type[k]=='offer'){  
            country_new1[n+k,final_offer := 1L]
            country_new1[n+k,Lost_offer := 1L]
            break
          }
          else if(select_data$type[k]=='order'){break} ## Do not process the last row, when it's an order
        }
        
        if(select_data$type[k] == 'offer'){
          Current_date <- select_data$order.offer_date[k]   ###Store date of current row#
          given_unitprice <- select_data$unitprice_transaction_currency[k]  ###Store unit price of current row#
          Is_autocreated <- select_data$auto_create[k]
          
          #4###########################################################
          #fourth loop within 30days window 
          for(l in k+1:nrow_sd){

            if(l>nrow_sd){break}else(
              ### If next row is within 30 days 
              if(select_data$order.offer_date[l]>=Current_date & select_data$order.offer_date[l] < Current_date+30){
                
                if(select_data$type[l]=='offer'){ ## First is an offer, second is also an offer
                  
                  if(Is_autocreated=='Y' & select_data$auto_create[l]=='Y'){
                    # print("First Offer is AC and second is also AC, move on..")
                    break} ## If First Offer is AC and second is also AC, stop for this offer
                  if(Is_autocreated=='Y' & select_data$auto_create[l]=='N'){
                    # print("first Offer is AC and second is MC, AC is neither won/loss, move on..")  
                    break} ## If first Offer is AC and second is MC, AC is neither won/loss, move on
                  if(Is_autocreated=='N' & select_data$auto_create[l]=='Y'){ ## If first Offer is MC and second is AC, check prices
                    # print("first Offer is MC and second is AC. Check prices..")
                    if(select_data$unitprice_transaction_currency[l] == given_unitprice){
                      # print("Offer with same price") 
                      break      ###If we found that there is a similar offer available in next - 30 days, End further iterations
                    }
                    else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
                      # print("Offer with diff price")
                      # print(paste0("Row#=",n+k))
                      if(select_data$qty_disc[l]==1){break}
                      else {
                        country_new1[n+k,final_offer := 1L]
                        country_new1[n+k,Lost_offer := 1L]
                        break
                      }
                    }
                  } 
                  
                  if(Is_autocreated=='N' & select_data$auto_create[l]=='N'){ ## Both are manual offers, Normal case
                    if(select_data$unitprice_transaction_currency[l] == given_unitprice){
                      # print("Offer with same price") 
                      break      ###If we found that there is a similar offer available in next - 30 days, End further iterations
                    }
                    else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
                      # print("Offer with diff price")
                      # print(paste0("Row#=",n+k))
                      if(select_data$qty_disc[l]==1){break}
                      else {
                        country_new1[n+k,final_offer := 1L]
                        country_new1[n+k,Lost_offer := 1L]
                        break
                      }
                      ## We established that this is a final-lost offer. break the loop
                    }
                  }  
                }
                else if(select_data$type[l]=='order'){
                  if(select_data$unitprice_transaction_currency[l]==given_unitprice){
                    # print("Order with same price - Won Offer")
                    # print(paste0("Row#=",n+k))
                    country_new1[n+k,final_offer := 1L]
                    country_new1[n+k,Won_offer := 1L]
                    country_new1[n+k,Lost_offer := 0L]
                    break
                  }
                  else if(select_data$unitprice_transaction_currency[l]>given_unitprice){
                    # print("Order with higher price - Not a final offer")
                    break
                  }
                  else if(select_data$unitprice_transaction_currency[l]<given_unitprice){
                    # print("Order with low price")
                    if(select_data$qty_disc[l]==1){
                      country_new1[n+k,final_offer := 1L]
                      country_new1[n+k,Won_offer := 1L]
                      country_new1[n+k,Lost_offer := 0L]
                      break
                    }
                    else {
                      country_new1[n+k,final_offer := 1L]
                      country_new1[n+k,Won_offer := 0L]
                      country_new1[n+k,Lost_offer := 1L]
                      break
                    }
                  }
                }
              }
              else if(select_data$order.offer_date[l] >= Current_date+30){
                # print("Current date is more than 30 days")
                country_new1[n+k,final_offer := 1L]
                country_new1[n+k,Lost_offer := 1L]
                break
              }
            )}
        }
      }
    }
    
    
    n <- n + nrow_sd    ##Value of n should be updated once loop has run for selected data
    #print(paste0("New n=", n))
  }
}

#13

As suggested in this post, writing dplyr code would definitely be ideal since you would be able to execute against R, databases and Spark with ease.

That said, since this code already runs for 1m records per country and you are looking into scaling to multiple countries, you might also want to consider using spark_apply() partitioned by country, something like:

all_countries_tbl %>% spark_apply(function(country_new1) {
   # existing code goes here
   # return data frame with same columns for each country group
}, group_by = "country")

#14

The spark_apply function never works for me, it is not that easy as just copy pasting :frowning: . Could I send you some sample data so you could run my code and try to apply the inner 2 loops as a spark_apply function?

As mentioned above, the outer 2 loops are easily replaced by using group_by(customer, sku)


#15

@tbradley could I send you the sample data somewhere private?


#16

@Charlesvandamme

sparklyr is very elegantly support sliding window functions in SQL, as your mention

how do I specify that spark needs to look for rows that are less than 30 days further from the current row? (a 'date' column is available in the dataframe)

Never writting for loop in sparklyr, remember, it very be very slow and un-optimized by spark.

I am deeply using sparklyr to process 2B+ user data for 2+ years sliding window lag 7d/30d/180d/360d which only cost 10 minutes.

here is an example:

res <- tbl(sc,sql(glue::glue("
SELECT
user_id,dt,  {params$agg_name}(VALUE) OVER(PARTITION BY user_id ORDER BY order RANGE BETWEEN {lag} PRECEDING AND  CURRENT ROW ) as value
FROM sdf
    "))) %>%
  select(user_id,dt,value) %>% #  = sum_value 
  sparklyr::sdf_repartition(partitions = 2*1000,partition_by = "dt")

directly calling spark_apply is not efficient than Spark SQL