11import logging
2+ import time
23
3- from splitio .api import APIException
4+ from splitio .api import APIException , FetchOptions
45from splitio .tasks .util import workerpool
56from splitio .models import segments
7+ from splitio .util .backoff import Backoff
68
79
810_LOGGER = logging .getLogger (__name__ )
911
1012
13+ _ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
14+ _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute
15+ _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
16+
17+
1118class SegmentSynchronizer (object ):
1219 def __init__ (self , segment_api , split_storage , segment_storage ):
1320 """
@@ -28,6 +35,9 @@ def __init__(self, segment_api, split_storage, segment_storage):
2835 self ._segment_storage = segment_storage
2936 self ._worker_pool = workerpool .WorkerPool (10 , self .synchronize_segment )
3037 self ._worker_pool .start ()
38+ self ._backoff = Backoff (
39+ _ON_DEMAND_FETCH_BACKOFF_BASE ,
40+ _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT )
3141
3242 def recreate (self ):
3343 """
@@ -44,27 +54,33 @@ def shutdown(self):
4454 """
4555 self ._worker_pool .stop ()
4656
47- def synchronize_segment (self , segment_name , till = None ):
57+ def _fetch_until (self , segment_name , fetch_options , till = None ):
4858 """
49- Update a segment from queue
59+ Hit endpoint, update storage and return when since==till.
5060
5161 :param segment_name: Name of the segment to update.
5262 :type segment_name: str
5363
54- :param till: ChangeNumber received.
64+ :param fetch_options Fetch options for getting segment definitions.
65+ :type fetch_options splitio.api.FetchOptions
66+
67+ :param till: Passed till from Streaming.
5568 :type till: int
5669
70+ :return: last change number
71+ :rtype: int
5772 """
58- while True :
73+ while True : # Fetch until since==till
5974 change_number = self ._segment_storage .get_change_number (segment_name )
6075 if change_number is None :
6176 change_number = - 1
6277 if till is not None and till < change_number :
6378 # the passed till is less than change_number, no need to perform updates
64- return
79+ return change_number
6580
6681 try :
67- segment_changes = self ._api .fetch_segment (segment_name , change_number )
82+ segment_changes = self ._api .fetch_segment (segment_name , change_number ,
83+ fetch_options )
6884 except APIException as exc :
6985 _LOGGER .error ('Exception raised while fetching segment %s' , segment_name )
7086 _LOGGER .debug ('Exception information: ' , exc_info = True )
@@ -82,7 +98,63 @@ def synchronize_segment(self, segment_name, till=None):
8298 )
8399
84100 if segment_changes ['till' ] == segment_changes ['since' ]:
85- return
101+ return segment_changes ['till' ]
102+
103+ def _attempt_segment_sync (self , segment_name , fetch_options , till = None ):
104+ """
105+ Hit endpoint, update storage and return True if sync is complete.
106+
107+ :param segment_name: Name of the segment to update.
108+ :type segment_name: str
109+
110+ :param fetch_options Fetch options for getting split definitions.
111+ :type fetch_options splitio.api.FetchOptions
112+
113+ :param till: Passed till from Streaming.
114+ :type till: int
115+
116+ :return: Flags to check if it should perform bypass or operation ended
117+ :rtype: bool, int, int
118+ """
119+ self ._backoff .reset ()
120+ remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
121+ while True :
122+ remaining_attempts -= 1
123+ change_number = self ._fetch_until (segment_name , fetch_options , till )
124+ if till is None or till <= change_number :
125+ return True , remaining_attempts , change_number
126+ elif remaining_attempts <= 0 :
127+ return False , remaining_attempts , change_number
128+ how_long = self ._backoff .get ()
129+ time .sleep (how_long )
130+
131+ def synchronize_segment (self , segment_name , till = None ):
132+ """
133+ Update a segment from queue
134+
135+ :param segment_name: Name of the segment to update.
136+ :type segment_name: str
137+
138+ :param till: ChangeNumber received.
139+ :type till: int
140+
141+ """
142+ fetch_options = FetchOptions (True ) # Set Cache-Control to no-cache
143+ successful_sync , remaining_attempts , change_number = self ._attempt_segment_sync (segment_name , fetch_options , till )
144+ attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
145+ if successful_sync : # succedeed sync
146+ _LOGGER .debug ('Refresh completed in %s attempts.' , attempts )
147+ return
148+ with_cdn_bypass = FetchOptions (True , change_number ) # Set flag for bypassing CDN
149+ without_cdn_successful_sync , remaining_attempts , change_number = self ._attempt_segment_sync (segment_name , with_cdn_bypass , till )
150+ without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
151+ if without_cdn_successful_sync :
152+ _LOGGER .debug ('Refresh completed bypassing the CDN in %s attempts.' ,
153+ without_cdn_attempts )
154+ return
155+ else :
156+ _LOGGER .debug ('No changes fetched after %s attempts with CDN bypassed.' ,
157+ without_cdn_attempts )
86158
87159 def synchronize_segments (self ):
88160 """
0 commit comments