7.6. Scene Segmentation Model¶
import inspect
from viper_toolkit import Dissect
from model_server import ViperModel, NeuralNetworkLoader
from scene_segmentation_module import SceneSegmentationModel
7.6.1. Scene Segmentation Class¶
The Scene Segmentation Model is a class of NeuralNetworkLoader models which performs scene segmentation on the image classifying the pixels in four classes:
Roadway
curb
Backgroun
Marker
Using the Neural Network and the model weights found in the modules “Model” folder, the module first loads a ViperModel object:
source = inspect.getsource(ViperModel)
print (source)
class ViperModel(object):
# A ViperModel contains the location of the model architecture
# and the model weights which are stored in the node package.
def __init__(self, package_name, model_xml, weights_bin):
self.pkg = package_name
self.model = model_xml
self.weights = weights_bin
self.setup_model()
def setup_model(self):
self.dir = roslib.packages.get_pkg_dir(self.pkg)
self.location = os.path.join(
self.dir,
self.model
)
self.weights = os.path.join(
self.dir,
self.weights
)
7.6.1.1. Instantiating the class NeuralNetworkLoader¶
When the VPU is initialized, the Model Server will provide the location of these parameters to the inference engine, as well as create a class object for this module called which I call a NeuralNetworkLoader. This object allows the model to be initialized at the time of the VPU instantiation, which is asyncronous to the instantiation of this module:
(Note: While I am providing the class definition here, the NeuralNetworkLoader Class is shared amongst all Model Nodes; I will break this down how this works in more detail on #TBA page)
source = inspect.getsource(NeuralNetworkLoader)
print (source)
class NeuralNetworkLoader(object):
"""
A NeuralNetworkLoader is a class object which loads a pretrained
model (architecture and weights) onto a initialized OpenVino
inference engine.
Keyword Arguments:
ie -- an Inference Engine instance set up in the parent node which
we will load this model to.
ViperModel -- a instance of class ViperModel which contains the
models weights and the structure of the neural network.
device -- the inference device to be used to predict on (i.e.,
"MYRIAD", CPU, GPU, etc.)
model_tag -- a three letter abbreviation used by the VIPER Logger
module which identifies log messages as originating from within this
modules code.
model_name -- a logger attribute which identifies this model.
"""
def __init__(self,
ie: IECore,
viper_model: ViperModel,
device: str,
model_tag: str = "M..",
model_name: str = "Model",
*args,
**kwargs):
# Creates our helper tools from our Viper Toolkkit such as
# the parameter manager, our log manager, and our timer.
self.setup_parameters(
model_name = model_name,
model_tag = model_tag)
# Prepare this model for loading
self.setup_inference_engine(
ie = ie,
viper_model = viper_model,
device = device)
# Load the read network onto the initialized device.
self.load_inference_engine(device=device, ie = ie)
# Retrieve the architecture of the model to load, including
# inputs and outputs and stores these on the parameter server
self.get_network_info()
# Retrieves the image shapes for the input and output from the
# now loaded model and stores these on the parameter server
self.get_model_info()
def setup_parameters(self, model_name: str, model_tag: str):
# Instantiate our logger tool naming these processes and
# setting the tag. The ("XX.") convention indicates this is a
# model and log messages are coming from within the
# model processing script and not the main node.
self.logger = Logger(
name = model_name,
tag = model_tag)
# Instantiate our timer tool which will output the times of
# the processes within the model, and indicate that the
# process originated from within the model, and not the module.
self.timer = ProcessTimer(logger=self.logger)
# Creates a parameter manager
self.NeuralNetworkParams = ParameterManager(logger=self.logger)
def setup_inference_engine(self, ie: IECore, viper_model: ViperModel, device: str):
# Link the internal inference engine with the initialized engine
# and read the network architecture.
self._ie = ie
# Load the Viper Model class object, which contains the address
# for the neural network architecture and well as the weights
# of the trained model.
self._net = ie.read_network(
model=viper_model.location,
weights=viper_model.weights
)
def load_inference_engine(self, device, ie):
# Load the network architecture and weights into the initialized
# inference engine. We must indicate the device name which
# is passed through the main node.
self._exec_net = ie.load_network(
network = self._net,
device_name = device
)
def get_network_info(self):
# Set the input and output blobs
self._input_blob = next(iter(self._exec_net.input_info))
self._output_blob = next(iter(self._exec_net.outputs))
# Get the input shape
#self._input_shape = self._net.inputs[self._input_blob].shape
#self.logger.i(f'Input shape: {self._input_shape}')
# Save these parameters to the parameter server
#self.NeuralNetworkParams.add(
# Parameter(
# name = "Input_shape",
# value = self._input_shape,
# dynamic = False))
# Get the output shape
self._output_shape = self._net.outputs[self._output_blob].shape
self.logger.i(f'Output shape: {self._output_shape}')
# Save these parameters to the parameter server
self.NeuralNetworkParams.add(
Parameter(
name="Output_shape",
value=self._output_shape,
dynamic=False))
def get_model_info(self):
# Accesses the shape of the input layer and the output layer
self._input_key = list(self._exec_net.input_info)[0]
self._output_keys = list(self._exec_net.outputs.keys())
self._tensors = self._exec_net.input_info[self._input_key].tensor_desc
# Saves the shapes to variables representing
self.n, self.c, self.h, self.w = self._tensors.dims
self.logger.i(f'Tensor shape (NCHW): ({self.n}, {self.c}, {self.h}, {self.w})')
self.NeuralNetworkParams.add(
Parameter(
name="Input_height",
value=self.h,
dynamic=False))
self.NeuralNetworkParams.add(
Parameter(
name="Input_width",
value=self.w,
dynamic=False))
The effect of this is that the inference engine object is initiated once, and all models are loaded to the inference engine object at initialization. This module then instantiates a SeceneSegmentationModel, which is a child class of the NeuralNetworkLoader already instantiated on behalf of this module. We allow this module to inherit all methods and properties of the parent class.
In effect, the class object that was initiated prior to this modules initialization can then be “passed on” to this module as if it were this module which had instantiated this class.
7.6.1.2. Scene Segmentation Model¶
The SceneSegmentationModel is a class object of type NeuralNetworkLoader, and contains the methods particular to the performance of Scene Segmentation. It contains two class methods:
export_parameters() which exports the shape of the model specificly needed by this model
run_scene_segmentation() which is the method by which the image is transformed and passed to the inference engine for prediction.
After sucessfully invoking this method an image mask is returned with each pixel being classified as one of the four classes.
source = inspect.getsource(SceneSegmentationModel)
print (source)
class SceneSegmentationModel(NeuralNetworkLoader):
"""
A SceneSegmentationModel is a class object which uses a Convolutional
Neural Network (CNN) to classify every pixel within an image as a
member of a certain class (segments the image). In this pytorch
pretrained model we are predicting on 4 classes:
(a) Roadway, (b) Curb, (c) Background, and (d) marker.
The model specifications can be found at:
https://docs.openvino.ai/2018_R5/_docs_Transportation_segmentation_curbs_release1_caffe_desc_road_segmentation_adas_0001.html
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def export_parameters(export_state=True, **kwargs):
# Exports parameters sent to this function for debugging
# purposes, and then turns off function after export_state=False
# is received.
self.export_state = False
for arg in kwargs:
self.parameters.add(
Parameter(
name = arg,
value = kwargs[arg],
dynamic = False))
self.export_state = export_state
def run_scene_segmentation(self, frame):
# The default ROS image is BGR color, however the model is
# expecting RGB, so we will need to convert this first.
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# We will now attempt to resize the image to fit the model input
#try:
input_img = cv2.resize(rgb_image, (self.w, self.h))
rospy.loginfo("resized")
#self.logger.d(f"Resized Shape: {input_img.shape}")
#except:
#self.logger.e(f"Cannot resize image to shape ({self.w}, {self.h})")
#return
# if self.export_state: self.export_parameters(resized_shape = input_img.shape)
# We need to wrangle the image from into the NCHW format.
#try:
transposed_img = np.expand_dims(
a = input_img.transpose(2, 0, 1),
axis = 0)
rospy.loginfo("transposed")
#self.logger.d(f"Transposed Shape: {transposed_img.shape}")
#except:
# self.logger.e("Error converting to NCHW format")
# return
#if self.export_state: self.export_parameters(transposed_shape=transposed_img.shape)
# We will now perform inference on the input object using the
# inference engine we loaded to the Visual Processing Unit
results = self._exec_net.infer(
inputs = {self._input_key: transposed_img}
)
rospy.loginfo("infered")
# Extract the inference blob from the results array
result_ir = results[self._output_blob]
# We then compute the maximum value along axis 1 indicating
# the max likelyhood class for which that pixel belongs, and
# return this classification map of the original image.
mask = np.argmax(
a = result_ir,
axis=1)
# We export the successful shape and then turn off exporting.
#if self.export_state:self.export_parameters(mask_shape=mask.shape, export_state=False)
#self.logger.i(f"Returning shape: {mask.shape}")
return mask