2023年02月14日(火) [長年日記]
■ [dev] ROS2でノードに対するテストを書く
ROS2にlaunch_pytestというものがあることを知った。これを使えばノードを立ち上げて外部からその振る舞いをテストすることができる。
Writing a simple publisher and subscriber (Python)にあるMinimalPublisherをテストするコードをメモ。
まず、テスト対象ノードと通信するテスト用ノードを作成する。topicトピックを購読して、受信したメッセージをキューに保存するだけ。
from queue import Queue from rclpy.node import Node from std_msgs.msg import String class DummyNode(Node): def __init__(self): super().__init__("dummy_node") self.topic_queue = Queue() self.topic_subscription = self.create_subscription( String, "topic", self._receive_topic, 10 ) def _receive_topic(self, msg): self.topic_queue.put(msg)
テスト本体は次の通り。
import queue import launch import launch_pytest import launch_ros import pytest import rclpy from concurrent.futures import ThreadPoolExecutor from dummy_node import DummyNode @launch_pytest.fixture def talker_description(): return launch.LaunchDescription( [ launch_ros.actions.Node( executable="talker", package="py_pubsub", output="screen", ), ] ) @pytest.fixture(scope="session") def thread_pool(): pool = ThreadPoolExecutor() yield pool pool.shutdown() @pytest.fixture def dummy_node(thread_pool): rclpy.init() node = DummyNode() thread_pool.submit(rclpy.spin, node) yield node node.destroy_node() rclpy.shutdown() @pytest.mark.launch(fixture=talker_description) def test_talker(dummy_node): for i in range(10): try: msg = dummy_node.topic_queue.get(timeout=2.0 / (i + 1)) assert msg.data == f"Hello World: {i}", f"index={i}" except(queue.Empty): pytest.fail(f"no message. index={i}")
- @launch_pytest.fixture を指定して、launch.LaunchDescriptionの配列を返す関数を定義する。ここにテスト対象ノードの情報を記述することで、テスト時にそのノードが立ち上がる。配列なので複数のノードを立ち上げ可能。
- dummy_nodeというフィクスチャで先に定義したDummyNodeノードを生成して立ち上げている。ThreadPoolExecutorを使って別スレッドで動くようにしている。
- テストは @pytest.mark.launch で修飾し、先に定義したlaunch.LaunchDescription配列を指定する。メッセージを受信する間隔がだんだん短くなるようにしているので、ループの5回目くらいでテストが失敗する。