Skip to content

Commit f9f6ac8

Browse files
committed
Add documentation for streaming arch
1 parent 53e5fb0 commit f9f6ac8

2 files changed

Lines changed: 43 additions & 3 deletions

File tree

docs/architecture/index.rst

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,48 @@ In Kolibri, on the ``FacilityDataset`` model, we generate the certificate as a f
132132
There's flexibility in the application layer for determining the validity of a root certificate, and it's specified on a per-profile basis. For the ``facilitydata`` profile, Kolibri leverages its ``auth`` models for this.
133133

134134

135+
Streaming architecture
136+
----------------------
137+
138+
Morango includes a streaming architecture for memory-efficient processing of sync data. This architecture is implemented in the ``morango.sync.stream`` module and provides a modular, ETL-like pipeline pattern for processing data records one-by-one, significantly reducing memory overhead compared to batch processing approaches.
139+
140+
The streaming architecture is built around several core concepts:
141+
142+
**Stream modules**
143+
Abstract base classes that form the foundation of the streaming pipeline:
144+
145+
- ``Source``: The starting point of a pipeline that yields data items
146+
- ``OperatorModule``: Transform-like modules that process data items
147+
- ``Sink``: Terminal modules that consume data items without yielding further output
148+
- ``PipelineModule``: Modules that can be connected to other modules via the ``pipe()`` method
149+
150+
**Pipeline composition**
151+
Modules are connected using a fluent interface via the ``pipe()`` method, creating a directed flow of data:
152+
153+
.. code-block:: python
154+
155+
source.pipe(transform1).pipe(transform2).end(sink)
156+
157+
**Key pipeline modules**
158+
Several specialized pipeline modules are provided:
159+
160+
- ``Transform``: Applies a 1:1 transformation to each item
161+
- ``FlatMap``: Maps each item to zero or more output items
162+
- ``Buffer``: Collects items into fixed-size chunks for batch operations
163+
- ``Unbuffer``: Flattens chunks back into individual items
164+
165+
**Serialization pipeline**
166+
The serialization process uses this streaming architecture through the ``serialize_into_store()`` function, which constructs a pipeline that:
167+
168+
1. Reads dirty app models from the database (``AppModelSource``)
169+
2. Buffers records for efficient database lookups (``Buffer``)
170+
3. Looks up corresponding store records (``StoreLookup``)
171+
4. Updates store records with new data (``StoreUpdate``)
172+
5. Buffers by model type for efficient bulk operations (``ModelPartitionBuffer``)
173+
6. Writes changes to the database (``WriteSink``)
174+
175+
This streaming approach ensures that memory usage remains constant regardless of dataset size, making Morango suitable for large-scale deployments with limited resources.
176+
135177
Session controller, contexts, and operations
136178
--------------------------------------------
137179

@@ -142,4 +184,3 @@ A unidirectional sync has several stages: ``INITIALIZING``, ``SERIALIZING``, ``Q
142184
.. image:: ./session-controller-seq.png
143185

144186
The list of operations for each stage are configured through Django settings. The configuration key for each stage follows the pattern ``MORANGO_%STAGE%_OPERATIONS``, so the list/tuple of operations for the ``QUEUING`` stage access the ``MORANGO_QUEUING_OPERATIONS`` configuration value. Built-in operations implement a callable ``BaseOperation`` class by overriding a ``handle`` method. The ``BaseOperation`` class supports raising an ``AssertionError`` to defer responsibility to the next operation.
145-

docs/syncing/index.rst

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Process
1515

1616
Syncing is the actual exchange of data in a sync session. The general steps for syncing data are:
1717

18-
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store
18+
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store. This process uses a streaming architecture that processes records one-by-one through a modular pipeline, ensuring constant memory usage regardless of dataset size.
1919
2. **Queuing/Buffering** - storing serialized records and their modification history to a separate Buffers data structure
2020
3. **Transfer/chunking of data** - the actual transfer of data over a request/response cycle in chunks of 500 records at a time
2121
4. **Dequeuing** - merging the data received in the receiving buffers to the receiving store and record-max counter
@@ -70,4 +70,3 @@ For a push or pull sync lifecycle, the order of the fired signals would be as fo
7070
7) Dequeuing started
7171
8) Dequeuing completed
7272
9) Session completed
73-

0 commit comments

Comments
 (0)