内容简介:ROS最重要的特性之一就是多语言的支持,可以使用C++、Python等语言进行程序的开发,ROS2会继续强化这个特性,对更多语言提供丰富的支持。这里我们尝试ROS2当中的Python编程,实现话题和服务通信。在进行操作前,请先参考以下链接,完成ROS 2的安装和例程编译:
~欢迎关注~
微信公众号:古月居(guyue_home)
新浪微博:古月春旭(
)
知乎专栏:古月居(
)
原文链接:
ROS最重要的特性之一就是多语言的支持,可以使用C++、Python等语言进行程序的开发,ROS2会继续强化这个特性,对更多语言提供丰富的支持。
这里我们尝试ROS2当中的Python编程,实现话题和服务通信。
在进行操作前,请先参考以下链接,完成ROS 2的安装和例程编译:
ROS 2发布第三个正式版——Crystal Clemmys
一、话题通信
代码编译后,在终端设置环境变量,并使用如下命令运行listener和talker:
-
$ ros2 run demo_nodes_py listener
-
$ ros2 run demo_nodes_py talker
运行成功后,可以看到如下效果:
以上talker和listener的具体实现,我们可以来分析下代码,分析内容请见代码中的注释:
- talker.py
-
# 引用python接口
-
import rclpy
-
from rclpy.node import Node
-
from std_msgs.msg import String
-
class Talker(Node):
-
def __init__(self):
-
super().__init__('talker')
-
self.i = 0
-
# 创建一个Publisher,发布名为chatter的topic,消息类型为String
-
self.pub = self.create_publisher(String, 'chatter')
-
# 设置定时器,周期调用定时器回调函数timer_callback
-
timer_period = 1.0
-
self.tmr = self.create_timer(timer_period, self.timer_callback)
-
def timer_callback(self):
-
# 创建String类型的消息
-
msg = String()
-
msg.data = 'Hello World: {0}'.format(self.i)
-
self.i += 1
-
self.get_logger().info('Publishing: "{0}"'.format(msg.data))
-
# 发布消息
-
self.pub.publish(msg)
-
def main(args=None):
-
# ROS节点初始化
-
rclpy.init(args=args)
-
# 创建节点及发布器
-
node = Talker()
-
# 循环
-
rclpy.spin(node)
-
# 销毁节点,退出程序
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
- listener.py
-
# 引用python接口
-
import rclpy
-
from rclpy.node import Node
-
from std_msgs.msg import String
-
class Listener(Node):
-
def __init__(self):
-
super().__init__('listener')
-
# 创建一个subscriber,订阅名为chatter的topic
-
# 消息类型是String,回调函数为chatter_callback
-
self.sub = self.create_subscription(String, 'chatter', self.chatter_callback)
-
def chatter_callback(self, msg):
-
# 将收到的消息数据打印出来
-
self.get_logger().info('I heard: [%s]' % msg.data)
-
def main(args=None):
-
# ROS节点初始化
-
rclpy.init(args=args)
-
# 创建节点及订阅器
-
node = Listener()
-
# 循环查询消息队列,收到消息后进入回调函数
-
rclpy.spin(node)
-
# 销毁节点,退出程序
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
二、服务通信
接下来我们看下ROS另外一种核心通信机制——服务的实现,运行以下命令实现ROS 1教程当中经典的加法求和服务例程:
-
$ ros2 run demo_nodes_py add_two_ints_server
-
$ ros2 run demo_nodes_py add_two_ints_client
可以看到如下效果:
add_two_ints_server和add_two_ints_client的具体实现,继续来分析代码:
- add_two_ints_server.py
-
# 引用python接口
-
from example_interfaces.srv import AddTwoInts
-
import rclpy
-
from rclpy.node import Node
-
class AddTwoIntsServer(Node):
-
def __init__(self):
-
super().__init__('add_two_ints_server')
-
# 创建一个service服务端,提供名为AddTwoInts的service,
-
# 消息类型为AddTwoInts,回调函数为add_two_ints_callback
-
self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
-
def add_two_ints_callback(self, request, response):
-
# 处理服务功能,将求和结果放入应答数据中
-
response.sum = request.a + request.b
-
self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
-
return response
-
def main(args=None):
-
# ROS节点初始化
-
rclpy.init(args=args)
-
# 创建服务端
-
node = AddTwoIntsServer()
-
# 循环
-
rclpy.spin(node)
-
# 销毁节点,退出程序
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
- add_two_ints_client.py
-
# 引用python接口
-
from example_interfaces.srv import AddTwoInts
-
import rclpy
-
def main(args=None):
-
# ROS节点初始化
-
rclpy.init(args=args)
-
# 创建节点
-
node = rclpy.create_node('add_two_ints_client')
-
# 创建客户端
-
cli = node.create_client(AddTwoInts, 'add_two_ints')
-
# 等待服务端
-
while not cli.wait_for_service(timeout_sec=1.0):
-
print('service not available, waiting again...')
-
# 创建请求数据
-
req = AddTwoInts.Request()
-
req.a = 2
-
req.b = 3
-
# 异步请求(非阻塞)
-
future = cli.call_async(req)
-
# 等待服务端应答
-
rclpy.spin_until_future_complete(node, future)
-
if future.result() is not None:
-
node.get_logger().info('Result of add_two_ints: %d' % future.result().sum)
-
else:
-
node.get_logger().error('Exception while calling service: %r' % future.exception())
-
# 销毁节点,退出程序
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
三、QoS配置
ROS 2中的通信中间件改用DDS,而QoS是DDS中非常重要的一环,控制了各方面与底层的通讯机制,主要从时间限制、可靠性、持续性、历史记录几个方面,满足用户针对不同场景的数据应用需求。
以话题通信为例,我们看下Python程序中该如何配置QoS:
- talker_qos.py
-
import argparse
-
import sys
-
# 引用python接口
-
import rclpy
-
from rclpy.node import Node
-
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
-
from rclpy.qos import QoSReliabilityPolicy
-
from std_msgs.msg import String
-
class TalkerQos(Node):
-
def __init__(self, qos_profile):
-
super().__init__('talker_qos')
-
self.i = 0
-
# 传输模式:Reliable、Best effort
-
if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
-
self.get_logger().info('Reliable talker')
-
else:
-
self.get_logger().info('Best effort talker')
-
# 创建一个Publisher,发布名为chatter的topic,消息类型为String,设置qos参数
-
self.pub = self.create_publisher(String, 'chatter', qos_profile=qos_profile)
-
# 设置定时器,周期调用定时器回调函数timer_callback
-
timer_period = 1.0
-
self.tmr = self.create_timer(timer_period, self.timer_callback)
-
def timer_callback(self):
-
# 创建String类型的消息
-
msg = String()
-
msg.data = 'Hello World: {0}'.format(self.i)
-
self.i += 1
-
self.get_logger().info('Publishing: "{0}"'.format(msg.data))
-
# 发布消息
-
self.pub.publish(msg)
-
def main(argv=sys.argv[1:]):
-
# 解析输入参数
-
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-
# 是否使用relibale可靠传输模式
-
parser.add_argument(
-
'--reliable', dest='reliable', action='store_true',
-
help='set qos profile to reliable')
-
parser.set_defaults(reliable=False)
-
# 循环发布次数
-
parser.add_argument(
-
'-n', '--number_of_cycles', type=int, default=20,
-
help='number of sending attempts')
-
# 其他输入参数
-
parser.add_argument(
-
'argv', nargs=argparse.REMAINDER,
-
help='Pass arbitrary arguments to the executable')
-
args = parser.parse_args(argv)
-
# ROS节点初始化
-
rclpy.init(args=args.argv)
-
# 设置qos参数
-
if args.reliable:
-
custom_qos_profile = qos_profile_default
-
else:
-
custom_qos_profile = qos_profile_sensor_data
-
# 创建节点及发布器
-
node = TalkerQos(custom_qos_profile)
-
# 循环
-
cycle_count = 0
-
while rclpy.ok() and cycle_count < args.number_of_cycles:
-
rclpy.spin_once(node)
-
cycle_count += 1
-
# 销毁节点并退出
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
- listener_qos.py
-
import argparse
-
import sys
-
# 引用python接口
-
import rclpy
-
from rclpy.node import Node
-
from rclpy.qos import qos_profile_default, qos_profile_sensor_data
-
from rclpy.qos import QoSReliabilityPolicy
-
from std_msgs.msg import String
-
class ListenerQos(Node):
-
def __init__(self, qos_profile):
-
super().__init__('listener_qos')
-
# 传输模式:Reliable、Best effort
-
if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
-
self.get_logger().info('Reliable listener')
-
else:
-
self.get_logger().info('Best effort listener')
-
# 创建一个subscriber,订阅名为chatter的topic
-
# 消息类型是String,回调函数为chatter_callback,设置qos参数
-
self.sub = self.create_subscription(
-
String, 'chatter', self.chatter_callback, qos_profile=qos_profile)
-
def chatter_callback(self, msg):
-
# 将收到的消息数据打印出来
-
self.get_logger().info('I heard: [%s]' % msg.data)
-
def main(argv=sys.argv[1:]):
-
# 解析输入参数
-
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
-
# 是否使用relibale可靠传输模式
-
parser.add_argument(
-
'--reliable', dest='reliable', action='store_true',
-
help='set qos profile to reliable')
-
parser.set_defaults(reliable=False)
-
# 循环订阅次数
-
parser.add_argument(
-
'-n', '--number_of_cycles', type=int, default=20,
-
help='max number of spin_once iterations')
-
# 其他输入参数
-
parser.add_argument(
-
'argv', nargs=argparse.REMAINDER,
-
help='Pass arbitrary arguments to the executable')
-
args = parser.parse_args(argv)
-
# ROS节点初始化
-
rclpy.init(args=args.argv)
-
# 设置qos参数
-
if args.reliable:
-
custom_qos_profile = qos_profile_default
-
else:
-
custom_qos_profile = qos_profile_sensor_data
-
# 创建节点及订阅器
-
node = ListenerQos(custom_qos_profile)
-
# 循环
-
cycle_count = 0
-
while rclpy.ok() and cycle_count < args.number_of_cycles:
-
rclpy.spin_once(node)
-
cycle_count += 1
-
# 销毁节点并退出
-
node.destroy_node()
-
rclpy.shutdown()
-
if __name__ == '__main__':
-
main()
ROS 2针对四种类型的通信,提供了QoS的默认配置:
-
Default QoS settings for publishers and subscriptions
In order to make the transition from ROS1 to ROS2, exercising a similar network behavior is desirable. By default, publishers and subscriptions are reliable in ROS2, have volatile durability, and “keep last” history.
-
Services
In the same vein as publishers and subscriptions, services are reliable. It is especially important for services to use volatile durability, as otherwise service servers that re-start may receive outdated requests.
-
Sensor data
For sensor data, in most cases it’s more important to receive readings in a timely fashion, rather than ensuring that all of them arrive. That is, developers want the latest samples as soon as they are captured, at the expense of maybe losing some. For that reason the sensor data profile uses best effort reliability and a smaller queue depth.
-
Parameters
Parameters are based on services, and as such have a similar profile. The difference is that parameters use a much larger queue depth so that requests do not get lost when, for example, the parameter client is unable to reach the parameter service server.
参考: http://design.ros2.org/articles/qos.html
以上例程运行时,默认使用的是Best effort模式:
可以通过命令输入切换成Reliable模式:
还可以设置发布或订阅的次数:
更多内容欢迎关注:
微信公众号:古月居 (guyue_home)
新浪微博:古月春旭 ( https://weibo.com/hcx196 )
知乎专栏:古月居 ( https://zhuanlan.zhihu.com/guyuehome )
本文链接地址: ROS2探索总结(十二)—— ROS2 Python编程基础
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。