Going parallel only intermittently works with the same code

parallel

#1

When running our code in parallel using either parLapply, future_map, or foreach, we usually get an error message along the lines of error in unserialize(node$con) error reading from connection. (That is the parLappy error, but the others are similar - it seems that the code is running correctly on the cores, but then the attempt to return the data into the main R session causes it to break). The problem occurs when running on Windows Server 2016. I've looked around online and can't seem to find a particularly good candidate for what is causing this. I know this probably isn't enough info to definitively solve this....but at this point I'd appreciate any and all suggestions for what I can try or look into that might resolve this issue.


#2

do you have a minimal example of reproducible code that throws this (even if intermittently)? Kinda hard to guess what might be going on.


#3

I've seen such message with parallel when one runs the remote workers out of memory.


#4

Sadly I can't reproduce the error on a different dataset and I can't share the actual data I'm working from (private company). Trying to do some fairly straightforward rolling calculations (using RcppRoll) on sales volume. Works in serial, and then intermittently it'll work in parallel.

Since posting, instead of nesting by state and product combination, I tried nesting by state and then within the rolling calculations doing a group_by on product. I realized that nesting the product data meant we were sending thousands of products individually to each core and trying to retrieve that data. It makes intuitive sense to me that something might break in sending that much data back and forth. Since we switched to parallelizing by state and grouping by product things seem to be working. Does that make sense as the actual cause of the errors?

plan(multiprocess)
tic()
total_brand_state_volumes_rolling <- total_brand_state_volumes %>%
mutate(rolling_data = future_map(data,
rolling_calculations,
.progress = TRUE))
toc()
# Stop clusters
future:::ClusterRegistry("stop")

#5

We certainly could be doing that - in the past though I recall getting error messages that explicitly told us something along the lines of 'unable to allocate a vector of size X mb'


#6

My guess is also that it's a memory issue. Googling the error pulls up a few hits, one in particular describing how intermediate results that go over the memory limit can trigger it. It's at the bottom here: http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/

My guess on why it doesn't happen all the time is that potentially R runs the gc() at different times and you're just getting unlucky. But it's hard to say.

Honestly if anyone would have seen this before, it would be @HenrikBengtsson (if you have a moment)


#7

I'm suspicious you're right about the memory on the workers...

Things to consider:

are you running multiple R sessions on each node? In other words are you parallel across nodes then parallel again on each node? If you're parallel on each node you need enough memory on each node to hold all the R sessions at once.. if they are all producing big data structures then that can get pretty burdensome on the worker nodes.

When I've done this in the past I tend to take my most memory constrained node (or any node if they are homogeneous) and make sure I can run the job in parallel on one node with the number of workers I want. I interactively run htop while the job fires off to make sure 1) I am using as many CPUs as I expected and 2) I'm not being a RAM piggy. Then when I am sure it runs fine on one node, I'll run larger jobs with multiple nodes.

That design pattern works great if each "work chunk" is close to the same size. But it can be misleading if you have work chunks that are different size. If you're "work chunk" (which goes in memory) is something like "every policy holder in a state" then your node performance will be fine if you test on RI, WY, SD, ND, MT, VT. But then you might run out of memory if a single node ends up getting CA and NY to work on in parallel.


#8

Thanks. And thank you also for the furrr package, which is awesome and has saved us a ton of time on both coding and execution.

Lots of good stuff in that post you shared - my big takeaway for now is that using gc() and rm() within the function I'm actually executing can prevent some of these memory issues. It makes sense to me that this would be particularly important if I'm dividing thousands of tasks across only a dozen cores - which is what was happening. Changing it to dividing 50 bigger tasks among the same 12 cores seems to have worked.


#9

To be honest, I'm getting a little lost on this nodes v. cores discussion (I'm from a stats background, so my comp sci knowledge as some pretty huge holes in it). There's only one layer of parallelization though, so based on what I understand we have one node with 12 cores in it.

We definitely have the problem with each 'work chunk' being a different size.


#10

In other words are you parallel across nodes then parallel again on each node?

@jdlong I don't think he is doing this (even though you and I had great fun with it).

As Nate just clarified, he's got 1 Windows Server that it sounds like he is on interactively. And this is where he calls future_map() from to distribute across 12 cores on that machine.


#11

If you're only running on one machine and spinning up 12 workers, then you have 12 (maybe 13) sessions of R going on at once. If the combined memory usage of all of those exceeds your total system memory, then you're going to have "something bad".

When you sent up your parallelization you can usually tell R how many cores to use. I don't think you stated how you were parallelizing (sorry if I missed it). If you're using future you can do this by passing workers=x to the multiprocess call. where x is the number of cores to use on your machine. See ?future::multiprocess for more help...


#12

Happy to help! Glad to hear it has saved time.

I would also avoid creating intermediate objects in your function where possible. Especially if they are intermediate copies of your distributed data frame.

Yes, you definitely have to think a little bit about how to divide up your data. Generally, if you give future_map() a vector or list of things, it knows how to automatically slice this up evenly to spread it across the 12 cores, but it's still too easy to mess that up.


#13

Yeah I now get a hankering for bourbon in a red solo cup every time I use future or furrr... I blame you.


#14

The rolling calculations are for bourbon sales. All glass spirits actually, but bourbon very much included.

Anyway, I appreciate you both taking the time to talk me through what was a rather vague question to start with.


#15

Nate, speaking as a native Kentuckian, I will parallelize your code in exchange for bourbon.

Joking aside, this stuff is kind of hard at first. It requires thinking about things we don't normally have to think about. I hope we helped.


#16

(author of future here) First of all, for anyone trying to reproduce this, @natekratzer is on Windows (assuming your master R session and your workers are all Windows), meaning that:

plan(multiprocess)

is equivalent to:

plan(multisession)

If you're on Linux or macOS, that would become plan(multicore), which is a different creature - it uses forked processing rather than background R session for parallelization, which has different (better or worse) memory properties (I'm constantly debating with myself whether it was a good thing to provide the "virtual" plan(multiprocess)).

To narrow down on the actually problem. The error

error in unserialize(node$con) error reading from connection

originates from parallel:::recvData.SOCKnode() which in turn is called by parallell::recvResult(), which is the framework used by plan(multisession). This recvData() function is called when the master R process tries to retrieve results back from the workers. Therefore, the error suggests that the results were not sent at all, or only partially sent. This in turn suggests that the worker's R process has terminated (*).

Looking at the code for these workers, which is simply parallel:::slaveLoop(), I don't see other reasons for the worker terminating other than

  1. running out of memory,
  2. the process being killed by external signals, or
  3. running corrupt code causing R to core dump.

A core dump is unlikely and should equally well happen when running sequentially. Also, core dumps are likely due to execution of really "bad" code (memory leaks in native code or similar).

So, as others already suggested, I would put my bet on an "out-of-memory" problem. Other than externally monitoring the memory consumption of these workers, I don't think there's an easy way to know whether it's a out-of-memory error or not.

On the future roadmap, I'm hoping to add (optional) automatic timing and memory benchmarking of futures being evaluated (https://github.com/HenrikBengtsson/future/issues/59). That could have helped here, e.g. by observing how much memory successful futures consume with one, two, three,... workers until the error kicks in.

Footnote: (*) It should be possible to add a post-mortem diagnostic to the above error and test whether the (local) worker is alive or not. If so, I think one could provide a more informative error message that also include something like "It looks like the underlying R process (PID ...) of the SOCKnode worker has terminated". I'll add it to the todo list.


#17

@natekratzer, instead of using the "hack" of calling future:::ClusterRegistry("stop") to stop the cluster created by plan(multiprocess), just call plan(sequential) and the previously set plan will be cleanup up (including the cluster being stopped).


#18

To help troubleshooting: The develop version of the future package now provides some post-mortem diagnostics in the error message that might help to rule out certain problems, e.g.

> library(future)
> plan(multisession, workers = 2L)
> f <- future(quit())
> v <- value(f)
Error in unserialize(node$con) : 
Failed to retrieve the value of MultisessionFuture (<none>) from cluster
SOCKnode #1 (PID 3100 on localhost 'localhost'). The reason reported was
'error reading from connection'. Post-mortem diagnostic: No process exist
with this PID, i.e. the localhost worker is no longer alive.

Note that this type of diagnostic only works on workers running on the local machine - it does not work on workers running on other machines.

To install this develop version, use:

remotes::install_github("HenrikBengtsson/future@develop")

It should work on all OSes, but if it doesn't work for you, or if you run into issues, please report back on https://github.com/HenrikBengtsson/future/issues/250


#19

Thank you! I appreciate the explanation, advice, and the updated error message.