ものづくりのブログ

うちのネコを題材にしたものづくりができたらいいなと思っていろいろ奮闘してます。

【RaspberryPi】rclpy について

rclpy について調べたことをここにメモします。

rclpy

rclpy は、ROS 2(Robot Operating System 2) の公式 Python クライアントライブラリです。
ROS 2のノードを Python で記述するために使用され、C++向けの rclcpp に相当します。

機能

ノード管理
  • ノードの作成 (Node クラス)
  • ノードの開始 (rclpy.init()) と終了 (rclpy.shutdown())
通信機能
  • パブリッシャー(Publisher): トピックにデータを送信
  • サブスクライバー(Subscriber): トピックのデータを受信
  • サービス(Service): サーバー・クライアント通信
  • アクション(Action): 継続的なタスク実行のための通信
タイマーとイベント処理
  • 一定間隔で関数を実行するタイマー (create_timer())
パラメータ管理
  • ノードごとに設定可能な変数 (declare_parameter(), get_parameter())

基本的な使い方

rclpy のインストール

Ubuntu 環境では、以下のコマンドで rclpy をインストールします。

sudo apt install ros-humble-rclpy

仮想環境で pip を使ってインストールする場合は以下のコマンドです。

pip install rclpy

シンプルなノードの作成

このコードを実行すると、コンソールに 「Hello, ROS 2!」 と出力されます。

import rclpy
from rclpy.node import Node

class SimpleNode(Node):
    def __init__(self):
        super().__init__('simple_node')
        self.get_logger().info('Hello, ROS 2!')

def main():
    rclpy.init()
    node = SimpleNode()
    rclpy.spin(node)  # ノードが実行され続ける
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

トピック通信の例

パブリッシャー(データを送信する)

以下の例では、1秒ごとに "Hello, ROS 2!" というメッセージを送信します。

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class PublisherNode(Node):
    def __init__(self):
        super().__init__('publisher_node')
        self.publisher = self.create_publisher(String, 'chatter', 10)
        self.timer = self.create_timer(1.0, self.publish_message)

    def publish_message(self):
        msg = String()
        msg.data = 'Hello, ROS 2!'
        self.publisher.publish(msg)
        self.get_logger().info(f'Published: {msg.data}')

def main():
    rclpy.init()
    node = PublisherNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
サブスクライバー(データを受信する)

このサブスクライバーを実行すると、publisher_node から送信された "Hello, ROS 2!" メッセージが受信され、ログに表示されます。

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SubscriberNode(Node):
    def __init__(self):
        super().__init__('subscriber_node')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10
        )

    def listener_callback(self, msg):
        self.get_logger().info(f'Received: {msg.data}')

def main():
    rclpy.init()
    node = SubscriberNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
サービス通信の例

サーバー(リクエストを処理する)

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class AddTwoIntsServer(Node):
    def __init__(self):
        super().__init__('add_two_ints_server')
        self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback)

    def add_callback(self, request, response):
        response.sum = request.a + request.b
        self.get_logger().info(f'Request: {request.a} + {request.b} = {response.sum}')
        return response

def main():
    rclpy.init()
    node = AddTwoIntsServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

クライアント(リクエストを送信する)

import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts

class AddTwoIntsClient(Node):
    def __init__(self):
        super().__init__('add_two_ints_client')
        self.client = self.create_client(AddTwoInts, 'add_two_ints')
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for service...')
        self.send_request(5, 3)

    def send_request(self, a, b):
        request = AddTwoInts.Request()
        request.a = a
        request.b = b
        future = self.client.call_async(request)
        future.add_done_callback(self.response_callback)

    def response_callback(self, future):
        response = future.result()
        self.get_logger().info(f'Result: {response.sum}')

def main():
    rclpy.init()
    node = AddTwoIntsClient()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

この例では、クライアントが (5, 3) をサーバーに送信し、サーバーが 8 を返します。

memo

  • rclpy はROS 2のPython APIで、ノード、トピック通信、サービス通信、アクション通信などを実装できる。
  • rclpy.init() で初期化し、rclpy.spin(node) でノードを実行し、rclpy.shutdown() で終了する。
  • パブリッシャー (create_publisher) とサブスクライバー (create_subscription) を使ってトピック通信ができる。
  • サービス (create_service / create_client) を使ってリクエスト・レスポンス通信が可能。