Skip to content

Commit dd7740a

Browse files
authored
Merge pull request #814 from powerapi-ng/feat/json-database-compression
feat(database/json): Add support for file compression
2 parents 05182ec + 24bfbce commit dd7740a

8 files changed

Lines changed: 338 additions & 19 deletions

src/powerapi/cli/common_cli_parsing_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ def __init__(self):
159159
subparser_json_input.add_argument("n", "name", help_text="Name of the puller", default_value="puller_json")
160160
subparser_json_input.add_argument("m", "model", help_text="Expected report type")
161161
subparser_json_input.add_argument("f", "filepath", help_text="Input file path")
162+
subparser_json_input.add_argument("c", "compression", default_value='auto', help_text="Compression type")
162163
self.add_subgroup_parser("input", subparser_json_input)
163164

164165
subparser_mongo_output = SubgroupConfigParsingManager("mongodb")
@@ -252,6 +253,7 @@ def __init__(self):
252253
subparser_json_output.add_argument("n", "name", help_text="Name of the pusher", default_value="pusher_json")
253254
subparser_json_output.add_argument("m", "model", help_text="Report type to be exported")
254255
subparser_json_output.add_argument("f", "filepath", help_text="Output file path")
256+
subparser_json_output.add_argument("c", "compression", default_value="auto", help_text="Compression type")
255257
self.add_subgroup_parser("output", subparser_json_output)
256258

257259
subparser_opentsdb_output = SubgroupConfigParsingManager("opentsdb")

src/powerapi/cli/generator.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,16 @@ def generate(self, main_config: dict) -> dict[str, Actor]:
9090
Generate an actor class and actor start message from config dict
9191
"""
9292
if self.component_group_name not in main_config:
93-
raise PowerAPIException('Configuration error : no ' + self.component_group_name + ' specified')
93+
raise PowerAPIException(f'Configuration error : Component {self.component_group_name} group is unknown')
9494

9595
actors = {}
9696
for component_name, component_config in main_config[self.component_group_name].items():
9797
try:
9898
actors[component_name] = self._gen_actor(component_config, main_config, component_name)
9999
except KeyError as exn:
100-
raise PowerAPIException('Configuration error: Missing "%s" argument for %s component', exn.args[0], component_name) from exn
100+
raise PowerAPIException(f'Configuration error: Missing "{exn.args[0]}" argument for {component_name} component') from exn
101+
except ValueError as exn:
102+
raise PowerAPIException(f'Configuration error: Invalid parameter for {component_name} component: {exn.args[0]}') from exn
101103

102104
return actors
103105

@@ -227,7 +229,7 @@ def _json_input_database_factory(conf: dict) -> ReadableDatabase:
227229
JSON Input database factory method.
228230
"""
229231
from powerapi.database.json import JsonInput
230-
return JsonInput(conf['model'], conf['filepath'])
232+
return JsonInput(conf['model'], conf['filepath'], conf['compression'])
231233

232234
@staticmethod
233235
def _socket_database_factory(conf: dict) -> ReadableDatabase:
@@ -291,7 +293,7 @@ def _json_output_database_factory(conf: dict) -> WritableDatabase:
291293
JSON Output database factory method.
292294
"""
293295
from powerapi.database.json import JsonOutput
294-
return JsonOutput(conf['model'], conf['filepath'])
296+
return JsonOutput(conf['model'], conf['filepath'], conf['compression'])
295297

296298
@staticmethod
297299
def _mongodb_database_factory(conf: dict) -> WritableDatabase:

src/powerapi/database/json/driver.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@
2828
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929

3030
from collections.abc import Iterable, Iterator
31-
from io import TextIOWrapper
3231
from os import fsync
3332
from pathlib import Path
33+
from typing import TextIO
3434

3535
from powerapi.database.driver import ReadableDatabase, WritableDatabase
3636
from powerapi.database.exceptions import ConnectionFailed, WriteFailed, ReadFailed
3737
from powerapi.database.json.codecs import ReportDecoders, ReportEncoders
38+
from powerapi.database.json.file_handlers import FileHandlerRegistry
3839
from powerapi.report import Report
3940

4041

@@ -45,19 +46,26 @@ class JsonInput(ReadableDatabase):
4546
The input **should** follow the JSON Lines text format. (https://jsonlines.org/)
4647
"""
4748

48-
def __init__(self, report_type: type[Report], input_filepath: str):
49+
def __init__(self, report_type: type[Report], input_filepath: str, compression: str):
4950
"""
5051
:param report_type: Type of the report handled by this database
5152
:param input_filepath: Path to the input file
53+
:param compression: Compression method to use for the file
5254
"""
5355
self.input_filepath = Path(input_filepath)
56+
self.compression = compression
5457

5558
self._report_decoder = ReportDecoders.get(report_type)
56-
self._file: TextIOWrapper | None = None
59+
self._file_handler = FileHandlerRegistry.get(compression, self.input_filepath)
60+
self._file: TextIO | None = None
5761

5862
def connect(self) -> None:
63+
"""
64+
Connect the JSON input database driver.
65+
:raise: ConnectionFailed if the operation fails
66+
"""
5967
try:
60-
self._file = open(self.input_filepath, encoding='utf-8')
68+
self._file = self._file_handler.open(self.input_filepath, 'r')
6169
except OSError as exn:
6270
raise ConnectionFailed(f'Failed to open input file: {exn}') from exn
6371

@@ -110,31 +118,31 @@ class JsonOutput(WritableDatabase):
110118
The output follows the JSON Lines text format. (https://jsonlines.org/)
111119
"""
112120

113-
def __init__(self, report_type: type[Report], output_filepath: str):
121+
def __init__(self, report_type: type[Report], output_filepath: str, compression: str):
114122
"""
115123
:param report_type: Type of the report handled by this database
116124
:param output_filepath: Path to the output file
117125
"""
118-
super().__init__()
119-
120126
self.output_filepath = Path(output_filepath)
127+
self.compression = compression
121128

122129
self._report_encoder = ReportEncoders.get(report_type)
123-
self._file: TextIOWrapper | None = None
130+
self._file_handler = FileHandlerRegistry.get(compression, self.output_filepath)
131+
self._file: TextIO | None = None
124132

125133
def connect(self) -> None:
126134
"""
127-
Connect the JSON input database driver.
135+
Connect the JSON output database driver.
128136
:raise: ConnectionFailed if the operation fails
129137
"""
130138
try:
131-
self._file = open(self.output_filepath, 'w', encoding='utf-8')
139+
self._file = self._file_handler.open(self.output_filepath, 'w')
132140
except OSError as exn:
133141
raise ConnectionFailed(f'Failed to open output file: {exn}') from exn
134142

135143
def disconnect(self) -> None:
136144
"""
137-
Disconnect the JSON input database driver.
145+
Disconnect the JSON output database driver.
138146
"""
139147
try:
140148
self._file.flush()
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright (c) 2026, Inria
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
29+
from abc import ABC, abstractmethod
30+
from io import TextIOWrapper
31+
from pathlib import Path
32+
from typing import Literal, TextIO, ClassVar
33+
34+
35+
_OPEN_MODES = Literal['r', 'w']
36+
37+
class FileHandler(ABC):
38+
"""
39+
Base class for JSON file opening strategies.
40+
"""
41+
compression_method: str = ''
42+
supported_suffixes: tuple[str, ...] = ()
43+
44+
@classmethod
45+
@abstractmethod
46+
def open(cls, filepath: Path, mode: _OPEN_MODES) -> TextIO:
47+
"""
48+
Open a file as a UTF-8 text stream using the handler strategy.
49+
:param filepath: Path to the file to open
50+
:param mode: Text mode used to open the file
51+
:return: Open text stream
52+
"""
53+
...
54+
55+
56+
class RawFileHandler(FileHandler):
57+
"""
58+
File handler for uncompressed JSON files.
59+
"""
60+
compression_method = 'none'
61+
supported_suffixes = ('.jsonl', '.jsonlines', '.ndjson', '.json', '')
62+
63+
@classmethod
64+
def open(cls, filepath: Path, mode: _OPEN_MODES) -> TextIO:
65+
return open(filepath, mode, encoding='utf-8')
66+
67+
68+
class GzipFileHandler(FileHandler):
69+
"""
70+
File handler for gzip-compressed JSON files.
71+
"""
72+
compression_method = 'gzip'
73+
supported_suffixes = ('.gz', '.gzip')
74+
75+
@classmethod
76+
def open(cls, filepath: Path, mode: _OPEN_MODES) -> TextIO:
77+
from gzip import GzipFile
78+
gzip_file = GzipFile(filepath, mode)
79+
text_handler = TextIOWrapper(gzip_file, encoding='utf-8')
80+
return text_handler
81+
82+
83+
class LzmaFileHandler(FileHandler):
84+
""""
85+
File handler for lzma-compressed JSON files.
86+
"""
87+
compression_method = 'lzma'
88+
supported_suffixes = ('.xz', '.lzma')
89+
90+
@classmethod
91+
def open(cls, filepath: Path, mode: _OPEN_MODES) -> TextIO:
92+
from lzma import LZMAFile
93+
lzma_file = LZMAFile(filepath, mode)
94+
text_handler = TextIOWrapper(lzma_file, encoding='utf-8')
95+
return text_handler
96+
97+
98+
class FileHandlerRegistry:
99+
"""
100+
Registry of JSON file handlers.
101+
"""
102+
_file_handlers: ClassVar[list[type[FileHandler]]] = []
103+
104+
@classmethod
105+
def register(cls, handler: type[FileHandler]) -> None:
106+
"""
107+
Register a file handler in lookup order.
108+
:param handler: Handler class to register
109+
"""
110+
cls._file_handlers.append(handler)
111+
112+
@classmethod
113+
def _get_from_compression_method(cls, compression_method: str) -> type[FileHandler]:
114+
"""
115+
Retrieve a handler from its compression method name.
116+
:param compression_method: Name of the compression method to resolve
117+
:return: File handler matching the requested compression method
118+
:raises ValueError: If the compression method is not recognized
119+
"""
120+
for handler in cls._file_handlers:
121+
if handler.compression_method == compression_method:
122+
return handler
123+
124+
raise ValueError(f'Unknown compression method: {compression_method}')
125+
126+
@classmethod
127+
def _get_from_file_extension(cls, filepath: Path) -> type[FileHandler]:
128+
"""
129+
Infer a handler from the suffix of the given filepath.
130+
:param filepath: Path to the file
131+
:return: Handler matching the filepath suffix
132+
:raises ValueError: If the file extension is not recognized
133+
"""
134+
suffix = filepath.suffix.casefold()
135+
for handler in cls._file_handlers:
136+
if suffix in handler.supported_suffixes:
137+
return handler
138+
139+
raise ValueError(f'Unknown file extension for: {filepath}')
140+
141+
@classmethod
142+
def get(cls, compression_method: str, filepath: Path) -> type[FileHandler]:
143+
"""
144+
Resolve the file handler for a JSON file.
145+
146+
When the ``compression_method`` parameter is ``auto``, the handler is inferred from the filepath suffix.
147+
Otherwise, the compression method is resolved explicitly.
148+
149+
:param compression_method: Compression method name or ``auto``
150+
:param filepath: Path to the file associated with the handler lookup
151+
:return: Matching file handler
152+
:raises ValueError: If no handler matches the requested method or file suffix
153+
"""
154+
method = compression_method.casefold()
155+
if method == 'auto':
156+
handler = cls._get_from_file_extension(filepath)
157+
else:
158+
handler = cls._get_from_compression_method(method)
159+
160+
return handler
161+
162+
163+
FileHandlerRegistry.register(RawFileHandler)
164+
FileHandlerRegistry.register(GzipFileHandler)
165+
FileHandlerRegistry.register(LzmaFileHandler)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright (c) 2026, Inria
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

0 commit comments

Comments
 (0)