1 | """
|
---|
2 | A structured logging utility supporting multiple data formats such as CSV, JSON,
|
---|
3 | and YAML.
|
---|
4 |
|
---|
5 | The main purpose of this script, besides having relevant information available
|
---|
6 | in a condensed and deserialized.
|
---|
7 |
|
---|
8 | This script defines a protocol for different file handling strategies and provides
|
---|
9 | implementations for CSV, JSON, and YAML formats. The main class, StructuredLogger,
|
---|
10 | allows for easy interaction with log data, enabling users to load, save, increment,
|
---|
11 | set, and append fields in the log. The script also includes context managers for
|
---|
12 | file locking and editing log data to ensure data integrity and avoid race conditions.
|
---|
13 | """
|
---|
14 |
|
---|
15 | import json
|
---|
16 | import os
|
---|
17 | from collections.abc import MutableMapping, MutableSequence
|
---|
18 | from contextlib import contextmanager
|
---|
19 | from datetime import datetime
|
---|
20 | from pathlib import Path
|
---|
21 | from typing import Any, Protocol
|
---|
22 |
|
---|
23 | import fire
|
---|
24 | from filelock import FileLock
|
---|
25 |
|
---|
26 | try:
|
---|
27 | import polars as pl
|
---|
28 |
|
---|
29 | CSV_LIB_EXCEPTION = None
|
---|
30 | except ImportError as e:
|
---|
31 | CSV_LIB_EXCEPTION: ImportError = e
|
---|
32 |
|
---|
33 | try:
|
---|
34 | from ruamel.yaml import YAML
|
---|
35 |
|
---|
36 | YAML_LIB_EXCEPTION = None
|
---|
37 | except ImportError as e:
|
---|
38 | YAML_LIB_EXCEPTION: ImportError = e
|
---|
39 |
|
---|
40 |
|
---|
41 | class ContainerProxy:
|
---|
42 | """
|
---|
43 | A proxy class that wraps a mutable container object (such as a dictionary or
|
---|
44 | a list) and calls a provided save_callback function whenever the container
|
---|
45 | or its contents
|
---|
46 | are changed.
|
---|
47 | """
|
---|
48 | def __init__(self, container, save_callback):
|
---|
49 | self.container = container
|
---|
50 | self.save_callback = save_callback
|
---|
51 |
|
---|
52 | def __getitem__(self, key):
|
---|
53 | value = self.container[key]
|
---|
54 | if isinstance(value, (MutableMapping, MutableSequence)):
|
---|
55 | return ContainerProxy(value, self.save_callback)
|
---|
56 | return value
|
---|
57 |
|
---|
58 | def __setitem__(self, key, value):
|
---|
59 | self.container[key] = value
|
---|
60 | self.save_callback()
|
---|
61 |
|
---|
62 | def __delitem__(self, key):
|
---|
63 | del self.container[key]
|
---|
64 | self.save_callback()
|
---|
65 |
|
---|
66 | def __getattr__(self, name):
|
---|
67 | attr = getattr(self.container, name)
|
---|
68 |
|
---|
69 | if callable(attr):
|
---|
70 | def wrapper(*args, **kwargs):
|
---|
71 | result = attr(*args, **kwargs)
|
---|
72 | self.save_callback()
|
---|
73 | return result
|
---|
74 |
|
---|
75 | return wrapper
|
---|
76 | return attr
|
---|
77 |
|
---|
78 | def __iter__(self):
|
---|
79 | return iter(self.container)
|
---|
80 |
|
---|
81 | def __len__(self):
|
---|
82 | return len(self.container)
|
---|
83 |
|
---|
84 | def __repr__(self):
|
---|
85 | return repr(self.container)
|
---|
86 |
|
---|
87 |
|
---|
88 | class AutoSaveDict(dict):
|
---|
89 | """
|
---|
90 | A subclass of the built-in dict class with additional functionality to
|
---|
91 | automatically save changes to the dictionary. It maintains a timestamp of
|
---|
92 | the last modification and automatically wraps nested mutable containers
|
---|
93 | using ContainerProxy.
|
---|
94 | """
|
---|
95 | timestamp_key = "_timestamp"
|
---|
96 |
|
---|
97 | def __init__(self, *args, save_callback, register_timestamp=True, **kwargs):
|
---|
98 | self.save_callback = save_callback
|
---|
99 | self.__register_timestamp = register_timestamp
|
---|
100 | self.__heartbeat()
|
---|
101 | super().__init__(*args, **kwargs)
|
---|
102 | self.__wrap_dictionaries()
|
---|
103 |
|
---|
104 | def __heartbeat(self):
|
---|
105 | if self.__register_timestamp:
|
---|
106 | self[AutoSaveDict.timestamp_key] = datetime.now().isoformat()
|
---|
107 |
|
---|
108 | def __save(self):
|
---|
109 | self.__heartbeat()
|
---|
110 | self.save_callback()
|
---|
111 |
|
---|
112 | def __wrap_dictionaries(self):
|
---|
113 | for key, value in self.items():
|
---|
114 | if isinstance(value, MutableMapping) and not isinstance(
|
---|
115 | value, AutoSaveDict
|
---|
116 | ):
|
---|
117 | self[key] = AutoSaveDict(
|
---|
118 | value, save_callback=self.save_callback, register_timestamp=False
|
---|
119 | )
|
---|
120 |
|
---|
121 | def __setitem__(self, key, value):
|
---|
122 | if isinstance(value, MutableMapping) and not isinstance(value, AutoSaveDict):
|
---|
123 | value = AutoSaveDict(
|
---|
124 | value, save_callback=self.save_callback, register_timestamp=False
|
---|
125 | )
|
---|
126 | super().__setitem__(key, value)
|
---|
127 |
|
---|
128 | if self.__register_timestamp and key == AutoSaveDict.timestamp_key:
|
---|
129 | return
|
---|
130 | self.__save()
|
---|
131 |
|
---|
132 | def __getitem__(self, key):
|
---|
133 | value = super().__getitem__(key)
|
---|
134 | if isinstance(value, (MutableMapping, MutableSequence)):
|
---|
135 | return ContainerProxy(value, self.__save)
|
---|
136 | return value
|
---|
137 |
|
---|
138 | def __delitem__(self, key):
|
---|
139 | super().__delitem__(key)
|
---|
140 | self.__save()
|
---|
141 |
|
---|
142 | def pop(self, *args, **kwargs):
|
---|
143 | result = super().pop(*args, **kwargs)
|
---|
144 | self.__save()
|
---|
145 | return result
|
---|
146 |
|
---|
147 | def update(self, *args, **kwargs):
|
---|
148 | super().update(*args, **kwargs)
|
---|
149 | self.__wrap_dictionaries()
|
---|
150 | self.__save()
|
---|
151 |
|
---|
152 |
|
---|
153 | class StructuredLoggerStrategy(Protocol):
|
---|
154 | def load_data(self, file_path: Path) -> dict:
|
---|
155 | pass
|
---|
156 |
|
---|
157 | def save_data(self, file_path: Path, data: dict) -> None:
|
---|
158 | pass
|
---|
159 |
|
---|
160 |
|
---|
161 | class CSVStrategy:
|
---|
162 | def __init__(self) -> None:
|
---|
163 | if CSV_LIB_EXCEPTION:
|
---|
164 | raise RuntimeError(
|
---|
165 | "Can't parse CSV files. Missing library"
|
---|
166 | ) from CSV_LIB_EXCEPTION
|
---|
167 |
|
---|
168 | def load_data(self, file_path: Path) -> dict:
|
---|
169 | dicts: list[dict[str, Any]] = pl.read_csv(
|
---|
170 | file_path, try_parse_dates=True
|
---|
171 | ).to_dicts()
|
---|
172 | data = {}
|
---|
173 | for d in dicts:
|
---|
174 | for k, v in d.items():
|
---|
175 | if k != AutoSaveDict.timestamp_key and k in data:
|
---|
176 | if isinstance(data[k], list):
|
---|
177 | data[k].append(v)
|
---|
178 | continue
|
---|
179 | data[k] = [data[k], v]
|
---|
180 | else:
|
---|
181 | data[k] = v
|
---|
182 | return data
|
---|
183 |
|
---|
184 | def save_data(self, file_path: Path, data: dict) -> None:
|
---|
185 | pl.DataFrame(data).write_csv(file_path)
|
---|
186 |
|
---|
187 |
|
---|
188 | class JSONStrategy:
|
---|
189 | def load_data(self, file_path: Path) -> dict:
|
---|
190 | return json.loads(file_path.read_text())
|
---|
191 |
|
---|
192 | def save_data(self, file_path: Path, data: dict) -> None:
|
---|
193 | with open(file_path, "w") as f:
|
---|
194 | json.dump(data, f, indent=2)
|
---|
195 |
|
---|
196 |
|
---|
197 | class YAMLStrategy:
|
---|
198 | def __init__(self):
|
---|
199 | if YAML_LIB_EXCEPTION:
|
---|
200 | raise RuntimeError(
|
---|
201 | "Can't parse YAML files. Missing library"
|
---|
202 | ) from YAML_LIB_EXCEPTION
|
---|
203 | self.yaml = YAML()
|
---|
204 | self.yaml.indent(sequence=4, offset=2)
|
---|
205 | self.yaml.default_flow_style = False
|
---|
206 | self.yaml.representer.add_representer(AutoSaveDict, self.represent_dict)
|
---|
207 |
|
---|
208 | @classmethod
|
---|
209 | def represent_dict(cls, dumper, data):
|
---|
210 | return dumper.represent_mapping("tag:yaml.org,2002:map", data)
|
---|
211 |
|
---|
212 | def load_data(self, file_path: Path) -> dict:
|
---|
213 | return self.yaml.load(file_path.read_text())
|
---|
214 |
|
---|
215 | def save_data(self, file_path: Path, data: dict) -> None:
|
---|
216 | with open(file_path, "w") as f:
|
---|
217 | self.yaml.dump(data, f)
|
---|
218 |
|
---|
219 |
|
---|
220 | class StructuredLogger:
|
---|
221 | def __init__(
|
---|
222 | self, file_name: str, strategy: StructuredLoggerStrategy = None, truncate=False
|
---|
223 | ):
|
---|
224 | self.file_name: str = file_name
|
---|
225 | self.file_path = Path(self.file_name)
|
---|
226 | self._data: AutoSaveDict = AutoSaveDict(save_callback=self.save_data)
|
---|
227 |
|
---|
228 | if strategy is None:
|
---|
229 | self.strategy: StructuredLoggerStrategy = self.guess_strategy_from_file(
|
---|
230 | self.file_path
|
---|
231 | )
|
---|
232 | else:
|
---|
233 | self.strategy = strategy
|
---|
234 |
|
---|
235 | if not self.file_path.exists():
|
---|
236 | Path.mkdir(self.file_path.parent, exist_ok=True)
|
---|
237 | self.save_data()
|
---|
238 | return
|
---|
239 |
|
---|
240 | if truncate:
|
---|
241 | with self.get_lock():
|
---|
242 | os.truncate(self.file_path, 0)
|
---|
243 | self.save_data()
|
---|
244 |
|
---|
245 | def load_data(self):
|
---|
246 | self._data = self.strategy.load_data(self.file_path)
|
---|
247 |
|
---|
248 | def save_data(self):
|
---|
249 | self.strategy.save_data(self.file_path, self._data)
|
---|
250 |
|
---|
251 | @property
|
---|
252 | def data(self) -> AutoSaveDict:
|
---|
253 | return self._data
|
---|
254 |
|
---|
255 | @contextmanager
|
---|
256 | def get_lock(self):
|
---|
257 | with FileLock(f"{self.file_path}.lock", timeout=10):
|
---|
258 | yield
|
---|
259 |
|
---|
260 | @contextmanager
|
---|
261 | def edit_context(self):
|
---|
262 | """
|
---|
263 | Context manager that ensures proper loading and saving of log data when
|
---|
264 | performing multiple modifications.
|
---|
265 | """
|
---|
266 | with self.get_lock():
|
---|
267 | try:
|
---|
268 | self.load_data()
|
---|
269 | yield
|
---|
270 | finally:
|
---|
271 | self.save_data()
|
---|
272 |
|
---|
273 | @staticmethod
|
---|
274 | def guess_strategy_from_file(file_path: Path) -> StructuredLoggerStrategy:
|
---|
275 | file_extension = file_path.suffix.lower().lstrip(".")
|
---|
276 | return StructuredLogger.get_strategy(file_extension)
|
---|
277 |
|
---|
278 | @staticmethod
|
---|
279 | def get_strategy(strategy_name: str) -> StructuredLoggerStrategy:
|
---|
280 | strategies = {
|
---|
281 | "csv": CSVStrategy,
|
---|
282 | "json": JSONStrategy,
|
---|
283 | "yaml": YAMLStrategy,
|
---|
284 | "yml": YAMLStrategy,
|
---|
285 | }
|
---|
286 |
|
---|
287 | try:
|
---|
288 | return strategies[strategy_name]()
|
---|
289 | except KeyError as e:
|
---|
290 | raise ValueError(f"Unknown strategy for: {strategy_name}") from e
|
---|
291 |
|
---|
292 |
|
---|
293 | if __name__ == "__main__":
|
---|
294 | fire.Fire(StructuredLogger)
|
---|