ROS 2 Python Custom OmniGraph Node

Warning

The content of this tutorial is currently broken due to the removal of the Node Description Editor dependency in Omniverse Kit. An alternative solution will be proposed in future releases.

Learning Objectives

This is an optional, advanced tutorial where you will learn how to

  • Use ROS 2 rclpy Python interface with Isaac Sim

  • Create a simple custom OmniGraph Python node which can subscribe to multiple topics and perform computations on the subscribed data and generate an output.

Getting Started

Important

Make sure to source your ROS 2 installation from the terminal before running Isaac Sim. If sourcing ROS 2 is a part of your bashrc then Isaac Sim can be run directly.

Prerequisite

Creating the ROS 2 Custom OmniGraph Python Node Template

  1. Go to Window > Visual Scripting > Node Description Editor to open the OmniGraph node editor window.

  2. This tutorial is assuming the Extension Name is omni.new.extension. Fill in the node properties as the following:

    Node Properties
  3. Create and fill in the input attributes as the following. Please note the default String inputs require quotes.

    Input Attributes
  4. Create and fill in the output attributes as the following. Please note the default String inputs require quotes.

    Output Attributes
  5. Follow all of the Custom Node Tutorial steps to generate and edit the node template.

  6. This extension will be dependant on the ROS 2 Bridge. In order to add this dependency, open a file browser to the root of the extension folder. Then open the file at config/extension.toml. Look for [dependencies] section and add the following line under it. Save and close.

    "omni.isaac.ros2_bridge" = {}
    
  7. Back in the OmniGraph node editor window, click Edit Node and wait for a text editor to open. Proceed to the next section.

Populating the Custom ROS2 OmniGraph Node

  1. First add the relevant import statements to the node template.

    import omni
    
    # OgnROS2CustomPythonNodeDatabase module is an autogenerated python module located in the extension and is used later on.
    from omni.new.extension.ogn.OgnROS2CustomPythonNodeDatabase import OgnROS2CustomPythonNodeDatabase
    
    import rclpy
    from std_msgs.msg import Int32
    
    # BaseResetNode class is used for resetting the node when stopping and playing
    from omni.isaac.core_nodes import BaseResetNode
    
  2. Implement a class to store the internal state of the node as shown below. This class inherits from BaseResetNode which automatically registers the custom_reset function to a stage event such that any time simulation is stopped, custom_reset will be called.

    OgnROS2CustomPythonNodeInternalState will be accessed within the compute function and release function later on.

    class OgnROS2CustomPythonNodeInternalState(BaseResetNode):
    
    def __init__(self):
    
        self.node = None
        self.numA = None
        self.numB = None
        self.subscriptionA = None
        self.subscriptionB = None
    
        super().__init__(initialize=False)
    
    
    def listener_callbackA(self, msg):
        self.numA = msg.data
    
    def listener_callbackB(self, msg):
        self.numB = msg.data
    
    def initialize_ros2_node(self, node_name):
        try:
            rclpy.init()
        except:
            pass
        self.node = rclpy.create_node(node_name)
        self.initialized = True
    
    def create_subscriberA(self, topicName):
        self.topicNameA = topicName
        self.subscriptionA = self.node.create_subscription(
            Int32,
            self.topicNameA,
            self.listener_callbackA,
            10)
    
    def create_subscriberB(self, topicName):
        self.topicNameB = topicName
        self.subscriptionB = self.node.create_subscription(
            Int32,
            self.topicNameB,
            self.listener_callbackB,
            10)
    
    # Overriding a function from BaseResetNode.
    # This is automatically called when simulation is stopped.
    # This is will also be called when the OmniGraph node is released.
    def custom_reset(self):
        if self.node:
            self.node.destroy_subscription(self.subscriptionA)
            self.node.destroy_subscription(self.subscriptionB)
            self.node.destroy_node()
    
        self.numA = None
        self.numB = None
        self.subscriptionA = None
        self.subscriptionB = None
        self.node = None
    
        self.initialized = False
    
        rclpy.try_shutdown()
    
  3. Within the OgnROS2CustomPythonNode class add in a static method for internal_state as shown below:

    @staticmethod
    def internal_state():
        return OgnROS2CustomPythonNodeInternalState()
    
  4. Next setup the compute function as follows:

    @staticmethod
    def compute(db) -> bool:
        """Compute the outputs from the current input"""
    
        state = db.per_instance_state
        try:
            if not state.initialized:
    
                state.initialize_ros2_node('custom_python_adder')
    
                if (state.subscriptionA == None):
                    state.create_subscriberA(db.inputs.topicInputA)
    
                if (state.subscriptionB == None):
                    state.create_subscriberB(db.inputs.topicInputB)
    
            rclpy.spin_once(state.node, timeout_sec=0.01)
    
            if state.numA != None and state.numB != None:
                db.outputs.sum = state.numA + state.numB
                db.outputs.execOut = omni.graph.core.ExecutionAttributeState.ENABLED
    
                # Set the numA and numB to None to ensure we only calculate and trigger execOut when new messages are received.
                state.numA = None
                state.numB = None
    
        except Exception as error:
            # If anything causes your compute to fail report the error and return False
            db.log_error(str(error))
            return False
    
        # Even if inputs were edge cases like empty arrays, correct outputs mean success
        return True
    
  5. Finally setup the release function as shown below. This function is called when the OmniGraph node is deleted or the stage is cleared.

    @staticmethod
    def release(node):
        try:
    
            state = OgnROS2CustomPythonNodeDatabase.per_instance_internal_state(node)
    
        except Exception:
            state = None
    
        if state is not None:
            state.custom_reset()
    
Click to expand and see the full Ogn and Python files…
{
    "ROS2CustomPythonNode": {
        "version": 1,
        "description": "A custom ros2 python node that adds number from 2 separate topics and outputs the sum as a OmniGraph Int",
        "language": "Python",
        "metadata": {
            "uiName": "ROS2 Custom Python Node"
        },
        "inputs": {
            "execIn": {
                "type": "execution",
                "description": "Execution Input",
                "default": 0,
                "metadata": {
                    "uiName": "Exec In"
                }
            },
            "topicInputA": {
                "type": "string",
                "description": "Topic Input A",
                "metadata": {
                    "uiName": "Topic Input A"
                },
                "default": "/numberA"
            },
            "topicInputB": {
                "type": "string",
                "description": "Topic Input B",
                "metadata": {
                    "uiName": "Topic Input B"
                },
                "default": "/numberB"
            }
        },
        "outputs": {
            "sum": {
                "type": "int",
                "description": "sum",
                "metadata": {
                    "uiName": "Sum"
                }
            },
            "execOut": {
                "type": "execution",
                "description": "Execution Output",
                "metadata": {
                    "uiName": "Exec Out"
                },
                "default": 0
            }
        }
    }
}
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#

"""
This is the implementation of the OGN node defined in OgnROS2CustomPythonNode.ogn
"""

# Array or tuple values are accessed as numpy arrays so you probably need this import
import numpy


import omni

# OgnROS2CustomPythonNodeDatabase module is an autogenerated python module located in the extension and is used later on.
from omni.new.extension.ogn.OgnROS2CustomPythonNodeDatabase import OgnROS2CustomPythonNodeDatabase

import rclpy
from std_msgs.msg import Int32

# BaseResetNode class is used for resetting the node when stopping and playing
from omni.isaac.core_nodes import BaseResetNode

class OgnROS2CustomPythonNodeInternalState(BaseResetNode):

    def __init__(self):

        self.node = None
        self.numA = None
        self.numB = None
        self.subscriptionA = None
        self.subscriptionB = None

        super().__init__(initialize=False)


    def listener_callbackA(self, msg):
        self.numA = msg.data

    def listener_callbackB(self, msg):
        self.numB = msg.data

    def initialize_ros2_node(self, node_name):
        try:
            rclpy.init()
        except:
            pass
        self.node = rclpy.create_node(node_name)
        self.initialized = True

    def create_subscriberA(self, topicName):
        self.topicNameA = topicName
        self.subscriptionA = self.node.create_subscription(
            Int32,
            self.topicNameA,
            self.listener_callbackA,
            10)

    def create_subscriberB(self, topicName):
        self.topicNameB = topicName
        self.subscriptionB = self.node.create_subscription(
            Int32,
            self.topicNameB,
            self.listener_callbackB,
            10)

    # Overriding a function from BaseResetNode.
    # This is automatically called when simulation is stopped.
    # This is will also be called when the OmniGraph node is released.
    def custom_reset(self):
        if self.node:
            self.node.destroy_subscription(self.subscriptionA)
            self.node.destroy_subscription(self.subscriptionB)
            self.node.destroy_node()

        self.numA = None
        self.numB = None
        self.subscriptionA = None
        self.subscriptionB = None
        self.node = None

        self.initialized = False

        rclpy.try_shutdown()

class OgnROS2CustomPythonNode:
    """
        A custom ros2 python node that adds number from 2 separate topics and outputs the
    sum as a OmniGraph Int
    """

    @staticmethod
    def internal_state():
        return OgnROS2CustomPythonNodeInternalState()

    @staticmethod
    def compute(db) -> bool:
        """Compute the outputs from the current input"""

        state = db.per_instance_state
        try:
            if not state.initialized:

                state.initialize_ros2_node('custom_python_adder')

                if (state.subscriptionA == None):
                    state.create_subscriberA(db.inputs.topicInputA)

                if (state.subscriptionB == None):
                    state.create_subscriberB(db.inputs.topicInputB)

            rclpy.spin_once(state.node, timeout_sec=0.01)

            if state.numA != None and state.numB != None:
                db.outputs.sum = state.numA + state.numB
                db.outputs.execOut = omni.graph.core.ExecutionAttributeState.ENABLED

                # Set the numA and numB to None to ensure we only calculate and trigger execOut when new messages are received.
                state.numA = None
                state.numB = None

        except Exception as error:
            # If anything causes your compute to fail report the error and return False
            db.log_error(str(error))
            return False

        # Even if inputs were edge cases like empty arrays, correct outputs mean success
        return True

    @staticmethod
    def release(node):
        try:

            state = OgnROS2CustomPythonNodeDatabase.per_instance_internal_state(node)

        except Exception:
            state = None

        if state is not None:
            state.custom_reset()

Running the Custom OmniGraph Node

  1. Open new stage and go to Create -> Visual Scripting -> Action Graph to create an Action graph.

  2. Add the following OmniGraph nodes into the Action graph and connect them as shown.

  • On Playback Tick node to execute other graph nodes every simulation frame.

  • ROS2 Custom Python Node that we just created above.

  • To String node to convert the output of our custom node to a string.

  • Print Text node to display the output of our custom node to screen or terminal.

    ROS2 Custom Node in a graph
  1. Select the Print Text node and set the toScreen attribute to True.

  2. In a new ROS2-sourced terminal, run the following command to publish a number to the /numberA topic.

    ros2 topic pub /numberA std_msgs/msg/Int32 "data: 42"
    
  3. In another ROS2-sourced terminal, run the following command to publish a number to the /numberB topic.

    ros2 topic pub /numberB std_msgs/msg/Int32 "data: 240"
    
  4. In Isaac Sim, click on Play to start simulation. Once messages are being received from both topics, the sum of the two numbers will appear in the top left corner of the viewport.

    Results display

    Whenever any of the topics are disconnected, the outputs won’t be triggered and the sum display will fade away.

    Note

    To view the values in the Isaac Sim console, you can select the Print Text node, set the toScreen attribute to False and set the logLevel attribute to Warning.

  5. Change any of the publish commands from earlier with different numbers and notice the sum change.

Summary

This tutorial covered the following topics:

  1. Creating a custom OmniGraph Python node and extension

  2. Using rclpy interface to create a ROS 2 node within a custom OmniGraph node capable of subscribing to multiple topics, performing computations and trigger downstream nodes when the computed OmniGraph output is ready.

Next Steps

Continue on to the next tutorial in our ROS2 Tutorials series, ROS 2 Navigation with Block World Generator.