-
Notifications
You must be signed in to change notification settings - Fork 825
Expand file tree
/
Copy pathex03_stateful_nodes.py
More file actions
70 lines (55 loc) · 1.59 KB
/
ex03_stateful_nodes.py
File metadata and controls
70 lines (55 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python3
"""
Demonstration of stateful action nodes.
"""
import numpy as np
from btpy import (
BehaviorTreeFactory,
StatefulActionNode,
SyncActionNode,
NodeStatus,
ports,
)
xml_text = """
<root BTCPP_format="4" >
<BehaviorTree ID="MainTree">
<Sequence>
<!-- Interpolate from the initial position to the final one printing
at each step. -->
<ReactiveSequence name="root">
<Print value="{interpolated}" />
<Interpolate x0="[1.0, 0.0]" x1="[0.0, 1.0]" out="{interpolated}" />
</ReactiveSequence>
</Sequence>
</BehaviorTree>
</root>
"""
@ports(inputs=["x0", "x1"], outputs=["out"])
class Interpolate(StatefulActionNode):
def on_start(self):
self.t = 0.0
self.x0 = np.asarray(self.get_input("x0"))
self.x1 = np.asarray(self.get_input("x1"))
return NodeStatus.RUNNING
def on_running(self):
if self.t < 1.0:
x = (1.0 - self.t) * self.x0 + self.t * self.x1
self.set_output("out", x)
self.t += 0.1
return NodeStatus.RUNNING
else:
return NodeStatus.SUCCESS
def on_halted(self):
pass
@ports(inputs=["value"])
class Print(SyncActionNode):
def tick(self):
value = self.get_input("value")
if value is not None:
print(value)
return NodeStatus.SUCCESS
factory = BehaviorTreeFactory()
factory.register(Interpolate)
factory.register(Print)
tree = factory.create_tree_from_text(xml_text)
tree.tick_while_running()