• Alibek Jakupov

Azure Functions with Python: first steps towards 'Clean Code'

Updated: Nov 11



I like my code to be elegant and efficient. The logic should be straightforward to make it hard for bugs to hide, the dependencies minimal to ease maintenance, error handling complete according to an articulated strategy, and performance close to optimal so as not to tempt people to make the code messy with unprincipled optimizations. Clean code does one thing well.

Bjarne Stroustrup, inventor of C++ and author of The C++ Programming Language


I could list all of the qualities that I notice in clean code, but there is one overarching quality that leads to all of them. Clean code always looks like it was written by someone who cares. There is nothing obvious that you can do to make it better. All of those things were thought about by the code’s author, and if you try to imagine improvements, you’re led back to where you are, sitting in appreciation of the code someone left for you—code left by someone who cares deeply about the craft.

Michael Feathers, author of Working Effectively with Legacy Code


As you have already understood from the very beginning of the article today we are going to discuss the issue of so-called 'Clean code', that implies that you try to keep your code easy and comprehensible, by separating the logic and giving meaningful names to your functions and variables. It's not a trivial task at all, let me assure you. Whereas there are plenty of good references on the topic (Clean Code, Clean Coder, Clean Architecture by Uncle Bob to name a few), the task may vary depending on the problem context. In this article we are going to cover the serverless architecture and Azure Functions particulary. I am still a Rookie developer, so there are certainly a lot of things to improve in my code, but as the motto of alirookie.com says 'While True: Learn', so I am constantly trying to improve my code.



So what's the issue?


Let me explain the logic of Azure Functions first.


According to the official documentation:

Azure Functions allows you to run small pieces of code (called "functions") without worrying about application infrastructure. With Azure Functions, the cloud infrastructure provides all the up-to-date servers you need to keep your application running at scale. A function is "triggered" by a specific type of event. Supported triggers include responding to changes in data, responding to messages, running on a schedule, or as the result of an HTTP request. Although you can always code directly against myriad services, integrating with other services is streamlined by using bindings. Bindings give you declarative access to a wide variety of Azure and third-party services.

So the logic is quite straightforward. You create a piece of code and run on the Azure cloud. If you are working on python, you create a virtual environment (this is generally a good practice), prepare a requirements.txt, with all the libraries used in your project and deploy it on the cloud using a Visual Studio Code extension. If there are special system configurations that you need to adapt to your need, then you prepare a Docker script, push it to Docker Hub, and add a link in your Azure Function configuration to let it pull the image for executing.


As I've had a couple of projects on Python with Azure Functions, I've noticed that the majority of developers tends to put all the logic into the main function, or, in the best case, create a lot of functions in init.py and call them all from the main function. Anyway, if your function calls a database, does some machine learning, and saves artifacts to Azure Blob storage, there will be definitely a lot of code, and consequently, your init.py file will become extremely long.


While searching on the web I've recently found this StackOverflow post:



Great news, I am not the only one to concern myself with the issues like that!


In my previous project my init.py contained 484 lines of code which was difficult to maintain. However, in the last project my init.py had only 20. I am quite satisfied with the size of my main file and the structure of my project so I would like to share with readers some thoughts with concrete examples.



Let's code!


Here's the project context. We had a website that uploaded a file to Azure Blob storage, then called our function. The function did some basic text preprocessing, launched ML predictions on image and text column, added label columns to the initial dataframe, saved the result to a database, and finally save the processed file to the Blob storage but into a different container.


Here are the steps that I followed to make my code a little bit cleaner. No deep stuff, just pure code with comments.


1) Create a helpers folder


In this folder I've put all the helpers of my project as separate python files :


BlobHelper

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

import pandas as pd
import os
import io

from ..helpers.MLHelper import MLHelper


class BlobHelper:
    AZURE_STORAGE_CONNECTION_STRING = ""
    INPUT_CONTAINER = "siteuploads"
    OUTPUT_CONTAINER = "processed-files"

 def __init__(self):
 # Create the BlobServiceClient object which will be used to create a container client
 self.blob_service_client = BlobServiceClient.from_connection_string(
 self.AZURE_STORAGE_CONNECTION_STRING)

 def blob_to_pandas(self, blobname):
 """Download a csv from blob storage and convert it to a pandas dataframe

        Args:
            blobname (string): a filename (only .xlsx, .xls, .csv,) are accepted

        Returns:
            pandas.DataFrame: A dataframe represntation of the input
        """
 self.clean_name, self.file_extension = os.path.splitext(blobname)

 self.input_blob_client = self.blob_service_client.get_blob_client(
            container=self.INPUT_CONTAINER, blob=blobname)
 # all the output is in a csv format
 self.output_blob_client = self.blob_service_client.get_blob_client(
            container=self.OUTPUT_CONTAINER, blob=self.clean_name+".csv")

 # conver blob to BytesIO
        blob_stream = io.BytesIO(
 self.input_blob_client.download_blob().readall())

 if self.file_extension in [".xls", ".xlsx"]:
 self.input_dataframe = pd.read_excel(blob_stream)
 elif self.file_extension == ".csv":
 self.input_dataframe = pd.read_csv(blob_stream)
 else:
 self.input_dataframe = None

 return self.input_dataframe

 def remove_input_blob(self):
 self.input_blob_client.delete_blob()

 def upload_output_blob(self, dataframe):
 self.output_blob_client.upload_blob(dataframe, overwrite=True)

 def get_processed_blob(self, blobname):
        mlHelper = MLHelper(self.blob_to_pandas(blobname))
 self.remove_input_blob()
        upload_data = io.StringIO()
        processed_dataframe = mlHelper.get_processed_dataframe()
        upload_data = processed_dataframe.to_csv(
            index=None)
 self.upload_output_blob(upload_data)
 return processed_dataframe



DBHelper


import pandas as pd
import pyodbc


class DBHelper:
    SERVER = ''
    DATABASE = ''
    USERNAME = ''
    PASSWORD = ''

 def __init__(self):
 self.server = self.SERVER
 self.database = self.DATABASE
 self.username = self.USERNAME
 self.password = self.PASSWORD

 self.cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=' +
 self.server+';DATABASE='+self.database+';UID='+self.username+';PWD=' + self.password)
 self.cursor = self.cnxn.cursor()

 def insert_result(self, result_df):
 """Insert a pandas dataframe into a corresponding SQL table

        Args:
            result_df (pandas.DataFrame): resulting table to be inserted into the database
        """
 for index, row in result_df.iterrows():
 # insert into import data
 self.cursor.execute(
 "insert into [Results] (\
                    [Column 1], \
                        [Column 2], \
                            [Column 3], \
                                [Column 4], \
                                    [NLP_Category], \
                                        [CV_Category], \
                                            [Anomaly], \
                                                [IsProcessed], \
                                                    [NLP_Probability], \
                                                        [CV_Probability]) \
                                                            values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                row["Column 1"],
                row["Column 2"],
                row["Column 3"],
                row["Column 4"],
                row["NLP_Category"],
                row["CV_Category"],
 0,
 0,
                row["NLP_Probability"],
                row["CV_Probability"])
 self.cnxn.commit()




MLHelper


from azure.cognitiveservices.vision.customvision.prediction import CustomVisionPredictionClient
from msrest.authentication import ApiKeyCredentials
from io import StringIO
from html.parser import HTMLParser

import string
import re
import json
import numpy as np
import pandas as pd
import urllib.request as urllib


class MLStripper(HTMLParser):
 def __init__(self):
        super().__init__()
 self.reset()
 self.strict = False
 self.convert_charrefs = True
 self.text = StringIO()

 def handle_data(self, d):
 self.text.write(d)

 def get_data(self):
 return self.text.getvalue()


class MLHelper:
    NLP_URL = ''
    NLP_API_KEY = ''
    CV_PREDICTION_KEY = ""
    CV_ENDPOINT = ""

 def __init__(self, input_dataframe):
 """Connect to the Azure ML Studio/Custom Vision web API and add new columns to the input dataframe

        Args:
            input_dataframe (pandas.DataFrame): Partners Data to be treated
        """

 # There is a trained endpoint that can be used to make a prediction
 # Now there is a trained endpoint that can be used to make a prediction
        prediction_credentials = ApiKeyCredentials(
            in_headers={"Prediction-key": self.CV_PREDICTION_KEY})
 self.predictor = CustomVisionPredictionClient(
 self.CV_ENDPOINT, prediction_credentials)

 # clean labels from NaN and normalize whitespaces
 self.input_dataframe = input_dataframe.replace({'Column': {np.nan: "non communiqué"},
 'Column 1': {np.nan: "non communiqué"},
 'Column 2': {np.nan: "non communiqué"},
 'Column 3': {np.nan: "non communiqué"},
 'Column 4': {np.nan: ""},
 'Description': {np.nan: ""},
 'filename': {np.nan: "no-image-v2.jpg"}})

 self.input_dataframe['Column'] = self.input_dataframe['Famille BO'].apply(
 lambda x: self.clean_text_basic(x))
 self.input_dataframe['Column'] = self.input_dataframe['Produit'].apply(
 lambda x: self.clean_text_basic(x))
 self.input_dataframe['Column'] = self.input_dataframe['Sport'].apply(
 lambda x: self.clean_text_basic(x))
 self.input_dataframe['Column'] = self.input_dataframe['Genre'].apply(
 lambda x: self.clean_text_basic(x))

 def get_scored_label_nlp(self, item_input):
 """get a scored labbel for a list of items

        Arguments:
            item_input {[list]} -- "Designation Principale" column

        Returns:
            [list]-- predicted label
        """
 for batch execution create a temporary list of list with user requests
        web_input = []
 # convert each item into a list
 for item in item_input:
            web_input.append([item])

 # http request structure
        data = {

 "Inputs": {

 "input1":
                {
 "ColumnNames": ["text_column"],
 "Values": web_input
                }, },
 "GlobalParameters": {
            }
        }
 # convert the request into json data
        body = str.encode(json.dumps(data))
 # azure ml studio text classification endpoint

        headers = {'Content-Type': 'application/json',
 'Authorization': ('Bearer ' + self.NLP_API_KEY)}
 # send request
        req = urllib.Request(self.NLP_URL, body, headers)
 get response
        response = urllib.urlopen(req)
 # convert the response into json
        result = json.loads(response.read())
 # parse the response json
        responses = result['Results']['output1']['value']['Values']

 # colums of the output dataframe
        product_categories = []

 for response in responses:
 # the webservice output is the product category
            product_categories.append(response)

 return product_categories

 def get_image_tag_m1(self, image_url, predictor):
        results = predictor.classify_image_url_with_no_store(project_id="",
                                                             published_name="Iteration3", url=image_url)

        max_score = 0
        max_tag = ''

 # Display the results.
 for prediction in results.predictions:
            probability = prediction.probability * 100
 if probability > max_score:
                max_tag = prediction.tag_name
                max_score = probability

 return (max_score, max_tag)

 def get_image_tag_m2(self, image_url, predictor):
        results = predictor.classify_image_url(project_id="",
                                               published_name="Iteration2", url=image_url)

        max_score = 0
        max_tag = ''

 # Display the results.
 for prediction in results.predictions:
            probability = prediction.probability * 100
 if probability > max_score:
                max_tag = prediction.tag_name
                max_score = probability

 return (max_score, max_tag)

 def get_image_tag_m3(self, image_url, predictor):
        results = predictor.classify_image_url(project_id="",
                                               published_name="Iteration2", url=image_url)

        max_score = 0
        max_tag = ''

 # Display the results.
 for prediction in results.predictions:
            probability = prediction.probability * 100
 if probability > max_score:
                max_tag = prediction.tag_name
                max_score = probability

 return (max_score, max_tag)

 def get_image_tag_m4(self, image_url, predictor):
        results = predictor.classify_image_url(project_id="",
                                               published_name="Iteration2", url=image_url)

        max_score = 0
        max_tag = ''

 # Display the results.
 for prediction in results.predictions:
            probability = prediction.probability * 100
 if probability > max_score:
                max_tag = prediction.tag_name
                max_score = probability

 return (max_score, max_tag)

 def get_image_tag(self, image_url, predictor):
 return "", 0

 def get_processed_dataframe(self):
 """Apply API call to each row of the dataframe

        Returns:
            pandas.DataFrame: input_dataframe with ML columns
        """
        output = self.add_category_column(self.input_dataframe)
        output = self.add_url_column(output)
        output = self.add_full_description(output)
        chunk_size = 1000
        array_size = len(output['Description'])
        i = 0
        temp = []
 while i <= array_size:
            temp.append(self.get_scored_label_nlp(
                output['Description'][i:i+chunk_size]))
            i += chunk_size
        flat_temp = [item for sublist in temp for item in sublist]
        nlp_category, nlp_probability = zip(*flat_temp)
        output["NLP_Category"] = nlp_category
        output["NLP_Probability"] = nlp_probability
        output[["CV_Category", "CV_Probability"]] = pd.DataFrame(output["image_link"].apply(
 lambda x: self.get_image_tag(x, self.predictor)).tolist(), index=output.index)

 return output

 def add_category_column(self, input_dataframe):
 """Concat label columns into a category name

        Returns:
            pandas.DataFrame: initial dataframe with Category column
        """
        output = input_dataframe
        cols = ['Column', 'Column', 'Column', 'Column']
        output["Original_category"] = output[cols].apply(
 lambda row: '¤¤¤'.join(row.values.astype(str)), axis=1)

 return output

 def add_url_column(self, input_dataframe):
        output = input_dataframe

        output["image_link"] = "https://" + \
            output["filename"]

 return output

 def add_full_description(self, input_dataframe):
        output = input_dataframe
        output["Column"] = output['Column'].map(
            str) + ' ' + output['Column'].map(str)

        output["Column"] = output["Column"].apply(
 lambda text: self.clean_text(text))

 return output

 def strip_tags(self, html):
        s = MLStripper()
        s.feed(str(html))
 return s.get_data()

 def remove_tabulations(self, text):
        text = str(text)
 return(text.replace("\r", ' ').replace("\t", ' ').replace("\n", ' '))

 def clean_text(self, text):
 # Remove HTML tags
        text = self.strip_tags(text)
 # Remove tabulation
        text = self.remove_tabulations(text)
 # convert to lower case
        text = text.lower()
 # Remove special characters
        text = re.sub('\[.*?\]', ' ', text)
 # Remove punctuation
        text = re.sub('[%s]' % re.escape(string.punctuation), ' ', text)
 # normalize whitespace
        text = ' '.join(text.split())
 return text

 def clean_text_basic(self, text):
 # remove whitespace before and after
        text = text.strip()
 # normalize whitespace
        text = ' '.join(text.split())
 return text



2) Import your helpers to the init file


init.py

import logging
import azure.functions as func

from ..helpers.BlobHelper import BlobHelper
from ..helpers.MLHelper import MLHelper
from ..helpers.DBHelper import DBHelper


def main(req: func.HttpRequest) -> func.HttpResponse:

    blobname = req.params.get('blobname')

 if blobname:
        blobHelper = BlobHelper()
        dbConnector = DBHelper()
        dbConnector.insert_result(blobHelper.get_processed_blob(blobname))

 return func.HttpResponse(f"Success")
 else:
 return func.HttpResponse("Blob name required", status_code=200)


And that's it! It only takes 3 steps to set our solution up and running.



Conclusions

The whole project structure looks like this:



ROOKIEFUNCTION

¦ .funcignore

¦ .gitignore

¦ host.json

¦ local.settings.json

¦ proxies.json

¦ requirements.txt

¦

+---.venv

¦ ¦ pip-selfcheck.json

¦ ¦ pyvenv.cfg

¦ ¦

¦ +---Include

¦ +---Lib

¦

¦ ¦

¦ +---__pycache__

¦ runxlrd.cpython-36.pyc

¦

+---.vs

¦ ¦ ProjectSettings.json

¦ ¦ slnx.sqlite

¦ ¦ VSWorkspaceState.json

¦ ¦

¦ +---RookieFunction

¦ +---v16

+---.vscode

¦ extensions.json

¦ launch.json

¦ settings.json

¦ tasks.json

¦

+---RookieMapping

¦ ¦ function.json

¦ ¦ sample.dat

¦ ¦ __init__.py

¦ ¦

¦ +---__pycache__

¦ __init__.cpython-36.pyc

¦

+---helpers

¦ ¦ BlobHelper.py

¦ ¦ DBHelper.py

¦ ¦ MLHelper.py

¦ ¦

¦ +---__pycache__

¦ BlobHelper.cpython-36.pyc

¦ DBHelper.cpython-36.pyc

¦ MLHelper.cpython-36.pyc

¦

+---tests

¦ test_nlp_service.py

¦ __init__.py

¦

+---__pycache__

test_nlp_service.cpython-36.pyc




What could have been done better:

  1. Instead of storing the secrets as the constants I should've put them in the Configurations files (still learning how to do it correctly)

  2. Create a Parent Helper class, to make all the other helpers inherit from it

  3. As text preprocessing may contain a complex logic, I should've created a separate TextProcessing class that would store all the preparation steps.

  4. I still think that the whole solution could have been implemented in a smarter manner. So I will keep learning and will keep you updated.



Hopefully, someone will find it useful!


 
  • Twitter
  • LinkedIn

Since 2018 by ©alirookie