11from __future__ import annotations
22
33import bisect
4+ import csv
45import datetime as dt
56import functools
67import json
8+ import os
79from collections import defaultdict
810from collections .abc import Mapping
911from io import TextIOWrapper
12+ from os import PathLike
13+ from pathlib import Path
1014from typing import Any , ClassVar , NamedTuple , TextIO
1115
1216import pandas as pd
1519from databento_dbn import SType
1620from databento_dbn import SymbolMappingMsg
1721
18-
19- ALL_SYMBOLS = "ALL_SYMBOLS"
22+ from databento .common .parsing import datetime_to_unix_nanoseconds
2023
2124
2225class MappingInterval (NamedTuple ):
@@ -39,6 +42,126 @@ class MappingInterval(NamedTuple):
3942 symbol : str
4043
4144
45+ def _validate_path_pair (
46+ in_file : Path | PathLike [str ] | str ,
47+ out_file : Path | PathLike [str ] | str | None ,
48+ ) -> tuple [Path , Path ]:
49+ in_file_valid = Path (in_file )
50+
51+ if not in_file_valid .exists ():
52+ raise ValueError (f"{ in_file_valid } does not exist" )
53+ if not in_file_valid .is_file ():
54+ raise ValueError (f"{ in_file_valid } is not a file" )
55+
56+ if out_file is not None :
57+ out_file_valid = Path (out_file )
58+ else :
59+ out_file_valid = in_file_valid .with_name (
60+ f"{ in_file_valid .stem } _mapped{ in_file_valid .suffix } " ,
61+ )
62+
63+ i = 0
64+ while out_file_valid .exists ():
65+ out_file_valid = in_file_valid .with_name (
66+ f"{ in_file_valid .stem } _mapped_{ i } { in_file_valid .suffix } " ,
67+ )
68+ i += 1
69+
70+ if in_file_valid == out_file_valid :
71+ raise ValueError ("The input file cannot be the same path as the output file." )
72+
73+ return in_file_valid , out_file_valid
74+
75+
76+ def map_symbols_csv (
77+ symbology_file : Path | PathLike [str ] | str ,
78+ csv_file : Path | PathLike [str ] | str ,
79+ out_file : Path | PathLike [str ] | str | None = None ,
80+ ) -> Path :
81+ """
82+ Use a `symbology.json` file to map a symbols column onto an existing CSV
83+ file. The result is written to `out_file`.
84+
85+ Parameters
86+ ----------
87+ symbology_file: Path | PathLike[str] | str
88+ Path to a `symbology.json` file to use as a symbology source.
89+ csv_file: Path | PathLike[str] | str
90+ Path to a CSV file that contains encoded DBN data; must contain
91+ a `ts_recv` or `ts_event` and `instrument_id` column.
92+ out_file: Path | PathLike[str] | str (optional)
93+ Path to a file to write results to. If unspecified, `_mapped` will be
94+ appended to the `csv_file` name.
95+
96+ Returns
97+ -------
98+ Path
99+ The path to the written file.
100+
101+ Raises
102+ ------
103+ ValueError
104+ When the input or output paths are invalid.
105+ When the input CSV file does not contain a valid timestamp or instrument_id column.
106+
107+ See Also
108+ --------
109+ map_symbols_json
110+
111+ """
112+ instrument_map = InstrumentMap ()
113+ with open (symbology_file ) as input_symbology :
114+ instrument_map .insert_json (json .load (input_symbology ))
115+ return instrument_map .map_symbols_csv (
116+ csv_file = csv_file ,
117+ out_file = out_file ,
118+ )
119+
120+
121+ def map_symbols_json (
122+ symbology_file : Path | PathLike [str ] | str ,
123+ json_file : Path | PathLike [str ] | str ,
124+ out_file : Path | PathLike [str ] | str | None = None ,
125+ ) -> Path :
126+ """
127+ Use a `symbology.json` file to insert a symbols key into records of an
128+ existing JSON file. The result is written to `out_file`.
129+
130+ Parameters
131+ ----------
132+ symbology_file: Path | PathLike[str] | str
133+ Path to a `symbology.json` file to use as a symbology source.
134+ json_file: Path | PathLike[str] | str
135+ Path to a JSON file that contains encoded DBN data.
136+ out_file: Path | PathLike[str] | str (optional)
137+ Path to a file to write results to. If unspecified, `_mapped` will be
138+ appended to the `json_file` name.
139+
140+ Returns
141+ -------
142+ Path
143+ The path to the written file.
144+
145+ Raises
146+ ------
147+ ValueError
148+ When the input or output paths are invalid.
149+ When the input JSON file does not contain a valid record.
150+
151+ See Also
152+ --------
153+ map_symbols_csv
154+
155+ """
156+ instrument_map = InstrumentMap ()
157+ with open (symbology_file ) as input_symbology :
158+ instrument_map .insert_json (json .load (input_symbology ))
159+ return instrument_map .map_symbols_json (
160+ json_file = json_file ,
161+ out_file = out_file ,
162+ )
163+
164+
42165class InstrumentMap :
43166 SYMBOLOGY_RESOLVE_KEYS : ClassVar [tuple [str , ...]] = (
44167 "result" ,
@@ -94,7 +217,7 @@ def resolve(
94217 If the InstrumentMap does not contain a mapping for the `instrument_id`.
95218
96219 """
97- mappings = self ._data [instrument_id ]
220+ mappings = self ._data [int ( instrument_id ) ]
98221 for entry in mappings :
99222 if entry .start_date <= date < entry .end_date :
100223 return entry .symbol
@@ -270,6 +393,150 @@ def insert_json(
270393 ),
271394 )
272395
396+ def map_symbols_csv (
397+ self ,
398+ csv_file : Path | PathLike [str ] | str ,
399+ out_file : Path | PathLike [str ] | str | None = None ,
400+ ) -> Path :
401+ """
402+ Use the loaded symbology data to map a symbols column onto an existing
403+ CSV file. The result is written to `out_file`.
404+
405+ Parameters
406+ ----------
407+ csv_file: Path | PathLike[str] | str
408+ Path to a CSV file that contains encoded DBN data; must contain
409+ a `ts_recv` or `ts_event` and `instrument_id` column.
410+ out_file: Path | PathLike[str] | str (optional)
411+ Path to a file to write results to. If unspecified, `_mapped` will be
412+ appended to the `csv_file` name.
413+
414+ Returns
415+ -------
416+ Path
417+ The path to the written file.
418+
419+ Raises
420+ ------
421+ ValueError
422+ When the input or output paths are invalid.
423+ When the input CSV file does not contain a valid timestamp or instrument_id column.
424+
425+ See Also
426+ --------
427+ InstrumentMap.map_symbols_json
428+
429+ """
430+ csv_file_valid , out_file_valid = _validate_path_pair (csv_file , out_file )
431+
432+ with csv_file_valid .open () as input_ :
433+ reader = csv .DictReader (input_ )
434+
435+ in_fields = reader .fieldnames
436+
437+ if in_fields is None :
438+ raise ValueError (f"no CSV header in { csv_file } " )
439+
440+ if "ts_recv" in in_fields :
441+ ts_field = "ts_recv"
442+ elif "ts_event" in in_fields :
443+ ts_field = "ts_event"
444+ else :
445+ raise ValueError (
446+ f"{ csv_file } does not have a 'ts_recv' or 'ts_event' column" ,
447+ )
448+
449+ if "instrument_id" not in in_fields :
450+ raise ValueError (f"{ csv_file } does not have an 'instrument_id' column" )
451+
452+ out_fields = (* in_fields , "symbol" )
453+
454+ with out_file_valid .open ("w" ) as output :
455+ writer = csv .DictWriter (output , fieldnames = out_fields )
456+ writer .writeheader ()
457+
458+ for row in reader :
459+ ts = datetime_to_unix_nanoseconds (row [ts_field ])
460+ date = pd .Timestamp (ts , unit = "ns" ).date ()
461+ instrument_id = row ["instrument_id" ]
462+ if instrument_id is None :
463+ row ["symbol" ] = ""
464+ else :
465+ row ["symbol" ] = self .resolve (instrument_id , date )
466+
467+ writer .writerow (row )
468+
469+ return out_file_valid
470+
471+ def map_symbols_json (
472+ self ,
473+ json_file : Path | PathLike [str ] | str ,
474+ out_file : Path | PathLike [str ] | str | None = None ,
475+ ) -> Path :
476+ """
477+ Use the loaded symbology data to insert a symbols key into records of
478+ an existing JSON file. The result is written to `out_file`.
479+
480+ Parameters
481+ ----------
482+ json_file: Path | PathLike[str] | str
483+ Path to a JSON file that contains encoded DBN data.
484+ out_file: Path | PathLike[str] | str (optional)
485+ Path to a file to write results to. If unspecified, `_mapped` will be
486+ appended to the `json_file` name.
487+
488+ Returns
489+ -------
490+ Path
491+ The path to the written file.
492+
493+ Raises
494+ ------
495+ ValueError
496+ When the input or output paths are invalid.
497+ When the input JSON file does not contain a valid record.
498+
499+ See Also
500+ --------
501+ InstrumentMap.map_symbols_csv
502+
503+ """
504+ json_file_valid , out_file_valid = _validate_path_pair (json_file , out_file )
505+
506+ with json_file_valid .open () as input_ :
507+ with out_file_valid .open ("w" ) as output :
508+ for i , record in enumerate (map (json .loads , input_ )):
509+ try :
510+ header = record ["hd" ]
511+ instrument_id = header ["instrument_id" ]
512+ except KeyError :
513+ raise ValueError (
514+ f"{ json_file } :{ i } does not contain a valid JSON encoded record" ,
515+ )
516+
517+ if "ts_recv" in record :
518+ ts_field = record ["ts_recv" ]
519+ elif "ts_event" in header :
520+ ts_field = header ["ts_event" ]
521+ else :
522+ raise ValueError (
523+ f"{ json_file } :{ i } does not have a 'ts_recv' or 'ts_event' key" ,
524+ )
525+
526+ ts = datetime_to_unix_nanoseconds (ts_field )
527+
528+ date = pd .Timestamp (ts , unit = "ns" ).date ()
529+ record ["symbol" ] = self .resolve (instrument_id , date )
530+
531+ json .dump (
532+ record ,
533+ output ,
534+ separators = ("," , ":" ),
535+ )
536+ output .write (os .linesep )
537+
538+ return out_file_valid
539+
273540 def _insert_inverval (self , instrument_id : int , interval : MappingInterval ) -> None :
274541 """
275542 Insert a SymbolInterval into the map.
0 commit comments