An introduction to multidplyr

multidplyr is a backend for dplyr that spreads work across multiple processes. Like all dplyr backends, it allows you to use the dplyr verbs that you’re already familiar with, but alters the underlying computational model to transparently support multi-process parallelism.

This vignette will show you the basics of multidplyr using the nycflights13 dataset.

library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)

Creating a cluster

To start using multidplyr you must create a cluster. Here I used two cores because it’s the maximum permitted by CRAN, but I suggest that you use more. For best performance, I recommend using 1 or 2 fewer than the total number of cores on your computer, which you can detect with parallel::detectCores() (leaving at least 1 core free means that you should still be able to use your computer for other tasks while your computation is running).

cluster <- new_cluster(2)
cluster
#> 2 session cluster [..]

(In the examples, you’ll also see the use of default_cluster(); this is designed specifically for the constraints of R CMD check, so I don’t recommend using it in your own code.)

A cluster consists of multiple R processes created by callr. When multiple processes are running at the same time, your operating system will take care of spreading the work across multiple cores.

Add data

There are two ways to get data to the workers in cluster:

partition()

partition() is useful if you have a single in-memory data frame. For example, take nycflights13::flights. This dataset contains information for about ~300,000 flights departing New York City in 2013. We group it by destination, then partition it:

flights1 <- flights %>% group_by(dest) %>% partition(cluster)
flights1
#> Source: party_df [336,776 x 19]
#> Groups: dest
#> Shards: 2 [166,251--170,525 rows]
#> 
#> # A data frame: 336,776 × 19
#>    year month   day dep_time sched_dep…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>   <int> <int> <int>    <int>       <int>   <dbl>   <int>   <int>   <dbl> <chr>  
#> 1  2013     1     1      557         600      -3     709     723     -14 EV     
#> 2  2013     1     1      557         600      -3     838     846      -8 B6     
#> 3  2013     1     1      558         600      -2     849     851      -2 B6     
#> 4  2013     1     1      558         600      -2     853     856      -3 B6     
#> 5  2013     1     1      559         559       0     702     706      -4 B6     
#> 6  2013     1     1      559         600      -1     854     902      -8 UA     
#> # … with 336,770 more rows, 9 more variables: flight <int>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <dttm>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

partition() splits flights1 into roughly equal subsets on each worker, ensuring that all rows in a group are transfered to the same worker. The result is a party_df, or partitioned data frame.

Direct loading

partition() is simple to call, but it’s relatively expensive because it copies a lot of data between processes. An alternative strategy is for each worker to load the data (from files) it needs directly.

To show how that might work, I’ll first split flights up by month and save as csv files:

path <- tempfile()
dir.create(path)

flights %>% 
  group_by(month) %>% 
  group_walk(~ vroom::vroom_write(.x, sprintf("%s/month-%02i.csv", path, .y$month)))

Now we find all the files in the directory, and divide them up so that each worker gets (approximately) the same number of pieces:

files <- dir(path, full.names = TRUE)
cluster_assign_partition(cluster, files = files)

Then we read in the files on each worker and use party_df() to create a partitioned dataframe:

cluster_send(cluster, flights2 <- vroom::vroom(files))

flights2 <- party_df(cluster, "flights2")
flights2
#> Source: party_df [336,776 x 18]
#> Shards: 2 [166,158--170,618 rows]
#> 
#> # A data frame: 336,776 × 18
#>    year   day dep_time sched_de…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier flight
#>   <dbl> <dbl>    <dbl>      <dbl>   <dbl>   <dbl>   <dbl>   <dbl> <chr>    <dbl>
#> 1  2013     1      517        515       2     830     819      11 UA        1545
#> 2  2013     1      533        529       4     850     830      20 UA        1714
#> 3  2013     1      542        540       2     923     850      33 AA        1141
#> 4  2013     1      544        545      -1    1004    1022     -18 B6         725
#> 5  2013     1      554        600      -6     812     837     -25 DL         461
#> 6  2013     1      554        558      -4     740     728      12 UA        1696
#> # … with 336,770 more rows, 8 more variables: tailnum <chr>, origin <chr>,
#> #   dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
#> #   time_hour <dttm>, and abbreviated variable names ¹​sched_dep_time,
#> #   ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

dplyr verbs

Once you have a partitioned data frame, you can operate on it with the usual dplyr verbs. To bring the data back to the interactive process, use collect():

flights1 %>% 
  summarise(dep_delay = mean(dep_delay, na.rm = TRUE)) %>% 
  collect()
#> # A tibble: 105 × 2
#>    dest  dep_delay
#>    <chr>     <dbl>
#>  1 ABQ       13.7 
#>  2 ALB       23.6 
#>  3 AUS       13.0 
#>  4 AVL        8.19
#>  5 BDL       17.7 
#>  6 BGR       19.5 
#>  7 BHM       29.7 
#>  8 BNA       16.0 
#>  9 BOS        8.73
#> 10 BZN       11.5 
#> # … with 95 more rows

For this size of data and a simple transformation, using a local cluster actually makes performance much worse!

by_dest <- flights %>% group_by(dest)

# Local computation
system.time(by_dest %>% summarise(mean(dep_delay, na.rm = TRUE)))
#>    user  system elapsed 
#>   0.009   0.000   0.010

# Remote: partitioning
system.time(flights2 <- flights %>% partition(cluster))
#>    user  system elapsed 
#>   0.193   0.041   0.266
# Remote: computation
system.time(flights3 <- flights2 %>% summarise(mean(dep_delay, na.rm = TRUE)))
#>    user  system elapsed 
#>   0.004   0.001   0.038
# Remote: retrieve results
system.time(flights3 %>% collect())
#>    user  system elapsed 
#>   0.004   0.002   0.051

That’s because of the overhead associated with sending the data to each worker and retrieving the results at the end. For basic dplyr verbs, multidplyr is unlikely to give you significant speed ups unless you have 10s or 100s of millions of data points (and in that scenario you should first try dtplyr, which uses data.table).

multipldyr might help, however, if you’re doing more complex things. Let’s see how that plays out when fitting a moderately complex model. We’ll start by selecting a subset of flights that have at least 50 occurrences, and we’ll compute the day of the year from the date:

daily_flights <- flights %>%
  count(dest) %>%
  filter(n >= 365)

common_dest <- flights %>% 
  semi_join(daily_flights, by = "dest") %>% 
  mutate(yday = lubridate::yday(ISOdate(year, month, day))) %>% 
  group_by(dest)

nrow(common_dest)
#> [1] 332942

That leaves us with ~332,000 observations. Let’s partition this smaller dataset:

by_dest <- common_dest %>% partition(cluster)
by_dest
#> Source: party_df [332,942 x 20]
#> Groups: dest
#> Shards: 2 [164,539--168,403 rows]
#> 
#> # A data frame: 332,942 × 20
#>    year month   day dep_time sched_dep…¹ dep_d…² arr_t…³ sched…⁴ arr_d…⁵ carrier
#>   <int> <int> <int>    <int>       <int>   <dbl>   <int>   <int>   <dbl> <chr>  
#> 1  2013     1     1      517         515       2     830     819      11 UA     
#> 2  2013     1     1      533         529       4     850     830      20 UA     
#> 3  2013     1     1      542         540       2     923     850      33 AA     
#> 4  2013     1     1      554         558      -4     740     728      12 UA     
#> 5  2013     1     1      555         600      -5     913     854      19 B6     
#> 6  2013     1     1      558         600      -2     753     745       8 AA     
#> # … with 332,936 more rows, 10 more variables: flight <int>, tailnum <chr>,
#> #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
#> #   minute <dbl>, time_hour <dttm>, yday <dbl>, and abbreviated variable names
#> #   ¹​sched_dep_time, ²​dep_delay, ³​arr_time, ⁴​sched_arr_time, ⁵​arr_delay

Let’s fit a smoothed generalised additive model to each destination, estimating how delays vary over the course of the year and within a day. Note that we need to use cluster_library() to load the mgcv package on every node. That takes around 3s:

cluster_library(cluster, "mgcv")
system.time({
  models <- by_dest %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})
#>    user  system elapsed 
#>   0.005   0.002   1.395

Compared with around 5s doing it locally:

system.time({
  models <- common_dest %>% 
    group_by(dest) %>% 
    do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})
#>    user  system elapsed 
#>   2.003   0.101   2.111

The cost of transmitting messages to the nodes is roughly constant, so the longer the task you’re parallelising, the closer you’ll get to a linear speed up.