# Overview

I’ve previously written about doing cross-validation with `sparklyr`

. This post will serve as an update, given the changes that have been made to `sparklyr`

. The first half of my previous post may be worth reading, but the section on cross-validation is wrong, in that the function provided no longer works. If you want an overview of `sparklyr`

, and how it compared to `SparkR`

, see this post. Bear in mind, however, that post was written in December 2017, and both packages have added functionality since then.

Here, we’re going to start off by to writing the necessary functions for cross-validation ourselves. The hope is that this will make clear how CV works, as well as introducing some `sparklyr`

functions. At the end, the in-built functions for cross-validation are discussed. A future post will show how these in-built functions can be used to do feature selection by cross-validation.

# Setup

First we do some setup by loading in some packages and creating a local Spark connection.

```
library(tidyverse)
library(sparklyr)
sc <- spark_connect(master = 'local')
```

Here we’re going to use the famous, if a little dull, titanic dataset. We’re getting this from the `titanic`

package, which you’ll need to install if you don’t have it. To keep things simple, we’re selecting a subset of columns and making sure everything in an integer.

```
# get the training data
df_titanic <- titanic::titanic_train %>%
# select some columns
select(Survived, Pclass, Sex, Age, SibSp, Parch, Fare) %>%
# make the col names lower case
rename_all(tolower) %>%
# turn sex to a interger
mutate(sex = ifelse(sex == 'male', 1, 0)) %>%
# remove NAs
filter_all(all_vars(!is.na(.)))
```

Now we’ve cleaned the data, we can copy it to our Spark connection, creating a Spark DataFrame. In practice, we could have started by copying the data to Spark. We then could have used Spark’s string indexer function (`ft_string_indexer()`

) to clean the `sex`

column. This function will automatically index a column of strings with integers.

```
# create a Spark DataFrame for the data
tbl_titanic <- copy_to(sc, df_titanic)
```

# Cross-validation

We can now turn to doing some cross-validation. As noted above, we’re doing this ‘long-hand’ to begin with, before looking at the functions built into `sparklyr`

.

# Using user-defined functions

In order to do cross-validation, the first thing we need to do is split our data into different folds. To do this we’ll use `sdf_partition()`

. To use this function we need to specify our data and then some weights to use when partitioning the data. These weights should sum to 1, and specify the proportion of data to put into each partition. For \(k\)-fold validation we want \(k\) equal folds, meanings our weights should just be \(1 / k\).

```
# number of folds
k = 5
# create the weights
weights <- rep(1 / k, times = k)
# give names to the weights so that our partitions are named
# sdf_parition will fail if we don't do this
names(weights) <- paste0("fold", 1:k)
# create the partitioned data
tbl_titanic_cv_split <- sdf_partition(
tbl_titanic,
weights = weights,
seed = 2018
)
```

We now have our data split into 5 equally sized folds. For doing cross-validated training/testing we don’t really want the data in this form. Instead we want \(k\) test sets that contains the data from one fold, and \(k\) training sets that contains the *combined* data from the others folds. We can get these training sets by creating a list with \(k\) elements, where each element contains the data from \(k - 1\) folds joined together.

To get this data, the first thing we need to do is get the indices for the folds that will make up these different training set.

```
# get the indicies for our different training sets
# e.g. a dataFrame for partitions 1, 2, 3, and 4
K <- 1:k
(indices <- map(K, ~ K[-.x]))
```

```
## [[1]]
## [1] 2 3 4 5
##
## [[2]]
## [1] 1 3 4 5
##
## [[3]]
## [1] 1 2 4 5
##
## [[4]]
## [1] 1 2 3 5
##
## [[5]]
## [1] 1 2 3 4
```

It may not be obvious how this code is working, so let’s talk through it. In the above code `K`

is the vector `c(1, 2, 3, 4, 5)`

. The first argument to `map()`

is just saying that `K`

is the thing we want to loop over. Then the bit after the `~`

is the function we want to apply. `K[-.x]`

is saying subset `K`

removing a particular element. `.x`

is used inside map to refer to the thing we’re iterating over. On the first iteration, `.x`

will be `1`

, the first element of `K`

. This means that `K[-.x] = K[-1] = c(2, 3, 4, 5)`

. In other words, `K`

without it’s first element.

Importantly, `K[-1]`

is saying ‘remove the first element of `K`

, **not** ’remove the element of K that equals 1’. It just so happens here that the first element of `K`

*is* `1`

. For example, if `K`

were instead `c(5, 4, 3, 2, 1)`

then `K[-1]`

would leave `c(4, 3, 2, 1)`

.

Next we use these indices to create our \(k\) training data sets. We create these training sets by looping over our indices and binding together the partitions of `tbl_titanic_cv_split`

that match those indices. This gives us back a list where each element contains 4 out of the 5 partitions from `tbl_titanic_cv_split`

. We’re using `sdf_bind_rows()`

here, which is analogous to `dplyr::bind_rows()`

but for Spark DataFrames.

```
# create our training sets by binding together partitions
# of data_cv. We get back a list where each element is a
# dataFrame of the partions indexed by each element
# of indices combined together
tbl_titanic_cv_train <-
map(indices, ~ sdf_bind_rows(tbl_titanic_cv_split[.x]))
# name the elements
names(tbl_titanic_cv_train) <- paste0("train", 1:k)
```

Now that we’ve been through the process of creating our training sets, lets make it into a function. The arguments to this function are the data, the number of cross-validation folds, and an optional seed for the partitioning of the data. Following `sdf_partition()`

, if the seed is not specified then it is generated by `sample(.Machine$integer.max, 1)`

.

```
sdf_cross_validation_train_sets <-
function(tbl,
k = 5,
seed = sample(.Machine$integer.max, 1)) {
# create weights for partitioning the data
weights <- rep(1 / k, times = k)
# name the elements of weights
names(weights) <- paste0("fold", 1:k)
# partition the data using weights
tbl_cv_split <-
sparklyr::sdf_partition(tbl, weights = weights, seed = seed)
# get the indicies for our different training sets
# e.g. a dataFrame for partitions 1, 2, 3, and 4
K <- 1:k
indices <- purrr::map(K, ~ K[-.x])
# create our training sets by binding together
# partitions of data_cv. We get back a list where
# each element is a dataFrame of the partions
# indexed by each element of indices combined together
tbl_cv_train <-
purrr::map(indices, ~ sparklyr::sdf_bind_rows(tbl_cv_split[.x]))
# name the training sets
names(tbl_cv_train) <- paste0("train", 1:k)
}
```

The next thing we’ll want is the test sets to go with each of our training sets. The way our `indices`

worked was that the first fold was left out of the first training set, the second out of the second at so on. Thus, the first training set contains the 2^{nd}, 3^{rd}, 4^{th} and 5^{th} partitions of the data. This means that the original partitioned data is what we’d used as test data. The \(i^{th}\) element of `tbl_cv_split`

is the partition not in the \(i^{th}\) element of `tbl_cv_train`

. We can add this into our function.

```
sdf_cross_validation_train_test_sets <-
function(data,
k = 5,
seed = sample(.Machine$integer.max, 1)) {
# first off create weights for partitioning the data
weights <- rep(1 / k, times = k)
# name the elements of weights
names(weights) <- paste0("fold", 1:k)
# partition the data using weights
data_cv_split <-
sparklyr::sdf_partition(data, weights = weights, seed = seed)
# get the indicies for our different training sets
# e.g. a dataFrame for partitions 1, 2, 3, and 4
K <- 1:k
indices <- purrr::map(K, ~ K[-.x])
# create our training sets by binding together
# partitions of data_cv. We get back a list where each
# element is a dataFrame of the partions
# indexed by each element of indices combined together
data_cv_train <-
purrr::map(indices, ~ sparklyr::sdf_bind_rows(data_cv_split[.x]))
# name the training sets
names(data_cv_train) <- paste0("train", 1:k)
# rename the test sets
names(data_cv_split) <- paste0("test", 1:k)
# return a tibble with list columns
# for the training and test sets
tibble::data_frame(cv_fold = K,
train = data_cv_train,
test = data_cv_split)
}
```

The function gives us back a data frame with list columns for the training and test data. By putting the data in this format, we can use some of the cool techniques for combining `purrr`

with list columns. Let’s take a look at what the function returns.

```
# use the functions
tbl_titanic_cv <- sdf_cross_validation_train_test_sets(
tbl_titanic,
seed = 2018
)
head(tbl_titanic_cv)
```

```
## # A tibble: 5 x 3
## cv_fold train test
## <int> <list> <list>
## 1 1 <S3: tbl_spark> <S3: tbl_spark>
## 2 2 <S3: tbl_spark> <S3: tbl_spark>
## 3 3 <S3: tbl_spark> <S3: tbl_spark>
## 4 4 <S3: tbl_spark> <S3: tbl_spark>
## 5 5 <S3: tbl_spark> <S3: tbl_spark>
```

In order to actually do our modelling, we’re going to need to create a formula to pass into the modelling functions. We can do this with a bit of string manipulation. First off lets specify our response/outcome column, and the features we want to feed in as predictors.

```
# the response columns
response = 'survived'
# the colnames of our data
columns <- colnames(tbl_titanic)
# get the col names that aren't the response
features <- columns[columns != response]
```

Now we’ve got this stuff we can build up our formula. First we’ll collapse the features into a single string separated by `' + '`

. We’ll then put `'am ~ '`

on the front and turn the whole string into a formula.

```
model_formula <- features %>%
paste0(collapse = ' + ') %>%
paste0(response, ' ~ ', .) %>%
as.formula()
model_formula
```

```
## survived ~ pclass + sex + age + sibsp + parch + fare
## <environment: 0x5c9a610>
```

Again, lets wrap this up into a function. We’re setting it up so that a vector of features can optionally be passed in. If it isn’t then every column in the data except the response are used as the predictors.

```
get_model_formula <- function(data, response, features = NULL) {
# If a vector of feature names hasn't been specified
if (is.null(features)) {
# Get the column names of the data
columns <- colnames(data)
# Create the feature names by using all
# the columns except the response
features <- columns[columns != response]
}
features %>%
paste0(collapse = ' + ') %>%
paste0(response, ' ~ ', .) %>%
as.formula()
}
```

Now we can use the function to verify we get the same result back.

```
# use the function
get_model_formula(tbl_titanic, 'survived')
```

```
## survived ~ pclass + sex + age + sibsp + parch + fare
## <environment: 0x8763110>
```

Now we have a model formula, we can actually do some modelling. To keep this simple we’re going to be using logistic regression. This is provided for us by Spark, and can be called with the `ml_logistic_regression()`

function.

Firstly, we’re going to add a new list column that contains model fits for each training set. We do this by putting `map()`

inside a `mutate()`

. We create the new column `fit_lr`

by mapping over the `train`

column, passing each element as the first argument to `ml_logistic_regression()`

. Additionally, we pass the formula that we created to the `formula`

argument of `ml_logistic_regression()`

.

```
tbl_titanic_cv <- tbl_titanic_cv %>%
mutate(fit_lr = map(train,
~ ml_logistic_regression(
.x,
formula = model_formula)))
```

Now we have some trained models, we can add the predictions on the test data into another list column. We’re using `map2()`

as we want to loop over our fits and test data simultaneously.

```
tbl_titanic_cv <- tbl_titanic_cv %>%
mutate(pred_lr = map2(fit_lr, test, ml_predict))
```

Now we’ve got our predictions, we can add in a column with some evaluation metrics. Here we’re using `map_dbl()`

to indicate that we want a vector of `double`

values back. If we didn’t do this, we’d get a list where each element is a single value.

```
tbl_titanic_cv <- tbl_titanic_cv %>%
mutate(eval_lr = map_dbl(pred_lr,
ml_binary_classification_evaluator,
label_col = response))
tbl_titanic_cv
```

```
## # A tibble: 5 x 6
## cv_fold train test fit_lr pred_lr eval_lr
## <int> <list> <list> <list> <list> <dbl>
## 1 1 <S3: tbl_s… <S3: tbl_… <S3: ml_model_logist… <S3: tbl_s… 0.839
## 2 2 <S3: tbl_s… <S3: tbl_… <S3: ml_model_logist… <S3: tbl_s… 0.837
## 3 3 <S3: tbl_s… <S3: tbl_… <S3: ml_model_logist… <S3: tbl_s… 0.855
## 4 4 <S3: tbl_s… <S3: tbl_… <S3: ml_model_logist… <S3: tbl_s… 0.888
## 5 5 <S3: tbl_s… <S3: tbl_… <S3: ml_model_logist… <S3: tbl_s… 0.848
```

Rather than doing this manually we could wrap the whole process into a function, making use of some of the helpers we’ve written along the way. We have a `model_fun`

argument to our function, which allows for different algorithms to be passed in.

```
ml_classification_cross_validation <-
function(tbl, # a tbl_spark
response, # the response column as a character
model_fun, # the modelling function to use (unquoted)
k = 5, # number of folds
seed = sample(.Machine$integer.max, 1), # optional seed for partitioning
features = NULL, # character vector or else all the columns except response
...) { # additional arguments
# quote the dots
dots = rlang::quos(...)
# get the dataframe with train and test sets
tbl_cv <- sdf_cross_validation_train_test_sets(tbl, k, seed)
# get the model formula
model_formula <- get_model_formula(tbl, response, features)
# fit, predict, evaluate
tbl_cv %>%
dplyr::mutate(
fit = purrr::map(.x = train, ~ model_fun(.x, formula = model_formula,!!!dots)),
pred = purrr::map2(fit, test, ml_predict),
eval = purrr::map(pred, ml_binary_classification_evaluator, label_col = response),
eval = unlist(eval)
)
}
```

Now lets use this function to see that we get the same result.

```
tbl_titanic_cv <-
ml_classification_cross_validation(
tbl = tbl_titanic,
response = 'survived',
model_fun = ml_logistic_regression,
k = 5,
seed = 2018
)
# print out the result
tbl_titanic_cv
```

```
## # A tibble: 5 x 6
## cv_fold train test fit pred eval
## <int> <list> <list> <list> <list> <dbl>
## 1 1 <S3: tbl_s… <S3: tbl_s… <S3: ml_model_logistic… <S3: tbl_… 0.839
## 2 2 <S3: tbl_s… <S3: tbl_s… <S3: ml_model_logistic… <S3: tbl_… 0.837
## 3 3 <S3: tbl_s… <S3: tbl_s… <S3: ml_model_logistic… <S3: tbl_… 0.855
## 4 4 <S3: tbl_s… <S3: tbl_s… <S3: ml_model_logistic… <S3: tbl_… 0.888
## 5 5 <S3: tbl_s… <S3: tbl_s… <S3: ml_model_logistic… <S3: tbl_… 0.848
```

We can average the `eval`

column to get a summary of the performance.

```
# take the average of the eval metric
mean(tbl_titanic_cv$eval)
```

`## [1] 0.853543`

Our function has a `...`

argument to allow us to pass in additional arguments to the various modelling functions offered by Spark. Lets demonstrate how this works using a random forest.

```
tbl_titanic_cv_rf <-
ml_classification_cross_validation(
tbl = tbl_titanic,
response = 'survived',
model_fun = ml_random_forest_classifier,
k = 5,
seed = 2018,
num_trees = 50
)
tbl_titanic_cv_rf
```

```
## # A tibble: 5 x 6
## cv_fold train test fit pred eval
## <int> <list> <list> <list> <list> <dbl>
## 1 1 <S3: tbl_s… <S3: tbl_… <S3: ml_model_random_fo… <S3: tbl_… 0.840
## 2 2 <S3: tbl_s… <S3: tbl_… <S3: ml_model_random_fo… <S3: tbl_… 0.837
## 3 3 <S3: tbl_s… <S3: tbl_… <S3: ml_model_random_fo… <S3: tbl_… 0.870
## 4 4 <S3: tbl_s… <S3: tbl_… <S3: ml_model_random_fo… <S3: tbl_… 0.893
## 5 5 <S3: tbl_s… <S3: tbl_… <S3: ml_model_random_fo… <S3: tbl_… 0.856
```

We could now plot the performance of the two different models we’ve tried.

```
# pull out the evaluation metrics from the two tibbles
tbl_model_eval <- tibble(
model = rep(c('Logistic regresion', 'Random forest'), each = 5),
auc = c(tbl_titanic_cv$eval, tbl_titanic_cv_rf$eval)
)
# boxplots of the auc over the 5 folds
ggplot(tbl_model_eval, aes(model, auc, fill = model)) +
coord_cartesian(ylim = c(0.75, 1)) +
geom_boxplot(alpha = .75, show.legend = FALSE) +
scale_fill_viridis_d(option = 'C', end = 0.4) +
theme_minimal(base_size = 16, base_family = 'mono') +
theme(panel.grid.major.x = element_blank()) +
labs(x = 'Model', y = 'Area under the ROC curve')
```

## Using in-built functions

Now we’re happy with the logic of how cross-validation works, we can leverage some in-built functions to do cross-validation. The cross-validation functions built into Spark / `sparklyr`

are designed for parameter tuning. However, we can use them to get a cross-validated estimate of performance for a single model. To make use of the in-built functions we can use Spark pipelines. Pipelines describe a number of stages we want to apply to some input data, such as feature transformations and fitting a model.

Below we’re creating a pipeline by intialising it for our Spark connection, specifying a model formula, and specifying an algorithm. `ft_r_formula()`

will take an R formula and use this to specify how our model should be fit to Spark.

```
pipeline <- ml_pipeline(sc) %>%
ft_r_formula(survived ~ .) %>%
ml_logistic_regression()
```

In order to use the cross-validation function, we need to provide a grid of tuning parameters to try. As we only want to fit a single model, we can just provide a single grid of parameters containing the default values.

```
grid <-
list(logistic_regression = list(elastic_net_param = 0, reg_param = 0))
```

Next we specify how we want our cross-validation to work. We are not actually doing the cross-validation here, just specifying its parameters.

```
cv <- ml_cross_validator(
sc,
estimator = pipeline, # use our pipeline to estimate the model
estimator_param_maps = grid, # use the params in grid
evaluator = ml_binary_classification_evaluator(sc), # how to evaluate the CV
num_folds = 5, # number of CV folds
seed = 2018
)
```

So far all we’ve done is lay out a pipeline. Nothing has actually ‘happened’ in terms of fitting a model to our data. Instead we’ve laid out a number of step that we ultimately want to be carried out.

`ml_pipeline(sc)`

: create a pipeline associated with our Spark connection (`sc`

)`ft_r_formula`

: use the Spark equivalent of this R formula when it comes to fitting the model`ml_logistic_regression`

: use logistic regression when fitting the model`grid`

: these are parameters to try out when doing the cross-validation`ml_cross_validator`

: create a cross-validator where the estimator is our pipeline, combining a model formula and algorithm. Try out the parameters in`grid`

and use`ml_binary_classification_evaluator`

to evaluation performance.

Now we have all these steps, we can actually carry out the cross-validation.

`cv_model <- ml_fit(cv, tbl_titanic)`

This gives us a list back containing a range of elements.

`names(cv_model)`

```
## [1] "uid" "param_map" "estimator"
## [4] "evaluator" "estimator_param_maps" "best_model"
## [7] "num_folds" "metric_name" "avg_metrics"
## [10] "avg_metrics_df" "sub_models" ".jobj"
```

There are some useful bits we can pull out, like a data frame of performance metrics. This would be very useful if we’d actually tried a range of parameters.

`cv_model$avg_metrics_df`

```
## areaUnderROC elastic_net_param_1 reg_param_1
## 1 0.8539799 0 0
```

One thing to note is that the in-built cross-validation function is a lot faster than the one we wrote. However, it does not return performance metrics for each fold, just an overall average. If we were fitting a lot of models we’d have to use the in-built functions for performance reasons. In addition, Spark 2.3 and `sparklyr`

0.8 support doing cross-validation in parallel. This will result in considerable performance gains, if you have access to a computer cluster.

In the next post we’ll explore using the in-built functions some more to do feature selection. We’ll also look at how you can use `ml_cross_validator`

without using pipelines, while discussing some of the advantages of pipelines.