Skip to content

Commit 50ff10b

Browse files
Added async option
1 parent b28e279 commit 50ff10b

8 files changed

Lines changed: 236 additions & 29 deletions

File tree

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2020 interakt.ai
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

setup.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
readme = f.read()
2424

2525
install_requires = [
26-
"requests>=2.20,<3.0"
26+
"requests>=2.20,<3.0",
27+
"backoff==1.10.0"
2728
]
2829

2930
tests_require = [
@@ -36,16 +37,14 @@
3637

3738
setup(
3839
name='interakt-track-python',
39-
packages=['track'],
40+
packages=['track', 'track.tests'],
4041
version=VERSION,
41-
# TODO : Add repository url
42-
url='',
42+
url='https://github.com/interakt/track-python',
43+
download_url=f'https://github.com/interakt/track-python/archive/v{VERSION}.tar.gz',
4344
author="Amar Jaiswal",
4445
author_email="amar.j@cawstudios.com",
4546
maintainer="interakt.ai",
46-
# Chose a license from here: https://help.github.com/articles/licensing-a-repository
47-
# TODO : Add License
48-
license='',
47+
license='MIT License',
4948
description='The easy way to integrate track apis for interakt',
5049
keywords=['INTERAKT', 'KIWI'],
5150
install_requires=install_requires,
@@ -57,12 +56,8 @@
5756
'Development Status :: 3 - Alpha',
5857
'Intended Audience :: Developers',
5958
'Topic :: Software Development :: Build Tools',
60-
# TODO: Add license
61-
# 'License :: OSI Approved :: MIT License', # Again, pick a license
59+
'License :: OSI Approved :: MIT License',
6260
"Operating System :: OS Independent",
63-
'Programming Language :: Python :: 3',
64-
'Programming Language :: Python :: 3.4',
65-
'Programming Language :: Python :: 3.5',
6661
'Programming Language :: Python :: 3.6',
6762
'Programming Language :: Python :: 3.7',
6863
'Programming Language :: Python :: 3.8',

track/__init__.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,55 @@
66
write_key = None
77
default_client = None
88
host = None
9-
sync_mode = True
9+
sync_mode = False
1010
debug = False
1111
timeout = 10
12+
max_retries = 3
13+
on_error = None
14+
max_queue_size = 10000
1215

1316

1417
def identify(user_id=None, country_code='+91', phone_number=None, traits={}):
1518
"""Send an identify call for customer"""
16-
return _proxy('identify', user_id=user_id, country_code=country_code, phone_number=phone_number, traits=traits)
19+
return _proxy('identify', user_id=user_id, country_code=country_code,
20+
phone_number=phone_number, traits=traits)
1721

1822

1923
def event(user_id=None, event=None, traits={}):
2024
"""Send an event track call."""
2125
return _proxy('event', user_id=user_id, event=event, traits=traits)
2226

2327

28+
def flush():
29+
"""Tell the client to flush."""
30+
_proxy('flush')
31+
32+
33+
def join():
34+
"""Block program until the client clears the queue"""
35+
_proxy('join')
36+
37+
38+
def shutdown():
39+
"""Flush all messages and cleanly shutdown the client"""
40+
_proxy('flush')
41+
_proxy('join')
42+
43+
2444
def _proxy(method, *args, **kwargs):
2545
"""Create an analytics client if one doesn't exist and send to it."""
2646
global default_client
2747
if not default_client:
2848
default_client = Client(
29-
write_key, host=host, debug=debug, sync_mode=sync_mode, timeout=timeout)
49+
write_key=write_key,
50+
host=host,
51+
debug=debug,
52+
sync_mode=sync_mode,
53+
timeout=timeout,
54+
max_queue_size=max_queue_size,
55+
on_error=on_error,
56+
max_retries=max_retries
57+
)
3058

3159
fn = getattr(default_client, method)
3260
return fn(*args, **kwargs)

track/client.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,46 @@
1-
from track.utils import require, stringify
2-
from track.request import post
3-
from track.const import ApiPaths
41
import logging
52
import numbers
3+
4+
from track.const import ApiPaths
5+
from track.consumer import Consumer
6+
from track.request import post
7+
from track.utils import require, stringify
8+
9+
try:
10+
import queue
11+
except ImportError:
12+
import Queue as queue
13+
614
ID_TYPES = (numbers.Number, str)
715

816

917
class Client(object):
1018
logger = logging.getLogger('interakt')
1119

12-
def __init__(self, write_key=None, host=None, debug=False, sync_mode=True, timeout=10):
20+
def __init__(self, write_key=None, host=None, debug=False,
21+
sync_mode=True, timeout=10, max_queue_size=10000, on_error=None, max_retries=3):
1322
"""Create a new interakt client"""
1423
require('write_key', write_key, str)
24+
25+
self.queue = queue.Queue(maxsize=max_queue_size)
1526
self.write_key = write_key
1627
self.debug = debug
1728
self.host = host
1829
self.sync_mode = sync_mode
1930
self.timeout = timeout
31+
self.on_error = on_error
2032
if debug:
2133
self.logger.setLevel(logging.DEBUG)
2234

35+
if sync_mode:
36+
self.consumer = None
37+
else:
38+
self.consumer = Consumer(
39+
queue=self.queue, write_key=write_key,
40+
host=host, on_error=on_error, retries=max_retries,
41+
timeout=timeout
42+
)
43+
2344
def identify(self, user_id=None, country_code='+91', phone_number=None, traits={}):
2445
"""Tie a user to their actions and record traits about them."""
2546
if not user_id and not phone_number:
@@ -29,26 +50,64 @@ def identify(self, user_id=None, country_code='+91', phone_number=None, traits={
2950
if phone_number:
3051
require('phone_number', phone_number, str)
3152
require('traits', traits, dict)
32-
msg = {
53+
body = {
3354
'userId': stringify(val=user_id),
3455
'countryCode': country_code,
3556
'phoneNumber': phone_number,
3657
'traits': traits
3758
}
38-
return self.__send_request(path=ApiPaths.Identify.value, msg=msg)
59+
return self.__queue_request(path=ApiPaths.Identify.value, body=body)
3960

4061
def event(self, user_id=None, event=None, traits={}):
4162
"""To record user events"""
4263
traits = traits or {}
4364
require('user_id', user_id, ID_TYPES)
4465
require('traits', traits, dict)
4566
require('event', event, str)
46-
msg = {
67+
body = {
4768
'userId': stringify(val=user_id),
4869
'event': event,
4970
'traits': traits
5071
}
51-
return self.__send_request(path=ApiPaths.Event.value, msg=msg)
72+
return self.__queue_request(path=ApiPaths.Event.value, body=body)
73+
74+
def flush(self):
75+
"""Forces a flush from the internal queue to the server"""
76+
queue = self.queue
77+
size = queue.qsize()
78+
queue.join()
79+
# Note that this message may not be precise, because of threading.
80+
self.logger.debug('successfully flushed about %s items.', size)
5281

53-
def __send_request(self, path, msg):
54-
return post(self.write_key, host=self.host, path=path, body=msg, timeout=self.timeout)
82+
def join(self):
83+
"""Ends the consumer thread once the queue is empty.
84+
Blocks execution until finished
85+
"""
86+
self.consumer.pause()
87+
try:
88+
self.consumer.join()
89+
except RuntimeError:
90+
# consumer thread has not started
91+
pass
92+
93+
def shutdown(self):
94+
"""Flush all messages and cleanly shutdown the client"""
95+
self.flush()
96+
self.join()
97+
98+
def __queue_request(self, path, body):
99+
# Directly call api in sync mode and return response
100+
if self.sync_mode:
101+
return post(self.write_key, host=self.host, path=path, body=body, timeout=self.timeout)
102+
103+
queue_msg = {
104+
'path': path,
105+
'body': body
106+
}
107+
try:
108+
self.queue.put(queue_msg, block=False)
109+
self.logger.debug(f'Enqueued msg for {path}')
110+
return True, queue_msg
111+
except queue.Full:
112+
self.logger.warning('track-python queue is full')
113+
return False, queue_msg

track/consumer.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
from threading import Thread
3+
from track.request import post, APIError
4+
import backoff
5+
6+
try:
7+
from queue import Empty
8+
except ImportError:
9+
from Queue import Empty
10+
11+
12+
class Consumer(Thread):
13+
"""Consumes the messages from the client's queue."""
14+
logger = logging.getLogger('interakt')
15+
16+
def __init__(self, queue, write_key, host=None,
17+
on_error=None, retries=3, timeout=10):
18+
Thread.__init__(self)
19+
self.running = True
20+
self.queue = queue
21+
self.write_key = write_key
22+
self.host = host
23+
self.on_error = on_error
24+
self.retries = retries
25+
self.timeout = timeout
26+
27+
def run(self):
28+
"""Runs the consumer."""
29+
self.logger.debug('consumer is running...')
30+
while self.running:
31+
self.upload()
32+
33+
self.logger.debug('consumer exited.')
34+
35+
def pause(self):
36+
"""Pause the consumer."""
37+
self.running = False
38+
39+
def upload(self):
40+
queue = self.queue
41+
queue_msg = None
42+
try:
43+
queue_msg = queue.get(block=True)
44+
except Empty:
45+
self.logger.debug("queue is empty now")
46+
47+
if not queue_msg:
48+
self.logger.debug("Nothing left in queue exiting")
49+
return False
50+
try:
51+
self.request(queue_msg=queue_msg)
52+
success = True
53+
except Exception as e:
54+
self.logger.error(f"Error uploading: {e}")
55+
success = False
56+
if self.on_error:
57+
self.on_error(e, queue_msg)
58+
finally:
59+
self.queue.task_done()
60+
return success
61+
62+
def request(self, queue_msg):
63+
"""Attempt to upload the queue_msg and retry before raising an error """
64+
65+
def fatal_exception(exc):
66+
if isinstance(exc, APIError):
67+
# retry on server errors and client errors
68+
# with 429 status code (rate limited),
69+
# don't retry on other client errors
70+
return (400 <= exc.status < 500) and exc.status != 429
71+
else:
72+
# retry on all other errors (eg. network)
73+
return False
74+
75+
@backoff.on_exception(
76+
backoff.expo,
77+
Exception,
78+
max_tries=self.retries + 1,
79+
giveup=fatal_exception)
80+
def send_request():
81+
post(write_key=self.write_key, host=self.host,
82+
path=queue_msg['path'], body=queue_msg['body'], timeout=self.timeout)
83+
84+
send_request()

track/request.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,26 @@ def post(write_key, host=None, path=None, body=None, timeout=10):
2222
logger.debug(f'Making request: {body}')
2323
response = _session.post(url=url, headers=headers,
2424
json=body, timeout=timeout)
25-
payload = response.json()
26-
logger.debug(f'Received response: {payload}')
27-
return response
25+
if response.status_code == 200:
26+
logger.debug("Data uploaded successfully")
27+
return response
28+
29+
try:
30+
payload = response.json()
31+
logger.debug(f'Received response: {payload}')
32+
raise APIError(payload.get("result"),
33+
response.status_code, payload.get("message"))
34+
except ValueError:
35+
raise APIError('Unknown', response.status_code, response.text)
36+
37+
38+
class APIError(Exception):
39+
40+
def __init__(self, status, code, message):
41+
self.message = message
42+
self.status = status
43+
self.code = code
44+
45+
def __str__(self):
46+
msg = "[Segment] {0}: {1} ({2})"
47+
return msg.format(self.code, self.message, self.status)

track/tests/__init__.py

Whitespace-only changes.

track/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = '0.0.4'
1+
VERSION = '0.0.4-alpha'

0 commit comments

Comments
 (0)