-
Notifications
You must be signed in to change notification settings - Fork 825
Expand file tree
/
Copy pathex04_ros_interop.py
More file actions
89 lines (65 loc) · 2.16 KB
/
ex04_ros_interop.py
File metadata and controls
89 lines (65 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
"""
Demonstrates interop of BehaviorTree.CPP Python bindings and ROS2 via rclpy.
You can publish the transform expected in the tree below using this command:
ros2 run tf2_ros static_transform_publisher \
--frame-id odom --child-frame-id base_link \
--x 1.0 --y 2.0
"""
import rclpy
from rclpy.node import Node
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
from btpy import (
BehaviorTreeFactory,
StatefulActionNode,
SyncActionNode,
NodeStatus,
ports,
)
xml_text = """
<root BTCPP_format="4" >
<BehaviorTree ID="MainTree">
<Sequence>
<ReactiveSequence name="root">
<Print value="{tf}" />
<GetRosTransform frame_id="odom" child_frame_id="base_link" tf="{tf}" />
</ReactiveSequence>
</Sequence>
</BehaviorTree>
</root>
"""
@ports(inputs=["frame_id", "child_frame_id"], outputs=["tf"])
class GetRosTransform(StatefulActionNode):
def __init__(self, name, config, node):
super().__init__(name, config)
self.node = node
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self.node)
def on_start(self):
return NodeStatus.RUNNING
def on_running(self):
frame_id = self.get_input("frame_id")
child_frame_id = self.get_input("child_frame_id")
time = self.node.get_clock().now()
if self.tf_buffer.can_transform(frame_id, child_frame_id, time):
tf = self.tf_buffer.lookup_transform(frame_id, child_frame_id, time)
self.set_output("tf", tf)
return NodeStatus.RUNNING
def on_halted(self):
pass
@ports(inputs=["value"])
class Print(SyncActionNode):
def tick(self):
value = self.get_input("value")
if value is not None:
print(value)
return NodeStatus.SUCCESS
rclpy.init()
node = Node("ex04_ros_interop")
factory = BehaviorTreeFactory()
factory.register(GetRosTransform, node)
factory.register(Print)
tree = factory.create_tree_from_text(xml_text)
node.create_timer(0.01, lambda: tree.tick_once())
rclpy.spin(node)