A comprehensive guide to putting a machine learning model in production using Flask, Docker, and Kubernetes

Have you created a machine learning model and now you must get it running in production? In this tutorial, I am going to explain how to make a REST interface using Flask, build a Docker image and prepare the service for being deployed on Kubernetes. Additionally, I am going to give you some tips regarding tracking the accuracy of the model in production, securing the service, and protecting user’s privacy.

Required packages

In addition to Scikit-learn, Pandas, and Numpy, we are going to need Flask and Joblib. Remember to install those packages!

Kubernetes Liveness Probe

Let’s start with the most straightforward task, which is creating a REST service in the Flask framework. I said that we are going to prepare the service for Kubernetes deployment, so the minimal requirement is exposing a “health check” endpoint.

We must return status between 200 and 400 if the service is running.

I created a service.py file which contains this code:

from flask import Flask, Response
app = Flask(__name__)

@app.route('/health')
def health_check():
    return Response("", status = 200)

if __name__ == '__main__':
    app.run(debug=True,host='0.0.0.0')

Prediction — REST endpoint

In a previous blog post, I described how to save a model into a file. The model defined in this blog post was silly, but we can use it as an example. We need to know what is the expected input of the model. In this case, it is an array of 3 numbers, and the number can be either 0 or 1.

As the next step, I am going to define an endpoint which accepts those three values as query parameters. In the example model, the values have no real names. Because of that, I call them “feature1”, “feature2”, and “feature3”. I am also going to use a Model class that is not defined yet. Don’t worry. We are going to create it in the next step.

Let’s replace the content of service.py file with this code:

from flask import Flask, Response
from model import Model

app = Flask(__name__)
model = Model('model.joblib')

@app.route('/predict')
def predict():
    feature1 = request.args.get('feature1')
    feature2 = request.args.get('feature2')
    feature3 = request.args.get('feature3')
    input = [feature1, feature2, feature3]
    prediction = model.predict(input)
    result = dict({
        'prediction': prediction
    })
    return jsonify(result)

@app.route('/health')
def health_check():
    return Response("", status = 200)

if __name__ == '__main__':
    app.run(debug=True,host='0.0.0.0')

That code does not work, because we need to define the Model class and load the model. Let’s do it now.

Loading the model

In the blog post mentioned above, I stored the model in a Python dictionary which besides the model also contains the model metadata. I must remember about it when I load the model.

I am going to load the model from the file in the constructor of the Model class and define the predict function that calls the model and returns its prediction. Here is the content of the model.py file:

from joblib import load
import pandas as pd
import numpy as np

class Model:
    def __init__(self, file_name):
        loaded = load(file_name)
        self.__model = loaded['model']
        self.meta_data = loaded['metadata']

    def predict(self, features):
        input = np.asarray(features).reshape(1, -1)
        result = self.__model.predict(input)
        return int(result[0])

AttributeError: module ‘main’ has no attribute ‘get_last_column’

When we try to run the code, we are going to the get the missing argument error. There is a missing function.

How did it happen? The person who created the model was using Jupyter Notebook, so everything they wrote was in the __main__ module. The correct way of fixing the problem would be creating a preprocessing.py module which contains the get_last_comment function and uses it in both the Jupyter Notebook and this service.

However, let’s make this example more realistic. It is too late. We cannot do it. Imagine that it takes a week to fit the model, our data scientist has no idea how to use Python modules, and the service must be in production in two hours. We have to either get it done or start writing messages to recruiters on LinkedIn.

We must accept that the solution is going to break some best practices and design patterns. We have two options.

Option #1: The __main__ module is always the code in the file passed as the argument of the python command. We run the application using python service.py, so service.py is our main module.

We could copy the get_last_column function to service.py, but we don’t want to break all best practices. The code used by the model cannot be in the same place where we implement the REST endpoint.

Option #2: What else can we do? We can define the function in the model.py file, but cheat and add it to the __main__ module.

Here is the updated code of the model.py file.

from joblib import load
import pandas as pd
import numpy as np
import sys

class Model:
    def __init__(self, file_name):
        def get_last_column(X):
            return X[:, -1].reshape(-1, 1)

        setattr(sys.modules['__main__'], 'get_last_column', get_last_column)

        loaded = load(file_name)
        self.__model = loaded['model']
        self.meta_data = loaded['metadata']

    def predict(self, features):
        input = np.asarray(features).reshape(1, -1)
        result = self.__model.predict(input)
        return int(result[0])

Where did I find the missing function? It is in the blog post about saving the model to a file. In your case, you should look for it in the Jupyter Notebook which built the model.

Input validation

Now, it is time to validate the query parameters. We must be sure that the model can use the values we get from the client services. I am going to define the validation as a Python decorator.

def validate_features(f):
    parameter_names = ['feature1', 'feature2', 'feature3']

    @wraps(f)
    def wrapper(*args, **kw):
        for parameter in parameter_names:
            to_be_validated = request.args.get(parameter)
            try:
                number_to_validate = int(to_be_validated)
                if number_to_validate < 0 or number_to_validate > 1:
                    raise ValueError('Value must be 0 or 1.')
            except ValueError as err:
                return Response(str(err), status = 400)
         return f(*args, **kw)
    return wrapper

Remember to add the @validate_features annotation before the predict function!

@app.route('/predict')
@validate_features
def predict():
    #The body of the function

Kubernetes Readiness Probe

What can we do when the model is enormous, and we need some time to load it. It is not enough to define the health check endpoint because Kubernetes is going to send request to the service as soon as the health check starts responding. If we try to cheat and return an error code from the health check endpoint, Kubernetes will restart the service, so that is not the valid solution.

We need a readiness check. A readiness check is an endpoint used by Kubernetes to check if the service is ready to handle requests.

First, I am going to add the endpoint to the service.py file.

@app.route('/ready')
def readiness_check():
    if model.is_ready():
        return Response("", status = 200)
    else:
        return Response("", status = 503)

Now I need to make a few modifications in the Model class.
Not only it needs the is_ready function, but also it needs to load the model in a separate thread so the Flask framework can continue working and respond to Kubernetes probes. Here is the Model class which loads the machine learning model in a separate thread:

from joblib import load
import pandas as pd
import numpy as np
import sys
from threading import Thread

class Model:
    def __init__(self, file_name):
        def get_last_column(X):
            return X[:, -1].reshape(-1, 1)

        setattr(sys.modules['__main__'], 'get_last_column', get_last_column)

        self.__file_name = file_name
        self.__is_ready = False
        self.__model = None
        self.__meta_data = None

    def __load_model(self):
        loaded = load(self.__file_name)
        self.__model = loaded['model']
        self.__meta_data = loaded['metadata']
        self.__is_ready = True

    def load_model(self):
        Thread(target=self.__load_model).start()

    def is_ready(self):
        return self.__is_ready

    def predict(self, features):
        if not self.is_ready():
            raise RuntimeError('Model is not ready yet.')

        input = np.asarray(features).reshape(1, -1)
        result = self.__model.predict(input)
        return int(result[0])

Scaling up

In the past versions of Flask, the number of simultaneous requests was limited to 1. In the current Flask (version: 1.0.2), the server can handle multiple requests at the same time.

If that is not enough, we can deploy multiple instances of this service on Kubernetes. Every one of them loads its model, and there is no shared state, so nothing stops us from doing that.

Observability

At some point, somebody is going to ask us how the model is performing in the production. To check the model performance, we need a business metric influenced by the model. If we are making a recommendation tool, we may track user’s behavior and check who bought the items recommended by the model.

What do we need? The services which use the model must send us some request identifier. Usually, it is done using the Correlation-Id header which contains the identifier of the user’s session or just a unique request’s identifier.

In this example, we are going to fetch the Correlation-Id and log the request and prediction together with the id. We may also need the response time of our model, so I am going to log that too. In real life, you can send that information to a Kafka queue or store them in a database.

Quick note, the user does not care about the model’s response time. They care only about the responsiveness of the webpage. It does not matter how fast the model responses if the page contains advertisement and other redundant stuff that makes it painfully slow.

I want to log also the model metadata, so first I am going to add the meta_data function to the Model class.

#model.py, somewhere inside the Model class

def meta_data(self):
    return self.__meta_data

Now I can modify the predict function in the service.py file.

# Add to the import section on top of the service.py file
from time import time

# Replace the predict function with this
@app.route('/predict')
@validate_features
def predict():
    before_time = time()
    feature1 = request.args.get('feature1')
    feature2 = request.args.get('feature2')
    feature3 = request.args.get('feature3')

    correlation_id = request.headers.get('X-Correlation-ID')

    input = [feature1, feature2, feature3]

    prediction = model.predict(input)
    result = dict({
        'prediction': prediction
    })

    after_time = time()

    to_be_logged = dict({
        'input': {
            'feature1': feature1,
            'feature2': feature2,
            'feature3': feature3
        },
        'request_id': correlation_id,
        'prediction': prediction,
        'model': model.meta_data(),
        'request_duration': after_time - before_time
    })

    app.logger.info(to_be_logged)

    return jsonify(result)

Security

Not everyone should be able to access the service. We must somehow authorize the user. The simplest way is using the Basic Authorization header and a constant username and password. It is also not secure, but it makes a good starting point for the next step.

First, we must define a Python decorator:

def requires_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.authorization
        if not auth or not auth.username == 'admin' or not auth.password == 'admin':
            return Response(
                'Could not verify your access level for that URL.\n'
                'You have to login with proper credentials', 401,
                {'WWW-Authenticate': 'Basic realm="Login Required"'})
        return f(*args, **kwargs)
    return decorated

Now, we must add the @requires_auth annotation to the predict function.

The proper authentication may be done using Token Authorization and by verifying the JWT token issued by another service. There is an excellent tutorial about it in the Auth0 documentation, so I am not going to cover that.

User’s privacy and GDPR

Does the model process user’s personal data? We must protect it! We must either stop logging the personal data or anonymize it. Another acceptable solution is cleaning the log messages as a part of the GDPR deletion procedure, but that requires additional work.

Speaking of GDPR, remember about asking the user for permission to process their data!

It may also be necessary to store the model’s prediction and give the users access to every automated decision made by the model. Ask the lawyer about that.

Docker

The last step covered in this tutorial is building a Docker image. We are going to create a Dockerfile. What do we need? First, we need a base Docker image with all the required packages. I am going to define the packages in the requirements.txt file:

pandas
numpy
scikit-learn
scipy
flask
joblib

After that, I can start writing the Dockerfile:

FROM python:3
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

Now, I am going to assume that the machine used to build the Docker image has the model file in the same directory as the source code. If it is not the case, downloading the model cannot be done inside the Dockerfile.

We don’t want to include the code necessary to download it from FTP, S3, anywhere in the Docker image. In this case, we should create a shell script that first downloads the model file and then runs the Docker build.

After that, we can copy the Python files, define the port number, and the command which runs the application.

COPY . /app
WORKDIR /app
EXPOSE 5000
ENTRYPOINT ["python"]
CMD ["service.py"]

The Dockerfile is ready. To create the image, run this command:

docker build -t mlmodel:latest .

To run the Docker image locally, execute this command:

docker run -p 5000:5000  mlmodel:latest

Now, we can deploy the image to Kubernetes. The procedure depends on the cloud provider you use, so check the details in their documentation.

Older post

Query string validation in Fastify

How to validate query parameters using Fastify

Newer post

Save and restore a Tensorflow model using Keras for continuous model training

How to run fit function multiple time and improve the model?