@@ -211,7 +211,7 @@ def put(self, split):
211211
212212class PluggableSegmentStorage (SegmentStorage ):
213213 """Pluggable implementation of segment storage."""
214- _SEGMENT_NAME_LEMNGTH = 14
214+ _SEGMENT_NAME_LENGTH = 14
215215 _TILL_LENGTH = 4
216216
217217 def __init__ (self , pluggable_adapter , prefix = None ):
@@ -235,10 +235,12 @@ def update(self, segment_name, to_add, to_remove, change_number=None):
235235 :type to_remove: Set
236236 """
237237 try :
238- self ._pluggable_adapter .add_items (self ._prefix .format (segment_name = segment_name ), to_add )
239- self ._pluggable_adapter .remove_items (self ._prefix .format (segment_name = segment_name ), to_remove )
238+ if to_add is not None :
239+ self ._pluggable_adapter .add_items (self ._prefix .format (segment_name = segment_name ), to_add )
240+ if to_remove is not None :
241+ self ._pluggable_adapter .remove_items (self ._prefix .format (segment_name = segment_name ), to_remove )
240242 if change_number is not None :
241- self ._pluggable_adapter . set ( self . _segment_till_prefix . format ( segment_name = segment_name ) , change_number )
243+ self .set_change_number ( segment_name , change_number )
242244 except Exception :
243245 _LOGGER .error ('Error updating segment storage' )
244246 _LOGGER .debug ('Error: ' , exc_info = True )
@@ -284,31 +286,32 @@ def get_segment_names(self):
284286 """
285287 try :
286288 keys = []
287- for key in self ._pluggable_adapter .get_keys_by_prefix (self ._prefix [:- self ._SEGMENT_NAME_LEMNGTH ]):
289+ for key in self ._pluggable_adapter .get_keys_by_prefix (self ._prefix [:- self ._SEGMENT_NAME_LENGTH ]):
288290 if key [- self ._TILL_LENGTH :] != 'till' :
289- keys .append (key [len (self ._prefix [:- self ._SEGMENT_NAME_LEMNGTH ]):])
291+ keys .append (key [len (self ._prefix [:- self ._SEGMENT_NAME_LENGTH ]):])
290292 return keys
291293 except Exception :
292294 _LOGGER .error ('Error getting segments' )
293295 _LOGGER .debug ('Error: ' , exc_info = True )
294296 return None
295297
296- def get_keys (self , segment_name ):
297- """
298- Get keys of a segment.
299-
300- :param segment_name: segment name
301- :type segment_name: str
302-
303- :return: list of segment keys
304- :rtype: str[]
305- """
306- try :
307- return list (self ._pluggable_adapter .get (self ._prefix .format (segment_name = segment_name )))
308- except Exception :
309- _LOGGER .error ('Error getting segments keys' )
310- _LOGGER .debug ('Error: ' , exc_info = True )
311- return None
298+ # TODO: To be added in the future because this data is not being sent by telemetry in consumer/synchronizer mode
299+ # def get_keys(self, segment_name):
300+ # """
301+ # Get keys of a segment.
302+ #
303+ # :param segment_name: segment name
304+ # :type segment_name: str
305+ #
306+ # :return: list of segment keys
307+ # :rtype: str[]
308+ # """
309+ # try:
310+ # return list(self._pluggable_adapter.get(self._prefix.format(segment_name=segment_name)))
311+ # except Exception:
312+ # _LOGGER.error('Error getting segments keys')
313+ # _LOGGER.debug('Error: ', exc_info=True)
314+ # return None
312315
313316 def segment_contains (self , segment_name , key ):
314317 """
0 commit comments