2023年02月14日(火) [長年日記]
■ [dev] ROS2でノードに対するテストを書く
ROS2にlaunch_pytestというものがあることを知った。これを使えばノードを立ち上げて外部からその振る舞いをテストすることができる。
Writing a simple publisher and subscriber (Python)にあるMinimalPublisherをテストするコードをメモ。
まず、テスト対象ノードと通信するテスト用ノードを作成する。topicトピックを購読して、受信したメッセージをキューに保存するだけ。
from queue import Queue
from rclpy.node import Node
from std_msgs.msg import String
class DummyNode(Node):
def __init__(self):
super().__init__("dummy_node")
self.topic_queue = Queue()
self.topic_subscription = self.create_subscription(
String, "topic", self._receive_topic, 10
)
def _receive_topic(self, msg):
self.topic_queue.put(msg)
テスト本体は次の通り。
import queue
import launch
import launch_pytest
import launch_ros
import pytest
import rclpy
from concurrent.futures import ThreadPoolExecutor
from dummy_node import DummyNode
@launch_pytest.fixture
def talker_description():
return launch.LaunchDescription(
[
launch_ros.actions.Node(
executable="talker",
package="py_pubsub",
output="screen",
),
]
)
@pytest.fixture(scope="session")
def thread_pool():
pool = ThreadPoolExecutor()
yield pool
pool.shutdown()
@pytest.fixture
def dummy_node(thread_pool):
rclpy.init()
node = DummyNode()
thread_pool.submit(rclpy.spin, node)
yield node
node.destroy_node()
rclpy.shutdown()
@pytest.mark.launch(fixture=talker_description)
def test_talker(dummy_node):
for i in range(10):
try:
msg = dummy_node.topic_queue.get(timeout=2.0 / (i + 1))
assert msg.data == f"Hello World: {i}", f"index={i}"
except(queue.Empty):
pytest.fail(f"no message. index={i}")
- @launch_pytest.fixture を指定して、launch.LaunchDescriptionの配列を返す関数を定義する。ここにテスト対象ノードの情報を記述することで、テスト時にそのノードが立ち上がる。配列なので複数のノードを立ち上げ可能。
- dummy_nodeというフィクスチャで先に定義したDummyNodeノードを生成して立ち上げている。ThreadPoolExecutorを使って別スレッドで動くようにしている。
- テストは @pytest.mark.launch で修飾し、先に定義したlaunch.LaunchDescription配列を指定する。メッセージを受信する間隔がだんだん短くなるようにしているので、ループの5回目くらいでテストが失敗する。
[ツッコミを入れる]