メモの日々


2023年02月14日(火) [長年日記]

[dev] ROS2でノードに対するテストを書く

ROS2にlaunch_pytestというものがあることを知った。これを使えばノードを立ち上げて外部からその振る舞いをテストすることができる。

Writing a simple publisher and subscriber (Python)にあるMinimalPublisherをテストするコードをメモ。

まず、テスト対象ノードと通信するテスト用ノードを作成する。topicトピックを購読して、受信したメッセージをキューに保存するだけ。

from queue import Queue

from rclpy.node import Node
from std_msgs.msg import String

class DummyNode(Node):
    def __init__(self):
        super().__init__("dummy_node")
        self.topic_queue = Queue()
        self.topic_subscription = self.create_subscription(
            String, "topic", self._receive_topic, 10
        )

    def _receive_topic(self, msg):
        self.topic_queue.put(msg)

テスト本体は次の通り。

import queue

import launch
import launch_pytest
import launch_ros
import pytest
import rclpy

from concurrent.futures import ThreadPoolExecutor
from dummy_node import DummyNode

@launch_pytest.fixture
def talker_description():
    return launch.LaunchDescription(
        [
            launch_ros.actions.Node(
                executable="talker",
                package="py_pubsub",
                output="screen",
            ),
        ]
    )

@pytest.fixture(scope="session")
def thread_pool():
    pool = ThreadPoolExecutor()
    yield pool
    pool.shutdown()

@pytest.fixture
def dummy_node(thread_pool):
    rclpy.init()
    node = DummyNode()
    thread_pool.submit(rclpy.spin, node)
    yield node
    node.destroy_node()
    rclpy.shutdown()

@pytest.mark.launch(fixture=talker_description)
def test_talker(dummy_node):
    for i in range(10):
        try:
            msg = dummy_node.topic_queue.get(timeout=2.0 / (i + 1))
            assert msg.data == f"Hello World: {i}", f"index={i}"
        except(queue.Empty):
            pytest.fail(f"no message. index={i}")
  • @launch_pytest.fixture を指定して、launch.LaunchDescriptionの配列を返す関数を定義する。ここにテスト対象ノードの情報を記述することで、テスト時にそのノードが立ち上がる。配列なので複数のノードを立ち上げ可能。
  • dummy_nodeというフィクスチャで先に定義したDummyNodeノードを生成して立ち上げている。ThreadPoolExecutorを使って別スレッドで動くようにしている。
  • テストは @pytest.mark.launch で修飾し、先に定義したlaunch.LaunchDescription配列を指定する。メッセージを受信する間隔がだんだん短くなるようにしているので、ループの5回目くらいでテストが失敗する。