1 | """
2 | A structured logging utility supporting multiple data formats such as CSV, JSON,
3 | and YAML.
4 |
5 | The main purpose of this script, besides having relevant information available
6 | in a condensed and deserialized.
7 |
8 | This script defines a protocol for different file handling strategies and provides
9 | implementations for CSV, JSON, and YAML formats. The main class, StructuredLogger,
10 | allows for easy interaction with log data, enabling users to load, save, increment,
11 | set, and append fields in the log. The script also includes context managers for
12 | file locking and editing log data to ensure data integrity and avoid race conditions.
13 | """
14 |
15 | import json
16 | import os
17 | from collections.abc import MutableMapping, MutableSequence
18 | from contextlib import contextmanager
19 | from datetime import datetime
20 | from pathlib import Path
21 | from typing import Any, Protocol
22 |
23 | import fire
24 | from filelock import FileLock
25 |
26 | try:
27 | import polars as pl
28 |
30 | except ImportError as e:
31 | CSV_LIB_EXCEPTION: ImportError = e
32 |
33 | try:
34 | from ruamel.yaml import YAML
35 |
37 | except ImportError as e:
38 | YAML_LIB_EXCEPTION: ImportError = e
39 |
40 |
41 | class ContainerProxy:
42 | """
43 | A proxy class that wraps a mutable container object (such as a dictionary or
44 | a list) and calls a provided save_callback function whenever the container
45 | or its contents
46 | are changed.
47 | """
48 | def __init__(self, container, save_callback):
49 | self.container = container
50 | self.save_callback = save_callback
51 |
52 | def __getitem__(self, key):
53 | value = self.container[key]
54 | if isinstance(value, (MutableMapping, MutableSequence)):
55 | return ContainerProxy(value, self.save_callback)
56 | return value
57 |
58 | def __setitem__(self, key, value):
59 | self.container[key] = value
60 | self.save_callback()
61 |
62 | def __delitem__(self, key):
63 | del self.container[key]
64 | self.save_callback()
65 |
66 | def __getattr__(self, name):
67 | attr = getattr(self.container, name)
68 |
69 | if callable(attr):
70 | def wrapper(*args, **kwargs):
71 | result = attr(*args, **kwargs)
72 | self.save_callback()
73 | return result
74 |
75 | return wrapper
76 | return attr
77 |
78 | def __iter__(self):
79 | return iter(self.container)
80 |
81 | def __len__(self):
82 | return len(self.container)
83 |
84 | def __repr__(self):
85 | return repr(self.container)
86 |
87 |
88 | class AutoSaveDict(dict):
89 | """
90 | A subclass of the built-in dict class with additional functionality to
91 | automatically save changes to the dictionary. It maintains a timestamp of
92 | the last modification and automatically wraps nested mutable containers
93 | using ContainerProxy.
94 | """
95 | timestamp_key = "_timestamp"
96 |
97 | def __init__(self, *args, save_callback, register_timestamp=True, **kwargs):
98 | self.save_callback = save_callback
99 | self.__register_timestamp = register_timestamp
100 | self.__heartbeat()
101 | super().__init__(*args, **kwargs)
102 | self.__wrap_dictionaries()
103 |
104 | def __heartbeat(self):
105 | if self.__register_timestamp:
106 | self[AutoSaveDict.timestamp_key] = datetime.now().isoformat()
107 |
108 | def __save(self):
109 | self.__heartbeat()
110 | self.save_callback()
111 |
112 | def __wrap_dictionaries(self):
113 | for key, value in self.items():
114 | if isinstance(value, MutableMapping) and not isinstance(
115 | value, AutoSaveDict
116 | ):
117 | self[key] = AutoSaveDict(
118 | value, save_callback=self.save_callback, register_timestamp=False
119 | )
120 |
121 | def __setitem__(self, key, value):
122 | if isinstance(value, MutableMapping) and not isinstance(value, AutoSaveDict):
123 | value = AutoSaveDict(
124 | value, save_callback=self.save_callback, register_timestamp=False
125 | )
126 | super().__setitem__(key, value)
127 |
128 | if self.__register_timestamp and key == AutoSaveDict.timestamp_key:
129 | return
130 | self.__save()
131 |
132 | def __getitem__(self, key):
133 | value = super().__getitem__(key)
134 | if isinstance(value, (MutableMapping, MutableSequence)):
135 | return ContainerProxy(value, self.__save)
136 | return value
137 |
138 | def __delitem__(self, key):
139 | super().__delitem__(key)
140 | self.__save()
141 |
142 | def pop(self, *args, **kwargs):
143 | result = super().pop(*args, **kwargs)
144 | self.__save()
145 | return result
146 |
147 | def update(self, *args, **kwargs):
148 | super().update(*args, **kwargs)
149 | self.__wrap_dictionaries()
150 | self.__save()
151 |
152 |
153 | class StructuredLoggerStrategy(Protocol):
154 | def load_data(self, file_path: Path) -> dict:
155 | pass
156 |
157 | def save_data(self, file_path: Path, data: dict) -> None:
158 | pass
159 |
160 |
161 | class CSVStrategy:
162 | def __init__(self) -> None:
164 | raise RuntimeError(
165 | "Can't parse CSV files. Missing library"
166 | ) from CSV_LIB_EXCEPTION
167 |
168 | def load_data(self, file_path: Path) -> dict:
169 | dicts: list[dict[str, Any]] = pl.read_csv(
170 | file_path, try_parse_dates=True
171 | ).to_dicts()
172 | data = {}
173 | for d in dicts:
174 | for k, v in d.items():
175 | if k != AutoSaveDict.timestamp_key and k in data:
176 | if isinstance(data[k], list):
177 | data[k].append(v)
178 | continue
179 | data[k] = [data[k], v]
180 | else:
181 | data[k] = v
182 | return data
183 |
184 | def save_data(self, file_path: Path, data: dict) -> None:
185 | pl.DataFrame(data).write_csv(file_path)
186 |
187 |
188 | class JSONStrategy:
189 | def load_data(self, file_path: Path) -> dict:
190 | return json.loads(file_path.read_text())
191 |
192 | def save_data(self, file_path: Path, data: dict) -> None:
193 | with open(file_path, "w") as f:
194 | json.dump(data, f, indent=2)
195 |
196 |
197 | class YAMLStrategy:
198 | def __init__(self):
200 | raise RuntimeError(
201 | "Can't parse YAML files. Missing library"
203 | self.yaml = YAML()
204 | self.yaml.indent(sequence=4, offset=2)
205 | self.yaml.default_flow_style = False
206 | self.yaml.representer.add_representer(AutoSaveDict, self.represent_dict)
207 |
208 | @classmethod
209 | def represent_dict(cls, dumper, data):
210 | return dumper.represent_mapping("tag:yaml.org,2002:map", data)
211 |
212 | def load_data(self, file_path: Path) -> dict:
213 | return self.yaml.load(file_path.read_text())
214 |
215 | def save_data(self, file_path: Path, data: dict) -> None:
216 | with open(file_path, "w") as f:
217 | self.yaml.dump(data, f)
218 |
219 |
220 | class StructuredLogger:
221 | def __init__(
222 | self, file_name: str, strategy: StructuredLoggerStrategy = None, truncate=False
223 | ):
224 | self.file_name: str = file_name
225 | self.file_path = Path(self.file_name)
226 | self._data: AutoSaveDict = AutoSaveDict(save_callback=self.save_data)
227 |
228 | if strategy is None:
229 | self.strategy: StructuredLoggerStrategy = self.guess_strategy_from_file(
230 | self.file_path
231 | )
232 | else:
233 | self.strategy = strategy
234 |
235 | if not self.file_path.exists():
236 | Path.mkdir(self.file_path.parent, exist_ok=True)
237 | self.save_data()
238 | return
239 |
240 | if truncate:
241 | with self.get_lock():
242 | os.truncate(self.file_path, 0)
243 | self.save_data()
244 |
245 | def load_data(self):
246 | self._data = self.strategy.load_data(self.file_path)
247 |
248 | def save_data(self):
249 | self.strategy.save_data(self.file_path, self._data)
250 |
251 | @property
252 | def data(self) -> AutoSaveDict:
253 | return self._data
254 |
255 | @contextmanager
256 | def get_lock(self):
257 | with FileLock(f"{self.file_path}.lock", timeout=10):
258 | yield
259 |
260 | @contextmanager
261 | def edit_context(self):
262 | """
263 | Context manager that ensures proper loading and saving of log data when
264 | performing multiple modifications.
265 | """
266 | with self.get_lock():
267 | try:
268 | self.load_data()
269 | yield
270 | finally:
271 | self.save_data()
272 |
273 | @staticmethod
274 | def guess_strategy_from_file(file_path: Path) -> StructuredLoggerStrategy:
275 | file_extension = file_path.suffix.lower().lstrip(".")
276 | return StructuredLogger.get_strategy(file_extension)
277 |
278 | @staticmethod
279 | def get_strategy(strategy_name: str) -> StructuredLoggerStrategy:
280 | strategies = {
281 | "csv": CSVStrategy,
282 | "json": JSONStrategy,
283 | "yaml": YAMLStrategy,
284 | "yml": YAMLStrategy,
285 | }
286 |
287 | try:
288 | return strategies[strategy_name]()
289 | except KeyError as e:
290 | raise ValueError(f"Unknown strategy for: {strategy_name}") from e
291 |
292 |
293 | if __name__ == "__main__":
294 | fire.Fire(StructuredLogger)