Skip to content

Commit 9f2e0a4

Browse files
authored
Merge pull request #321 from splitio/telemetry-storage-pluggable
Added telemetry pluggable storage with tests
2 parents 9f1c9b9 + c3802c6 commit 9f2e0a4

5 files changed

Lines changed: 330 additions & 25 deletions

File tree

splitio/models/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
MAX_LATENCY = 7481828
1818
MAX_LATENCY_BUCKET_COUNT = 23
1919
MAX_STREAMING_EVENTS = 20
20+
MAX_TAGS = 10
2021

2122
class CounterConstants(Enum):
2223
"""Impressions and events counters constants"""

splitio/storage/pluggable.py

Lines changed: 183 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import logging
44
import json
5+
import threading
56

67
from splitio.models import splits, segments
78
from splitio.models.impressions import Impression
8-
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage
9+
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, MAX_TAGS, get_latency_bucket_index
10+
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage
911

1012
_LOGGER = logging.getLogger(__name__)
1113

@@ -15,7 +17,14 @@ class PluggableSplitStorage(SplitStorage):
1517
_SPLIT_NAME_LENGTH = 12
1618

1719
def __init__(self, pluggable_adapter, prefix=None):
18-
"""Constructor."""
20+
"""
21+
Class constructor.
22+
23+
:param pluggable_adapter: Storage client or compliant interface.
24+
:type pluggable_adapter: TBD
25+
:param prefix: optional, prefix to storage keys
26+
:type prefix: str
27+
"""
1928
self._pluggable_adapter = pluggable_adapter
2029
self._prefix = "SPLITIO.split.{split_name}"
2130
self._traffic_type_prefix = "SPLITIO.trafficType.{traffic_type_name}"
@@ -302,7 +311,14 @@ class PluggableSegmentStorage(SegmentStorage):
302311
_TILL_LENGTH = 4
303312

304313
def __init__(self, pluggable_adapter, prefix=None):
305-
"""Constructor."""
314+
"""
315+
Class constructor.
316+
317+
:param pluggable_adapter: Storage client or compliant interface.
318+
:type pluggable_adapter: TBD
319+
:param prefix: optional, prefix to storage keys
320+
:type prefix: str
321+
"""
306322
self._pluggable_adapter = pluggable_adapter
307323
self._prefix = "SPLITIO.segment.{segment_name}"
308324
self._segment_till_prefix = "SPLITIO.segment.{segment_name}.till"
@@ -475,6 +491,7 @@ def put(self, segment):
475491

476492

477493
class PluggableImpressionsStorage(ImpressionStorage):
494+
"""Pluggable Impressions storage class."""
478495

479496
IMPRESSIONS_KEY_DEFAULT_TTL = 3600
480497

@@ -486,6 +503,8 @@ def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
486503
:type pluggable_adapter: TBD
487504
:param sdk_metadata: SDK & Machine information.
488505
:type sdk_metadata: splitio.client.util.SdkMetadata
506+
:param prefix: optional, prefix to storage keys
507+
:type prefix: str
489508
"""
490509
self._pluggable_adapter = pluggable_adapter
491510
self._sdk_metadata = {
@@ -573,18 +592,20 @@ def clear(self):
573592

574593

575594
class PluggableEventsStorage(EventStorage):
576-
"""Redis based event storage class."""
595+
"""Pluggable Event storage class."""
577596

578597
_EVENTS_KEY_DEFAULT_TTL = 3600
579598

580599
def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
581600
"""
582601
Class constructor.
583602
584-
:param redis_client: Redis client or compliant interface.
585-
:type redis_client: splitio.storage.adapters.redis.RedisAdapter
603+
:param pluggable_adapter: Storage client or compliant interface.
604+
:type pluggable_adapter: TBD
586605
:param sdk_metadata: SDK & Machine information.
587606
:type sdk_metadata: splitio.client.util.SdkMetadata
607+
:param prefix: optional, prefix to storage keys
608+
:type prefix: str
588609
"""
589610
self._pluggable_adapter = pluggable_adapter
590611
self._sdk_metadata = {
@@ -657,3 +678,159 @@ def clear(self):
657678
Clear data.
658679
"""
659680
raise NotImplementedError('Not supported for redis.')
681+
682+
683+
class PluggableTelemetryStorage(TelemetryStorage):
684+
"""Pluggable telemetry storage class."""
685+
686+
_TELEMETRY_KEY_DEFAULT_TTL = 3600
687+
688+
def __init__(self, pluggable_adapter, sdk_metadata, prefix=None):
689+
"""
690+
Class constructor.
691+
692+
:param pluggable_adapter: Storage client or compliant interface.
693+
:type pluggable_adapter: TBD
694+
:param sdk_metadata: SDK & Machine information.
695+
:type sdk_metadata: splitio.client.util.SdkMetadata
696+
:param prefix: optional, prefix to storage keys
697+
:type prefix: str
698+
"""
699+
self._lock = threading.RLock()
700+
self._reset_config_tags()
701+
self._pluggable_adapter = pluggable_adapter
702+
self._sdk_metadata = sdk_metadata.sdk_version + '/' + sdk_metadata.instance_name + '/' + sdk_metadata.instance_ip
703+
self._method_latencies = MethodLatencies()
704+
self._method_exceptions = MethodExceptions()
705+
self._tel_config = TelemetryConfig()
706+
self._telemetry_config_key = 'SPLITIO.telemetry.init'
707+
self._telemetry_latencies_key = 'SPLITIO.telemetry.latencies'
708+
self._telemetry_exceptions_key = 'SPLITIO.telemetry.exceptions'
709+
if prefix is not None:
710+
self._telemetry_config_key = prefix + "." + self._telemetry_config_key
711+
self._telemetry_latencies_key = prefix + "." + self._telemetry_latencies_key
712+
self._telemetry_exceptions_key = prefix + "." + self._telemetry_exceptions_key
713+
714+
def _reset_config_tags(self):
715+
"""Reset config tags."""
716+
with self._lock:
717+
self._config_tags = []
718+
719+
def add_config_tag(self, tag):
720+
"""
721+
Record tag string.
722+
723+
:param tag: tag to be added
724+
:type tag: str
725+
"""
726+
with self._lock:
727+
if len(self._config_tags) < MAX_TAGS:
728+
self._config_tags.append(tag)
729+
730+
def record_config(self, config, extra_config):
731+
"""
732+
initilize telemetry objects
733+
734+
:param config: factory configuration parameters
735+
:type config: Dict
736+
:param extra_config: any extra configs
737+
:type extra_config: Dict
738+
"""
739+
self._tel_config.record_config(config, extra_config)
740+
741+
def pop_config_tags(self):
742+
"""Get and reset configs."""
743+
with self._lock:
744+
tags = self._config_tags
745+
self._reset_config_tags()
746+
return tags
747+
748+
def push_config_stats(self):
749+
"""push config stats to storage."""
750+
self._pluggable_adapter.set(self._telemetry_config_key + "::" + self._sdk_metadata, str(self._format_config_stats()))
751+
752+
def _format_config_stats(self):
753+
"""format only selected config stats to json"""
754+
config_stats = self._tel_config.get_stats()
755+
return json.dumps({
756+
'aF': config_stats['aF'],
757+
'rF': config_stats['rF'],
758+
'sT': config_stats['sT'],
759+
'oM': config_stats['oM'],
760+
't': self.pop_config_tags()
761+
})
762+
763+
def record_active_and_redundant_factories(self, active_factory_count, redundant_factory_count):
764+
"""
765+
Record active and redundant factories.
766+
767+
:param active_factory_count: active factory count
768+
:type active_factory_count: int
769+
:param redundant_factory_count: redundant factory count
770+
:type redundant_factory_count: int
771+
"""
772+
self._tel_config.record_active_and_redundant_factories(active_factory_count, redundant_factory_count)
773+
774+
def record_latency(self, method, latency):
775+
"""
776+
record latency data
777+
778+
:param method: method name
779+
:type method: string
780+
:param latency: latency
781+
:type latency: int64
782+
"""
783+
bucket = get_latency_bucket_index(latency)
784+
latency_key = self._telemetry_latencies_key + '::' + self._sdk_metadata + '/' + method.value + '/' + str(bucket)
785+
result = self._pluggable_adapter.increment(latency_key, 1)
786+
self.expire_keys(latency_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
787+
788+
def record_exception(self, method):
789+
"""
790+
record an exception
791+
792+
:param method: method name
793+
:type method: string
794+
"""
795+
except_key = self._telemetry_exceptions_key + "::" + self._sdk_metadata + '/' + method.value
796+
result = self._pluggable_adapter.increment(except_key, 1)
797+
self.expire_keys(except_key, self._TELEMETRY_KEY_DEFAULT_TTL, 1, result)
798+
799+
def record_not_ready_usage(self):
800+
"""Not implemented"""
801+
pass
802+
803+
def record_bur_time_out(self):
804+
"""Not implemented"""
805+
pass
806+
807+
def record_impression_stats(self, data_type, count):
808+
"""Not implemented"""
809+
pass
810+
811+
def expire_latency_keys(self, total_keys, inserted):
812+
"""
813+
Set expire ttl for a latency key in storage
814+
815+
:param total_keys: length of keys.
816+
:type total_keys: int
817+
:param inserted: added keys.
818+
:type inserted: int
819+
"""
820+
self.expire_keys(self._telemetry_latencies_key, self._TELEMETRY_KEY_DEFAULT_TTL, total_keys, inserted)
821+
822+
def expire_keys(self, queue_key, key_default_ttl, total_keys, inserted):
823+
"""
824+
Set expire ttl for a key in storage if total keys equal inserted
825+
826+
:param queue_keys: key to be set
827+
:type queue_keys: str
828+
:param ey_default_ttl: ttl value
829+
:type ey_default_ttl: int
830+
:param total_keys: length of keys.
831+
:type total_keys: int
832+
:param inserted: added keys.
833+
:type inserted: int
834+
"""
835+
if total_keys == inserted:
836+
self._pluggable_adapter.expire(queue_key, key_default_ttl)

splitio/storage/redis.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from splitio.models.impressions import Impression
77
from splitio.models import splits, segments
8-
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig
8+
from splitio.models.telemetry import MethodExceptions, MethodLatencies, TelemetryConfig, get_latency_bucket_index
99
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \
1010
ImpressionPipelinedStorage, TelemetryStorage
1111
from splitio.storage.adapters.redis import RedisAdapterException
@@ -660,17 +660,9 @@ def add_latency_to_pipe(self, method, latency, pipe):
660660
:param pipe: Redis pipe.
661661
:type pipe: redis.pipe
662662
"""
663-
self._method_latencies.add_latency(method, latency)
664-
latencies = self._method_latencies.pop_all()['methodLatencies']
665-
values = latencies[method.value]
666-
total_keys = 0
667-
bucket_number = 0
668-
for bucket in values:
669-
if bucket > 0:
670-
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
671-
method.value + '/' + str(bucket_number), bucket)
672-
total_keys += 1
673-
bucket_number = bucket_number + 0
663+
bucket = get_latency_bucket_index(latency)
664+
pipe.hincrby(self._TELEMETRY_LATENCIES_KEY, self._sdk_metadata.sdk_version + '/' + self._sdk_metadata.instance_name + '/' + self._sdk_metadata.instance_ip + '/' +
665+
method.value + '/' + str(bucket), 1)
674666

675667
def record_latency(self, method, latency):
676668
"""

0 commit comments

Comments
 (0)