Deep Learning has been shown to produce highly effective machine learning models in a diverse group of fields. Some of the most interesting are: pharmaceutical drug discovery [1], detection of illegal fishing cargo [2], mapping dark matter [3], tracking deforestation in the Amazon [4], taxi destination prediction [5], predicting lift and grasp movements from EEG recordings [6], and medical diagnosis for cancer [7, 8]. Distributed deep learning allows for internet scale dataset sizes, as exemplified by companies like Facebook, Google, Microsoft, and other huge enterprises. This blog post demonstrates how any organization of any size can leverage distributed deep learning on Spark thanks to the Qubole Data Service (QDS).
This demonstration utilizes the Keras [9] framework for describing the structure of a deep neural network, and subsequently leverages the Dist-Keras [10] framework to achieve data parallel model training on Apache Spark. Keras was chosen in large part due to it being the dominant library for deep learning at the time of this writing [12, 13, 14].
The Goal: Distributed Deep Learning Integrated With Spark ML Pipelines
Upon completion of this blog post, an enterprising data scientist should be able to extend this demonstration to their application specific modeling needs. Within a Qubole notebook [19], you should be able to cross validate your deep neural networks using the Spark ML Pipeline interface [18], with an application specific parameter grid similar to the following
df_train = process_csv("/fully/qualified/path/to/training/data") df_test = process_csv("/fully/qualified/path/to/test/data") param_grid = tuning.ParamGridBuilder() \ .baseOn(['regularizer', regularizers.l1_l2]) \ .addGrid('activations', [['tanh', 'relu']]) \ .addGrid('initializers', [['glorot_normal', 'glorot_uniform']]) \ .addGrid('layer_dims', [[input_dim, 2000, 300, 1]]) \ .addGrid('metrics', [['mae']]) \ .baseOn(['learning_rate', 1e-2]) \ .baseOn(['reg_strength', 1e-2]) \ .baseOn(['reg_decay', 0.25]) \ .baseOn(['lr_decay', 0.90]) \ .addGrid('dropout_rate', [0.20, 0.35, 0.50, 0.65, 0.80]) \ .addGrid('loss', ['mse', 'msle']) \ .build() estimator = DistKeras(trainers.ADAG, {'batch_size': 256, 'communication_window': 3, 'num_epoch': 10, 'num_workers': 50}, **param_grid[0]) evaluator = evaluation.RegressionEvaluator(metricName='r2') cv_estimator = tuning.CrossValidator(estimator=estimator, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5) cv_model = cv_estimator.fit(df_train) df_pred_train = cv_model.transform(df_train) df_pred_test = cv_model.transform(df_test)
In order to make your cross validation work as illustrated above, you need to first configure your Qubole cluster [20], and secondly set up your Qubole notebook [19]. The instructions for doing so are contained in the remainder of this blog.
Configuring Your QDS Cluster
Add the following lines to your node_boostrap script
# automatically installs latest version of Keras as dependency
pip install dist-keras
# for GPU clusters, swap out default dependency tensorflow
# with tensorflow for GPU nodes
pip uninstall tensorflow
pip install tensorflow-gpu
and restart your cluster. Note the default back-end for Keras is Tensorflow. It supports any of the following back-ends as well: CNTK, MXNET, Theano [15, 16]. To use any of the other back-ends, you must pip install them in the node_bootstrap script and subsequently tell Keras to which back-end to switch [17].
Setting Up Your QDS Notebook
First import the necessary libraries
from keras import layers, models, optimizers, regularizers, utils from pyspark.ml import evaluation, feature, tuning
from distkeras import predictors, trainers
from pyspark.sql import functions, types from pyspark import ml
import numpy as np
import matplotlib
import StringIO
after which you should define these wrappers to tightly integrate with Spark ML pipelines [18]. The wrappers are taken directly from an open source gist [11].
class DistKeras(ml.Estimator): def __init__(self, *args, **kwargs): self.__trainer_klass = args[0] self.__trainer_params = args[1] self.__build_trainer(**kwargs) super(DistKeras, self).__init__() @classmethod def __build_keras_model(klass, *args, **kwargs): loss = kwargs['loss'] metrics = kwargs['metrics'] layer_dims = kwargs['layer_dims'] hidden_activation, output_activation = kwargs['activations'] hidden_init, output_init = kwargs['initializers'] dropout_rate = kwargs['dropout_rate'] alpha = kwargs['reg_strength'] reg_decay = kwargs['reg_decay'] reg = kwargs['regularizer'] keras_model = models.Sequential() for idx in range(1, len(layer_dims)-1, 1): keras_model.add(layers.Dense(layer_dims[idx], input_dim=layer_dims[idx-1], bias_initializer=hidden_init, kernel_initializer=hidden_init, kernel_regularizer=reg(alpha))) keras_model.add(layers.Activation(hidden_activation)) keras_model.add(layers.Dropout(dropout_rate)) alpha *= reg_decay keras_model.add(layers.Dense(layer_dims[-1], input_dim=layer_dims[-2], bias_initializer=output_init, kernel_initializer=output_init, kernel_regularizer=reg(alpha))) keras_model.add(layers.Activation(output_activation)) return keras_model def __build_trainer(self, *args, **kwargs): loss = kwargs['loss'] learning_rate = kwargs['learning_rate'] lr_decay = kwargs['lr_decay'] keras_optimizer = optimizers.SGD(learning_rate, decay=lr_decay) keras_model = DistKeras.__build_keras_model(**kwargs) self._trainer = self.__trainer_klass(keras_model, keras_optimizer, loss, **self.__trainer_params) def _fit(self, *args, **kwargs): data_frame = args[0] if len(args) > 1: params = args[1] self.__build_trainer(**params) keras_model = self._trainer.train(data_frame) return DistKerasModel(keras_model) class DistKerasModel(ml.Model): def __init__(self, *args, **kwargs): self._keras_model = args[0] self._predictor = predictors.ModelPredictor(self._keras_model) super(DistKerasModel, self).__init__() def _transform(self, *args, **kwargs): data_frame = args[0] pred_col = self._predictor.output_column preds = self._predictor.predict(data_frame) return preds.withColumn(pred_col, cast_to_double(preds[pred_col])) cast_to_double = functions.udf(lambda row: float(row[0]), types.DoubleType())
Last but not least, you should define some important helper functions, starting with the show() function which displays arbitrary and generic matplotlib figures. This function is adapted from the Qubole blog on integrating the alternate library Plotly into our notebooks [22].
# must do before importing pyplot or pylab matplotlib.use('Agg') from matplotlib import pyplot as plt def show(fig): image = StringIO.StringIO() fig.savefig(image, format='svg') image.seek(0) print("%html <div style='width:1200px'>"+ image.buf +"</div>")
Another important helper function is process_csv() which automates the highly redundant task of creating a data frame with renamed columns (such as ‘label’ for the label column) and with excluded columns (such as unused ID columns) from a CSV file in cloud storage [21].
def process_csv(fully_qualified_path, columns_renamed=tuple(), excluded_columns=tuple(), num_workers=None): if num_workers is None: raise NotImplementedError excluded_columns = frozenset(excluded_columns) data_frame = sqlContext.read.format('com.databricks.spark.csv') \ .options(header='true', inferSchema='true') \ .load(fully_qualified_path) for (old_name, new_name) in columns_renamed: data_frame = data_frame.withColumnRenamed(old_name, new_name) data_frame = data_frame.repartition(num_workers) feature_columns = tuple(frozenset(data_frame.columns) \ .difference(excluded_columns)) transformer = feature.VectorAssembler(inputCols=feature_columns, outputCol='features') data_frame = transformer.transform(data_frame) \ .drop(*feature_columns).cache() return data_frame
Now you are ready to configure, train, and evaluate any distributed deep learning model described in Keras!
About The Author
Horia Margarit is a career data scientist with industry experience in machine learning for digital media, consumer search, cloud infrastructure, life sciences, and consumer finance industry verticals [23]. His expertise is in modeling and optimization on internet scale datasets, specifically leveraging distributed deep learning among other techniques. He earned dual Bachelors degrees in Cognitive and Computer Science from UC Berkeley, and a Master’s degree in Statistics from Stanford University.
References
- Deep Learning How I Did It: Merck 1st place interview
- The Nature Conservancy Fisheries Monitoring Competition, 1st Place Winner’s Interview: Team ‘Towards Robust-Optimal Learning of Learning’
- DeepZot on Dark Matter: How we won the Mapping Dark Matter challenge
- Planet: Understanding the Amazon from Space, 1st Place Winner’s Interview
- Taxi Trajectory Winners’ Interview: 1st place, Team ?
- Grasp-and-Lift EEG Detection Winners’ Interview: 3rd place, Team HEDJ
- Intel & MobileODT Cervical Cancer Screening Competition, 1st Place Winner’s Interview: Team ‘Towards Empirically Stable Training’
- 2017 Data Science Bowl, Predicting Lung Cancer: 2nd Place Solution Write-up, Daniel Hammack and Julian de Wit
- Keras Documentation
- Dist-Keras Documentation
- Dist-Keras Spark ML Pipelines
- Compare Keras and Pytorch’s popularity and activity
- Compare Keras and Caffe’s popularity and activity
- Compare Keras and Theano’s popularity and activity
- Keras shoot-out: TensorFlow vs MXNet
- The Search for the Fastest Keras Deep Learning Backend
- Keras: Switching from one backend to another
- Spark ML Pipelines
- Introduction To Qubole Notebooks
- Configuring A Spark Cluster
- The Cloud Advantage: Decoupling Storage And Compute
- Creating Customized Plots In Qubole Notebooks
- Author’s LinkedIn Profile