rclpy について調べたことをここにメモします。
rclpy
rclpy は、ROS 2(Robot Operating System 2) の公式 Python クライアントライブラリです。
ROS 2のノードを Python で記述するために使用され、C++向けの rclcpp に相当します。
機能
ノード管理
- ノードの作成 (Node クラス)
- ノードの開始 (rclpy.init()) と終了 (rclpy.shutdown())
通信機能
- パブリッシャー(Publisher): トピックにデータを送信
- サブスクライバー(Subscriber): トピックのデータを受信
- サービス(Service): サーバー・クライアント通信
- アクション(Action): 継続的なタスク実行のための通信
タイマーとイベント処理
- 一定間隔で関数を実行するタイマー (create_timer())
パラメータ管理
- ノードごとに設定可能な変数 (declare_parameter(), get_parameter())
基本的な使い方
rclpy のインストール
Ubuntu 環境では、以下のコマンドで rclpy をインストールします。
sudo apt install ros-humble-rclpy
仮想環境で pip を使ってインストールする場合は以下のコマンドです。
pip install rclpy
シンプルなノードの作成
このコードを実行すると、コンソールに 「Hello, ROS 2!」 と出力されます。
import rclpy from rclpy.node import Node class SimpleNode(Node): def __init__(self): super().__init__('simple_node') self.get_logger().info('Hello, ROS 2!') def main(): rclpy.init() node = SimpleNode() rclpy.spin(node) # ノードが実行され続ける node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
トピック通信の例
パブリッシャー(データを送信する)
以下の例では、1秒ごとに "Hello, ROS 2!" というメッセージを送信します。
import rclpy from rclpy.node import Node from std_msgs.msg import String class PublisherNode(Node): def __init__(self): super().__init__('publisher_node') self.publisher = self.create_publisher(String, 'chatter', 10) self.timer = self.create_timer(1.0, self.publish_message) def publish_message(self): msg = String() msg.data = 'Hello, ROS 2!' self.publisher.publish(msg) self.get_logger().info(f'Published: {msg.data}') def main(): rclpy.init() node = PublisherNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
サブスクライバー(データを受信する)
このサブスクライバーを実行すると、publisher_node から送信された "Hello, ROS 2!" メッセージが受信され、ログに表示されます。
import rclpy from rclpy.node import Node from std_msgs.msg import String class SubscriberNode(Node): def __init__(self): super().__init__('subscriber_node') self.subscription = self.create_subscription( String, 'chatter', self.listener_callback, 10 ) def listener_callback(self, msg): self.get_logger().info(f'Received: {msg.data}') def main(): rclpy.init() node = SubscriberNode() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
サービス通信の例
サーバー(リクエストを処理する)
import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class AddTwoIntsServer(Node): def __init__(self): super().__init__('add_two_ints_server') self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.add_callback) def add_callback(self, request, response): response.sum = request.a + request.b self.get_logger().info(f'Request: {request.a} + {request.b} = {response.sum}') return response def main(): rclpy.init() node = AddTwoIntsServer() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
クライアント(リクエストを送信する)
import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class AddTwoIntsClient(Node): def __init__(self): super().__init__('add_two_ints_client') self.client = self.create_client(AddTwoInts, 'add_two_ints') while not self.client.wait_for_service(timeout_sec=1.0): self.get_logger().info('Waiting for service...') self.send_request(5, 3) def send_request(self, a, b): request = AddTwoInts.Request() request.a = a request.b = b future = self.client.call_async(request) future.add_done_callback(self.response_callback) def response_callback(self, future): response = future.result() self.get_logger().info(f'Result: {response.sum}') def main(): rclpy.init() node = AddTwoIntsClient() rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
この例では、クライアントが (5, 3) をサーバーに送信し、サーバーが 8 を返します。
memo
- rclpy はROS 2のPython APIで、ノード、トピック通信、サービス通信、アクション通信などを実装できる。
- rclpy.init() で初期化し、rclpy.spin(node) でノードを実行し、rclpy.shutdown() で終了する。
- パブリッシャー (create_publisher) とサブスクライバー (create_subscription) を使ってトピック通信ができる。
- サービス (create_service / create_client) を使ってリクエスト・レスポンス通信が可能。