VirtualBox

source: vbox/trunk/src/VBox/Additions/3D/mesa/mesa-24.0.2/bin/ci/gitlab_gql.py

最後變更 在這個檔案是 103996,由 vboxsync 提交於 12 月 前

Additions/3D/mesa: export mesa-24.0.2 to OSE. bugref:10606

檔案大小: 19.2 KB
 
1#!/usr/bin/env python3
2# For the dependencies, see the requirements.txt
3
4import logging
5import re
6import traceback
7from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
8from collections import OrderedDict
9from copy import deepcopy
10from dataclasses import dataclass, field
11from itertools import accumulate
12from os import getenv
13from pathlib import Path
14from subprocess import check_output
15from textwrap import dedent
16from typing import Any, Iterable, Optional, Pattern, TypedDict, Union
17
18import yaml
19from filecache import DAY, filecache
20from gql import Client, gql
21from gql.transport.requests import RequestsHTTPTransport
22from graphql import DocumentNode
23
24
25class DagNode(TypedDict):
26 needs: set[str]
27 stage: str
28 # `name` is redundant but is here for retro-compatibility
29 name: str
30
31
32# see create_job_needs_dag function for more details
33Dag = dict[str, DagNode]
34
35
36StageSeq = OrderedDict[str, set[str]]
37TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
38
39
40def get_token_from_default_dir() -> str:
41 token_file = TOKEN_DIR / "gitlab-token"
42 try:
43 return str(token_file.resolve())
44 except FileNotFoundError as ex:
45 print(
46 f"Could not find {token_file}, please provide a token file as an argument"
47 )
48 raise ex
49
50
51def get_project_root_dir():
52 root_path = Path(__file__).parent.parent.parent.resolve()
53 gitlab_file = root_path / ".gitlab-ci.yml"
54 assert gitlab_file.exists()
55
56 return root_path
57
58
59@dataclass
60class GitlabGQL:
61 _transport: Any = field(init=False)
62 client: Client = field(init=False)
63 url: str = "https://gitlab.freedesktop.org/api/graphql"
64 token: Optional[str] = None
65
66 def __post_init__(self) -> None:
67 self._setup_gitlab_gql_client()
68
69 def _setup_gitlab_gql_client(self) -> None:
70 # Select your transport with a defined url endpoint
71 headers = {}
72 if self.token:
73 headers["Authorization"] = f"Bearer {self.token}"
74 self._transport = RequestsHTTPTransport(url=self.url, headers=headers)
75
76 # Create a GraphQL client using the defined transport
77 self.client = Client(transport=self._transport, fetch_schema_from_transport=True)
78
79 def query(
80 self,
81 gql_file: Union[Path, str],
82 params: dict[str, Any] = {},
83 operation_name: Optional[str] = None,
84 paginated_key_loc: Iterable[str] = [],
85 disable_cache: bool = False,
86 ) -> dict[str, Any]:
87 def run_uncached() -> dict[str, Any]:
88 if paginated_key_loc:
89 return self._sweep_pages(gql_file, params, operation_name, paginated_key_loc)
90 return self._query(gql_file, params, operation_name)
91
92 if disable_cache:
93 return run_uncached()
94
95 try:
96 # Create an auxiliary variable to deliver a cached result and enable catching exceptions
97 # Decorate the query to be cached
98 if paginated_key_loc:
99 result = self._sweep_pages_cached(
100 gql_file, params, operation_name, paginated_key_loc
101 )
102 else:
103 result = self._query_cached(gql_file, params, operation_name)
104 return result # type: ignore
105 except Exception as ex:
106 logging.error(f"Cached query failed with {ex}")
107 # print exception traceback
108 traceback_str = "".join(traceback.format_exception(ex))
109 logging.error(traceback_str)
110 self.invalidate_query_cache()
111 logging.error("Cache invalidated, retrying without cache")
112 finally:
113 return run_uncached()
114
115 def _query(
116 self,
117 gql_file: Union[Path, str],
118 params: dict[str, Any] = {},
119 operation_name: Optional[str] = None,
120 ) -> dict[str, Any]:
121 # Provide a GraphQL query
122 source_path: Path = Path(__file__).parent
123 pipeline_query_file: Path = source_path / gql_file
124
125 query: DocumentNode
126 with open(pipeline_query_file, "r") as f:
127 pipeline_query = f.read()
128 query = gql(pipeline_query)
129
130 # Execute the query on the transport
131 return self.client.execute_sync(
132 query, variable_values=params, operation_name=operation_name
133 )
134
135 @filecache(DAY)
136 def _sweep_pages_cached(self, *args, **kwargs):
137 return self._sweep_pages(*args, **kwargs)
138
139 @filecache(DAY)
140 def _query_cached(self, *args, **kwargs):
141 return self._query(*args, **kwargs)
142
143 def _sweep_pages(
144 self, query, params, operation_name=None, paginated_key_loc: Iterable[str] = []
145 ) -> dict[str, Any]:
146 """
147 Retrieve paginated data from a GraphQL API and concatenate the results into a single
148 response.
149
150 Args:
151 query: represents a filepath with the GraphQL query to be executed.
152 params: a dictionary that contains the parameters to be passed to the query. These
153 parameters can be used to filter or modify the results of the query.
154 operation_name: The `operation_name` parameter is an optional parameter that specifies
155 the name of the GraphQL operation to be executed. It is used when making a GraphQL
156 query to specify which operation to execute if there are multiple operations defined
157 in the GraphQL schema. If not provided, the default operation will be executed.
158 paginated_key_loc (Iterable[str]): The `paginated_key_loc` parameter is an iterable of
159 strings that represents the location of the paginated field within the response. It
160 is used to extract the paginated field from the response and append it to the final
161 result. The node has to be a list of objects with a `pageInfo` field that contains
162 at least the `hasNextPage` and `endCursor` fields.
163
164 Returns:
165 a dictionary containing the response from the query with the paginated field
166 concatenated.
167 """
168
169 def fetch_page(cursor: str | None = None) -> dict[str, Any]:
170 if cursor:
171 params["cursor"] = cursor
172 logging.info(
173 f"Found more than 100 elements, paginating. "
174 f"Current cursor at {cursor}"
175 )
176
177 return self._query(query, params, operation_name)
178
179 # Execute the initial query
180 response: dict[str, Any] = fetch_page()
181
182 # Initialize an empty list to store the final result
183 final_partial_field: list[dict[str, Any]] = []
184
185 # Loop until all pages have been retrieved
186 while True:
187 # Get the partial field to be appended to the final result
188 partial_field = response
189 for key in paginated_key_loc:
190 partial_field = partial_field[key]
191
192 # Append the partial field to the final result
193 final_partial_field += partial_field["nodes"]
194
195 # Check if there are more pages to retrieve
196 page_info = partial_field["pageInfo"]
197 if not page_info["hasNextPage"]:
198 break
199
200 # Execute the query with the updated cursor parameter
201 response = fetch_page(page_info["endCursor"])
202
203 # Replace the "nodes" field in the original response with the final result
204 partial_field["nodes"] = final_partial_field
205 return response
206
207 def invalidate_query_cache(self) -> None:
208 logging.warning("Invalidating query cache")
209 try:
210 self._sweep_pages._db.clear()
211 self._query._db.clear()
212 except AttributeError as ex:
213 logging.warning(f"Could not invalidate cache, maybe it was not used in {ex.args}?")
214
215
216def insert_early_stage_jobs(stage_sequence: StageSeq, jobs_metadata: Dag) -> Dag:
217 pre_processed_dag: dict[str, set[str]] = {}
218 jobs_from_early_stages = list(accumulate(stage_sequence.values(), set.union))
219 for job_name, metadata in jobs_metadata.items():
220 final_needs: set[str] = deepcopy(metadata["needs"])
221 # Pre-process jobs that are not based on needs field
222 # e.g. sanity job in mesa MR pipelines
223 if not final_needs:
224 job_stage: str = jobs_metadata[job_name]["stage"]
225 stage_index: int = list(stage_sequence.keys()).index(job_stage)
226 if stage_index > 0:
227 final_needs |= jobs_from_early_stages[stage_index - 1]
228 pre_processed_dag[job_name] = final_needs
229
230 for job_name, needs in pre_processed_dag.items():
231 jobs_metadata[job_name]["needs"] = needs
232
233 return jobs_metadata
234
235
236def traverse_dag_needs(jobs_metadata: Dag) -> None:
237 created_jobs = set(jobs_metadata.keys())
238 for job, metadata in jobs_metadata.items():
239 final_needs: set = deepcopy(metadata["needs"]) & created_jobs
240 # Post process jobs that are based on needs field
241 partial = True
242
243 while partial:
244 next_depth: set[str] = {n for dn in final_needs for n in jobs_metadata[dn]["needs"]}
245 partial: bool = not final_needs.issuperset(next_depth)
246 final_needs = final_needs.union(next_depth)
247
248 jobs_metadata[job]["needs"] = final_needs
249
250
251def extract_stages_and_job_needs(
252 pipeline_jobs: dict[str, Any], pipeline_stages: dict[str, Any]
253) -> tuple[StageSeq, Dag]:
254 jobs_metadata = Dag()
255 # Record the stage sequence to post process deps that are not based on needs
256 # field, for example: sanity job
257 stage_sequence: OrderedDict[str, set[str]] = OrderedDict()
258 for stage in pipeline_stages["nodes"]:
259 stage_sequence[stage["name"]] = set()
260
261 for job in pipeline_jobs["nodes"]:
262 stage_sequence[job["stage"]["name"]].add(job["name"])
263 dag_job: DagNode = {
264 "name": job["name"],
265 "stage": job["stage"]["name"],
266 "needs": set([j["node"]["name"] for j in job["needs"]["edges"]]),
267 }
268 jobs_metadata[job["name"]] = dag_job
269
270 return stage_sequence, jobs_metadata
271
272
273def create_job_needs_dag(gl_gql: GitlabGQL, params, disable_cache: bool = True) -> Dag:
274 """
275 This function creates a Directed Acyclic Graph (DAG) to represent a sequence of jobs, where each
276 job has a set of jobs that it depends on (its "needs") and belongs to a certain "stage".
277 The "name" of the job is used as the key in the dictionary.
278
279 For example, consider the following DAG:
280
281 1. build stage: job1 -> job2 -> job3
282 2. test stage: job2 -> job4
283
284 - The job needs for job3 are: job1, job2
285 - The job needs for job4 are: job2
286 - The job2 needs to wait all jobs from build stage to finish.
287
288 The resulting DAG would look like this:
289
290 dag = {
291 "job1": {"needs": set(), "stage": "build", "name": "job1"},
292 "job2": {"needs": {"job1", "job2", job3"}, "stage": "test", "name": "job2"},
293 "job3": {"needs": {"job1", "job2"}, "stage": "build", "name": "job3"},
294 "job4": {"needs": {"job2"}, "stage": "test", "name": "job4"},
295 }
296
297 To access the job needs, one can do:
298
299 dag["job3"]["needs"]
300
301 This will return the set of jobs that job3 needs: {"job1", "job2"}
302
303 Args:
304 gl_gql (GitlabGQL): The `gl_gql` parameter is an instance of the `GitlabGQL` class, which is
305 used to make GraphQL queries to the GitLab API.
306 params (dict): The `params` parameter is a dictionary that contains the necessary parameters
307 for the GraphQL query. It is used to specify the details of the pipeline for which the
308 job needs DAG is being created.
309 The specific keys and values in the `params` dictionary will depend on
310 the requirements of the GraphQL query being executed
311 disable_cache (bool): The `disable_cache` parameter is a boolean that specifies whether the
312
313 Returns:
314 The final DAG (Directed Acyclic Graph) representing the job dependencies sourced from needs
315 or stages rule.
316 """
317 stages_jobs_gql = gl_gql.query(
318 "pipeline_details.gql",
319 params=params,
320 paginated_key_loc=["project", "pipeline", "jobs"],
321 disable_cache=disable_cache,
322 )
323 pipeline_data = stages_jobs_gql["project"]["pipeline"]
324 if not pipeline_data:
325 raise RuntimeError(f"Could not find any pipelines for {params}")
326
327 stage_sequence, jobs_metadata = extract_stages_and_job_needs(
328 pipeline_data["jobs"], pipeline_data["stages"]
329 )
330 # Fill the DAG with the job needs from stages that don't have any needs but still need to wait
331 # for previous stages
332 final_dag = insert_early_stage_jobs(stage_sequence, jobs_metadata)
333 # Now that each job has its direct needs filled correctly, update the "needs" field for each job
334 # in the DAG by performing a topological traversal
335 traverse_dag_needs(final_dag)
336
337 return final_dag
338
339
340def filter_dag(dag: Dag, regex: Pattern) -> Dag:
341 jobs_with_regex: set[str] = {job for job in dag if regex.fullmatch(job)}
342 return Dag({job: data for job, data in dag.items() if job in sorted(jobs_with_regex)})
343
344
345def print_dag(dag: Dag) -> None:
346 for job, data in dag.items():
347 print(f"{job}:")
348 print(f"\t{' '.join(data['needs'])}")
349 print()
350
351
352def fetch_merged_yaml(gl_gql: GitlabGQL, params) -> dict[str, Any]:
353 params["content"] = dedent("""\
354 include:
355 - local: .gitlab-ci.yml
356 """)
357 raw_response = gl_gql.query("job_details.gql", params)
358 if merged_yaml := raw_response["ciConfig"]["mergedYaml"]:
359 return yaml.safe_load(merged_yaml)
360
361 gl_gql.invalidate_query_cache()
362 raise ValueError(
363 """
364 Could not fetch any content for merged YAML,
365 please verify if the git SHA exists in remote.
366 Maybe you forgot to `git push`? """
367 )
368
369
370def recursive_fill(job, relationship_field, target_data, acc_data: dict, merged_yaml):
371 if relatives := job.get(relationship_field):
372 if isinstance(relatives, str):
373 relatives = [relatives]
374
375 for relative in relatives:
376 parent_job = merged_yaml[relative]
377 acc_data = recursive_fill(parent_job, acc_data, merged_yaml) # type: ignore
378
379 acc_data |= job.get(target_data, {})
380
381 return acc_data
382
383
384def get_variables(job, merged_yaml, project_path, sha) -> dict[str, str]:
385 p = get_project_root_dir() / ".gitlab-ci" / "image-tags.yml"
386 image_tags = yaml.safe_load(p.read_text())
387
388 variables = image_tags["variables"]
389 variables |= merged_yaml["variables"]
390 variables |= job["variables"]
391 variables["CI_PROJECT_PATH"] = project_path
392 variables["CI_PROJECT_NAME"] = project_path.split("/")[1]
393 variables["CI_REGISTRY_IMAGE"] = "registry.freedesktop.org/${CI_PROJECT_PATH}"
394 variables["CI_COMMIT_SHA"] = sha
395
396 while recurse_among_variables_space(variables):
397 pass
398
399 return variables
400
401
402# Based on: https://stackoverflow.com/a/2158532/1079223
403def flatten(xs):
404 for x in xs:
405 if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
406 yield from flatten(x)
407 else:
408 yield x
409
410
411def get_full_script(job) -> list[str]:
412 script = []
413 for script_part in ("before_script", "script", "after_script"):
414 script.append(f"# {script_part}")
415 lines = flatten(job.get(script_part, []))
416 script.extend(lines)
417 script.append("")
418
419 return script
420
421
422def recurse_among_variables_space(var_graph) -> bool:
423 updated = False
424 for var, value in var_graph.items():
425 value = str(value)
426 dep_vars = []
427 if match := re.findall(r"(\$[{]?[\w\d_]*[}]?)", value):
428 all_dep_vars = [v.lstrip("${").rstrip("}") for v in match]
429 # print(value, match, all_dep_vars)
430 dep_vars = [v for v in all_dep_vars if v in var_graph]
431
432 for dep_var in dep_vars:
433 dep_value = str(var_graph[dep_var])
434 new_value = var_graph[var]
435 new_value = new_value.replace(f"${{{dep_var}}}", dep_value)
436 new_value = new_value.replace(f"${dep_var}", dep_value)
437 var_graph[var] = new_value
438 updated |= dep_value != new_value
439
440 return updated
441
442
443def print_job_final_definition(job_name, merged_yaml, project_path, sha):
444 job = merged_yaml[job_name]
445 variables = get_variables(job, merged_yaml, project_path, sha)
446
447 print("# --------- variables ---------------")
448 for var, value in sorted(variables.items()):
449 print(f"export {var}={value!r}")
450
451 # TODO: Recurse into needs to get full script
452 # TODO: maybe create a extra yaml file to avoid too much rework
453 script = get_full_script(job)
454 print()
455 print()
456 print("# --------- full script ---------------")
457 print("\n".join(script))
458
459 if image := variables.get("MESA_IMAGE"):
460 print()
461 print()
462 print("# --------- container image ---------------")
463 print(image)
464
465
466def from_sha_to_pipeline_iid(gl_gql: GitlabGQL, params) -> str:
467 result = gl_gql.query("pipeline_utils.gql", params)
468
469 return result["project"]["pipelines"]["nodes"][0]["iid"]
470
471
472def parse_args() -> Namespace:
473 parser = ArgumentParser(
474 formatter_class=ArgumentDefaultsHelpFormatter,
475 description="CLI and library with utility functions to debug jobs via Gitlab GraphQL",
476 epilog=f"""Example:
477 {Path(__file__).name} --print-dag""",
478 )
479 parser.add_argument("-pp", "--project-path", type=str, default="mesa/mesa")
480 parser.add_argument("--sha", "--rev", type=str, default='HEAD')
481 parser.add_argument(
482 "--regex",
483 type=str,
484 required=False,
485 help="Regex pattern for the job name to be considered",
486 )
487 mutex_group_print = parser.add_mutually_exclusive_group()
488 mutex_group_print.add_argument(
489 "--print-dag",
490 action="store_true",
491 help="Print job needs DAG",
492 )
493 mutex_group_print.add_argument(
494 "--print-merged-yaml",
495 action="store_true",
496 help="Print the resulting YAML for the specific SHA",
497 )
498 mutex_group_print.add_argument(
499 "--print-job-manifest",
500 metavar='JOB_NAME',
501 type=str,
502 help="Print the resulting job data"
503 )
504 parser.add_argument(
505 "--gitlab-token-file",
506 type=str,
507 default=get_token_from_default_dir(),
508 help="force GitLab token, otherwise it's read from $XDG_CONFIG_HOME/gitlab-token",
509 )
510
511 args = parser.parse_args()
512 args.gitlab_token = Path(args.gitlab_token_file).read_text().strip()
513 return args
514
515
516def main():
517 args = parse_args()
518 gl_gql = GitlabGQL(token=args.gitlab_token)
519
520 sha = check_output(['git', 'rev-parse', args.sha]).decode('ascii').strip()
521
522 if args.print_dag:
523 iid = from_sha_to_pipeline_iid(gl_gql, {"projectPath": args.project_path, "sha": sha})
524 dag = create_job_needs_dag(
525 gl_gql, {"projectPath": args.project_path, "iid": iid}, disable_cache=True
526 )
527
528 if args.regex:
529 dag = filter_dag(dag, re.compile(args.regex))
530
531 print_dag(dag)
532
533 if args.print_merged_yaml or args.print_job_manifest:
534 merged_yaml = fetch_merged_yaml(
535 gl_gql, {"projectPath": args.project_path, "sha": sha}
536 )
537
538 if args.print_merged_yaml:
539 print(yaml.dump(merged_yaml, indent=2))
540
541 if args.print_job_manifest:
542 print_job_final_definition(
543 args.print_job_manifest, merged_yaml, args.project_path, sha
544 )
545
546
547if __name__ == "__main__":
548 main()
注意: 瀏覽 TracBrowser 來幫助您使用儲存庫瀏覽器

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette