Skip to content

Commit cfbcb75

Browse files
committed
Feat. Initial client api
1 parent d421e28 commit cfbcb75

6 files changed

Lines changed: 207 additions & 43 deletions

File tree

example/spawn_example.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,26 @@
22
Copyright 2022 Eigr.
33
Licensed under the Apache License, Version 2.0.
44
"""
5+
from spawn.eigr.functions.actors.api.reference import ActorRef
56
from spawn.eigr.functions.actors.api.sdk import Spawn
67
from example.joe import actor as joe_actor
7-
from example.domain.domain_pb2 import Reply, Request
8+
from example.unnamed_actor import abstract
9+
from example.domain.domain_pb2 import Request
810

911
if __name__ == "__main__":
1012
request = Request()
1113
request.language = "erlang"
1214

1315
spawn = Spawn()
1416
spawn.port(8091).proxy_port(9003).actor_system(
15-
"spawn-system").add_actor(joe_actor).start()
17+
"spawn-system").add_actor(joe_actor).add_actor(abstract).start()
18+
19+
# Get abstract actor reference called mike
20+
mike_actor: ActorRef = Spawn.create_actor_ref(
21+
system="spawn-system",
22+
actor_name="mike",
23+
parent="abs_actor",
24+
state_revision=1
25+
)
26+
27+
# mike_actor.invoke(request)

example/unnamed_actor.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
Copyright 2022 Eigr.
3+
Licensed under the Apache License, Version 2.0.
4+
"""
5+
from domain.domain_pb2 import Request, Reply
6+
from spawn.eigr.functions.actors.api.actor import Actor
7+
from spawn.eigr.functions.actors.api.settings import ActorSettings, Kind
8+
from spawn.eigr.functions.actors.api.context import Context
9+
from spawn.eigr.functions.actors.api.value import Value
10+
11+
abstract = Actor(settings=ActorSettings(
12+
name="abs_actor", stateful=True, kind=Kind.UNNAMED))
13+
14+
15+
@abstract.action("setLanguage")
16+
def set_language(request: Request, ctx: Context) -> Value:
17+
reply = Reply()
18+
reply.response = "erlang"
19+
return Value().of(reply, ctx.state).reply()
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
2+
from spawn.eigr.functions.actors.internal.client import SpawnClient
3+
4+
from spawn.eigr.functions.protocol.actors.actor_pb2 import (
5+
Actor,
6+
ActorId,
7+
ActorState,
8+
Metadata,
9+
ActorSettings,
10+
Action,
11+
FixedTimerAction,
12+
ActorSnapshotStrategy,
13+
ActorDeactivationStrategy,
14+
ActorSystem,
15+
Kind,
16+
Registry,
17+
TimeoutStrategy,
18+
)
19+
20+
from spawn.eigr.functions.protocol.actors.protocol_pb2 import (
21+
SpawnRequest,
22+
SpawnResponse,
23+
InvocationRequest,
24+
InvocationResponse,
25+
RegistrationRequest,
26+
RegistrationResponse
27+
)
28+
29+
30+
def spawn(client: SpawnClient, system: str, actor: str, parent: str, revision: int = None):
31+
actor_id = ActorId()
32+
actor_id.name = actor
33+
actor_id.system = system
34+
35+
if parent != None:
36+
actor_id.parent = parent
37+
38+
request: SpawnRequest = SpawnRequest()
39+
request.actors.append(actor_id)
40+
spawn_response: SpawnResponse = client.spawn(system, request, revision)
41+
return spawn_response
42+
43+
44+
class ActorRef:
45+
def __init__(self, client: SpawnClient, system: str, actor: str, parent: str = None, revision: int = None):
46+
self.__spawn_client = client
47+
self.actor_system = system
48+
self.actor_name = actor
49+
self.actor_parent = parent
50+
self.revision = revision
51+
if parent != None:
52+
spawn(self.__spawn_client, self.actor_system,
53+
self.actor_name, self.actor_parent, self.revision)
54+
55+
def invoke(self, request: any):
56+
pass

spawn/eigr/functions/actors/api/sdk.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from typing import MutableMapping
99

1010
from spawn.eigr.functions.actors.api.actor import Actor
11+
from spawn.eigr.functions.actors.api.reference import ActorRef
12+
from spawn.eigr.functions.actors.internal.client import SpawnClient
1113
from spawn.eigr.functions.actors.internal.controller import ActorController
1214

1315
from google.protobuf.any_pb2 import Any as ProtoAny
@@ -24,6 +26,7 @@ def create_app(controller: ActorController):
2426

2527
@app.route('/api/v1/actors/actions', methods=["POST"])
2628
def action():
29+
print(request)
2730
data = request.data
2831

2932
actor_invocation_response = controller.handle_invoke(data)
@@ -54,31 +57,33 @@ class Spawn:
5457
__actor_entities: MutableMapping[str,
5558
Actor] = field(default_factory=dict)
5659

57-
# @staticmethod
58-
# def invoke(name: str, command: str, arg: Any, output_type: Any) -> Any:
59-
# actorController = ActorController(
60-
# os.environ.get("PROXY_HOST", "localhost"),
61-
# os.environ.get("PROXY_PORT", "9002"),
62-
# )
63-
# actorController.invoke(name, command, arg, output_type)
60+
@staticmethod
61+
def create_actor_ref(system: str, actor_name: str, parent: str = None, state_revision: int = None) -> ActorRef:
62+
client = SpawnClient()
63+
return ActorRef(client, system, actor_name, parent, state_revision)
6464

6565
def host(self, address: str):
6666
"""Set the Network Host address."""
6767
self.__host = address
68+
os.environ["USER_FUNCTION_HOST"] = address
6869
return self
6970

7071
def port(self, port: int):
7172
"""Set the Network Port address."""
7273
self.__port = port
74+
os.environ["USER_FUNCTION_PORT"] = str(port)
7375
return self
7476

7577
def proxy_host(self, host: str):
7678
"""Set the Spawn Proxy Host Address"""
7779
self.__proxy_host = host
80+
os.environ["PROXY_HTTP_HOST"] = host
7881
return self
7982

8083
def proxy_port(self, port: int):
81-
self.__proxy_port = str(port)
84+
port_str: str = str(port)
85+
self.__proxy_port = port_str
86+
os.environ["PROXY_HTTP_PORT"] = port_str
8287
return self
8388

8489
def actor_system(self, system: str = None):
@@ -93,13 +98,16 @@ def add_actor(self, actor: Actor):
9398

9499
def start(self):
95100
"""Start the user function and HTTP Server."""
101+
import time
96102
if not self.__system:
97103
raise Exception(
98104
"ActorSystem cannot be None. Use actor_system function to set an ActorSystem")
99105

100106
address = "{}:{}".format(self.__host, self.__port)
107+
client = SpawnClient()
108+
101109
self.__controller = ActorController(
102-
self.__proxy_host, self.__proxy_port, self.__system, self.__actor_entities)
110+
client, self.__system, self.__actor_entities)
103111

104112
self.__app = create_app(controller=self.__controller)
105113

@@ -111,8 +119,8 @@ def start(self):
111119
try:
112120
server.start()
113121
client.start()
114-
server.join()
115-
client.join()
122+
# temporary
123+
time.sleep(2)
116124
except IOError as e:
117125
logging.error("Error on start Spawn %s", e.__cause__)
118126

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
2+
import requests
3+
import os
4+
5+
from requests.adapters import HTTPAdapter, Retry
6+
7+
from spawn.eigr.functions.protocol.actors.protocol_pb2 import (
8+
SpawnRequest,
9+
SpawnResponse,
10+
InvocationRequest,
11+
InvocationResponse,
12+
RegistrationRequest,
13+
RegistrationResponse
14+
)
15+
16+
_DEFAULT_HEADERS = {
17+
"Accept": "application/octet-stream",
18+
"Content-Type": "application/octet-stream",
19+
}
20+
_DEFAULT_MAX_RETRIES = 100
21+
_DEFAULT_MAX_RETRIES_BACKOFF_FACTOR = 0.2
22+
23+
req = requests.Session()
24+
retries = Retry(total=_DEFAULT_MAX_RETRIES,
25+
backoff_factor=_DEFAULT_MAX_RETRIES_BACKOFF_FACTOR)
26+
req.mount('http://', HTTPAdapter(max_retries=retries))
27+
28+
29+
class SpawnClient:
30+
_instance = None
31+
32+
def __init__(self, host: str = None, port: str = None):
33+
self.host = os.environ.get(
34+
"PROXY_HTTP_HOST", "0.0.0.0") if not host else host
35+
self.port = os.environ.get(
36+
"PROXY_HTTP_PORT", "9001") if not port else port
37+
38+
def __new__(cls, *args, **kwargs):
39+
if not isinstance(cls._instance, cls):
40+
cls._instance = object.__new__(cls)
41+
42+
return cls._instance
43+
44+
def register(self, request: RegistrationRequest) -> RegistrationResponse:
45+
proxy_url = "http://{}:{}{}".format(self.host,
46+
self.port, "/api/v1/system")
47+
48+
binary_payload = request.SerializeToString()
49+
50+
resp = req.post(
51+
proxy_url, data=binary_payload, headers=_DEFAULT_HEADERS
52+
)
53+
54+
databytes = bytes(resp.content)
55+
response = RegistrationResponse()
56+
response.ParseFromString(databytes)
57+
return response
58+
59+
def spawn(self, system: str, request: SpawnRequest, revision: int = None) -> SpawnResponse:
60+
proxy_url: str = None
61+
if not revision:
62+
proxy_url = "http://{}:{}/api/v1/system/{}/actors/spawn".format(self.host,
63+
self.port, system)
64+
else:
65+
proxy_url = "http://{}:{}/api/v1/system/{}/actors/spawn?revision={}".format(self.host,
66+
self.port, system, revision)
67+
68+
binary_payload = request.SerializeToString()
69+
70+
resp = req.post(
71+
proxy_url, data=binary_payload, headers=_DEFAULT_HEADERS
72+
)
73+
74+
databytes = bytes(resp.content)
75+
response = SpawnResponse()
76+
response.ParseFromString(databytes)
77+
return response
78+
79+
def invoke(self, system: str, actor: str, request: InvocationRequest) -> InvocationResponse:
80+
proxy_url = "http://{}:{}/api/v1/system/{}/actors/{}/invoke".format(self.host,
81+
self.port, system, actor)
82+
83+
binary_payload = request.SerializeToString()
84+
85+
resp = req.post(
86+
proxy_url, data=binary_payload, headers=_DEFAULT_HEADERS
87+
)
88+
89+
databytes = bytes(resp.content)
90+
response = InvocationResponse()
91+
response.ParseFromString(databytes)
92+
return response

spawn/eigr/functions/actors/internal/controller.py

Lines changed: 7 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
"""
66
import logging
77
import platform
8-
import requests
98

109
from spawn.eigr.functions.actors.api.actor import Actor as ActorEntity
1110
from spawn.eigr.functions.actors.api.actor import ActorHandler
1211
from spawn.eigr.functions.actors.api.context import Context as ActorContext
1312
from spawn.eigr.functions.actors.api.value import Value, ReplyKind
1413
from spawn.eigr.functions.actors.api.settings import Kind as ActorKind
14+
from spawn.eigr.functions.actors.internal.client import SpawnClient
1515

1616
from spawn.eigr.functions.protocol.actors.actor_pb2 import (
1717
Actor,
@@ -42,26 +42,12 @@
4242
from google.protobuf import symbol_database as _symbol_database
4343
from google.protobuf.any_pb2 import Any as AnyProto
4444

45-
from requests.adapters import HTTPAdapter, Retry
46-
4745
from typing import MutableMapping
4846

4947
_sym_db = _symbol_database.Default()
5048

51-
_DEFAULT_HEADERS = {
52-
"Accept": "application/octet-stream",
53-
"Content-Type": "application/octet-stream",
54-
}
55-
_DEFAULT_MAX_RETRIES = 100
56-
_DEFAULT_MAX_RETRIES_BACKOFF_FACTOR = 0.2
57-
_REGISTER_URI = "/api/v1/system"
5849
TYPE_URL_PREFIX = 'type.googleapis.com/'
5950

60-
req = requests.Session()
61-
retries = Retry(total=_DEFAULT_MAX_RETRIES,
62-
backoff_factor=_DEFAULT_MAX_RETRIES_BACKOFF_FACTOR)
63-
req.mount('http://', HTTPAdapter(max_retries=retries))
64-
6551

6652
def get_payload(input):
6753
input_type: str = input.type_url
@@ -126,9 +112,8 @@ def handle_broadcast(value_broadcast):
126112
class ActorController:
127113
_instance = None
128114

129-
def __init__(self, host: str, port: str, system: str, actors: MutableMapping[str, ActorEntity]):
130-
self.host = host
131-
self.port = port
115+
def __init__(self, client: SpawnClient, system: str, actors: MutableMapping[str, ActorEntity]):
116+
self.client = client
132117
self.system = system
133118
self.actors = actors
134119

@@ -180,19 +165,16 @@ def handle_invoke(self, data) -> ActorInvocationResponse:
180165
return handle_response(actor_system, actor_name, result)
181166

182167
def register(self):
183-
logging.info("Registering Actors on the Proxy %s", self.actors)
168+
logging.info("Registering Actors on the Proxy")
184169
try:
185170

186-
proxy_url = "http://{}:{}{}".format(self.host,
187-
self.port, _REGISTER_URI)
188-
189171
registry = Registry()
190172
actor_system = ActorSystem()
191173
actor_system.name = self.system
192174

193175
for actor_name, entity in self.actors.items():
194-
logging.info("Registering Actor %s with Config %s",
195-
actor_name, entity)
176+
logging.debug("Registering Actor %s with Config %s",
177+
actor_name, entity)
196178

197179
# Create actor params via ActorEntity
198180
actor_template = Actor()
@@ -289,12 +271,7 @@ def register(self):
289271
request.service_info.CopyFrom(service_info)
290272
request.actor_system.CopyFrom(actor_system)
291273

292-
binary_payload = request.SerializeToString()
293-
294-
resp = req.post(
295-
proxy_url, data=binary_payload, headers=_DEFAULT_HEADERS
296-
)
297-
274+
resp = self.client.register(request)
298275
logging.info("Actors register response %s", resp)
299276
except Exception as e:
300277
logging.error("ERROR: %s", e)

0 commit comments

Comments
 (0)