Our customers at Qubole use notebooks with Apache Spark as the back-end to build machine learning pipelines. Often, it is the data scientists who develop the model and the infrastructure/engineering team who deploy the models. This interdependency consumes a lot of time and is error-prone as well. This friction often leads to critical business delays.
In this blog, we will walk through an example notebook that can do it all: train the model using Spark MLlib, serialize the models using MLeap, and deploy the model to Amazon SageMaker. The steps are simple enough for data scientists to deploy models on their own.
What Is MLeap?
MLeap provides a serialization format and execution engine for machine learning pipelines. It supports multiple frameworks like Spark MLlib, Tensorflow, Scikit-Learn, etc. for training models and exports them to MLeap bundle. It is an actively developed and easy-to-use open source tool.
Spark also provides model serialization, which is more suited for batch prediction tasks. It is not the preferred solution for low latency online inference use cases. To perform low latency online predictions, MLeap is the ideal option. MLeap bundles are also portable and can be deployed using MLeap runtime on any cloud.
What Is Amazon SageMaker?
Amazon SageMaker is a tool designed to support the entire data scientist workflow. It provides the infrastructure to build, train, and deploy models. It also has support for A/B testing, which allows you to experiment with different versions of the model at the same time. The model runs on autoscaling k8s clusters of AWS SageMaker instances that are spread across multiple availability zones to deliver both high performance and high availability.
In this blog, we will serialize a model trained using Spark MLlib in MLeap format and deploy it to SageMaker. For Tensorflow and Scikit-Learn, similar steps can be followed. These model deployments can then be used with MLeap runtime to do real-time predictions.
Now, let’s take a step-by-step look at how to do this!
Prerequisite Steps
- Install boto3 (1.9.103) in your cluster using Environments. You can also use node-bootstrap to install the Python packages; however, Qubole’s Environments feature provides an easy way to manage Python and R dependencies and allows you to install packages at runtime dynamically without having to restart the cluster. Run the following snippet in the notebook to install MLeap and SageMaker.
%sh #This installs MLeap and SageMaker. After next release, these can be installed via Environments and this paragraph will not be needed. hadoop dfs -copyToLocal s3://paid-qubole/scripts/packaged/install_mleap_and_sagemaker.bash source install_mleap_and_sagemaker.bash && install_packages py s3_path
- Add the following MLeap jars in interpreter settings. The following jars are for Spark version 2.3. MLeap works with all Spark versions > 2.0 expect 2.4. You can find the MLeap jar versions for the corresponding Spark versions here.
ml.combust.mleap:mleap-runtime_2.11:0.13.0
ml.combust.mleap:mleap-spark_2.11:0.13.0
ml.combust.mleap:mleap-spark-extension_2.11:0.13.0
- For deploying to SageMaker, we need to upload the serialized model to s3. Make sure that you have roles configured with policies for access to Amazon ECR as well as SageMaker APIs. You will also need to add a policy that allows you to access s3 buckets where your model will be saved.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::*" ] } ] }
Train the Model Using MLlib
We are building a model that will be using the zoo data set from the Machine Learning Repository at UC Irvine to predict animals based on certain features. The data set has seven different classes of animals with seventeen Boolean-valued attributes.
%sh # download the dataset wget https://archive.ics.uci.edu/ml/machine-learning-databases/zoo/zoo.data # copy to hdfs hadoop dfs -copyFromLocal file:///zoo.data hdfs:///tmp/zoo.data
# Import libraries from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml import Pipeline from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # load the dataset into spark dataframe df = spark.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load('/tmp/zoo.data') df=df.drop('animal_name') input_fields = ['hair','feathers','eggs','milk', 'airbone','aquatic','predator','toothed','backbone', 'breathes','venomous','fins', 'tail','domestic','catsize', 'legs'] # Vector Assembler combines a list of columns into a single vector column assembler = VectorAssembler(inputCols=input_fields, outputCol="features") # To read more about decision trees, refer this link dt = DecisionTreeClassifier(labelCol="type") # create a pipeline with data preprocessing steps and model modelPipeline = Pipeline(stages = [assembler, dt]) (trainingData, testData) = df.randomSplit([0.8, 0.2]) evaluator = MulticlassClassificationEvaluator(labelCol="type", predictionCol="prediction", metricName="accuracy") # Find the best model parameters using ParamGridBuilder. paramGrid = (ParamGridBuilder() .addGrid(dt.maxDepth, [2, 4, 6]) .addGrid(dt.impurity, ['gini', 'entropy']) .addGrid(dt.maxBins, [10, 20, 40]) .build()) cv = CrossValidator(estimator=modelPipeline, estimatorParamMaps=paramGrid, evaluator=evaluator) # train on the training dataframe pipelineModel = cv.fit(trainingData) # best model final_model = pipelineModel.bestModel # transform the test dataframe predictions = final_model.transform(testData) accuracy = evaluator.evaluate(predictions) print("Test Error = {}".format(1.0 - accuracy))
Serialize the Trained Model Using MLeap
Serialize the model using MLeap. MLeap also stores the structure of input DataFrame to validate the LeapFrames during inferencing (runtime).
import mleap.pyspark from mleap.pyspark.spark_support import SimpleSparkSerializer import os file_name = '/tmp/mleap-zoo.zip' if os.path.isfile(file_name): os.remove(file_name) final_model.serializeToBundle("jar:file:{}".format(file_name), final_model.transform(testData))
Convert the model to the tar.gz format required by SageMaker and upload the model to s3.
import zipfile import tarfile import boto3 with zipfile.ZipFile(file_name) as zf: zf.extractall("/tmp/mleap-zoo") with tarfile.open("/tmp/mleap-zoo.tar.gz", "w:gz") as tar: tar.add("/tmp/mleap-zoo/bundle.json", arcname='bundle.json') tar.add("/tmp/mleap-zoo/root", arcname='root') s3 = boto3.client('s3') filename = 'mleap-zoo.tar.gz' bucket_name = # Uploads the given file using a managed uploader, which will split up large # files automatically and upload parts in parallel. s3.upload_file("/tmp/" + filename, bucket_name, filename)
Deploy Your Model to SageMaker
Initialize a SageMaker client and use it to create a SageMaker model, endpoint configuration, and endpoint. In the SageMaker model, you will need to specify the location where the image is present in ECR.
import SageMaker import boto3 import json from sagemaker.sparkml.model import SparkMLModel boto_session = boto3.Session(region_name='us-east-1') sess = sagemaker.Session(boto_session=boto_session) sagemaker_session = sess.boto_session.client('sagemaker') account = boto3.client('sts', region_name="us-east-1").get_caller_identity()['Account'] model_name = "simple-mleap-zoo" endpoint_name = 'simple-mleap-zoo-ep' # Define the input and output schema of your model schema = {"input":[....], "output": {.....}} schema_json = json.dumps(schema) role = 'arn:aws:iam::{}:role/sagemaker-access'.format(account) model_location = # SparkMlModel uses the image build by - https://github.com/aws/sagemaker-sparkml-serving-container sparkml_model = SparkMLModel(model_data=model_location, role=role, sagemaker_session=sess, name=model_name, env={'SAGEMAKER_SPARKML_SCHEMA' : schema_json}) print("Created model: {}".format(model_name)) endpoint_name = 'simple-mleap-zoo-ep' sparkml_model.deploy(initial_instance_count=1, instance_type='ml.c4.xlarge', endpoint_name=endpoint_name)
Happy Inferencing!
Once the endpoint is active, invoke the endpoint and get started with predicting.
# The data should be as per the schema specified in the above step test_data = {"data": []} from sagemaker.predictor import json_serializer, csv_serializer, json_deserializer, RealTimePredictor from sagemaker.content_types import CONTENT_TYPE_CSV, CONTENT_TYPE_JSON predictor = RealTimePredictor(endpoint=endpoint_name, sagemaker_session=sess, serializer=json_serializer, content_type=CONTENT_TYPE_JSON, accept=CONTENT_TYPE_CSV) print(predictor.predict(test_data))
Next Steps
You can try this in a Qubole notebook by importing it using this link.
Conclusion
Qubole provides excellent integrations for data science workflows. It is very easy to quickly develop an entire pipeline to build, train, and deploy models, as shown above. This will enable data scientists to get quick feedback about their models without relying on engineering teams.