ROS 2 Python Custom OmniGraph Node
Warning
The content of this tutorial is currently broken due to the removal of the Node Description Editor dependency in Omniverse Kit. An alternative solution will be proposed in future releases.
Learning Objectives
This is an optional, advanced tutorial where you will learn how to
Use ROS 2 rclpy Python interface with Isaac Sim
Create a simple custom OmniGraph Python node which can subscribe to multiple topics and perform computations on the subscribed data and generate an output.
Getting Started
Important
Make sure to source your ROS 2 installation from the terminal before running Isaac Sim. If sourcing ROS 2 is a part of your bashrc
then Isaac Sim can be run directly.
Prerequisite
Completed ROS and ROS 2 Installation: installed ROS2, enabled the ROS2 extension, built the provided Isaac Sim ROS 2 workspace, and set up the necessary environment variables.
Completed the tutorial for writing custom Python nodes: Omnigraph: Custom Python Nodes
Creating the ROS 2 Custom OmniGraph Python Node Template
Go to Window > Visual Scripting > Node Description Editor to open the OmniGraph node editor window.
This tutorial is assuming the Extension Name is omni.new.extension. Fill in the node properties as the following:
Create and fill in the input attributes as the following. Please note the default String inputs require quotes.
Create and fill in the output attributes as the following. Please note the default String inputs require quotes.
Follow all of the Custom Node Tutorial steps to generate and edit the node template.
This extension will be dependant on the ROS 2 Bridge. In order to add this dependency, open a file browser to the root of the extension folder. Then open the file at
config/extension.toml
. Look for[dependencies]
section and add the following line under it. Save and close."omni.isaac.ros2_bridge" = {}
Back in the OmniGraph node editor window, click Edit Node and wait for a text editor to open. Proceed to the next section.
Populating the Custom ROS2 OmniGraph Node
First add the relevant import statements to the node template.
import omni # OgnROS2CustomPythonNodeDatabase module is an autogenerated python module located in the extension and is used later on. from omni.new.extension.ogn.OgnROS2CustomPythonNodeDatabase import OgnROS2CustomPythonNodeDatabase import rclpy from std_msgs.msg import Int32 # BaseResetNode class is used for resetting the node when stopping and playing from omni.isaac.core_nodes import BaseResetNode
Implement a class to store the internal state of the node as shown below. This class inherits from
BaseResetNode
which automatically registers thecustom_reset
function to a stage event such that any time simulation is stopped,custom_reset
will be called.OgnROS2CustomPythonNodeInternalState
will be accessed within the compute function and release function later on.class OgnROS2CustomPythonNodeInternalState(BaseResetNode): def __init__(self): self.node = None self.numA = None self.numB = None self.subscriptionA = None self.subscriptionB = None super().__init__(initialize=False) def listener_callbackA(self, msg): self.numA = msg.data def listener_callbackB(self, msg): self.numB = msg.data def initialize_ros2_node(self, node_name): try: rclpy.init() except: pass self.node = rclpy.create_node(node_name) self.initialized = True def create_subscriberA(self, topicName): self.topicNameA = topicName self.subscriptionA = self.node.create_subscription( Int32, self.topicNameA, self.listener_callbackA, 10) def create_subscriberB(self, topicName): self.topicNameB = topicName self.subscriptionB = self.node.create_subscription( Int32, self.topicNameB, self.listener_callbackB, 10) # Overriding a function from BaseResetNode. # This is automatically called when simulation is stopped. # This is will also be called when the OmniGraph node is released. def custom_reset(self): if self.node: self.node.destroy_subscription(self.subscriptionA) self.node.destroy_subscription(self.subscriptionB) self.node.destroy_node() self.numA = None self.numB = None self.subscriptionA = None self.subscriptionB = None self.node = None self.initialized = False rclpy.try_shutdown()
Within the
OgnROS2CustomPythonNode
class add in a static method for internal_state as shown below:@staticmethod def internal_state(): return OgnROS2CustomPythonNodeInternalState()
Next setup the compute function as follows:
@staticmethod def compute(db) -> bool: """Compute the outputs from the current input""" state = db.per_instance_state try: if not state.initialized: state.initialize_ros2_node('custom_python_adder') if (state.subscriptionA == None): state.create_subscriberA(db.inputs.topicInputA) if (state.subscriptionB == None): state.create_subscriberB(db.inputs.topicInputB) rclpy.spin_once(state.node, timeout_sec=0.01) if state.numA != None and state.numB != None: db.outputs.sum = state.numA + state.numB db.outputs.execOut = omni.graph.core.ExecutionAttributeState.ENABLED # Set the numA and numB to None to ensure we only calculate and trigger execOut when new messages are received. state.numA = None state.numB = None except Exception as error: # If anything causes your compute to fail report the error and return False db.log_error(str(error)) return False # Even if inputs were edge cases like empty arrays, correct outputs mean success return True
Finally setup the release function as shown below. This function is called when the OmniGraph node is deleted or the stage is cleared.
@staticmethod def release(node): try: state = OgnROS2CustomPythonNodeDatabase.per_instance_internal_state(node) except Exception: state = None if state is not None: state.custom_reset()
Click to expand and see the full Ogn and Python files…
{
"ROS2CustomPythonNode": {
"version": 1,
"description": "A custom ros2 python node that adds number from 2 separate topics and outputs the sum as a OmniGraph Int",
"language": "Python",
"metadata": {
"uiName": "ROS2 Custom Python Node"
},
"inputs": {
"execIn": {
"type": "execution",
"description": "Execution Input",
"default": 0,
"metadata": {
"uiName": "Exec In"
}
},
"topicInputA": {
"type": "string",
"description": "Topic Input A",
"metadata": {
"uiName": "Topic Input A"
},
"default": "/numberA"
},
"topicInputB": {
"type": "string",
"description": "Topic Input B",
"metadata": {
"uiName": "Topic Input B"
},
"default": "/numberB"
}
},
"outputs": {
"sum": {
"type": "int",
"description": "sum",
"metadata": {
"uiName": "Sum"
}
},
"execOut": {
"type": "execution",
"description": "Execution Output",
"metadata": {
"uiName": "Exec Out"
},
"default": 0
}
}
}
}
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
"""
This is the implementation of the OGN node defined in OgnROS2CustomPythonNode.ogn
"""
# Array or tuple values are accessed as numpy arrays so you probably need this import
import numpy
import omni
# OgnROS2CustomPythonNodeDatabase module is an autogenerated python module located in the extension and is used later on.
from omni.new.extension.ogn.OgnROS2CustomPythonNodeDatabase import OgnROS2CustomPythonNodeDatabase
import rclpy
from std_msgs.msg import Int32
# BaseResetNode class is used for resetting the node when stopping and playing
from omni.isaac.core_nodes import BaseResetNode
class OgnROS2CustomPythonNodeInternalState(BaseResetNode):
def __init__(self):
self.node = None
self.numA = None
self.numB = None
self.subscriptionA = None
self.subscriptionB = None
super().__init__(initialize=False)
def listener_callbackA(self, msg):
self.numA = msg.data
def listener_callbackB(self, msg):
self.numB = msg.data
def initialize_ros2_node(self, node_name):
try:
rclpy.init()
except:
pass
self.node = rclpy.create_node(node_name)
self.initialized = True
def create_subscriberA(self, topicName):
self.topicNameA = topicName
self.subscriptionA = self.node.create_subscription(
Int32,
self.topicNameA,
self.listener_callbackA,
10)
def create_subscriberB(self, topicName):
self.topicNameB = topicName
self.subscriptionB = self.node.create_subscription(
Int32,
self.topicNameB,
self.listener_callbackB,
10)
# Overriding a function from BaseResetNode.
# This is automatically called when simulation is stopped.
# This is will also be called when the OmniGraph node is released.
def custom_reset(self):
if self.node:
self.node.destroy_subscription(self.subscriptionA)
self.node.destroy_subscription(self.subscriptionB)
self.node.destroy_node()
self.numA = None
self.numB = None
self.subscriptionA = None
self.subscriptionB = None
self.node = None
self.initialized = False
rclpy.try_shutdown()
class OgnROS2CustomPythonNode:
"""
A custom ros2 python node that adds number from 2 separate topics and outputs the
sum as a OmniGraph Int
"""
@staticmethod
def internal_state():
return OgnROS2CustomPythonNodeInternalState()
@staticmethod
def compute(db) -> bool:
"""Compute the outputs from the current input"""
state = db.per_instance_state
try:
if not state.initialized:
state.initialize_ros2_node('custom_python_adder')
if (state.subscriptionA == None):
state.create_subscriberA(db.inputs.topicInputA)
if (state.subscriptionB == None):
state.create_subscriberB(db.inputs.topicInputB)
rclpy.spin_once(state.node, timeout_sec=0.01)
if state.numA != None and state.numB != None:
db.outputs.sum = state.numA + state.numB
db.outputs.execOut = omni.graph.core.ExecutionAttributeState.ENABLED
# Set the numA and numB to None to ensure we only calculate and trigger execOut when new messages are received.
state.numA = None
state.numB = None
except Exception as error:
# If anything causes your compute to fail report the error and return False
db.log_error(str(error))
return False
# Even if inputs were edge cases like empty arrays, correct outputs mean success
return True
@staticmethod
def release(node):
try:
state = OgnROS2CustomPythonNodeDatabase.per_instance_internal_state(node)
except Exception:
state = None
if state is not None:
state.custom_reset()
Running the Custom OmniGraph Node
Open new stage and go to Create -> Visual Scripting -> Action Graph to create an Action graph.
Add the following OmniGraph nodes into the Action graph and connect them as shown.
On Playback Tick node to execute other graph nodes every simulation frame.
ROS2 Custom Python Node that we just created above.
To String node to convert the output of our custom node to a string.
Print Text node to display the output of our custom node to screen or terminal.
Select the Print Text node and set the toScreen attribute to True.
In a new ROS2-sourced terminal, run the following command to publish a number to the
/numberA
topic.ros2 topic pub /numberA std_msgs/msg/Int32 "data: 42"
In another ROS2-sourced terminal, run the following command to publish a number to the
/numberB
topic.ros2 topic pub /numberB std_msgs/msg/Int32 "data: 240"
In Isaac Sim, click on Play to start simulation. Once messages are being received from both topics, the sum of the two numbers will appear in the top left corner of the viewport.
Whenever any of the topics are disconnected, the outputs won’t be triggered and the sum display will fade away.
Note
To view the values in the Isaac Sim console, you can select the Print Text node, set the toScreen attribute to False and set the logLevel attribute to Warning.
Change any of the publish commands from earlier with different numbers and notice the sum change.
Summary
This tutorial covered the following topics:
Creating a custom OmniGraph Python node and extension
Using rclpy interface to create a ROS 2 node within a custom OmniGraph node capable of subscribing to multiple topics, performing computations and trigger downstream nodes when the computed OmniGraph output is ready.
Next Steps
Continue on to the next tutorial in our ROS2 Tutorials series, ROS 2 Navigation with Block World Generator.