1 | import argparse
2 | import logging
3 | from datetime import datetime
4 | from pathlib import Path
5 |
6 | from structured_logger import StructuredLogger
7 |
8 |
9 | class CustomLogger:
10 | def __init__(self, log_file):
11 | self.log_file = log_file
12 | self.logger = StructuredLogger(file_name=self.log_file)
13 |
14 | def get_last_dut_job(self):
15 | """
16 | Gets the details of the most recent DUT job.
17 |
18 | Returns:
19 | dict: Details of the most recent DUT job.
20 |
21 | Raises:
22 | ValueError: If no DUT jobs are found in the logger's data.
23 | """
24 | try:
25 | job = self.logger.data["dut_jobs"][-1]
26 | except KeyError:
27 | raise ValueError(
28 | "No DUT jobs found. Please create a job via create_dut_job call."
29 | )
30 |
31 | return job
32 |
33 | def update(self, **kwargs):
34 | """
35 | Updates the log file with provided key-value pairs.
36 |
37 | Args:
38 | **kwargs: Key-value pairs to be updated.
39 |
40 | """
41 | with self.logger.edit_context():
42 | for key, value in kwargs.items():
43 | self.logger.data[key] = value
44 |
45 | def create_dut_job(self, **kwargs):
46 | """
47 | Creates a new DUT job with provided key-value pairs.
48 |
49 | Args:
50 | **kwargs: Key-value pairs for the new DUT job.
51 |
52 | """
53 | with self.logger.edit_context():
54 | if "dut_jobs" not in self.logger.data:
55 | self.logger.data["dut_jobs"] = []
56 | new_job = {
57 | "status": "",
58 | "submitter_start_time": datetime.now().isoformat(),
59 | "dut_submit_time": "",
60 | "dut_start_time": "",
61 | "dut_end_time": "",
62 | "dut_name": "",
63 | "dut_state": "pending",
64 | "dut_job_phases": [],
65 | **kwargs,
66 | }
67 | self.logger.data["dut_jobs"].append(new_job)
68 |
69 | def update_dut_job(self, key, value):
70 | """
71 | Updates the last DUT job with a key-value pair.
72 |
73 | Args:
74 | key : The key to be updated.
75 | value: The value to be assigned.
76 |
77 | """
78 | with self.logger.edit_context():
79 | job = self.get_last_dut_job()
80 | job[key] = value
81 |
82 | def update_status_fail(self, reason=""):
83 | """
84 | Sets the status of the last DUT job to 'fail' and logs the failure reason.
85 |
86 | Args:
87 | reason (str, optional): The reason for the failure. Defaults to "".
88 |
89 | """
90 | with self.logger.edit_context():
91 | job = self.get_last_dut_job()
92 | job["status"] = "fail"
93 | job["dut_job_fail_reason"] = reason
94 |
95 | def create_job_phase(self, phase_name):
96 | """
97 | Creates a new job phase for the last DUT job.
98 |
99 | Args:
100 | phase_name : The name of the new job phase.
101 |
102 | """
103 | with self.logger.edit_context():
104 | job = self.get_last_dut_job()
105 | if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
106 | # If the last phase exists and its end time is empty, set the end time
107 | job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
108 |
109 | # Create a new phase
110 | phase_data = {
111 | "name": phase_name,
112 | "start_time": datetime.now().isoformat(),
113 | "end_time": "",
114 | }
115 | job["dut_job_phases"].append(phase_data)
116 |
117 | def check_dut_timings(self, job):
118 | """
119 | Check the timing sequence of a job to ensure logical consistency.
120 |
121 | The function verifies that the job's submission time is not earlier than its start time and that
122 | the job's end time is not earlier than its start time. If either of these conditions is found to be true,
123 | an error is logged for each instance of inconsistency.
124 |
125 | Args:
126 | job (dict): A dictionary containing timing information of a job. Expected keys are 'dut_start_time',
127 | 'dut_submit_time', and 'dut_end_time'.
128 |
129 | Returns:
130 | None: This function does not return a value; it logs errors if timing inconsistencies are detected.
131 |
132 | The function checks the following:
133 | - If 'dut_start_time' and 'dut_submit_time' are both present and correctly sequenced.
134 | - If 'dut_start_time' and 'dut_end_time' are both present and correctly sequenced.
135 | """
136 |
137 | # Check if the start time and submit time exist
138 | if job.get("dut_start_time") and job.get("dut_submit_time"):
139 | # If they exist, check if the submission time is before the start time
140 | if job["dut_start_time"] < job["dut_submit_time"]:
141 | logging.error("Job submission is happening before job start.")
142 |
143 | # Check if the start time and end time exist
144 | if job.get("dut_start_time") and job.get("dut_end_time"):
145 | # If they exist, check if the end time is after the start time
146 | if job["dut_end_time"] < job["dut_start_time"]:
147 | logging.error("Job ended before it started.")
148 |
149 | # Method to update DUT start, submit and end time
150 | def update_dut_time(self, value, custom_time):
151 | """
152 | Updates DUT start, submit, and end times.
153 |
154 | Args:
155 | value : Specifies which DUT time to update. Options: 'start', 'submit', 'end'.
156 | custom_time : Custom time to set. If None, use current time.
157 |
158 | Raises:
159 | ValueError: If an invalid argument is provided for value.
160 |
161 | """
162 | with self.logger.edit_context():
163 | job = self.get_last_dut_job()
164 | timestamp = custom_time if custom_time else datetime.now().isoformat()
165 | if value == "start":
166 | job["dut_start_time"] = timestamp
167 | job["dut_state"] = "running"
168 | elif value == "submit":
169 | job["dut_submit_time"] = timestamp
170 | job["dut_state"] = "submitted"
171 | elif value == "end":
172 | job["dut_end_time"] = timestamp
173 | job["dut_state"] = "finished"
174 | else:
175 | raise ValueError(
176 | "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
177 | )
178 | # check the sanity of the partial structured log
179 | self.check_dut_timings(job)
180 |
181 | def close_dut_job(self):
182 | """
183 | Closes the most recent DUT (Device Under Test) job in the logger's data.
184 |
185 | The method performs the following operations:
186 | 1. Validates if there are any DUT jobs in the logger's data.
187 | 2. If the last phase of the most recent DUT job has an empty end time, it sets the end time to the current time.
188 |
189 | Raises:
190 | ValueError: If no DUT jobs are found in the logger's data.
191 | """
192 | with self.logger.edit_context():
193 | job = self.get_last_dut_job()
194 | # Check if the last phase exists and its end time is empty, then set the end time
195 | if job["dut_job_phases"] and job["dut_job_phases"][-1]["end_time"] == "":
196 | job["dut_job_phases"][-1]["end_time"] = datetime.now().isoformat()
197 |
198 | def close(self):
199 | """
200 | Closes the most recent DUT (Device Under Test) job in the logger's data.
201 |
202 | The method performs the following operations:
203 | 1. Determines the combined status of all DUT jobs.
204 | 2. Sets the submitter's end time to the current time.
205 | 3. Updates the DUT attempt counter to reflect the total number of DUT jobs.
206 |
207 | """
208 | with self.logger.edit_context():
209 | job_status = []
210 | for job in self.logger.data["dut_jobs"]:
211 | if "status" in job:
212 | job_status.append(job["status"])
213 |
214 | if not job_status:
215 | job_combined_status = "null"
216 | else:
217 | # Get job_combined_status
218 | if "pass" in job_status:
219 | job_combined_status = "pass"
220 | else:
221 | job_combined_status = "fail"
222 |
223 | self.logger.data["job_combined_status"] = job_combined_status
224 | self.logger.data["dut_attempt_counter"] = len(self.logger.data["dut_jobs"])
225 | job["submitter_end_time"] = datetime.now().isoformat()
226 |
227 |
228 | def process_args(args):
229 | # Function to process key-value pairs and call corresponding logger methods
230 | def process_key_value_pairs(args_list, action_func):
231 | if not args_list:
232 | raise ValueError(
233 | f"No key-value pairs provided for {action_func.__name__.replace('_', '-')}"
234 | )
235 | if len(args_list) % 2 != 0:
236 | raise ValueError(
237 | f"Incomplete key-value pairs for {action_func.__name__.replace('_', '-')}"
238 | )
239 | kwargs = dict(zip(args_list[::2], args_list[1::2]))
240 | action_func(**kwargs)
241 |
242 | # Create a CustomLogger object with the specified log file path
243 | custom_logger = CustomLogger(Path(args.log_file))
244 |
245 | if args.update:
246 | process_key_value_pairs(args.update, custom_logger.update)
247 |
248 | if args.create_dut_job:
249 | process_key_value_pairs(args.create_dut_job, custom_logger.create_dut_job)
250 |
251 | if args.update_dut_job:
252 | key, value = args.update_dut_job
253 | custom_logger.update_dut_job(key, value)
254 |
255 | if args.create_job_phase:
256 | custom_logger.create_job_phase(args.create_job_phase)
257 |
258 | if args.update_status_fail:
259 | custom_logger.update_status_fail(args.update_status_fail)
260 |
261 | if args.update_dut_time:
262 | if len(args.update_dut_time) == 2:
263 | action, custom_time = args.update_dut_time
264 | elif len(args.update_dut_time) == 1:
265 | action, custom_time = args.update_dut_time[0], None
266 | else:
267 | raise ValueError("Invalid number of values for --update-dut-time")
268 |
269 | if action in ["start", "end", "submit"]:
270 | custom_logger.update_dut_time(action, custom_time)
271 | else:
272 | raise ValueError(
273 | "Error: Invalid argument provided for --update-dut-time. Use 'start', 'submit', 'end'."
274 | )
275 |
276 | if args.close_dut_job:
277 | custom_logger.close_dut_job()
278 |
279 | if args.close:
280 | custom_logger.close()
281 |
282 |
283 | def main():
284 | parser = argparse.ArgumentParser(description="Custom Logger Command Line Tool")
285 | parser.add_argument("log_file", help="Path to the log file")
286 | parser.add_argument(
287 | "--update",
288 | nargs=argparse.ZERO_OR_MORE,
289 | metavar=("key", "value"),
290 | help="Update a key-value pair e.g., --update key1 value1 key2 value2)",
291 | )
292 | parser.add_argument(
293 | "--create-dut-job",
294 | nargs=argparse.ZERO_OR_MORE,
295 | metavar=("key", "value"),
296 | help="Create a new DUT job with key-value pairs (e.g., --create-dut-job key1 value1 key2 value2)",
297 | )
298 | parser.add_argument(
299 | "--update-dut-job",
300 | nargs=argparse.ZERO_OR_MORE,
301 | metavar=("key", "value"),
302 | help="Update a key-value pair in DUT job",
303 | )
304 | parser.add_argument(
305 | "--create-job-phase",
306 | help="Create a new job phase (e.g., --create-job-phase name)",
307 | )
308 | parser.add_argument(
309 | "--update-status-fail",
310 | help="Update fail as the status and log the failure reason (e.g., --update-status-fail reason)",
311 | )
312 | parser.add_argument(
313 | "--update-dut-time",
314 | nargs=argparse.ZERO_OR_MORE,
315 | metavar=("action", "custom_time"),
316 | help="Update DUT start and end time. Provide action ('start', 'submit', 'end') and custom_time (e.g., '2023-01-01T12:00:00')",
317 | )
318 | parser.add_argument(
319 | "--close-dut-job",
320 | action="store_true",
321 | help="Close the dut job by updating end time of last dut job)",
322 | )
323 | parser.add_argument(
324 | "--close",
325 | action="store_true",
326 | help="Updates combined status, submitter's end time and DUT attempt counter",
327 | )
328 | args = parser.parse_args()
329 |
330 | process_args(args)
331 |
332 |
333 | if __name__ == "__main__":
334 | main()