Skip to content

Commit d1dd6d5

Browse files
committed
ADD: DBNStore.to_parquet
1 parent aef77da commit d1dd6d5

5 files changed

Lines changed: 168 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## 0.26.0 - TBD
4+
5+
This release adds support for transcoding DBN data into Apache parquet.
6+
7+
#### Enhancements
8+
- Added `DBNStore.to_parquet` for transcoding DBN data into Apache parquet using `pyarrow`
9+
310
## 0.25.0 - 2024-01-09
411

512
#### Breaking changes

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The minimum dependencies as found in the `pyproject.toml` are also listed below:
3535
- databento-dbn = "0.14.2"
3636
- numpy= ">=1.23.5"
3737
- pandas = ">=1.5.3"
38+
- pyarrow = ">=13.0.0"
3839
- requests = ">=2.24.0"
3940
- zstandard = ">=0.21.0"
4041

databento/common/dbnstore.py

Lines changed: 87 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
Any,
1616
BinaryIO,
1717
Callable,
18+
Final,
1819
Literal,
1920
Protocol,
2021
overload,
@@ -23,6 +24,8 @@
2324
import databento_dbn
2425
import numpy as np
2526
import pandas as pd
27+
import pyarrow as pa
28+
import pyarrow.parquet as pq
2629
import zstandard
2730
from databento_dbn import FIXED_PRICE_SCALE
2831
from databento_dbn import Compression
@@ -51,6 +54,8 @@
5154

5255
logger = logging.getLogger(__name__)
5356

57+
PARQUET_CHUNK_SIZE: Final = 2**16
58+
5459
if TYPE_CHECKING:
5560
from databento.historical.client import Historical
5661

@@ -791,18 +796,14 @@ def to_csv(
791796
compression : Compression or str, default `Compression.NONE`
792797
The output compression for writing.
793798
schema : Schema or str, optional
794-
The schema for the csv.
799+
The DBN schema for the csv.
795800
This is only required when reading a DBN stream with mixed record types.
796801
797802
Raises
798803
------
799804
ValueError
800805
If the schema for the array cannot be determined.
801806
802-
Notes
803-
-----
804-
Requires all the data to be brought up into memory to then be written.
805-
806807
"""
807808
compression = validate_enum(compression, Compression, "compression")
808809
schema = validate_maybe_enum(schema, Schema, "schema")
@@ -870,7 +871,7 @@ def to_df(
870871
a 'symbol' column, mapping the instrument ID to its requested symbol for
871872
every record.
872873
schema : Schema or str, optional
873-
The schema for the dataframe.
874+
The DBN schema for the dataframe.
874875
This is only required when reading a DBN stream with mixed record types.
875876
count : int, optional
876877
If set, instead of returning a single `DataFrame` a `DataFrameIterator`
@@ -887,7 +888,7 @@ def to_df(
887888
Raises
888889
------
889890
ValueError
890-
If the schema for the array cannot be determined.
891+
If the DBN schema is unspecified and cannot be determined.
891892
892893
"""
893894
schema = validate_maybe_enum(schema, Schema, "schema")
@@ -919,6 +920,81 @@ def to_df(
919920

920921
return df_iter
921922

923+
def to_parquet(
924+
self,
925+
path: Path | str,
926+
price_type: Literal["fixed", "float"] = "float",
927+
pretty_ts: bool = True,
928+
map_symbols: bool = True,
929+
schema: Schema | str | None = None,
930+
**kwargs: Any,
931+
) -> None:
932+
"""
933+
Write the data to a parquet file at the given path.
934+
935+
Parameters
936+
----------
937+
price_type : str, default "float"
938+
The price type to use for price fields.
939+
If "fixed", prices will have a type of `int` in fixed decimal format; each unit representing 1e-9 or 0.000000001.
940+
If "float", prices will have a type of `float`.
941+
The "decimal" price type is not supported at this time.
942+
pretty_ts : bool, default True
943+
If all timestamp columns should be converted from UNIX nanosecond
944+
`int` to tz-aware UTC `pyarrow.TimestampType`.
945+
map_symbols : bool, default True
946+
If symbology mappings from the metadata should be used to create
947+
a 'symbol' column, mapping the instrument ID to its requested symbol for
948+
every record.
949+
schema : Schema or str, optional
950+
The DBN schema for the parquet file.
951+
This is only required when reading a DBN stream with mixed record types.
952+
953+
Raises
954+
------
955+
ValueError
956+
If an incorrect price type is specified.
957+
If the DBN schema is unspecified and cannot be determined.
958+
959+
"""
960+
if price_type == "decimal":
961+
raise ValueError("the 'decimal' price type is not currently supported")
962+
963+
schema = validate_maybe_enum(schema, Schema, "schema")
964+
if schema is None:
965+
if self.schema is None:
966+
raise ValueError("a schema must be specified for mixed DBN data")
967+
schema = self.schema
968+
969+
dataframe_iter = self.to_df(
970+
price_type=price_type,
971+
pretty_ts=pretty_ts,
972+
map_symbols=map_symbols,
973+
schema=schema,
974+
count=PARQUET_CHUNK_SIZE,
975+
)
976+
977+
writer = None
978+
try:
979+
for frame in dataframe_iter:
980+
if writer is None:
981+
# Initialize the writer using the first DataFrame
982+
parquet_schema = pa.Schema.from_pandas(frame)
983+
writer = pq.ParquetWriter(
984+
where=path,
985+
schema=parquet_schema,
986+
**kwargs,
987+
)
988+
writer.write_table(
989+
pa.Table.from_pandas(
990+
frame,
991+
schema=parquet_schema,
992+
),
993+
)
994+
finally:
995+
if writer is not None:
996+
writer.close()
997+
922998
def to_file(self, path: Path | str) -> None:
923999
"""
9241000
Write the data to a DBN file at the given path.
@@ -972,18 +1048,14 @@ def to_json(
9721048
compression : Compression or str, default `Compression.NONE`
9731049
The output compression for writing.
9741050
schema : Schema or str, optional
975-
The schema for the json.
1051+
The DBN schema for the json.
9761052
This is only required when reading a DBN stream with mixed record types.
9771053
9781054
Raises
9791055
------
9801056
ValueError
9811057
If the schema for the array cannot be determined.
9821058
983-
Notes
984-
-----
985-
Requires all the data to be brought up into memory to then be written.
986-
9871059
"""
9881060
compression = validate_enum(compression, Compression, "compression")
9891061
schema = validate_maybe_enum(schema, Schema, "schema")
@@ -1030,7 +1102,7 @@ def to_ndarray(
10301102
Parameters
10311103
----------
10321104
schema : Schema or str, optional
1033-
The schema for the array.
1105+
The DBN schema for the array.
10341106
This is only required when reading a DBN stream with mixed record types.
10351107
count : int, optional
10361108
If set, instead of returning a single `np.ndarray` a `NDArrayIterator`
@@ -1047,7 +1119,7 @@ def to_ndarray(
10471119
Raises
10481120
------
10491121
ValueError
1050-
If the schema for the array cannot be determined.
1122+
If the DBN schema is unspecified and cannot be determined.
10511123
10521124
"""
10531125
schema = validate_maybe_enum(schema, Schema, "schema")
@@ -1329,8 +1401,7 @@ def _format_px(
13291401
if price_type == "decimal":
13301402
for field in px_fields:
13311403
df[field] = (
1332-
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal)
1333-
/ FIXED_PRICE_SCALE
1404+
df[field].replace(INT64_NULL, np.nan).apply(decimal.Decimal) / FIXED_PRICE_SCALE
13341405
)
13351406
elif price_type == "float":
13361407
for field in px_fields:

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ numpy = [
3838
{version = "^1.26.0", python = "^3.12"}
3939
]
4040
pandas = ">=1.5.3"
41+
pyarrow = ">=13.0.0"
4142
requests = ">=2.24.0"
4243
zstandard = ">=0.21.0"
4344

tests/test_historical_bento.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from unittest.mock import MagicMock
1010

1111
import databento
12+
import databento.common.dbnstore
1213
import numpy as np
1314
import pandas as pd
1415
import pytest
@@ -439,6 +440,77 @@ def test_to_df_with_price_type_handles_null(
439440
assert all(np.isnan(df_pretty["strike_price"]))
440441

441442

443+
@pytest.mark.parametrize(
444+
"dataset",
445+
[
446+
Dataset.GLBX_MDP3,
447+
Dataset.XNAS_ITCH,
448+
Dataset.OPRA_PILLAR,
449+
Dataset.DBEQ_BASIC,
450+
],
451+
)
452+
@pytest.mark.parametrize(
453+
"schema",
454+
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],
455+
)
456+
@pytest.mark.parametrize(
457+
"price_type",
458+
[
459+
"fixed",
460+
"float",
461+
],
462+
)
463+
@pytest.mark.parametrize(
464+
"pretty_ts",
465+
[
466+
True,
467+
False,
468+
],
469+
)
470+
@pytest.mark.parametrize(
471+
"map_symbols",
472+
[
473+
True,
474+
False,
475+
],
476+
)
477+
def test_to_parquet(
478+
monkeypatch: pytest.MonkeyPatch,
479+
tmp_path: Path,
480+
test_data: Callable[[Dataset, Schema], bytes],
481+
dataset: Dataset,
482+
schema: Schema,
483+
price_type: Literal["fixed", "float"],
484+
pretty_ts: bool,
485+
map_symbols: bool,
486+
) -> None:
487+
# Arrange
488+
monkeypatch.setattr(databento.common.dbnstore, "PARQUET_CHUNK_SIZE", 1)
489+
stub_data = test_data(dataset, schema)
490+
data = DBNStore.from_bytes(data=stub_data)
491+
parquet_file = tmp_path / "test.parquet"
492+
493+
# Act
494+
expected = data.to_df(
495+
price_type=price_type,
496+
pretty_ts=pretty_ts,
497+
map_symbols=map_symbols,
498+
)
499+
data.to_parquet(
500+
parquet_file,
501+
price_type=price_type,
502+
pretty_ts=pretty_ts,
503+
map_symbols=map_symbols,
504+
)
505+
actual = pd.read_parquet(parquet_file)
506+
507+
# Replace None values with np.nan
508+
actual.fillna(value=np.nan)
509+
510+
# Assert
511+
pd.testing.assert_frame_equal(actual, expected)
512+
513+
442514
@pytest.mark.parametrize(
443515
"expected_schema",
444516
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],

0 commit comments

Comments
 (0)