top of page
Writer's pictureAlibek Jakupov

Azure Custom Vision on Video Stream: face analysis on local.

Updated: Nov 19, 2021


In this short article we are going to discuss how to analyze face on video stream using Azure Custom Vision, OpenCV and Tensorflow. Feel free to use the code snippets in your applications, I will be completely satisfied if someone finds it useful. Up we go!



When may you need this?


As we've seen in our previous articles, Azure Custom Vision allows creating powerful CV models using transfer learning. The only thing you need to do is to provide a batch of images with the corresponding tags, so nothing complicated. However, when you need to do a precise classification, there may be some tricky manipulations to perform. For instance, you may want to analyze if a person wears glasses. You may also want to know whether the glasses are correctly put. And the most important thing, the analysis should be done in real time. Thus, if the image contains multiple persons, how can you just analyze the persons face and not the background. One solution would be to extract background beforehand, but it is a, kind of, challenging issue, so the easiest way is to do crop, but in intelligent way.



Why on local?


When you do the analysis in real time you need to analyze each frame, each milisecond. Consequently, when you call API, at each iteration you will need to open the URL connection, prepare image, send the HTTP request, get HTTP response, and finally, parse the JSON output. It's not too long in principle, but it is definetely much longer than a milisecond, so you can hardly call it "real-time". We will see how to do it on local, just within a couple of miliseconds.



Enough for text, let's dive into the code


First of all we need to train the model. At this stage there should be no problem. Simply upload your images, tag them and launch the training. Important: before starting your project be sure to make it ‘exportable’, i.e. select compact option.


This step should not be too complicated. After the training ends (usually it’s a matter of few seconds) go to Performance tab and click on Export button. In the dialog menu choose Tensor Flow (Android) and download it. The key word for us is Tensor Flow and not Android, as we are going to use it in our Python application.


This will download 3 files on your computer: model.pb which is the trained model itself, manifest and labels.txt that is the list of your classes.


We are now ready to dive into the implementation. But before, let's do some preparation.


Image processing functions


# coding utf-8
import cv2
import os
import tensorflow as tf
import numpy as np

# BGR colors
BLACK = (000)
WHITE = (255255255)
BLUE = (25500)
GREEN = (02550)
RED = (00255)
# Output text parameters
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 1
LINE_TYPE = 1

WINDOW_NAME = 'Glass Classifier'

def resize_down_to_1600_max_dim(image):
 """Change oversized image dimensions using Linear Interpolation

    Arguments:
        image {OpenCV} -- OpenCV image

    Returns:
        OpenCV -- resized or initial image
    """
    h, w = image.shape[:2]
 if (h < 1600 and w < 1600):
 return image

    new_size = (1600 * w // h, 1600if (h > w) else (16001600 * h // w)
 return cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)


def crop_center(img, cropx, cropy):
 """Extract a middle part of an image

    Arguments:
        img {OpenCv} -- OpenCV image to be cropped
        cropx {[type]} -- width of the cropped region
        cropy {[type]} -- height of the cropped region

    Returns:
        [OpenCV] -- cropped image
    """
    h, w = img.shape[:2]
    startx = w//2-(cropx//2)
    starty = h//2-(cropy//2)
 return img[starty:starty+cropy, startx:startx+cropx]


def resize_to_256_square(image):
 """Resize an image using the Linear Interpolation

    Arguments:
        image {OpenCV} -- OpenCV image

    Returns:
        OpenCV -- resized image
    """
    h, w = image.shape[:2]
 return cv2.resize(image, (256256), interpolation=cv2.INTER_LINEAR)


def save_image(image, folder):
 """Save an image with unique name

    Arguments:
        image {OpanCV} -- image object to be saved
        folder {string} -- output folder
    """

 # check whether the folder exists and create one if not
 if not os.path.exists(folder):
        os.makedirs(folder)

 # to not erase previously saved photos counter (image name) = number of photos in a folder + 1
    image_counter = len([name for name in os.listdir(folder)
 if os.path.isfile(os.path.join(folder, name))])

 # increment image counter
    image_counter += 1

 # save image to the dedicated folder (folder name = label)
    cv2.imwrite(folder + '/' + str(image_counter) + '.png', image)

As you have noticed here, I added my custom image saver, this will help me in retraining the image whenever I need to.


Load the model


# graph of operations to upload trained model
graph_def = tf.compat.v1.GraphDef()
# list of classes
labels = ['without_glass''with_glass']


# N.B. Azure Custom vision allows export trained model in the form of 2 files
# model.pb: a tensor flow graph and labels.txt: a list of classes
# import tensor flow graph, r+b mode is open the binary file in read or write mode
with tf.io.gfile.GFile(name='glass_model.pb', mode='rb'as f:
    graph_def.ParseFromString(f.read())
    tf.import_graph_def(graph_def=graph_def, name='')

Prepare video stream


# initialize video capture object to read video from external webcam
video_capture = cv2.VideoCapture(1)
# if there is no external camera then take the built-in camera
if not video_capture.read()[0]:
    video_capture = cv2.VideoCapture(0)

# Full screen mode
cv2.namedWindow(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(
    WINDOW_NAME, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# These names are part of the model and cannot be changed.
output_layer = 'loss:0'
input_node = 'Placeholder:0'
predicted_tag = 'Predicted Tag'

# counter to control the percentage of saved images
frame_counter = 0

And here's very important thing. We start the tensorflow session before the main loop. Thus, you initialize your session once, and it will save you about 1 sec, which is considerably important for real-time processing.


with tf.compat.v1.Session() as sess:
    prob_tensor = sess.graph.get_tensor_by_name(output_layer)
 while(video_capture.isOpened()):
 # read video frame by frame
        ret, frame = video_capture.read()

 try:
            frame = cv2.flip(frame, 1)
            frame_counter += 1
 # frame width and height
            w, h = 200300
 # set upper and lower boundaries
            upX = 220
            upY = 50
            lowX = upX + w
            lowY = upY + h
            image = frame[upY:lowY, upX:lowX]

 # If the image has either w or h greater than 1600 we resize it down respecting
 # aspect ratio such that the largest dimension is 1600
            image = resize_down_to_1600_max_dim(image)

 # We next get the largest center square
            h, w = image.shape[:2]
            min_dim = min(w, h)
            max_square_image = crop_center(image, min_dim, min_dim)

 # Resize that square down to 256x256
            augmented_image = resize_to_256_square(image)

 # Get the input size of the model
            input_tensor_shape = sess.graph.get_tensor_by_name(
                input_node).shape.as_list()
            network_input_size = input_tensor_shape[1]

 # Crop the center for the specified network_input_Size
            augmented_image = cv2.resize(
                image, (network_input_size, network_input_size), interpolation=cv2.INTER_LINEAR)

            predictions = sess.run(
                prob_tensor, {input_node: [augmented_image]})

 # get the highest probability label
            highest_probability_index = np.argmax(predictions)
            predicted_tag = labels[highest_probability_index]
            output_text = predicted_tag
 if predicted_tag == 'ok':
                frameColor = GREEN
 elif predicted_tag == 'ko':
                frameColor = RED
 else:
                frameColor = RED

            cv2.rectangle(frame, (upX, upY), (lowX, lowY), frameColor, 1)

 if (frame_counter % 10 == 0):
                save_image(augmented_image, predicted_tag)

 except:
 continue
        cv2.imshow(WINDOW_NAME, frame)

 if cv2.waitKey(1) & 0xFF == ord('q'):
 break

# release video capture object
video_capture.release()
cv2.destroyAllWindows()

So we crop the region we need to analyze and apply image processing functions on it


You may have noticed that I've also added a frame counter to limit the number of images saved for training, in other words, I save only 10 % of the whole flow. But you may have also noticed that we defined the face region manually, which is not the best solution, so what we'll do right now is add some intelligence using OpenCV dnn and a pretrained caffe model.



# Caffe 'deploy' prototxt file
prototxt = "models//deploy.prototxt.txt"
# Caffe pretrained model
model = "models//res10_300x300_ssd_iter_140000.caffemodel"
# minimum probability to filter weak detections
min_confidence = 0.5

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)

There is an excellent tutorial explaining how to do deep learning-based face recognition provided by Adrian Rosebrock at PyImageSearch, so please refer to his blog for any additional information.


Same steps, but using Face Detection

# grab the frame dimensions and convert it to a blob
            (h, w) = frame.shape[:2]
            blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300300)), 1.0,
                                         (300300), (104.0177.0123.0))

 # pass the blob through the network and obtain the detections and
 # predictions
            net.setInput(blob)
            detections = net.forward()
            faces_counter = 0

 # loop over the detections
 for i in range(0, detections.shape[2]):
 # extract the confidence (i.e., probability) associated with the
 # prediction
                confidence = detections[00, i, 2]

 # filter out weak detections by ensuring the `confidence` is
 # greater than the minimum confidence
 if confidence < min_confidence:
 continue

 # compute the (x, y)-coordinates of the bounding box for the
 # object
                box = detections[00, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")

                face_frame = frame[startY:endY, startX:endX]

 # get face height
                face_height = endY - startY

 # set logo size
 # logo always represents 20% of the rectangle
                LOGO_SIZE = int(face_height*0.2)
 # resize logo according to the image height
                white_mask_compact = image_resize(
                    white_mask, width=LOGO_SIZE, height=LOGO_SIZE)
                orange_mask_compact = image_resize(
                    orange_mask, width=LOGO_SIZE, height=LOGO_SIZE)
                green_mask_compact = image_resize(
                    green_mask, width=LOGO_SIZE, height=LOGO_SIZE)

 # calculate upper and lower corners of the rectangle
 # needed to place logos correctly
                rectangleXup = startX
                rectangleYup = startY
                rectangleXdown = endX
                rectangleYdown = endY

 # distance between border and logo
                PADDING = 5
 # mask
                rightX = rectangleXdown - LOGO_SIZE - PADDING
                rightY = rectangleYdown + PADDING

 # gowning
                leftX = rectangleXup + PADDING
                leftY = rectangleYdown + PADDING

                maskX = int((leftX + rightX)/2)
                maskY = rectangleYdown + PADDING

                face_frame = resize_to_256_square(face_frame)

 # Crop the center for the specified network_input_Size
                augmented_image = crop_center(
                    face_frame, network_input_size, network_input_size)

                predictions, = sess.run(
                    prob_tensor, {input_node: [augmented_image]})
                highest_probability_index = np.argmax(predictions)
                rectangle_color = WHITE

 if labels[highest_probability_index] == "with_glass":
                    rectangle_color = GREEN
                    status_logo = white_mask_compact
                    predicted_value = "ok"
 elif labels[highest_probability_index] == "without_glass":
                    rectangle_color = RED
                    status_logo = orange_mask_compact
                    predicted_value = "ko"

                cv2.rectangle(frame, (startX, startY), (endX, endY),
                              rectangle_color, 2)
                cv2.rectangle(frame, (rectangleXup - 1, rectangleYdown), (rectangleXdown +
 1, rectangleYdown + LOGO_SIZE + PADDING), rectangle_color, -1)
                frame[maskY:maskY + LOGO_SIZE, maskX:maskX +
                      LOGO_SIZE] = status_logo
                faces_counter += 1

Here's the full code.


As you see, in the final version I've used imutils library, because I wanted to keep image ratio when using the full-screen mode. I also added some icones to make it more beautiful, so you may create a folder in the root directory and put your own images.

# import the necessary packages
from imutils.video import VideoStream
import numpy as np
import argparse
import imutils
import time
import cv2
import os
import tensorflow as tf
from win32api import GetSystemMetrics
import pyodbc

print("Width =", GetSystemMetrics(0))
print("Height =", GetSystemMetrics(1))
width = GetSystemMetrics(0)
height = GetSystemMetrics(1)

# BGR color constants
WHITE = (255255255)
BLUE = (25500)
GREEN = (02550)
RED = (00255)
BLACK = (000)


def image_resize(image, width=None, height=None, inter=cv2.INTER_AREA):
 # resize without distortion
 # initialize the dimensions of the image to be resized and
 # grab the image size
    dim = None
    (h, w) = image.shape[:2]
 # if both the width and height are None, then return the
 # original image
 if width is None and height is None:
 return image
 # check to see if the width is None
 if width is None:
 # calculate the ratio of the height and construct the
 # dimensions
        r = height / float(h)
        dim = (int(w * r), height)
 # otherwise, the height is None
 else:
 # calculate the ratio of the width and construct the
 # dimensions
        r = width / float(w)
        dim = (width, int(h * r))
 # resize the image
    resized = cv2.resize(image, dim, interpolation=inter)
 # return the resized image
 return resized


def convert_to_opencv(image):
 # RGB -> BGR conversion is performed as well.
    image = image.convert('RGB')
    r, g, b = np.array(image).T
    opencv_image = np.array([b, g, r]).transpose()
 return opencv_image


def crop_center(img, cropx, cropy):
    h, w = img.shape[:2]
    startx = w//2-(cropx//2)
    starty = h//2-(cropy//2)
 return img[starty:starty+cropy, startx:startx+cropx]


def resize_down_to_1600_max_dim(image):
    h, w = image.shape[:2]
 if (h < 1600 and w < 1600):
 return image

    new_size = (1600 * w // h, 1600if (h > w) else (16001600 * h // w)
 return cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)


def resize_to_256_square(image):
    h, w = image.shape[:2]
 try:
        resized_image = cv2.resize(
            image, (256256), interpolation=cv2.INTER_LINEAR)
 except:
        resized_image = image
 return resized_image


def save_image(image, folder):
 """Save an image with unique name
    Arguments:
        image {OpanCV} -- image object to be saved
        folder {string} -- output folder
    """

 # check whether the folder exists and create one if not
 if not os.path.exists(folder):
        os.makedirs(folder)

 # to not erase previously saved photos counter (image name) = number of photos in a folder + 1
    image_counter = len([name for name in os.listdir(folder)
 if os.path.isfile(os.path.join(folder, name))])

 # increment image counter
    image_counter += 1

 # save image to the dedicated folder (folder name = label)
    cv2.imwrite(folder + '/' + str(image_counter) + '.png', image)


# Caffe 'deploy' prototxt file
prototxt = "models//deploy.prototxt.txt"
# Caffe pretrained model
model = "models//res10_300x300_ssd_iter_140000.caffemodel"
# minimum probability to filter weak detections
min_confidence = 0.5

# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)

# initialize the video stream and allow the cammera sensor to warmup
print("[INFO] starting video stream...")
vs = VideoStream(src=1).start()

if not isinstance(vs.read(), np.ndarray):
    vs = VideoStream(src=0).start()

time.sleep(1.0)

graph_def = tf.compat.v1.GraphDef()
labels = ["with_glass""without_glass"]

# These are set to the default names from exported models, update as needed.
filename = "models//mask_model.pb"

# Import the TF graph
with tf.io.gfile.GFile(filename, 'rb'as f:
    graph_def.ParseFromString(f.read())
    tf.import_graph_def(graph_def, name='')

# Get the input size of the model
with tf.compat.v1.Session() as sess:
    input_tensor_shape = sess.graph.get_tensor_by_name(
 'Placeholder:0').shape.as_list()
network_input_size = input_tensor_shape[1]


# Graphical elements
# mask
white_mask = cv2.imread('icones/picto_blanc/picto1.png')
orange_mask = cv2.imread('icones/picto_orange/picto1.png')
green_mask = cv2.imread('icones/picto_vert/picto1.png')
header_logo = cv2.imread('icones/header_logo.png')
logo_height, logo_width, _ = header_logo.shape
header_logo_resized = image_resize(
    header_logo, width=int(logo_height*2), height=int(logo_width*4))

with tf.compat.v1.Session() as sess:
 # These names are part of the model and cannot be changed.
    output_layer = 'loss:0'
    input_node = 'Placeholder:0'
    prob_tensor = sess.graph.get_tensor_by_name(output_layer)
 # loop over the frames from the video stream

 while True:
 try:
 # grab the frame from the threaded video stream and resize it
 # to have a maximum width of 400 pixels
            frame = vs.read()
            frame = imutils.resize(frame, width=width)
            frame[0:header_logo_resized.shape[0],
 0:header_logo_resized.shape[1]] = header_logo_resized

 # grab the frame dimensions and convert it to a blob
            (h, w) = frame.shape[:2]
            blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300300)), 1.0,
                                         (300300), (104.0177.0123.0))

 # pass the blob through the network and obtain the detections and
 # predictions
            net.setInput(blob)
            detections = net.forward()
            faces_counter = 0

 # loop over the detections
 for i in range(0, detections.shape[2]):
 # extract the confidence (i.e., probability) associated with the
 # prediction
                confidence = detections[00, i, 2]

 # filter out weak detections by ensuring the `confidence` is
 # greater than the minimum confidence
 if confidence < min_confidence:
 continue

 # compute the (x, y)-coordinates of the bounding box for the
 # object
                box = detections[00, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")

                face_frame = frame[startY:endY, startX:endX]

 # get face height
                face_height = endY - startY

 # set logo size
 # logo always represents 20% of the rectangle
                LOGO_SIZE = int(face_height*0.2)
 # resize logo according to the image height
                white_mask_compact = image_resize(
                    white_mask, width=LOGO_SIZE, height=LOGO_SIZE)
                orange_mask_compact = image_resize(
                    orange_mask, width=LOGO_SIZE, height=LOGO_SIZE)
                green_mask_compact = image_resize(
                    green_mask, width=LOGO_SIZE, height=LOGO_SIZE)

 # calculate upper and lower corners of the rectangle
 # needed to place logos correctly
                rectangleXup = startX
                rectangleYup = startY
                rectangleXdown = endX
                rectangleYdown = endY

 # distance between border and logo
                PADDING = 5
 # mask
                rightX = rectangleXdown - LOGO_SIZE - PADDING
                rightY = rectangleYdown + PADDING

 # gowning
                leftX = rectangleXup + PADDING
                leftY = rectangleYdown + PADDING

                maskX = int((leftX + rightX)/2)
                maskY = rectangleYdown + PADDING

                face_frame = resize_to_256_square(face_frame)

 # Crop the center for the specified network_input_Size
                augmented_image = crop_center(
                    face_frame, network_input_size, network_input_size)

                predictions, = sess.run(
                    prob_tensor, {input_node: [augmented_image]})
                highest_probability_index = np.argmax(predictions)
                rectangle_color = WHITE

 if labels[highest_probability_index] == "with_glass":
                    rectangle_color = GREEN
                    status_logo = white_mask_compact
                    predicted_value = "ok"
 elif labels[highest_probability_index] == "without_glass":
                    rectangle_color = RED
                    status_logo = orange_mask_compact
                    predicted_value = "ko"

                cv2.rectangle(frame, (startX, startY), (endX, endY),
                              rectangle_color, 2)
                cv2.rectangle(frame, (rectangleXup - 1, rectangleYdown), (rectangleXdown +
 1, rectangleYdown + LOGO_SIZE + PADDING), rectangle_color, -1)
                frame[maskY:maskY + LOGO_SIZE, maskX:maskX +
                      LOGO_SIZE] = status_logo
                faces_counter += 1
 
 # font 
            font = cv2.FONT_HERSHEY_SIMPLEX 
 
 # org 
            org = (200200
 
 # fontScale 
            fontScale = 1
 # Line thickness of 2 px 
            thickness = 2

            frame = cv2.putText(frame, str(faces_counter),org, font,  
                   fontScale, BLUE, thickness, cv2.LINE_AA)

 # show the output frame
            cv2.imshow("Expertime", frame)
            key = cv2.waitKey(1) & 0xFF

 # if the `q` key was pressed, break from the loop
 if key == ord("q"):
 break
 except ValueError:
            print("out of camera range")


Hope you found it useful. Any questions, please do not hesitate to leave a comment.


Wish you all an excellent week!

242 views0 comments

Comentarios


bottom of page