Events
Event streams provide a mechanism to move data around and synchronize logic in a thread-safe manner. Event streams are covered by the carb.events
plugin for Python.
Go here for detailed description of the Events System and API
Subscribe to Shutdown Events
# App/Subscribe to Shutdown Events
import carb.events
import omni.kit.app
# Stream where app sends shutdown events
shutdown_stream = omni.kit.app.get_app().get_shutdown_event_stream()
def on_event(e: carb.events.IEvent):
if e.type == omni.kit.app.POST_QUIT_EVENT_TYPE:
print("We are about to shutdown")
sub = shutdown_stream.create_subscription_to_pop(on_event, name="name of the subscriber for debugging", order=0)
Subscribe to Update Events
# App/Subscribe to Update Events
import carb.events
import omni.kit.app
update_stream = omni.kit.app.get_app().get_update_event_stream()
def on_update(e: carb.events.IEvent):
print(f"Update: {e.payload['dt']}")
sub = update_stream.create_subscription_to_pop(on_update, name="My Subscription Name")
Create a Custom Event
# App/Create Custom Event
import carb.events
import omni.kit.app
# Event is unique integer id. Create it from string by hashing, using helper function.
# [ext name].[event name] is a recommended naming convention:
MY_CUSTOM_EVENT = carb.events.type_from_string("omni.my.extension.MY_CUSTOM_EVENT")
# App provides common event bus. It is event queue which is popped every update (frame).
bus = omni.kit.app.get_app().get_message_bus_event_stream()
def on_event(e):
print(e.type, e.type == MY_CUSTOM_EVENT, e.payload)
# Subscribe to the bus. Keep subscription objects (sub1, sub2) alive for subscription to work.
# Push to queue is called immediately when pushed
sub1 = bus.create_subscription_to_push_by_type(MY_CUSTOM_EVENT, on_event)
# Pop is called on next update
sub2 = bus.create_subscription_to_pop_by_type(MY_CUSTOM_EVENT, on_event)
# Push event the bus with custom payload
bus.push(MY_CUSTOM_EVENT, payload={"data": 2, "x": "y"})