Skip to content

Commit 75ec835

Browse files
authored
Merge pull request #287 from bjester/streaming-sync
Streaming sync serialization
2 parents 8874a49 + f9f6ac8 commit 75ec835

18 files changed

Lines changed: 1467 additions & 390 deletions

File tree

docs/architecture/index.rst

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,48 @@ In Kolibri, on the ``FacilityDataset`` model, we generate the certificate as a f
132132
There's flexibility in the application layer for determining the validity of a root certificate, and it's specified on a per-profile basis. For the ``facilitydata`` profile, Kolibri leverages its ``auth`` models for this.
133133

134134

135+
Streaming architecture
136+
----------------------
137+
138+
Morango includes a streaming architecture for memory-efficient processing of sync data. This architecture is implemented in the ``morango.sync.stream`` module and provides a modular, ETL-like pipeline pattern for processing data records one-by-one, significantly reducing memory overhead compared to batch processing approaches.
139+
140+
The streaming architecture is built around several core concepts:
141+
142+
**Stream modules**
143+
Abstract base classes that form the foundation of the streaming pipeline:
144+
145+
- ``Source``: The starting point of a pipeline that yields data items
146+
- ``OperatorModule``: Transform-like modules that process data items
147+
- ``Sink``: Terminal modules that consume data items without yielding further output
148+
- ``PipelineModule``: Modules that can be connected to other modules via the ``pipe()`` method
149+
150+
**Pipeline composition**
151+
Modules are connected using a fluent interface via the ``pipe()`` method, creating a directed flow of data:
152+
153+
.. code-block:: python
154+
155+
source.pipe(transform1).pipe(transform2).end(sink)
156+
157+
**Key pipeline modules**
158+
Several specialized pipeline modules are provided:
159+
160+
- ``Transform``: Applies a 1:1 transformation to each item
161+
- ``FlatMap``: Maps each item to zero or more output items
162+
- ``Buffer``: Collects items into fixed-size chunks for batch operations
163+
- ``Unbuffer``: Flattens chunks back into individual items
164+
165+
**Serialization pipeline**
166+
The serialization process uses this streaming architecture through the ``serialize_into_store()`` function, which constructs a pipeline that:
167+
168+
1. Reads dirty app models from the database (``AppModelSource``)
169+
2. Buffers records for efficient database lookups (``Buffer``)
170+
3. Looks up corresponding store records (``StoreLookup``)
171+
4. Updates store records with new data (``StoreUpdate``)
172+
5. Buffers by model type for efficient bulk operations (``ModelPartitionBuffer``)
173+
6. Writes changes to the database (``WriteSink``)
174+
175+
This streaming approach ensures that memory usage remains constant regardless of dataset size, making Morango suitable for large-scale deployments with limited resources.
176+
135177
Session controller, contexts, and operations
136178
--------------------------------------------
137179

@@ -142,4 +184,3 @@ A unidirectional sync has several stages: ``INITIALIZING``, ``SERIALIZING``, ``Q
142184
.. image:: ./session-controller-seq.png
143185

144186
The list of operations for each stage are configured through Django settings. The configuration key for each stage follows the pattern ``MORANGO_%STAGE%_OPERATIONS``, so the list/tuple of operations for the ``QUEUING`` stage access the ``MORANGO_QUEUING_OPERATIONS`` configuration value. Built-in operations implement a callable ``BaseOperation`` class by overriding a ``handle`` method. The ``BaseOperation`` class supports raising an ``AssertionError`` to defer responsibility to the next operation.
145-

docs/syncing/index.rst

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Process
1515

1616
Syncing is the actual exchange of data in a sync session. The general steps for syncing data are:
1717

18-
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store
18+
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store. This process uses a streaming architecture that processes records one-by-one through a modular pipeline, ensuring constant memory usage regardless of dataset size.
1919
2. **Queuing/Buffering** - storing serialized records and their modification history to a separate Buffers data structure
2020
3. **Transfer/chunking of data** - the actual transfer of data over a request/response cycle in chunks of 500 records at a time
2121
4. **Dequeuing** - merging the data received in the receiving buffers to the receiving store and record-max counter
@@ -70,4 +70,3 @@ For a push or pull sync lifecycle, the order of the fired signals would be as fo
7070
7) Dequeuing started
7171
8) Dequeuing completed
7272
9) Session completed
73-

morango/registry.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
import inspect
66
import sys
77
from collections import OrderedDict
8+
from typing import Generator
89

10+
from django.db.models import QuerySet
911
from django.db.models.fields.related import ForeignKey
1012

1113
from morango.constants import transfer_stages
@@ -82,6 +84,14 @@ def get_models(self, profile):
8284
self.check_models_ready(profile)
8385
return list(self.profile_models.get(profile, {}).values())
8486

87+
def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
88+
"""
89+
Method for future enhancement to iterate over model's and their querysets in a fashion
90+
(particularly, an order) that is aware of FK dependencies.
91+
"""
92+
for model in self.get_models(profile):
93+
yield model.syncing_objects.all()
94+
8595
def _insert_model_in_dependency_order(self, model, profile):
8696
# When we add models to be synced, we need to make sure
8797
# that models that depend on other models are synced AFTER

morango/sync/controller.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,47 +6,35 @@
66
from morango.constants import transfer_statuses
77
from morango.registry import session_middleware
88
from morango.sync.operations import _deserialize_from_store
9-
from morango.sync.operations import _serialize_into_store
109
from morango.sync.operations import OperationLogger
10+
from morango.sync.stream.serialize import serialize_into_store
1111
from morango.sync.utils import SyncSignalGroup
1212
from morango.utils import _assert
1313

14-
1514
logger = logging.getLogger(__name__)
1615

1716

18-
def _self_referential_fk(klass_model):
19-
"""
20-
Return whether this model has a self ref FK, and the name for the field
21-
"""
22-
for f in klass_model._meta.concrete_fields:
23-
if f.related_model:
24-
if issubclass(klass_model, f.related_model):
25-
return f.attname
26-
return None
27-
28-
2917
class MorangoProfileController(object):
3018
def __init__(self, profile):
3119
_assert(profile, "profile needs to be defined.")
3220
self.profile = profile
3321

34-
def serialize_into_store(self, filter=None):
22+
def serialize_into_store(self, sync_filter=None):
3523
"""
3624
Takes data from app layer and serializes the models into the store.
3725
"""
3826
with OperationLogger("Serializing records", "Serialization complete"):
39-
_serialize_into_store(self.profile, filter=filter)
27+
serialize_into_store(self.profile, sync_filter=sync_filter)
4028

41-
def deserialize_from_store(self, skip_erroring=False, filter=None):
29+
def deserialize_from_store(self, skip_erroring=False, sync_filter=None):
4230
"""
4331
Takes data from the store and integrates into the application.
4432
"""
4533
with OperationLogger("Deserializing records", "Deserialization complete"):
4634
# we first serialize to avoid deserialization merge conflicts
47-
_serialize_into_store(self.profile, filter=filter)
35+
serialize_into_store(self.profile, sync_filter=sync_filter)
4836
_deserialize_from_store(
49-
self.profile, filter=filter, skip_erroring=skip_erroring
37+
self.profile, filter=sync_filter, skip_erroring=skip_erroring
5038
)
5139

5240
def create_network_connection(self, base_url, **kwargs):
@@ -217,7 +205,7 @@ def proceed_to_and_wait_for(
217205
if tries >= max_interval_tries:
218206
sleep(max_interval)
219207
else:
220-
sleep(0.3 * (2 ** tries - 1))
208+
sleep(0.3 * (2**tries - 1))
221209
result = self.proceed_to(target_stage, context=context)
222210
tries += 1
223211
if callable(callback):

morango/sync/db.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import logging
2+
from contextlib import contextmanager
3+
4+
from django.db import connection
5+
from django.db import transaction
6+
7+
from morango.sync.backends.utils import load_backend
8+
from morango.sync.utils import lock_partitions
9+
10+
11+
logger = logging.getLogger(__name__)
12+
13+
DBBackend = load_backend(connection)
14+
15+
16+
@contextmanager
17+
def begin_transaction(sync_filter, isolated=False, shared_lock=False):
18+
"""
19+
Starts a transaction, sets the transaction isolation level to repeatable read, and locks
20+
affected partitions
21+
22+
:param sync_filter: The filter for filtering applicable records of the sync
23+
:type sync_filter: morango.models.certificates.Filter|None
24+
:param isolated: Whether to alter the transaction isolation to repeatable-read
25+
:type isolated: bool
26+
:param shared_lock: Whether the advisory lock should be exclusive or shared
27+
:type shared_lock: bool
28+
"""
29+
if isolated:
30+
# when isolation is requested, we modify the transaction isolation of the connection for the
31+
# duration of the transaction
32+
with DBBackend._set_transaction_repeatable_read():
33+
with transaction.atomic(savepoint=False):
34+
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
35+
yield
36+
else:
37+
with transaction.atomic():
38+
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
39+
yield

0 commit comments

Comments
 (0)