Skip to content

Commit a0cebf3

Browse files
committed
Basic result handler for actions
1 parent ce437a1 commit a0cebf3

7 files changed

Lines changed: 171 additions & 87 deletions

File tree

example/joe.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,19 @@
1616
actor = Actor(settings=ActorSettings(name="joe", stateful=True))
1717

1818

19-
@actor.action("setLanguage")
20-
def set_language(request: Request, ctx: Context) -> Value:
19+
@actor.timer_action(every=1000)
20+
def hi(ctx: Context) -> Value:
21+
print("Context {}".format(ctx))
22+
new_state = JoeState()
23+
new_state.languages.append("portuguese")
2124
return Value()\
2225
.of("test")\
23-
.broadcast(Broadcast())\
24-
.effect(Effect())\
25-
.metada(Metadata())\
26-
.state({})\
26+
.state(new_state)\
2727
.reply()
2828

2929

30-
@actor.timer_action(every=1000)
31-
def hi(request: Request, ctx: Context) -> Value:
30+
@actor.action("setLanguage")
31+
def set_language(request: Request, ctx: Context) -> Value:
3232
return Value()\
3333
.of("test")\
3434
.broadcast(Broadcast())\

spawn/eigr/functions/actors/api/actor.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def register_timer_action_handler(function):
6363

6464

6565
def invoke(function, parameters):
66+
print("Parameters {}".format(parameters))
6667
ordered_parameters = []
6768
for parameter_definition in inspect.signature(function).parameters.values():
6869
annotation = parameter_definition.annotation
@@ -85,7 +86,12 @@ class ActorHandler:
8586
entity: Actor
8687

8788
def handle_action(self, action_name, input, ctx: Context):
88-
if action_name not in self.entity.action_handlers:
89-
raise Exception("Missing action handler function for Actor {} and action {}".format(
90-
self.entity.settings.name, action_name))
91-
return invoke(self.entity.action_handlers[action_name], [input, ctx])
89+
if action_name in self.entity.action_handlers:
90+
return invoke(self.entity.action_handlers[action_name], [input, ctx])
91+
elif action_name in self.entity.timer_action_handlers:
92+
action = self.entity.timer_action_handlers[action_name].action
93+
return invoke(action, [input, ctx])
94+
else:
95+
error = "Missing action handler function for Actor [{}] and Action [{}]".format(
96+
self.entity.settings.name, action_name)
97+
raise Exception(error)
Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,21 @@
11

2+
from dataclasses import dataclass, field
3+
from typing import MutableMapping
4+
5+
6+
@dataclass
27
class Metadata:
3-
def __init__(self):
4-
pass
8+
__map: MutableMapping[str, str] = field(default_factory=dict)
9+
__tags: MutableMapping[str, str] = field(default_factory=dict)
10+
11+
def put_metadata(self, key: str, value: str):
12+
self.__map[key] = value
13+
14+
def put_tag(self, key: str, value: str):
15+
self.__tags[key] = value
16+
17+
def get_metadata(self):
18+
return self.__map
19+
20+
def get_tags(self):
21+
return self.__tags

spawn/eigr/functions/actors/api/settings.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ def __init__(
1818
stateful: bool = True,
1919
state_type: Generic[T] = None,
2020
channel: str = None,
21-
deactivate_timeout: int = 30,
22-
snapshot_timeout: int = 10):
21+
deactivate_timeout: int = 30000,
22+
snapshot_timeout: int = 2000):
2323
self.name = name
2424
self.kind = kind
2525
self.stateful = stateful
Lines changed: 64 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# pylint: disable=too-few-public-methods
22
# pylint: disable=arguments-differ
3-
from abc import ABCMeta, abstractmethod
3+
from dataclasses import dataclass
4+
from enum import Enum
45

56
from spawn.eigr.functions.actors.api.metadata import Metadata
67
from spawn.eigr.functions.actors.api.workflows.broadcast import Broadcast
@@ -9,84 +10,86 @@
910
from spawn.eigr.functions.actors.api.workflows.pipe import Pipe
1011

1112

12-
class IValue(metaclass=ABCMeta):
13-
"The Builder Interface"
13+
class ReplyKind(str, Enum):
14+
REPLY = 'REPLY'
15+
NO_REPLY = 'NO_REPLY'
1416

15-
@staticmethod
16-
@abstractmethod
17-
def of(response, state=None):
18-
"Build value from response and state"
1917

20-
@staticmethod
21-
@abstractmethod
22-
def state(state):
23-
"Build value only from state"
18+
@dataclass
19+
class Value():
20+
"The Concrete Builder."
21+
__state = None
22+
__response = None
23+
__metadata: Metadata = None
24+
__broadcast: Broadcast = None
25+
__effect: Effect = None
26+
__forward: Forward = None
27+
__pipe: Pipe = None
28+
__reply_kind: ReplyKind = ReplyKind.REPLY
2429

25-
@staticmethod
26-
@abstractmethod
27-
def metadata(metadata: Metadata):
28-
"Build value with metadata"
30+
def get_state(self):
31+
return self.__state
2932

30-
@staticmethod
31-
@abstractmethod
32-
def value(response):
33-
"Build value from response"
33+
def get_response(self):
34+
return self.__response
3435

35-
@staticmethod
36-
@abstractmethod
37-
def broadcast(broadcast: Broadcast):
38-
"Create Broadcast"
36+
def get_metadata(self):
37+
return self.__metadata
3938

40-
@staticmethod
41-
@abstractmethod
42-
def effect(effect: Effect):
43-
"Create Effect"
39+
def get_broadcast(self):
40+
return self.__broadcast
4441

45-
@staticmethod
46-
@abstractmethod
47-
def forward(forward: Forward):
48-
"Create Forward"
42+
def get_broadcast(self):
43+
return self.__broadcast
4944

50-
@staticmethod
51-
@abstractmethod
52-
def pipe(pipe: Pipe):
53-
"Create Pipe"
45+
def get_effect(self):
46+
return self.__effect
5447

55-
@staticmethod
56-
@abstractmethod
57-
def reply():
58-
"Create Value response"
48+
def get_forward(self):
49+
return self.__forward
5950

51+
def get_pipe(self):
52+
return self.__pipe
6053

61-
class Value(IValue):
62-
"The Concrete Builder."
54+
def get_reply_kind(self):
55+
return self.__reply_kind
6356

64-
def __init__(self):
65-
pass
57+
def of(self, value, state=None):
58+
self.__response = value
59+
self.__state = state
60+
return self
6661

67-
def of(response, state=None):
68-
pass
62+
def state(self, state):
63+
self.__state = state
64+
return self
6965

70-
def state(state):
71-
pass
66+
def metadata(self, metadata: Metadata):
67+
self.__metadata = metadata
68+
return self
7269

73-
def metadata(metadata: Metadata):
74-
pass
70+
def value(self, value):
71+
self.__response = value
72+
return self
7573

76-
def value(response):
77-
pass
74+
def broadcast(self, broadcast: Broadcast):
75+
self.__broadcast = broadcast
76+
return self
7877

79-
def broadcast(broadcast: Broadcast):
80-
pass
78+
def effect(self, effect: Effect):
79+
self.__effect = effect
80+
return self
8181

82-
def effect(effect: Effect):
83-
pass
82+
def forward(self, forward: Forward):
83+
self.__forward = forward
84+
return self
8485

85-
def forward(forward: Forward):
86-
pass
86+
def pipe(self, pipe: Pipe):
87+
self.__pipe = pipe
88+
return self
8789

88-
def pipe(pipe: Pipe):
89-
pass
90+
def reply(self):
91+
return self
9092

91-
def reply():
92-
pass
93+
def noreply(self):
94+
self.__reply_kind = ReplyKind.NO_REPLY
95+
return self
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11

22
class Effect:
33
def __init__(self):
4-
pass
4+
self

spawn/eigr/functions/actors/internal/controller.py

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66

77
from spawn.eigr.functions.actors.api.actor import Actor as ActorEntity
88
from spawn.eigr.functions.actors.api.actor import ActorHandler
9+
from spawn.eigr.functions.actors.api.context import Context as ActorContext
10+
from spawn.eigr.functions.actors.api.value import Value, ReplyKind
911
from spawn.eigr.functions.actors.api.settings import Kind as ActorKind
10-
from spawn.eigr.functions.actors.api.actor import TimerFunction
1112

1213
from spawn.eigr.functions.protocol.actors.actor_pb2 import (
1314
Actor,
@@ -29,24 +30,75 @@
2930
ActorInvocation,
3031
ActorInvocationResponse,
3132
Context,
33+
Noop,
3234
RegistrationRequest,
3335
RegistrationResponse,
3436
ServiceInfo,
3537
)
3638

39+
from google.protobuf import symbol_database as _symbol_database
40+
from google.protobuf.any_pb2 import Any as AnyProto
41+
3742
import logging
3843
import platform
3944
import requests
4045

4146
from typing import Any, MutableMapping
4247

48+
_sym_db = _symbol_database.Default()
49+
4350
_DEFAULT_HEADERS = {
4451
"Accept": "application/octet-stream",
4552
"Content-Type": "application/octet-stream",
4653
}
4754

4855
_REGISTER_URI = "/api/v1/system"
4956

57+
TYPE_URL_PREFIX = 'type.googleapis.com/'
58+
59+
60+
def get_payload(input):
61+
input_type: str = input.payload.type_url
62+
if input_type.startswith(TYPE_URL_PREFIX):
63+
input_type = input_type[len(TYPE_URL_PREFIX):]
64+
input_class = _sym_db.GetSymbol(input_type)
65+
input = input_class()
66+
input.ParseFromString(input.payload.value)
67+
return input
68+
69+
70+
def pack(input):
71+
any = AnyProto()
72+
any.Pack(input)
73+
return any
74+
75+
76+
def handle_response(system, actor_name, result):
77+
print("Result ----- {}".format(result))
78+
actor_invocation_response = ActorInvocationResponse()
79+
actor_invocation_response.actor_name = actor_name
80+
actor_invocation_response.actor_system = system
81+
82+
updated_context = Context()
83+
84+
if result.get_metadata() != None and len(result.get_metadata().get_metadata()) > 0:
85+
print("Metadata -----------".format(result.metadata))
86+
updated_context.metadata = result.get_metadata.get_metadata()
87+
88+
if result.get_metadata() != None and len(result.get_metadata().get_tags()) > 0:
89+
updated_context.tags = result.get_metadata().get_tags()
90+
91+
updated_context.state.CopyFrom(pack(result.get_state()))
92+
93+
actor_invocation_response.updated_context.CopyFrom(updated_context)
94+
95+
if result.get_reply_kind() == ReplyKind.NO_REPLY:
96+
actor_invocation_response.noop = Noop()
97+
elif result.get_reply_kind == ReplyKind.REPLY:
98+
actor_invocation_response.value = pack(result.get_response())
99+
100+
return actor_invocation_response
101+
50102

51103
class ActorController:
52104
_instance = None
@@ -81,17 +133,23 @@ def handle_invoke(self, data) -> ActorInvocationResponse:
81133
entity = self.actors[actor_name] if not actor_parent else self.actors[actor_parent]
82134

83135
handler = ActorHandler(entity)
136+
current_context = actor_invocation.current_context
137+
138+
input = None if actor_invocation.WhichOneof(
139+
"payload") == "noop" else get_payload(actor_invocation.value)
140+
141+
ctx = ActorContext(state=None, caller=actor_invocation.caller.name,
142+
metadata=current_context.metadata, tags=current_context.tags)
84143

85-
# Update Context
86-
updated_context = Context()
144+
result = handler.handle_action(
145+
action_name=actor_invocation.action_name, input=input, ctx=ctx)
87146

88-
# Then send ActorInvocationResponse back to the caller
89-
actor_invocation_response = ActorInvocationResponse()
90-
actor_invocation_response.actor_name = actor_name
91-
actor_invocation_response.actor_system = actor_system
92-
actor_invocation_response.updated_context.CopyFrom(updated_context)
147+
if not isinstance(result, Value):
148+
raise Exception(
149+
"Action did not return a valid type in its response. Valid Value found {}".format(type(result)))
93150

94-
return actor_invocation_response
151+
# Handle result value
152+
return handle_response(actor_system, actor_name, result)
95153

96154
def register(self):
97155
logging.info("Registering Actors on the Proxy %s", self.actors)

0 commit comments

Comments
 (0)