ROS2探索总结(十二)—— ROS2 Python编程基础

栏目: Python · 发布时间: 5年前

内容简介:ROS最重要的特性之一就是多语言的支持,可以使用C++、Python等语言进行程序的开发,ROS2会继续强化这个特性,对更多语言提供丰富的支持。这里我们尝试ROS2当中的Python编程,实现话题和服务通信。在进行操作前,请先参考以下链接,完成ROS 2的安装和例程编译:

~欢迎关注~

微信公众号:古月居(guyue_home)

新浪微博:古月春旭(

https://weibo.com/hcx196

知乎专栏:古月居(

https://zhuanlan.zhihu.com/guyuehome

原文链接:

ROS2 Python 编程基础

ROS最重要的特性之一就是多语言的支持,可以使用C++、Python等语言进行程序的开发,ROS2会继续强化这个特性,对更多语言提供丰富的支持。

这里我们尝试ROS2当中的Python编程,实现话题和服务通信。

在进行操作前,请先参考以下链接,完成ROS 2的安装和例程编译:

ROS 2发布第三个正式版——Crystal Clemmys

一、话题通信

代码编译后,在终端设置环境变量,并使用如下命令运行listener和talker:

  1. $ ros2 run demo_nodes_py listener
  2. $ ros2 run demo_nodes_py talker

运行成功后,可以看到如下效果:

ROS2探索总结(十二)—— ROS2 Python编程基础

以上talker和listener的具体实现,我们可以来分析下代码,分析内容请见代码中的注释:

  • talker.py
  1. # 引用python接口
  2. import rclpy
  3. from rclpy.node import Node
  4. from std_msgs.msg import String
  5. class Talker(Node):
  6. def __init__(self):
  7. super().__init__('talker')
  8. self.i = 0
  9. # 创建一个Publisher,发布名为chatter的topic,消息类型为String
  10. self.pub = self.create_publisher(String, 'chatter')
  11. # 设置定时器,周期调用定时器回调函数timer_callback
  12. timer_period = 1.0
  13. self.tmr = self.create_timer(timer_period, self.timer_callback)
  14. def timer_callback(self):
  15. # 创建String类型的消息
  16. msg = String()
  17. msg.data = 'Hello World: {0}'.format(self.i)
  18. self.i += 1
  19. self.get_logger().info('Publishing: "{0}"'.format(msg.data))
  20. # 发布消息
  21. self.pub.publish(msg)
  22. def main(args=None):
  23. # ROS节点初始化
  24. rclpy.init(args=args)
  25. # 创建节点及发布器
  26. node = Talker()
  27. # 循环
  28. rclpy.spin(node)
  29. # 销毁节点,退出程序
  30. node.destroy_node()
  31. rclpy.shutdown()
  32. if __name__ == '__main__':
  33. main()
  • listener.py
  1. # 引用python接口
  2. import rclpy
  3. from rclpy.node import Node
  4. from std_msgs.msg import String
  5. class Listener(Node):
  6. def __init__(self):
  7. super().__init__('listener')
  8. # 创建一个subscriber,订阅名为chatter的topic
  9. # 消息类型是String,回调函数为chatter_callback
  10. self.sub = self.create_subscription(String, 'chatter', self.chatter_callback)
  11. def chatter_callback(self, msg):
  12. # 将收到的消息数据打印出来
  13. self.get_logger().info('I heard: [%s]' % msg.data)
  14. def main(args=None):
  15. # ROS节点初始化
  16. rclpy.init(args=args)
  17. # 创建节点及订阅器
  18. node = Listener()
  19. # 循环查询消息队列,收到消息后进入回调函数
  20. rclpy.spin(node)
  21. # 销毁节点,退出程序
  22. node.destroy_node()
  23. rclpy.shutdown()
  24. if __name__ == '__main__':
  25. main()

二、服务通信

接下来我们看下ROS另外一种核心通信机制——服务的实现,运行以下命令实现ROS 1教程当中经典的加法求和服务例程:

  1. $ ros2 run demo_nodes_py add_two_ints_server
  2. $ ros2 run demo_nodes_py add_two_ints_client

可以看到如下效果:

ROS2探索总结(十二)—— ROS2 Python编程基础

add_two_ints_server和add_two_ints_client的具体实现,继续来分析代码:

  • add_two_ints_server.py
  1. # 引用python接口
  2. from example_interfaces.srv import AddTwoInts
  3. import rclpy
  4. from rclpy.node import Node
  5. class AddTwoIntsServer(Node):
  6. def __init__(self):
  7. super().__init__('add_two_ints_server')
  8. # 创建一个service服务端,提供名为AddTwoInts的service,
  9. # 消息类型为AddTwoInts,回调函数为add_two_ints_callback
  10. self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_two_ints_callback)
  11. def add_two_ints_callback(self, request, response):
  12. # 处理服务功能,将求和结果放入应答数据中
  13. response.sum = request.a + request.b
  14. self.get_logger().info('Incoming request\na: %d b: %d' % (request.a, request.b))
  15. return response
  16. def main(args=None):
  17. # ROS节点初始化
  18. rclpy.init(args=args)
  19. # 创建服务端
  20. node = AddTwoIntsServer()
  21. # 循环
  22. rclpy.spin(node)
  23. # 销毁节点,退出程序
  24. node.destroy_node()
  25. rclpy.shutdown()
  26. if __name__ == '__main__':
  27. main()
  • add_two_ints_client.py
  1. # 引用python接口
  2. from example_interfaces.srv import AddTwoInts
  3. import rclpy
  4. def main(args=None):
  5. # ROS节点初始化
  6. rclpy.init(args=args)
  7. # 创建节点
  8. node = rclpy.create_node('add_two_ints_client')
  9. # 创建客户端
  10. cli = node.create_client(AddTwoInts, 'add_two_ints')
  11. # 等待服务端
  12. while not cli.wait_for_service(timeout_sec=1.0):
  13. print('service not available, waiting again...')
  14. # 创建请求数据
  15. req = AddTwoInts.Request()
  16. req.a = 2
  17. req.b = 3
  18. # 异步请求(非阻塞)
  19. future = cli.call_async(req)
  20. # 等待服务端应答
  21. rclpy.spin_until_future_complete(node, future)
  22. if future.result() is not None:
  23. node.get_logger().info('Result of add_two_ints: %d' % future.result().sum)
  24. else:
  25. node.get_logger().error('Exception while calling service: %r' % future.exception())
  26. # 销毁节点,退出程序
  27. node.destroy_node()
  28. rclpy.shutdown()
  29. if __name__ == '__main__':
  30. main()

三、QoS配置

ROS 2中的通信中间件改用DDS,而QoS是DDS中非常重要的一环,控制了各方面与底层的通讯机制,主要从时间限制、可靠性、持续性、历史记录几个方面,满足用户针对不同场景的数据应用需求。

以话题通信为例,我们看下Python程序中该如何配置QoS:

  • talker_qos.py
  1. import argparse
  2. import sys
  3. # 引用python接口
  4. import rclpy
  5. from rclpy.node import Node
  6. from rclpy.qos import qos_profile_default, qos_profile_sensor_data
  7. from rclpy.qos import QoSReliabilityPolicy
  8. from std_msgs.msg import String
  9. class TalkerQos(Node):
  10. def __init__(self, qos_profile):
  11. super().__init__('talker_qos')
  12. self.i = 0
  13. # 传输模式:Reliable、Best effort
  14. if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
  15. self.get_logger().info('Reliable talker')
  16. else:
  17. self.get_logger().info('Best effort talker')
  18. # 创建一个Publisher,发布名为chatter的topic,消息类型为String,设置qos参数
  19. self.pub = self.create_publisher(String, 'chatter', qos_profile=qos_profile)
  20. # 设置定时器,周期调用定时器回调函数timer_callback
  21. timer_period = 1.0
  22. self.tmr = self.create_timer(timer_period, self.timer_callback)
  23. def timer_callback(self):
  24. # 创建String类型的消息
  25. msg = String()
  26. msg.data = 'Hello World: {0}'.format(self.i)
  27. self.i += 1
  28. self.get_logger().info('Publishing: "{0}"'.format(msg.data))
  29. # 发布消息
  30. self.pub.publish(msg)
  31. def main(argv=sys.argv[1:]):
  32. # 解析输入参数
  33. parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  34. # 是否使用relibale可靠传输模式
  35. parser.add_argument(
  36. '--reliable', dest='reliable', action='store_true',
  37. help='set qos profile to reliable')
  38. parser.set_defaults(reliable=False)
  39. # 循环发布次数
  40. parser.add_argument(
  41. '-n', '--number_of_cycles', type=int, default=20,
  42. help='number of sending attempts')
  43. # 其他输入参数  
  44. parser.add_argument(
  45. 'argv', nargs=argparse.REMAINDER,
  46. help='Pass arbitrary arguments to the executable')
  47. args = parser.parse_args(argv)
  48. # ROS节点初始化
  49. rclpy.init(args=args.argv)
  50. # 设置qos参数
  51. if args.reliable:
  52. custom_qos_profile = qos_profile_default
  53. else:
  54. custom_qos_profile = qos_profile_sensor_data
  55. # 创建节点及发布器
  56. node = TalkerQos(custom_qos_profile)
  57. # 循环
  58. cycle_count = 0
  59. while rclpy.ok() and cycle_count < args.number_of_cycles:
  60. rclpy.spin_once(node)
  61. cycle_count += 1
  62. # 销毁节点并退出
  63. node.destroy_node()
  64. rclpy.shutdown()
  65. if __name__ == '__main__':
  66. main()
  • listener_qos.py
  1. import argparse
  2. import sys
  3. # 引用python接口
  4. import rclpy
  5. from rclpy.node import Node
  6. from rclpy.qos import qos_profile_default, qos_profile_sensor_data
  7. from rclpy.qos import QoSReliabilityPolicy
  8. from std_msgs.msg import String
  9. class ListenerQos(Node):
  10. def __init__(self, qos_profile):
  11. super().__init__('listener_qos')
  12. # 传输模式:Reliable、Best effort
  13. if qos_profile.reliability is QoSReliabilityPolicy.RMW_QOS_POLICY_RELIABILITY_RELIABLE:
  14. self.get_logger().info('Reliable listener')
  15. else:
  16. self.get_logger().info('Best effort listener')
  17. # 创建一个subscriber,订阅名为chatter的topic
  18. # 消息类型是String,回调函数为chatter_callback,设置qos参数
  19. self.sub = self.create_subscription(
  20. String, 'chatter', self.chatter_callback, qos_profile=qos_profile)
  21. def chatter_callback(self, msg):
  22. # 将收到的消息数据打印出来
  23. self.get_logger().info('I heard: [%s]' % msg.data)
  24. def main(argv=sys.argv[1:]):
  25. # 解析输入参数
  26. parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  27. # 是否使用relibale可靠传输模式
  28. parser.add_argument(
  29. '--reliable', dest='reliable', action='store_true',
  30. help='set qos profile to reliable')
  31. parser.set_defaults(reliable=False)
  32. # 循环订阅次数
  33. parser.add_argument(
  34. '-n', '--number_of_cycles', type=int, default=20,
  35. help='max number of spin_once iterations')
  36. # 其他输入参数
  37. parser.add_argument(
  38. 'argv', nargs=argparse.REMAINDER,
  39. help='Pass arbitrary arguments to the executable')
  40. args = parser.parse_args(argv)
  41. # ROS节点初始化
  42. rclpy.init(args=args.argv)
  43. # 设置qos参数
  44. if args.reliable:
  45. custom_qos_profile = qos_profile_default
  46. else:
  47. custom_qos_profile = qos_profile_sensor_data
  48. # 创建节点及订阅器
  49. node = ListenerQos(custom_qos_profile)
  50. # 循环
  51. cycle_count = 0
  52. while rclpy.ok() and cycle_count < args.number_of_cycles:
  53. rclpy.spin_once(node)
  54. cycle_count += 1
  55. # 销毁节点并退出
  56. node.destroy_node()
  57. rclpy.shutdown()
  58. if __name__ == '__main__':
  59. main()

ROS 2针对四种类型的通信,提供了QoS的默认配置:

  • Default QoS settings for publishers and subscriptions

    In order to make the transition from ROS1 to ROS2, exercising a similar network behavior is desirable. By default, publishers and subscriptions are reliable in ROS2, have volatile durability, and “keep last” history.

  • Services

    In the same vein as publishers and subscriptions, services are reliable. It is especially important for services to use volatile durability, as otherwise service servers that re-start may receive outdated requests.

  • Sensor data

    For sensor data, in most cases it’s more important to receive readings in a timely fashion, rather than ensuring that all of them arrive. That is, developers want the latest samples as soon as they are captured, at the expense of maybe losing some. For that reason the sensor data profile uses best effort reliability and a smaller queue depth.

  • Parameters

    Parameters are based on services, and as such have a similar profile. The difference is that parameters use a much larger queue depth so that requests do not get lost when, for example, the parameter client is unable to reach the parameter service server.

参考: http://design.ros2.org/articles/qos.html

以上例程运行时,默认使用的是Best effort模式:

ROS2探索总结(十二)—— ROS2 Python编程基础

可以通过命令输入切换成Reliable模式:

ROS2探索总结(十二)—— ROS2 Python编程基础

还可以设置发布或订阅的次数:

ROS2探索总结(十二)—— ROS2 Python编程基础

更多内容欢迎关注:

微信公众号:古月居 (guyue_home)

新浪微博:古月春旭 ( https://weibo.com/hcx196 )

知乎专栏:古月居 ( https://zhuanlan.zhihu.com/guyuehome )

原创文章,转载请注明: 转载自古月居

本文链接地址: ROS2探索总结(十二)—— ROS2 Python编程基础


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

R语言实战

R语言实战

卡巴科弗 (Robert I.Kabacoff) / 高涛、肖楠、陈钢 / 人民邮电出版社 / 2013-1 / 79.00元

数据时代已经到来,但数据分析、数据挖掘人才却十分短缺。由于“大数据”对每个领域的决定性影响, 相对于经验和直觉,在商业、经济及其他领域中基于数据和分析去发现问题并作出科学、客观的决策越来越重要。开源软件R是世界上最流行的数据分析、统计计算及制图语言,几乎能够完成任何数据处理任务,可安装并运行于所有主流平台,为我们提供了成千上万的专业模块和实用工具,是从大数据中获取有用信息的绝佳工具。  本书从解决......一起来看看 《R语言实战》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具