7.6. Scene Segmentation Model

import inspect
from viper_toolkit import Dissect
from model_server import ViperModel, NeuralNetworkLoader
from scene_segmentation_module import SceneSegmentationModel

7.6.1. Scene Segmentation Class

The Scene Segmentation Model is a class of NeuralNetworkLoader models which performs scene segmentation on the image classifying the pixels in four classes:

  • Roadway

  • curb

  • Backgroun

  • Marker

Using the Neural Network and the model weights found in the modules “Model” folder, the module first loads a ViperModel object:

source = inspect.getsource(ViperModel)
print (source)
class ViperModel(object):
    # A ViperModel contains the location of the model architecture
    # and the model weights which are stored in the node package.
    
    def __init__(self, package_name, model_xml, weights_bin):
        self.pkg = package_name
        self.model = model_xml
        self.weights = weights_bin
        self.setup_model()
        
    def setup_model(self):
        self.dir = roslib.packages.get_pkg_dir(self.pkg)
        self.location = os.path.join(
            self.dir, 
            self.model
            )
        self.weights = os.path.join(
            self.dir, 
            self.weights
            )

7.6.1.1. Instantiating the class NeuralNetworkLoader

When the VPU is initialized, the Model Server will provide the location of these parameters to the inference engine, as well as create a class object for this module called which I call a NeuralNetworkLoader. This object allows the model to be initialized at the time of the VPU instantiation, which is asyncronous to the instantiation of this module:

(Note: While I am providing the class definition here, the NeuralNetworkLoader Class is shared amongst all Model Nodes; I will break this down how this works in more detail on #TBA page)

source = inspect.getsource(NeuralNetworkLoader)
print (source)
class NeuralNetworkLoader(object):
    """
    
    A NeuralNetworkLoader is a class object which loads a pretrained
    model (architecture and weights) onto a initialized OpenVino
    inference engine.
    
    Keyword Arguments:
    
    ie -- an Inference Engine instance set up in the parent node which 
    we will load this model to.
    
    ViperModel -- a instance of class ViperModel which contains the 
    models weights and the structure of the neural network.
    
    device -- the inference device to be used to predict on (i.e., 
    "MYRIAD", CPU, GPU, etc.)
    
    model_tag -- a three letter abbreviation used by the VIPER Logger
    module which identifies log messages as originating from within this
    modules code.
    
    model_name -- a logger attribute which identifies this model.
    
    """
    
    def __init__(self, 
                ie: IECore, 
                viper_model: ViperModel,
                device: str,
                model_tag: str = "M..",
                model_name: str = "Model",
                *args,
                **kwargs):
        
        # Creates our helper tools from our Viper Toolkkit such as 
        # the parameter manager, our log manager, and our timer.
        self.setup_parameters(
            model_name = model_name, 
            model_tag = model_tag)
        
        # Prepare this model for loading
        self.setup_inference_engine(
            ie = ie, 
            viper_model = viper_model, 
            device = device)

        # Load the read network onto the initialized device.
        self.load_inference_engine(device=device, ie = ie)
        
        # Retrieve the architecture of the model to load, including 
        # inputs and outputs and stores these on the parameter server
        self.get_network_info()
        
        # Retrieves the image shapes for the input and output from the
        # now loaded model and stores these on the parameter server
        self.get_model_info()


    def setup_parameters(self, model_name: str, model_tag: str):
    
        # Instantiate our logger tool naming these processes and
        # setting the tag. The ("XX.") convention indicates this is a 
        # model and log messages are coming from within the 
        # model processing script and not the main node.
        self.logger = Logger(
            name = model_name, 
            tag = model_tag)
        
        # Instantiate our timer tool which will output the times of
        # the processes within the model, and indicate that the 
        # process originated from within the model, and not the module.
        self.timer = ProcessTimer(logger=self.logger)
        
        # Creates a parameter manager
        self.NeuralNetworkParams = ParameterManager(logger=self.logger)
        
    def setup_inference_engine(self, ie: IECore, viper_model: ViperModel, device: str):

        # Link the internal inference engine with the initialized engine
        # and read the network architecture.
        self._ie = ie
        
        # Load the Viper Model class object, which contains the address
        # for the neural network architecture and well as the weights
        # of the trained model.
        self._net = ie.read_network(
            model=viper_model.location,
            weights=viper_model.weights
            )

    def load_inference_engine(self, device, ie):
        
        # Load the network architecture and weights into the initialized
        # inference engine. We must indicate the device name which 
        # is passed through the main node.
        self._exec_net = ie.load_network(
            network = self._net,
            device_name = device
            )
            
    def get_network_info(self):
        
        # Set the input and output blobs
        self._input_blob = next(iter(self._exec_net.input_info))
        self._output_blob = next(iter(self._exec_net.outputs))

        # Get the input shape
        #self._input_shape = self._net.inputs[self._input_blob].shape
        #self.logger.i(f'Input shape: {self._input_shape}')
        
        # Save these parameters to the parameter server
        #self.NeuralNetworkParams.add(
        #    Parameter(
        #        name = "Input_shape",
        #        value = self._input_shape,
        #        dynamic = False))
            
        # Get the output shape
        self._output_shape = self._net.outputs[self._output_blob].shape
        self.logger.i(f'Output shape: {self._output_shape}')
        
        # Save these parameters to the parameter server
        self.NeuralNetworkParams.add(
            Parameter(
                name="Output_shape",
                value=self._output_shape,
                dynamic=False))
                
    def get_model_info(self):
        
        # Accesses the shape of the input layer and the output layer
        self._input_key = list(self._exec_net.input_info)[0]
        self._output_keys = list(self._exec_net.outputs.keys())
        self._tensors = self._exec_net.input_info[self._input_key].tensor_desc
        
        # Saves the shapes to variables representing
        self.n, self.c, self.h, self.w = self._tensors.dims
        self.logger.i(f'Tensor shape (NCHW): ({self.n}, {self.c}, {self.h}, {self.w})')
        
        self.NeuralNetworkParams.add(
            Parameter(
                name="Input_height",
                value=self.h,
                dynamic=False))

        self.NeuralNetworkParams.add(
            Parameter(
                name="Input_width",
                value=self.w,
                dynamic=False))

The effect of this is that the inference engine object is initiated once, and all models are loaded to the inference engine object at initialization. This module then instantiates a SeceneSegmentationModel, which is a child class of the NeuralNetworkLoader already instantiated on behalf of this module. We allow this module to inherit all methods and properties of the parent class.

In effect, the class object that was initiated prior to this modules initialization can then be “passed on” to this module as if it were this module which had instantiated this class.

7.6.1.2. Scene Segmentation Model

The SceneSegmentationModel is a class object of type NeuralNetworkLoader, and contains the methods particular to the performance of Scene Segmentation. It contains two class methods:

  • export_parameters() which exports the shape of the model specificly needed by this model

  • run_scene_segmentation() which is the method by which the image is transformed and passed to the inference engine for prediction.

After sucessfully invoking this method an image mask is returned with each pixel being classified as one of the four classes.

source = inspect.getsource(SceneSegmentationModel)
print (source)
class SceneSegmentationModel(NeuralNetworkLoader):
    """
    
    A SceneSegmentationModel is a class object which uses a Convolutional
    Neural Network (CNN) to classify  every pixel within an image as a 
    member of a certain class (segments the image). In this  pytorch 
    pretrained model we are predicting on 4 classes: 
        (a) Roadway, (b) Curb, (c) Background, and (d) marker. 
    
    The model specifications can be found at: 
    https://docs.openvino.ai/2018_R5/_docs_Transportation_segmentation_curbs_release1_caffe_desc_road_segmentation_adas_0001.html
    
    """
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def export_parameters(export_state=True, **kwargs):
        # Exports parameters sent to this function for debugging 
        # purposes, and then turns off function after export_state=False
        # is received.
        self.export_state = False
        
        for arg in kwargs:
            self.parameters.add(
                Parameter(
                    name = arg,
                    value = kwargs[arg],
                    dynamic = False))

        self.export_state = export_state

    def run_scene_segmentation(self, frame):

        # The default ROS image is BGR color, however the model is
        # expecting RGB, so we will need to convert this first.
        rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # We will now attempt to resize the image to fit the model input
        #try:
        input_img = cv2.resize(rgb_image, (self.w, self.h))
        rospy.loginfo("resized")
            #self.logger.d(f"Resized Shape: {input_img.shape}")
        #except:
            #self.logger.e(f"Cannot resize image to shape ({self.w}, {self.h})") 
            #return
            
       # if self.export_state: self.export_parameters(resized_shape = input_img.shape)

        # We need to wrangle the image from into the NCHW format.
        #try:    
        transposed_img = np.expand_dims(
                a = input_img.transpose(2, 0, 1),
                axis = 0)
        rospy.loginfo("transposed")
            #self.logger.d(f"Transposed Shape: {transposed_img.shape}")
        #except:
        #    self.logger.e("Error converting to NCHW format")
        #    return
        
        #if self.export_state: self.export_parameters(transposed_shape=transposed_img.shape)
        

        # We will now perform inference on the input object using the
        # inference engine we loaded to the Visual Processing Unit
        results = self._exec_net.infer(
            inputs = {self._input_key: transposed_img}
            )
        rospy.loginfo("infered")
        # Extract the inference blob from the results array
        result_ir = results[self._output_blob]
        
        # We then compute the maximum value along axis 1 indicating
        # the max likelyhood class for which that pixel belongs, and
        # return this classification map of the original image.
        mask = np.argmax(
            a = result_ir, 
            axis=1)
        
        # We export the successful shape and then turn off exporting.
        #if self.export_state:self.export_parameters(mask_shape=mask.shape, export_state=False)
            
        #self.logger.i(f"Returning shape: {mask.shape}")

        return mask