7.5. Scene Segmentation Module Process

import inspect
from viper_toolkit import Dissect
from scripts.scene_segmentation_module_node import SceneSegmentationModule

The Scene Segmentation Module is a ROS Package Module which is comprised of:

  • the SceneSegmentationModule Class Object

  • the SceneSegmentationModel processing library

  • the RoadSegmentation ADAS Model and pretrained weights.

7.5.1. Class Definition

7.5.1.1. Initializing the SceneSegmentationModule Class

The class loads a NameManager, the InferenceResults() message type, and the setup functions upon initalization

Dissect("__init__", SceneSegmentationModule)
class SceneSegmentationModule(object):

    def __init__(self):
        self.name = NameManager(abv="SEG")
        self.inference = InferenceResults()
        self.setup_ros()
        #self.setup_model()
        self.loop()

7.5.1.2. Setup Paramters

We initalize several parameters with the parameter server, including the rate at which we would like the node to run, as well as if we would like to be able to dynamically update this node. This will allow us to change its rate once it is already running to optimize its performance with the other nodes.

Dissect("setup_parameters", SceneSegmentationModule)
class SceneSegmentationModule(object):

    def setup_parameters(self):
        self.parameters = ParameterManager()
        self.parameters.add(Parameter(
            name="rate", 
            target=f"{self.name.name}/rate", 
            default=50, 
            dynamic=True
            ))
        self.parameters.add(Parameter(
            name="updates", 
            target=f"{self.name.name}/dynamic", 
            default=True, 
            dynamic=True
            ))

7.5.1.2.1. Setting up ROS and the Model Server

We setup the module and register it with the ROS network. We also setup the model_server/modeling_service ServiceProxy which we will use to request new Models. The request message ‘ModelRequest’ takes the name of a model (SEG is the name of the Segmentation model) and returns an Inference.

Dissect("setup_ros", SceneSegmentationModule)
Dissect("setup_model_server", SceneSegmentationModule)
class SceneSegmentationModule(object):

    def setup_ros(self):
        self.setup_parameters()
        rospy.init_node('scene_segmentation', log_level=rospy.DEBUG)
        self.setup_model_server()
        self.image_server = ImageServerClient(self.name.abv)

class SceneSegmentationModule(object):

    def setup_model_server(self):
        
        self.model_request = rospy.ServiceProxy(
            'model_server/modeling_service',
            ModelRequest,
            )

        self.pub = rospy.Publisher(
            'model_output/scene_segmentation/segmentation_mask',
            InferenceResults,
            queue_size=1
            )

7.5.1.3. Action Loop

When the SceneSegmentationModule is ready to perform inference it contacts the model server and provides its modeling code “SEG”. The model server then computes the inference, and returns an inference message.

Inference Messages contain two variables:

  • int32[] structure

  • float32[] inferences

The Vision Processing Unit computes the segmentation of the image, and produces an array of variable length indicating the class of each pixel in the image. All ROS messages must be compatable between Python and C++, and Numpy ndarrays are not supported. In order to serialize these variable length inference results, I decided to flatten the numpy darrays into one dimensional vectors.

The structure variable is a list of 32 bit unsigned integers which indicate the original shape of the arrays produced by the Vision Processing Unit.The inference variable is a list of 32 bit floats.

The node can reshape the inference vector into the original arrays by using the structure vector as the original shape parameters.

Dissect("action_loop", SceneSegmentationModule)
class SceneSegmentationModule(object):

    def action_loop(self):
        self.timer.lap("Wait for Service")
        rospy.wait_for_service(
            'model_server/modeling_service'
            )
        self.timer.time("Wait for Service")
        try:
            self.timer.lap("Modeling Server")
            rospy.logdebug(f"[{self.name.abv}] Service Requested.")
            req = String()
            req.data = "SEG" #self.name.abv
            response = self.model_request(req)
            self.timer.time("Modeling Server", name = "MOD")
            self.inference = response.results
            rospy.logdebug(
                f"[{self.name.abv}] Mask shape received: {self.inference.structure}"
                )
            return self.inference

        except rospy.ServiceException as e:
            rospy.logerr(f"[{self.name.abv}] Service call failed: {e}")

7.5.1.4. The Main Loop

The complete process is as follows:

  1. the Node checks for any updates to its parameters and begins logging its time.

  2. It then checks to see if there are any new images waiting for it at the model server.

  3. If there are new images, it makes a model request.Prior to making the request however it takes down its own “waiting for image” since it cannot simultaniously make a request and use these images.

  4. Following sucessfully receiving images it broadcasts the results to its subscribers. It does not update its request durring this time since while it is performing this action it cannot use any new images.

  5. After publishing its inference results it updates its flag to reflect that it is ready for a new image which will be delivered by the time the loop repetes itself and it requests a new inference from the model server.

Dissect("loop", SceneSegmentationModule)
class SceneSegmentationModule(object):

    def loop(self):
        rate = rospy.Rate(self.parameters.rate)
        while not rospy.is_shutdown():
            if self.parameters.updates == True:
                self.parameters.update()
                
            # Setup timer. Note that this resets on every iteration. 
            # Future development we may want to move this so that we can 
            # compute averages.
            
            self.timer = ProcessTimer(abv = self.name.abv)
            
            # Check to see if the server has released a new image and 
            # if so, start new modeling request
            
            status = self.image_server.status("server")
            rospy.logdebug(
                f"[{self.name.abv}] Remote Image Server Status is: {status}"
                )
            if status == True: 
                # Takes down image request flag while performing 
                # inference since inference time is not negligable
                
                self.image_server.update(False)
                
                my_status = self.image_server.status("me")
                rospy.logdebug(
                    f"[{self.name.abv}] Waiting Status: {my_status}"
                    )
                # Create the image mask
                try:
                    mask = self.action_loop()
                    self.pub.publish(mask)
                    rospy.logdebug(f"[{self.name.abv}] Mask Published")
                except:
                    pass
                print (mask)
            # Update the image server to let it know we need a new image
            
            self.image_server.update(True)

            rospy.logdebug(
                "[SEG] Waiting Status: %s", 
                self.image_server.status("me")
                )
            self.timer.time("total_runtime")
            rate.sleep()

7.5.2. The Whole Thing in Action

lines = inspect.getsource(SceneSegmentationModule)
print(lines)
class SceneSegmentationModule(object):
    def __init__(self):
        self.name = NameManager(abv="SEG")
        self.inference = InferenceResults()
        self.setup_ros()
        #self.setup_model()
        self.loop()
    
    def setup_parameters(self):
        self.parameters = ParameterManager()
        self.parameters.add(Parameter(
            name="rate", 
            target=f"{self.name.name}/rate", 
            default=50, 
            dynamic=True
            ))
        self.parameters.add(Parameter(
            name="updates", 
            target=f"{self.name.name}/dynamic", 
            default=True, 
            dynamic=True
            ))

    def setup_ros(self):
        self.setup_parameters()
        rospy.init_node('scene_segmentation', log_level=rospy.DEBUG)
        self.setup_model_server()
        self.image_server = ImageServerClient(self.name.abv)

    def setup_model_server(self):
        
        self.model_request = rospy.ServiceProxy(
            'model_server/modeling_service',
            ModelRequest,
            )

        self.pub = rospy.Publisher(
            'model_output/scene_segmentation/segmentation_mask',
            InferenceResults,
            queue_size=1
            )

    def image_server_backup(self):
        self.waiting = Bool
        self._image_server_Status = Bool
        #Sends out if it is waiting for images
        self.wait_status.pub = rospy.Publisher(
            'image_request/SEG',
            Bool,
            queue_size=1
            )
        rospy.Subscriber('image_server/status',
            Bool, 
            self.image_server.callback, 
            queue_size=1
            )
        
        def callback(self, msg):
            self._image_server_Status = msg

    def action_loop(self):
        self.timer.lap("Wait for Service")
        rospy.wait_for_service(
            'model_server/modeling_service'
            )
        self.timer.time("Wait for Service")
        try:
            self.timer.lap("Modeling Server")
            rospy.logdebug(f"[{self.name.abv}] Service Requested.")
            req = String()
            req.data = "SEG" #self.name.abv
            response = self.model_request(req)
            self.timer.time("Modeling Server", name = "MOD")
            self.inference = response.results
            rospy.logdebug(
                f"[{self.name.abv}] Mask shape received: {self.inference.structure}"
                )
            return self.inference

        except rospy.ServiceException as e:
            rospy.logerr(f"[{self.name.abv}] Service call failed: {e}")
            

    def loop(self):
        rate = rospy.Rate(self.parameters.rate)
        while not rospy.is_shutdown():
            if self.parameters.updates == True:
                self.parameters.update()
                
            # Setup timer. Note that this resets on every iteration. 
            # Future development we may want to move this so that we can 
            # compute averages.
            
            self.timer = ProcessTimer(abv = self.name.abv)
            
            # Check to see if the server has released a new image and 
            # if so, start new modeling request
            
            status = self.image_server.status("server")
            rospy.logdebug(
                f"[{self.name.abv}] Remote Image Server Status is: {status}"
                )
            if status == True: 
                # Takes down image request flag while performing 
                # inference since inference time is not negligable
                
                self.image_server.update(False)
                
                my_status = self.image_server.status("me")
                rospy.logdebug(
                    f"[{self.name.abv}] Waiting Status: {my_status}"
                    )
                # Create the image mask
                try:
                    mask = self.action_loop()
                    self.pub.publish(mask)
                    rospy.logdebug(f"[{self.name.abv}] Mask Published")
                except:
                    pass
                print (mask)
            # Update the image server to let it know we need a new image
            
            self.image_server.update(True)

            rospy.logdebug(
                "[SEG] Waiting Status: %s", 
                self.image_server.status("me")
                )
            self.timer.time("total_runtime")
            rate.sleep()