# Copyright 2023 FZI Forschungszentrum Informatik## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are met:## * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.## * Redistributions in binary form must reproduce the above copyright# notice, this list of conditions and the following disclaimer in the# documentation and/or other materials provided with the distribution.## * Neither the name of the FZI Forschungszentrum Informatik nor the names of its# contributors may be used to endorse or promote products derived from# this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE# POSSIBILITY OF SUCH DAMAGE.importinspectimportrclpy.loggingfromrclpyimportactionfromrclpy.nodeimportNode,Publisherfromros_bt_py.exceptionsimportBehaviorTreeExceptionfromros_bt_py_interfaces.msgimportMessageChannel,MessageChannels
[docs]classLoggerLevel(object):"""Data class containing a logging level."""def__init__(self,logger_level=rclpy.logging.LoggingSeverity.INFO):"""Initialize a new LoggerLevel class."""self.logger_level=logger_level
[docs]classEnumValue(object):"""Data class containing an enum value."""def__init__(self,enum_value=""):"""Initialize a new EnumValue class."""self.enum_value=enum_value
[docs]defget_interface_name(msg_metaclass:type)->str:""" Extract the interface name from a ROS2 message metaclass. :param msg_metaclass: The ROS2 message metaclass. :returns: The interface name in the format 'package_name/message_type/message_name'. """module_parts=msg_metaclass.__module__.split(".")package_name=module_parts[0]message_type=module_parts[-2]# Extract 'msg', 'srv', or 'action'message_name=msg_metaclass.__name__returnf"{package_name}/{message_type}/{message_name}"
[docs]defget_message_constant_fields(message_class):"""Return all constant fields of a message as a list."""ifinspect.isclass(message_class):# This is highly dependend on the ROS message class generation.members=[attrforattrindir(message_class.__class__)ifnotattr.startswith("_")andnotcallable(getattr(message_class.__class__,attr))]returnmemberselse:raiseBehaviorTreeException(f"{message_class} is not a ROS Message")
[docs]defpublish_message_channels(node:Node,publisher:Publisher):"""Return all known topic-, service-, and action-names."""msg=MessageChannels()# Types are returned as 1?-element lists, so we need to unpack themforname,[interface,*_]innode.get_topic_names_and_types():msg.topics.append(MessageChannel(name=name,type=interface))forname,[interface,*_]innode.get_service_names_and_types():msg.services.append(MessageChannel(name=name,type=interface))forname,[interface,*_]inaction.get_action_names_and_types(node):msg.actions.append(MessageChannel(name=name,type=interface))publisher.publish(msg)