1 | # -*- coding: utf-8 -*-
2 | # $Id: schedulerbase.py 56295 2015-06-09 14:29:55Z vboxsync $
3 | # pylint: disable=C0302
4 |
5 |
6 | """
7 | Test Manager - Base class and utilities for the schedulers.
8 | """
9 |
10 | __copyright__ = \
11 | """
12 | Copyright (C) 2012-2015 Oracle Corporation
13 |
14 | This file is part of VirtualBox Open Source Edition (OSE), as
15 | available from http://www.alldomusa.eu.org. This file is free software;
16 | you can redistribute it and/or modify it under the terms of the GNU
17 | General Public License (GPL) as published by the Free Software
18 | Foundation, in version 2 as it comes in the "COPYING" file of the
19 | VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20 | hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21 |
22 | The contents of this file may alternatively be used under the terms
23 | of the Common Development and Distribution License Version 1.0
24 | (CDDL) only, as it comes in the "COPYING.CDDL" file of the
25 | VirtualBox OSE distribution, in which case the provisions of the
26 | CDDL are applicable instead of those of the GPL.
27 |
28 | You may elect to license modified versions of this file under the
29 | terms and conditions of either the GPL or the CDDL or both.
30 | """
31 | __version__ = "$Revision: 56295 $"
32 |
33 |
34 | # Standard python imports.
35 | import unittest;
36 |
37 | # Validation Kit imports.
38 | from common import utils, constants;
39 | from testmanager import config;
40 | from testmanager.core.build import BuildDataEx, BuildLogic;
41 | from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42 | from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43 | from testmanager.core.globalresource import GlobalResourceLogic;
44 | from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45 | from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46 | from testmanager.core.testbox import TestBoxData;
47 | from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48 | from testmanager.core.testcase import TestCaseLogic;
49 | from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50 | from testmanager.core.testset import TestSetData, TestSetLogic;
51 |
52 |
53 | class ReCreateQueueData(object):
54 | """
55 | Data object for recreating a scheduling queue.
56 |
57 | It's mostly a storage object, but has a few data checking operation
58 | associated with it.
59 | """
60 |
61 | def __init__(self, oDb, idSchedGroup):
62 | #
63 | # Load data from the database.
64 | #
65 |
66 | # Will extend the entries with aoTestCases and dTestCases members
67 | # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 | self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69 |
70 | # aoTestCases entries are TestCaseData instance with iSchedPriority
71 | # and idTestGroup added for our purposes.
72 | # We will add oTestGroup and aoArgsVariations members to each further down.
73 | self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74 |
75 | # Load dependencies.
76 | oTestCaseLogic = TestCaseLogic(oDb)
77 | for oTestCase in self.aoTestCases:
78 | oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79 |
80 | # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 | # and idTestGroup added for our purposes.
82 | # We will add oTestGroup and oTestCase members to each further down.
83 | self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84 |
85 | #
86 | # Generate global lookups.
87 | #
88 |
89 | # Generate a testcase lookup dictionary for use when working on
90 | # argument variations.
91 | self.dTestCases = dict();
92 | for oTestCase in self.aoTestCases:
93 | self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 | assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95 |
96 | # Generate a testgroup lookup dictionary.
97 | self.dTestGroups = dict();
98 | for oTestGroup in self.aoTestGroups:
99 | self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 | assert len(self.dTestGroups) == len(self.aoTestGroups);
101 |
102 | #
103 | # Associate extra members with the base data.
104 | #
105 | if len(self.aoTestGroups) > 0:
106 | # Prep the test groups.
107 | for oTestGroup in self.aoTestGroups:
108 | oTestGroup.aoTestCases = list();
109 | oTestGroup.dTestCases = dict();
110 |
111 | # Link testcases to their group, both directions. Prep testcases for
112 | # argument varation association.
113 | oTestGroup = self.aoTestGroups[0];
114 | for oTestCase in self.aoTestCases:
115 | if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 | oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117 |
118 | assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 | oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 | oTestGroup.aoTestCases.append(oTestCase);
121 | oTestCase.oTestGroup = oTestGroup;
122 | oTestCase.aoArgsVariations = list();
123 |
124 | # Associate testcase argument variations with their testcases (group)
125 | # in both directions.
126 | oTestGroup = self.aoTestGroups[0];
127 | oTestCase = self.aoTestCases[0] if len(self.aoTestCases) > 0 else None;
128 | for oArgVariation in self.aoArgsVariations:
129 | if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 | oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 | if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 | oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133 |
134 | oTestCase.aoArgsVariations.append(oArgVariation);
135 | oArgVariation.oTestCase = oTestCase;
136 | oArgVariation.oTestGroup = oTestGroup;
137 |
138 | else:
139 | assert len(self.aoTestCases) == 0;
140 | assert len(self.aoArgsVariations) == 0;
141 | # done.
142 |
143 | @staticmethod
144 | def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 | """ Returns a chain of IDs error entry. """
146 |
147 | sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 | for i in range(1, len(aidChain)):
149 | sMsg += ' -> %s' % (aidChain[i],);
150 |
151 | aoErrors.append([sMsg, oObj]);
152 | return aoErrors;
153 |
154 | def checkForGroupDepCycles(self):
155 | """
156 | Checks for testgroup depencency cycles and any missing testgroup
157 | dependencies.
158 | Returns array of errors (see SchedulderBase.recreateQueue()).
159 | """
160 | aoErrors = list();
161 | for oTestGroup in self.aoTestGroups:
162 | idPreReq = oTestGroup.idTestGroupPreReq;
163 | if idPreReq is None:
164 | oTestGroup.aidTestGroupPreReqs = list();
165 | continue;
166 |
167 | aidChain = [oTestGroup.idTestGroup,];
168 | while idPreReq is not None:
169 | aidChain.append(idPreReq);
170 | if len(aidChain) >= 10:
171 | self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 | 'TestGroup #%s prerequisite chain is too long!'
173 | % (oTestGroup.idTestGroup,));
174 | break;
175 |
176 | oDep = self.dTestGroups.get(idPreReq, None);
177 | if oDep is None:
178 | self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 | 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 | % (oTestGroup.idTestGroup, idPreReq,));
181 | break;
182 |
183 | idPreReq = oDep.idTestGroupPreReq;
184 | oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185 |
186 | return aoErrors;
187 |
188 |
189 | def checkForMissingTestCaseDeps(self):
190 | """
191 | Checks that testcase dependencies stays within bounds. We do not allow
192 | dependencies outside a testgroup, no dependency cycles or even remotely
193 | long dependency chains.
194 |
195 | Returns array of errors (see SchedulderBase.recreateQueue()).
196 | """
197 | aoErrors = list();
198 | for oTestGroup in self.aoTestGroups:
199 | for oTestCase in oTestGroup.aoTestCases:
200 | if len(oTestCase.aidPreReqs) == 0:
201 | continue;
202 |
203 | # Stupid recursion code using special stack(s).
204 | aiIndexes = [(oTestCase, 0), ];
205 | aidChain = [oTestCase.idTestGroup,];
206 | while len(aiIndexes) > 0:
207 | (oCur, i) = aiIndexes[-1];
208 | if i >= len(oCur.aidPreReqs):
209 | aiIndexes.pop();
210 | aidChain.pop();
211 | else:
212 | aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213 |
214 | idPreReq = oTestCase.aidPreReqs[i];
215 | oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 | if oDep is None:
217 | self._addPreReqError(aoErrors, aidChain, oTestCase,
218 | 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 | % (oTestCase.idTestCase, idPreReq));
220 | elif idPreReq in aidChain:
221 | self._addPreReqError(aoErrors, aidChain, oTestCase,
222 | 'TestCase #%s prerequisite #%s creates a cycle!'
223 | % (oTestCase.idTestCase, idPreReq));
224 | elif len(oDep.aiPreReqs) == 0:
225 | pass;
226 | elif len(aidChain) >= 10:
227 | self._addPreReqError(aoErrors, aidChain, oTestCase,
228 | 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 | else:
230 | aiIndexes.append((oDep, 0));
231 | aidChain.append(idPreReq);
232 |
233 | return aoErrors;
234 |
235 | def deepTestGroupSort(self):
236 | """
237 | Sorts the testgroups and their testcases by priority and dependencies.
238 | Note! Don't call this before checking for dependency cycles!
239 | """
240 | if len(self.aoTestGroups) == 0:
241 | return;
242 |
243 | #
244 | # ASSUMES groups as well as testcases are sorted by priority by the
245 | # database. So we only have to concern ourselves with the dependency
246 | # sorting.
247 | #
248 | iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 | for oTestGroup in self.aoTestGroups:
250 | if oTestGroup.iSchedPriority > iGrpPrio:
251 | raise TMExceptionBase('Incorrectly sorted testgroups returned by database.');
252 | iGrpPrio = oTestGroup.iSchedPriority;
253 |
254 | if len(oTestGroup.aoTestCases) > 0:
255 | iTstPrio = oTestGroup.aoTestCases[0];
256 | for oTestCase in oTestGroup.aoTestCases:
257 | if oTestCase.iSchedPriority > iTstPrio:
258 | raise TMExceptionBase('Incorrectly sorted testcases returned by database.');
259 |
260 | #
261 | # Sort the testgroups by dependencies.
262 | #
263 | i = 0;
264 | while i < len(self.aoTestGroups):
265 | oTestGroup = self.aoTestGroups[i];
266 | if oTestGroup.idTestGroupPreReq is not None:
267 | iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
268 | if iPreReq > i:
269 | # The prerequisite is after the current entry. Move the
270 | # current entry so that it's following it's prereq entry.
271 | self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
272 | self.aoTestGroups.pop(i);
273 | continue;
274 | assert iPreReq < i;
275 | i += 1; # Advance.
276 |
277 | #
278 | # Sort the testcases by dependencies.
279 | # Same algorithm as above, just more prerequisites.
280 | #
281 | for oTestGroup in self.aoTestGroups:
282 | i = 0;
283 | while i < len(oTestGroup.aoTestCases):
284 | oTestCase = oTestGroup.aoTestCases[i];
285 | if len(oTestCase.aidPreReqs) > 0:
286 | for idPreReq in oTestCase.aidPreReqs:
287 | iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
288 | if iPreReq > i:
289 | # The prerequisite is after the current entry. Move the
290 | # current entry so that it's following it's prereq entry.
291 | oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
292 | oTestGroup.aoTestGroups.pop(i);
293 | i -= 1; # Don't advance.
294 | break;
295 | assert iPreReq < i;
296 | i += 1; # Advance.
297 |
298 |
299 | return True;
300 |
301 |
302 |
303 | class SchedQueueData(ModelDataBase):
304 | """
305 | Scheduling queue data item.
306 | """
307 |
308 | ksIdAttr = 'idSchedGroup';
309 |
310 | ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
311 | ksParam_idItem = 'SchedQueueData_idItem';
312 | ksParam_offQueue = 'SchedQueueData_offQueue';
313 | ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
314 | ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
315 | ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
316 | ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
317 | ksParam_tsConfig = 'SchedQueueData_tsConfig';
318 | ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
319 | ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
320 | ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
321 |
322 | kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
323 | 'tsConfig', 'tsLastScheduled' ];
324 |
325 |
326 | def __init__(self):
327 | ModelDataBase.__init__(self);
328 |
329 | #
330 | # Initialize with defaults.
331 | # See the database for explanations of each of these fields.
332 | #
333 | self.idSchedGroup = None;
334 | self.idItem = None;
335 | self.offQueue = None;
336 | self.idGenTestCaseArgs = None;
337 | self.idTestGroup = None;
338 | self.aidTestGroupPreReqs = None;
339 | self.bmHourlySchedule = None;
340 | self.tsConfig = None;
341 | self.tsLastScheduled = None;
342 | self.idTestSetGangLeader = None;
343 | self.cMissingGangMembers = 1;
344 |
345 | def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
346 | bmHourlySchedule, cMissingGangMembers,
347 | idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
348 | """
349 | Reinitialize with all attributes potentially given as inputs.
350 | Return self.
351 | """
352 | self.idSchedGroup = idSchedGroup;
353 | self.idItem = idItem;
354 | self.offQueue = offQueue;
355 | self.idGenTestCaseArgs = idGenTestCaseArgs;
356 | self.idTestGroup = idTestGroup;
357 | self.aidTestGroupPreReqs = aidTestGroupPreReqs;
358 | self.bmHourlySchedule = bmHourlySchedule;
359 | self.tsConfig = tsConfig;
360 | self.tsLastScheduled = tsLastScheduled;
361 | self.idTestSetGangLeader = idTestSetGangLeader;
362 | self.cMissingGangMembers = cMissingGangMembers;
363 | return self;
364 |
365 | def initFromDbRow(self, aoRow):
366 | """
367 | Initialize from database row (SELECT * FROM SchedQueues).
368 | Returns self.
369 | Raises exception if no row is specfied.
370 | """
371 | if aoRow is None:
372 | raise TMExceptionBase('SchedQueueData not found.');
373 |
374 | self.idSchedGroup = aoRow[0];
375 | self.idItem = aoRow[1];
376 | self.offQueue = aoRow[2];
377 | self.idGenTestCaseArgs = aoRow[3];
378 | self.idTestGroup = aoRow[4];
379 | self.aidTestGroupPreReqs = aoRow[5];
380 | self.bmHourlySchedule = aoRow[6];
381 | self.tsConfig = aoRow[7];
382 | self.tsLastScheduled = aoRow[8];
383 | self.idTestSetGangLeader = aoRow[9];
384 | self.cMissingGangMembers = aoRow[10];
385 | return self;
386 |
387 |
388 |
389 |
390 |
391 |
392 | class SchedulerBase(object):
393 | """
394 | The scheduler base class.
395 |
396 | The scheduler classes have two functions:
397 | 1. Recreate the scheduling queue.
398 | 2. Pick the next task from the queue.
399 |
400 | The first is scheduler specific, the latter isn't.
401 | """
402 |
403 | class BuildCache(object):
404 | """ Build cache. """
405 |
406 | class BuildCacheIterator(object):
407 | """ Build class iterator. """
408 | def __init__(self, oCache):
409 | self.oCache = oCache;
410 | self.iCur = 0;
411 |
412 | def __iter__(self):
413 | """Returns self, required by the language."""
414 | return self;
415 |
416 | def next(self):
417 | """Returns the next build, raises StopIteration when the end has been reached."""
418 | while True:
419 | if self.iCur >= len(self.oCache.aoEntries):
420 | oEntry = self.oCache.fetchFromCursor();
421 | if oEntry is None:
422 | raise StopIteration;
423 | else:
424 | oEntry = self.oCache.aoEntries[self.iCur];
425 | self.iCur += 1;
426 | if not oEntry.fRemoved:
427 | return oEntry;
428 | # end
429 |
430 | class BuildCacheEntry(object):
431 | """ Build cache entry. """
432 |
433 | def __init__(self, oBuild, fMaybeBlacklisted):
434 | self.oBuild = oBuild;
435 | self._fBlacklisted = None if fMaybeBlacklisted is True else False;
436 | self.fRemoved = False;
437 | self._dPreReqDecisions = dict();
438 |
439 | def remove(self):
440 | """
441 | Marks the cache entry as removed.
442 | This doesn't actually remove it from the cache array, only marks
443 | it as removed. It has no effect on open iterators.
444 | """
445 | self.fRemoved = True;
446 |
447 | def getPreReqDecision(self, sPreReqSet):
448 | """
449 | Retrieves a cached prerequisite decision.
450 | Returns boolean if found, None if not.
451 | """
452 | return self._dPreReqDecisions.get(sPreReqSet);
453 |
454 | def setPreReqDecision(self, sPreReqSet, fDecision):
455 | """
456 | Caches a prerequistie decision.
457 | """
458 | self._dPreReqDecisions[sPreReqSet] = fDecision;
459 | return fDecision;
460 |
461 | def isBlacklisted(self, oDb):
462 | """ Checks if the build is blacklisted. """
463 | if self._fBlacklisted is None:
464 | self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
465 | return self._fBlacklisted;
466 |
467 |
468 | def __init__(self):
469 | self.aoEntries = [];
470 | self.oCursor = None;
471 |
472 | def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
473 | """ Configures the build cursor for the cache. """
474 | if len(self.aoEntries) == 0 and self.oCursor is None:
475 | oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
476 | self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
477 | return True;
478 |
479 | def __iter__(self):
480 | """Return an iterator."""
481 | return self.BuildCacheIterator(self);
482 |
483 | def fetchFromCursor(self):
484 | """ Fetches a build from the cursor and adds it to the cache."""
485 | if self.oCursor is None:
486 | return None;
487 |
488 | try:
489 | aoRow = self.oCursor.fetchOne();
490 | except:
491 | return None;
492 | if aoRow is None:
493 | return None;
494 |
495 | oBuild = BuildDataEx().initFromDbRow(aoRow);
496 | oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
497 | self.aoEntries.append(oEntry);
498 | return oEntry;
499 |
500 | def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
501 | self._oDb = oDb;
502 | self._oSchedGrpData = oSchedGrpData;
503 | self._iVerbosity = iVerbosity;
504 | self._asMessages = [];
505 | self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
506 | self.oBuildCache = self.BuildCache();
507 | self.dTestGroupMembers = dict();
508 |
509 | @staticmethod
510 | def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
511 | """
512 | Instantiate the scheduler specified by the scheduling group.
513 | Returns scheduler child class instance. May raise exception if
514 | the input is invalid.
515 | """
516 | if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinousItegration:
517 | from testmanager.core.schedulerbeci import SchdulerBeci;
518 | oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
519 | else:
520 | raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
521 | % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
522 | return oScheduler;
523 |
524 |
525 | #
526 | # Misc.
527 | #
528 |
529 | def msgDebug(self, sText):
530 | """Debug printing."""
531 | if self._iVerbosity > 1:
532 | self._asMessages.append('debug:' + sText);
533 | return None;
534 |
535 | def msgInfo(self, sText):
536 | """Info printing."""
537 | if self._iVerbosity > 1:
538 | self._asMessages.append('info: ' + sText);
539 | return None;
540 |
541 | def dprint(self, sMsg):
542 | """Prints a debug message to the srv glue log (see config.py). """
543 | if config.g_kfSrvGlueDebugScheduler:
544 | self._oDb.dprint(sMsg);
545 | return None;
546 |
547 | def getElapsedSecs(self):
548 | """ Returns the number of seconds this scheduling task has been running. """
549 | tsSecNow = utils.timestampSecond();
550 | if tsSecNow < self._tsSecStart: # paranoia
551 | self._tsSecStart = tsSecNow;
552 | return tsSecNow - self._tsSecStart;
553 |
554 |
555 | #
556 | # Create schedule.
557 | #
558 |
559 | def _recreateQueueCancelGatherings(self):
560 | """
561 | Cancels all pending gang gatherings on the current queue.
562 | """
563 | self._oDb.execute('SELECT idTestSetGangLeader\n'
564 | 'FROM SchedQueues\n'
565 | 'WHERE idSchedGroup = %s\n'
566 | ' AND idTestSetGangLeader is not NULL\n'
567 | , (self._oSchedGrpData.idSchedGroup,));
568 | if self._oDb.getRowCount() > 0:
569 | oTBStatusLogic = TestBoxStatusLogic(self._oDb);
570 | for aoRow in self._oDb.fetchAll():
571 | idTestSetGangLeader = aoRow[0];
572 | oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
573 | TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
574 | fCommit = False);
575 | return True;
576 |
577 | def _recreateQueueItems(self, oData):
578 | """
579 | Returns an array of queue items (SchedQueueData).
580 | Child classes must override this.
581 | """
582 | _ = oData;
583 | return [];
584 |
585 | def recreateQueueWorker(self):
586 | """
587 | Worker for recreateQueue.
588 | """
589 |
590 | #
591 | # Collect the necessary data and validate it.
592 | #
593 | oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
594 | aoErrors = oData.checkForGroupDepCycles();
595 | aoErrors.extend(oData.checkForMissingTestCaseDeps());
596 | if len(aoErrors) == 0:
597 | oData.deepTestGroupSort();
598 |
599 | #
600 | # The creation of the scheduling queue is done by the child class.
601 | #
602 | # We will try guess where in queue we're currently at and rotate
603 | # the items such that we will resume execution in the approximately
604 | # same position. The goal of the scheduler is to provide a 100%
605 | # deterministic result so that if we regenerate the queue when there
606 | # are no changes to the testcases, testgroups or scheduling groups
607 | # involved, test execution will be unchanged (save for maybe just a
608 | # little for gang gathering).
609 | #
610 | aoItems = list();
611 | if len(oData.aoArgsVariations) > 0:
612 | aoItems = self._recreateQueueItems(oData);
613 | self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
614 | for i in range(len(aoItems)):
615 | self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
616 | if len(aoItems) > 0:
617 | self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
618 | , (self._oSchedGrpData.idSchedGroup,));
619 | if self._oDb.getRowCount() > 0:
620 | offQueue = self._oDb.fetchOne()[0];
621 | self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
622 | , (self._oSchedGrpData.idSchedGroup,));
623 | cItems = self._oDb.fetchOne()[0];
624 | offQueueNew = (offQueue * cItems) / len(aoItems);
625 | if offQueueNew != 0:
626 | aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
627 |
628 | #
629 | # Replace the scheduling queue.
630 | # Care need to be take to first timeout/abort any gangs in the
631 | # gathering state since these use the queue to set up the date.
632 | #
633 | self._recreateQueueCancelGatherings();
634 | self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
635 | for oItem in aoItems:
636 | self._oDb.execute('INSERT INTO SchedQueues (\n'
637 | ' idSchedGroup,\n'
638 | ' offQueue,\n'
639 | ' idGenTestCaseArgs,\n'
640 | ' idTestGroup,\n'
641 | ' aidTestGroupPreReqs,\n'
642 | ' bmHourlySchedule,\n'
643 | ' cMissingGangMembers )\n'
644 | 'VALUES ( %s, %s, %s, %s, %s, %s, %s )\n'
645 | , ( oItem.idSchedGroup,
646 | oItem.offQueue,
647 | oItem.idGenTestCaseArgs,
648 | oItem.idTestGroup,
649 | oItem.aidTestGroupPreReqs if len(oItem.aidTestGroupPreReqs) > 0 else None,
650 | oItem.bmHourlySchedule,
651 | oItem.cMissingGangMembers
652 | ));
653 | return (aoErrors, self._asMessages);
654 |
655 | @staticmethod
656 | def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
657 | """
658 | (Re-)creates the scheduling queue for the given group.
659 |
660 | Returns (asMessages, asMessages). On success the array with the error
661 | will be empty, on failure it will contain (sError, oRelatedObject)
662 | entries. The messages is for debugging and are simple strings.
663 |
664 | Raises exception database error.
665 | """
666 | aoErrors = [];
667 | asMessages = [];
668 | try:
669 | #
670 | # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
671 | # we lock quite a few tables while doing this work. We access more
672 | # data than scheduleNewTask so we lock some additional tables.
673 | #
674 | oDb.rollback();
675 | oDb.begin();
676 | oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
677 | oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
678 | oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
679 |
680 | #
681 | # Instantiate the scheduler and call the worker function.
682 | #
683 | oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
684 | oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
685 |
686 | (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
687 | if len(aoErrors) == 0:
688 | SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
689 | 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
690 | oDb.commit();
691 | else:
692 | oDb.rollback();
693 |
694 | except:
695 | oDb.rollback();
696 | raise;
697 |
698 | return (aoErrors, asMessages);
699 |
700 |
701 |
702 | #
703 | # Schedule Task.
704 | #
705 |
706 | def _composeGangArguments(self, idTestSet):
707 | """
708 | Composes the gang specific testdriver arguments.
709 | Returns command line string, including a leading space.
710 | """
711 |
712 | oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
713 | aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
714 |
715 | sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
716 | for i in range(len(aoGangMembers)):
717 | sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
718 |
719 | return sArgs;
720 |
721 |
722 | def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
723 | """
724 | Given all the bits of data, compose an EXEC command response to the testbox.
725 | """
726 | sScriptZips = oTestEx.oTestCase.sValidationKitZips;
727 | if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
728 | assert oValidationKitBuild;
729 | if sScriptZips is None:
730 | sScriptZips = oValidationKitBuild.sBinaries;
731 | else:
732 | sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
733 | sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
734 |
735 | sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
736 | sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
737 | sCmdLine = sCmdLine.strip();
738 | if oTestEx.cGangMembers > 1:
739 | sCmdLine += ' ' + self._composeGangArguments(idTestSet);
740 |
741 | cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
742 | cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
743 |
744 | dResponse = \
745 | {
746 | constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
747 | constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
748 | constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
749 | constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
750 | constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
751 | };
752 | return dResponse;
753 |
754 | @staticmethod
755 | def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
756 | """
757 | Composes an EXEC response for a gang member (other than the last).
758 | Returns a EXEC response or raises an exception (DB/input error).
759 | """
760 | #
761 | # Gather the necessary data.
762 | #
763 | oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
764 | oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
765 | oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
766 | oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
767 | oValidationKitBuild = None;
768 | if oTestSet.idBuildTestSuite is not None:
769 | oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
770 |
771 | #
772 | # Instantiate the specified scheduler and let it do the rest.
773 | #
774 | oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBox.idSchedGroup, oTestSet.tsCreated);
775 | assert oSchedGrpData.fEnabled is True;
776 | assert oSchedGrpData.idBuildSrc is not None;
777 | oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
778 |
779 | return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
780 |
781 |
782 | def _updateTask(self, oTask, tsNow):
783 | """
784 | Updates a gang schedule task.
785 | """
786 | assert oTask.cMissingGangMembers >= 1;
787 | assert oTask.idTestSetGangLeader is not None;
788 | assert oTask.idTestSetGangLeader >= 1;
789 | if tsNow is not None:
790 | self._oDb.execute('UPDATE SchedQueues\n'
791 | ' SET idTestSetGangLeader = %s,\n'
792 | ' cMissingGangMembers = %s,\n'
793 | ' tsLastScheduled = %s\n'
794 | 'WHERE idItem = %s\n'
795 | , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
796 | else:
797 | self._oDb.execute('UPDATE SchedQueues\n'
798 | ' SET cMissingGangMembers = %s\n'
799 | 'WHERE idItem = %s\n'
800 | , (oTask.cMissingGangMembers, oTask.idItem,) );
801 | return True;
802 |
803 | def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
804 | """
805 | The task has been scheduled successfully, reset it's data move it to
806 | the end of the queue.
807 | """
808 | if cGangMembers > 1:
809 | self._oDb.execute('UPDATE SchedQueues\n'
810 | ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
811 | ' idTestSetGangLeader = NULL,\n'
812 | ' cMissingGangMembers = %s\n'
813 | 'WHERE idItem = %s\n'
814 | , (cGangMembers, oTask.idItem,) );
815 | else:
816 | self._oDb.execute('UPDATE SchedQueues\n'
817 | ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
818 | ' idTestSetGangLeader = NULL,\n'
819 | ' cMissingGangMembers = 1,\n'
820 | ' tsLastScheduled = %s\n'
821 | 'WHERE idItem = %s\n'
822 | , (tsNow, oTask.idItem,) );
823 | return True;
824 |
825 |
826 |
827 |
828 | def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
829 | """
830 | Creates a test set for using the given data.
831 | Will not commit, someone up the callstack will that later on.
832 | Returns the test set ID, may raise an exception on database error.
833 | """
834 | # Lazy bird doesn't want to write testset.py and does it all here.
835 |
836 | #
837 | # We're getting the TestSet ID first in order to include it in the base
838 | # file name (that way we can directly relate files on the disk to the
839 | # test set when doing batch work), and also for idTesetSetGangLeader.
840 | #
841 | self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
842 | idTestSet = self._oDb.fetchOne()[0];
843 |
844 | sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
845 | % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
846 |
847 | #
848 | # Gang scheduling parameters. Changes the oTask data for updating by caller.
849 | #
850 | iGangMemberNo = 0;
851 |
852 | if oTestEx.cGangMembers <= 1:
853 | assert oTask.idTestSetGangLeader is None;
854 | assert oTask.cMissingGangMembers <= 1;
855 | elif oTask.idTestSetGangLeader is None:
856 | assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
857 | oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
858 | oTask.idTestSetGangLeader = idTestSet;
859 | else:
860 | assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
861 | oTask.cMissingGangMembers -= 1;
862 |
863 | #
864 | # Do the database stuff.
865 | #
866 | self._oDb.execute('INSERT INTO TestSets (\n'
867 | ' idTestSet,\n'
868 | ' tsConfig,\n'
869 | ' tsCreated,\n'
870 | ' idBuild,\n'
871 | ' idBuildCategory,\n'
872 | ' idBuildTestSuite,\n'
873 | ' idGenTestBox,\n'
874 | ' idTestBox,\n'
875 | ' idTestGroup,\n'
876 | ' idGenTestCase,\n'
877 | ' idTestCase,\n'
878 | ' idGenTestCaseArgs,\n'
879 | ' idTestCaseArgs,\n'
880 | ' sBaseFilename,\n'
881 | ' iGangMemberNo,\n'
882 | ' idTestSetGangLeader )\n'
883 | 'VALUES ( %s,\n' # idTestSet
884 | ' %s,\n' # tsConfig
885 | ' %s,\n' # tsCreated
886 | ' %s,\n' # idBuild
887 | ' %s,\n' # idBuildCategory
888 | ' %s,\n' # idBuildTestSuite
889 | ' %s,\n' # idGenTestBox
890 | ' %s,\n' # idTestBox
891 | ' %s,\n' # idTestGroup
892 | ' %s,\n' # idGenTestCase
893 | ' %s,\n' # idTestCase
894 | ' %s,\n' # idGenTestCaseArgs
895 | ' %s,\n' # idTestCaseArgs
896 | ' %s,\n' # sBaseFilename
897 | ' %s,\n' # iGangMemberNo
898 | ' %s)\n' # idTestSetGangLeader
899 | , ( idTestSet,
900 | oTask.tsConfig,
901 | tsNow,
902 | oBuild.idBuild,
903 | oBuild.idBuildCategory,
904 | oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
905 | oTestBoxData.idGenTestBox,
906 | oTestBoxData.idTestBox,
907 | oTask.idTestGroup,
908 | oTestEx.oTestCase.idGenTestCase,
909 | oTestEx.oTestCase.idTestCase,
910 | oTestEx.idGenTestCaseArgs,
911 | oTestEx.idTestCaseArgs,
912 | sBaseFilename,
913 | iGangMemberNo,
914 | oTask.idTestSetGangLeader,
915 | ));
916 |
917 | self._oDb.execute('INSERT INTO TestResults (\n'
918 | ' idTestResultParent,\n'
919 | ' idTestSet,\n'
920 | ' tsCreated,\n'
921 | ' idStrName,\n'
922 | ' cErrors,\n'
923 | ' enmStatus,\n'
924 | ' iNestingDepth)\n'
925 | 'VALUES ( NULL,\n' # idTestResultParent
926 | ' %s,\n' # idTestSet
927 | ' %s,\n' # tsCreated
928 | ' 0,\n' # idStrName
929 | ' 0,\n' # cErrors
930 | ' \'running\'::TestStatus_T,\n'
931 | ' 0)\n' # iNestingDepth
932 | 'RETURNING idTestResult'
933 | , ( idTestSet, tsNow, ));
934 | idTestResult = self._oDb.fetchOne()[0];
935 |
936 | self._oDb.execute('UPDATE TestSets\n'
937 | ' SET idTestResult = %s\n'
938 | 'WHERE idTestSet = %s\n'
939 | , (idTestResult, idTestSet, ));
940 |
941 | return idTestSet;
942 |
943 | def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
944 | """
945 | Tries to find the most recent validation kit build suitable for the given testbox.
946 | Returns BuildDataEx or None. Raise exception on database error.
947 |
948 | Can be overridden by child classes to change the default build requirements.
949 | """
950 | oBuildLogic = BuildLogic(self._oDb);
951 | oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
952 | oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
953 | for _ in range(oCursor.getRowCount()):
954 | oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
955 | if not oBuildLogic.isBuildBlacklisted(oBuild):
956 | return oBuild;
957 | return None;
958 |
959 | def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
960 | """
961 | Tries to find a fitting build.
962 | Returns BuildDataEx or None. Raise exception on database error.
963 |
964 | Can be overridden by child classes to change the default build requirements.
965 | """
966 |
967 | #
968 | # Gather the set of prerequisites we have and turn them into a value
969 | # set for use in the loop below.
970 | #
971 | # Note! We're scheduling on testcase level and ignoring argument variation
972 | # selections in TestGroupMembers is intentional.
973 | #
974 | dPreReqs = {};
975 |
976 | # Direct prerequisites. We assume they're all enabled as this can be
977 | # checked at queue creation time.
978 | for oPreReq in oTestEx.aoTestCasePreReqs:
979 | dPreReqs[oPreReq.idTestCase] = 1;
980 |
981 | # Testgroup dependencies from the scheduling group config.
982 | if oTask.aidTestGroupPreReqs is not None:
983 | for iTestGroup in oTask.aidTestGroupPreReqs:
984 | # Make sure the _active_ test group members are in the cache.
985 | if iTestGroup not in self.dTestGroupMembers:
986 | self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
987 | 'FROM TestGroupMembers, TestCases\n'
988 | 'WHERE TestGroupMembers.idTestGroup = %s\n'
989 | ' AND TestGroupMembers.tsExpire > %s\n'
990 | ' AND TestGroupMembers.tsEffective <= %s\n'
991 | ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
992 | ' AND TestCases.tsExpire > %s\n'
993 | ' AND TestCases.tsEffective <= %s\n'
994 | ' AND TestCases.fEnabled is TRUE\n'
995 | , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
996 | aidTestCases = [];
997 | for aoRow in self._oDb.fetchAll():
998 | aidTestCases.append(aoRow[0]);
999 | self.dTestGroupMembers[iTestGroup] = aidTestCases;
1000 |
1001 | # Add the testgroup members to the prerequisites.
1002 | for idTestCase in self.dTestGroupMembers[iTestGroup]:
1003 | dPreReqs[idTestCase] = 1;
1004 |
1005 | # Create a SQL values table out of them.
1006 | sPreReqSet = ''
1007 | if len(dPreReqs) > 0:
1008 | for idPreReq in sorted(dPreReqs.keys()):
1009 | sPreReqSet += ', (' + str(idPreReq) + ')';
1010 | sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1011 |
1012 | #
1013 | # Try the builds.
1014 | #
1015 | self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1016 | for oEntry in self.oBuildCache:
1017 | #
1018 | # Check build requirements set by the test.
1019 | #
1020 | if not oTestEx.matchesBuildProps(oEntry.oBuild):
1021 | continue;
1022 |
1023 | if oEntry.isBlacklisted(self._oDb):
1024 | oEntry.remove();
1025 | continue;
1026 |
1027 | #
1028 | # Check prerequisites. The default scheduler is satisfied if one
1029 | # argument variation has been executed successfully. It is not
1030 | # satisfied if there are any failure runs.
1031 | #
1032 | if len(sPreReqSet) > 0:
1033 | fDecision = oEntry.getPreReqDecision(sPreReqSet);
1034 | if fDecision is None:
1035 | ## @todo DB Tuning
1036 | # Check for missing prereqs.
1037 | self._oDb.execute('SELECT COUNT(*)\n'
1038 | 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1040 | ' FROM TestSets\n'
1041 | ' WHERE enmStatus IN (%s, %s)\n'
1042 | ' AND idBuild = %s\n'
1043 | ' ) AS TestSets\n'
1044 | ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1045 | 'WHERE TestSets.idTestSet is NULL\n'
1046 | , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1047 | oEntry.oBuild.idBuild, ));
1048 | cMissingPreReqs = self._oDb.fetchOne()[0];
1049 | if cMissingPreReqs > 0:
1050 | self.dprint('build %s is missing %u prerequisites (out of %s)'
1051 | % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1052 | oEntry.setPreReqDecision(sPreReqSet, False);
1053 | continue;
1054 |
1055 | # Check for failed prereq runs.
1056 | self._oDb.execute('SELECT COUNT(*)\n'
1057 | 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1058 | ' TestSets\n'
1059 | 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1060 | ' AND TestSets.idBuild = %s\n'
1061 | ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1062 | , ( oEntry.oBuild.idBuild,
1063 | TestSetData.ksTestStatus_Failure,
1064 | TestSetData.ksTestStatus_TimedOut,
1065 | TestSetData.ksTestStatus_Rebooted,
1066 | )
1067 | );
1068 | cFailedPreReqs = self._oDb.fetchOne()[0];
1069 | if cFailedPreReqs > 0:
1070 | self.dprint('build %s is has %u prerequisite failures (out of %s)'
1071 | % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1072 | oEntry.setPreReqDecision(sPreReqSet, False);
1073 | continue;
1074 |
1075 | oEntry.setPreReqDecision(sPreReqSet, True);
1076 | elif not fDecision:
1077 | continue;
1078 |
1079 | #
1080 | # If we can, check if the build files still exist.
1081 | #
1082 | if oEntry.oBuild.areFilesStillThere() is False:
1083 | self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1084 | oEntry.remove();
1085 | continue;
1086 |
1087 | self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1088 | return oEntry.oBuild;
1089 | return None;
1090 |
1091 | def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1092 | """
1093 | Tries to find a matching build for gang scheduling.
1094 | Returns BuildDataEx or None. Raise exception on database error.
1095 |
1096 | Can be overridden by child classes to change the default build requirements.
1097 | """
1098 | #
1099 | # Note! Should probably check build prerequisites if we get a different
1100 | # build back, so that we don't use a build which hasn't passed
1101 | # the smoke test.
1102 | #
1103 | _ = idBuildSrc;
1104 | return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1105 |
1106 |
1107 | def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1108 | """
1109 | Try schedule the task as a gang leader (can be a gang of one).
1110 | Returns response or None. May raise exception on DB error.
1111 | """
1112 |
1113 | # We don't wait for busy resources, we just try the next test.
1114 | oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1115 | if not oTestArgsLogic.areResourcesFree(oTestEx):
1116 | self.dprint('Cannot get global test resources!');
1117 | return None;
1118 |
1119 | #
1120 | # Find a matching build (this is the difficult bit).
1121 | #
1122 | oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1123 | if oBuild is None:
1124 | self.dprint('No build!');
1125 | return None;
1126 | if oTestEx.oTestCase.needValidationKitBit():
1127 | oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1128 | if oValidationKitBuild is None:
1129 | self.dprint('No validation kit build!');
1130 | return None;
1131 | else:
1132 | oValidationKitBuild = None;
1133 |
1134 | #
1135 | # Create a testset, allocate the resources and update the state.
1136 | # Note! Since resource allocation may still fail, we create a nested
1137 | # transaction so we can roll back. (Heed lock warning in docs!)
1138 | #
1139 | self._oDb.execute('SAVEPOINT tryAsLeader');
1140 | idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1141 |
1142 | if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1143 | is not True:
1144 | self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1145 | self.dprint('Failed to allocate global resources!');
1146 | return False;
1147 |
1148 | if oTestEx.cGangMembers <= 1:
1149 | # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1150 | self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1151 | dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1152 | sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1153 | else:
1154 | # We're missing gang members, issue WAIT cmd.
1155 | self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1156 | dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1157 | sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1158 |
1159 | TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1160 | self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1161 | return dResponse;
1162 |
1163 | def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1164 | """
1165 | Try schedule the task as a gang member.
1166 | Returns response or None. May raise exception on DB error.
1167 | """
1168 |
1169 | #
1170 | # The leader has choosen a build, we need to find a matching one for our platform.
1171 | # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1172 | # upon subordinate group members.)
1173 | #
1174 | oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1175 |
1176 | oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1177 | oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1178 | if oBuild is None:
1179 | return None;
1180 |
1181 | oValidationKitBuild = None;
1182 | if oLeaderTestSet.idBuildTestSuite is not None:
1183 | oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1184 | oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1185 | self._oSchedGrpData.idBuildSrcTestSuite);
1186 |
1187 | #
1188 | # Create a testset and update the state(s).
1189 | #
1190 | idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1191 |
1192 | oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1193 | if oTask.cMissingGangMembers < 1:
1194 | # The whole gang is there, move the task to the end of the queue
1195 | # and update the status on the other gang members.
1196 | self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1197 | dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1198 | sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1199 | oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1200 | else:
1201 | # We're still missing some gang members, issue WAIT cmd.
1202 | self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1203 | dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1204 | sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1205 |
1206 | oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1207 | return dResponse;
1208 |
1209 |
1210 | def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1211 | """
1212 | Worker for schduling a new task.
1213 | """
1214 |
1215 | #
1216 | # Iterate the scheduler queue (fetch all to avoid having to concurrent
1217 | # queries), trying out each task to see if the testbox can execute it.
1218 | #
1219 | dRejected = {}; # variations we've already checked out and rejected.
1220 | self._oDb.execute('SELECT *\n'
1221 | 'FROM SchedQueues\n'
1222 | 'WHERE idSchedGroup = %s\n'
1223 | ' AND ( bmHourlySchedule IS NULL\n'
1224 | ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1225 | 'ORDER BY idItem ASC\n'
1226 | , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1227 | aaoRows = self._oDb.fetchAll();
1228 | for aoRow in aaoRows:
1229 | # Don't loop forever.
1230 | if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1231 | break;
1232 |
1233 | # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1234 | oTask = SchedQueueData().initFromDbRow(aoRow);
1235 | if config.g_kfSrvGlueDebugScheduler:
1236 | self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1237 | % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1238 | oTask.tsLastScheduled, oTask.tsConfig,));
1239 |
1240 | sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1241 | if sRejectNm in dRejected:
1242 | self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1243 | continue;
1244 | dRejected[sRejectNm] = 1;
1245 |
1246 | # Fetch all the test case info (too much, but who cares right now).
1247 | oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1248 | tsConfigEff = oTask.tsConfig,
1249 | tsRsrcEff = oTask.tsConfig);
1250 | if config.g_kfSrvGlueDebugScheduler:
1251 | self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1252 |
1253 | # This shouldn't happen, but just in case it does...
1254 | if oTestEx.oTestCase.fEnabled is not True:
1255 | self.dprint('Testcase is not enabled!!');
1256 | continue;
1257 |
1258 | # Check if the testbox properties matches the test.
1259 | if not oTestEx.matchesTestBoxProps(oTestBoxData):
1260 | self.dprint('Testbox mismatch!');
1261 | continue;
1262 |
1263 | # Try schedule it.
1264 | if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1265 | dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1266 | elif oTask.cMissingGangMembers > 1:
1267 | dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1268 | else:
1269 | dResponse = None; # Shouldn't happen!
1270 | if dResponse is not None:
1271 | self.dprint('Found a task! dResponse=%s' % (dResponse,));
1272 | return dResponse;
1273 |
1274 | # Found no suitable task.
1275 | return None;
1276 |
1277 | @staticmethod
1278 | def scheduleNewTask(oDb, oTestBoxData, sBaseUrl, iVerbosity = 0):
1279 | """
1280 | Schedules a new task.
1281 | """
1282 | try:
1283 | #
1284 | # To avoid concurrency issues in SchedQueues we lock all the rows
1285 | # related to our scheduling queue. Also, since this is a very
1286 | # expensive operation we lock the testbox status row to fend of
1287 | # repeated retires by fault testbox script.
1288 | #
1289 | tsSecStart = utils.timestampSecond();
1290 | oDb.rollback();
1291 | oDb.begin();
1292 | oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1293 | % (oTestBoxData.idTestBox,));
1294 | oDb.execute('SELECT idSchedGroup FROM SchedQueues WHERE idSchedGroup = %s FOR UPDATE'
1295 | % (oTestBoxData.idSchedGroup,));
1296 |
1297 | # We need the current timestamp.
1298 | tsNow = oDb.getCurrentTimestamp();
1299 |
1300 | # Re-read the testbox data ...
1301 | oTestBoxDataCur = TestBoxData().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1302 | if oTestBoxDataCur.fEnabled \
1303 | and oTestBoxDataCur.idGenTestBox == oTestBoxData.idGenTestBox \
1304 | and oTestBoxDataCur.idSchedGroup == oTestBoxData.idSchedGroup: # (paranoia wrt idSchedGroup)
1305 |
1306 | # ... and schedule group data.
1307 | oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBoxDataCur.idSchedGroup, tsNow);
1308 | if oSchedGrpData.fEnabled and oSchedGrpData.idBuildSrc is not None:
1309 |
1310 | #
1311 | # Instantiate the specified scheduler and let it do the rest.
1312 | #
1313 | oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity, tsSecStart);
1314 | dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataCur, tsNow, sBaseUrl);
1315 | if dResponse is not None:
1316 | oDb.commit();
1317 | return dResponse;
1318 | except:
1319 | oDb.rollback();
1320 | raise;
1321 |
1322 | # Not enabled, rollback and return no task.
1323 | oDb.rollback();
1324 | return None;
1325 |
1326 | @staticmethod
1327 | def tryCancelGangGathering(oDb, oStatusData):
1328 | """
1329 | Try canceling a gang gathering.
1330 |
1331 | Returns True if successfully cancelled.
1332 | Returns False if not (someone raced us to the SchedQueue table).
1333 |
1334 | Note! oStatusData is re-initialized.
1335 | """
1336 | assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1337 | try:
1338 | #
1339 | # Lock the tables we're updating so we don't run into concurrency
1340 | # issues (we're racing both scheduleNewTask and other callers of
1341 | # this method).
1342 | #
1343 | oDb.rollback();
1344 | oDb.begin();
1345 | oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1346 |
1347 | #
1348 | # Re-read the testbox data and check that we're still in the same state.
1349 | #
1350 | oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1351 | if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1352 | #
1353 | # Get the leader thru the test set and change the state of the whole gang.
1354 | #
1355 | oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1356 |
1357 | oTBStatusLogic = TestBoxStatusLogic(oDb);
1358 | oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1359 | TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1360 | fCommit = False);
1361 |
1362 | #
1363 | # Move the scheduling queue item to the end.
1364 | #
1365 | oDb.execute('SELECT *\n'
1366 | 'FROM SchedQueues\n'
1367 | 'WHERE idTestSetGangLeader = %s\n'
1368 | , (oTestSetData.idTestSetGangLeader,) );
1369 | oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1370 | oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1371 |
1372 | oDb.execute('UPDATE SchedQueues\n'
1373 | ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1374 | ' idTestSetGangLeader = NULL,\n'
1375 | ' cMissingGangMembers = %s\n'
1376 | 'WHERE idItem = %s\n'
1377 | , (oTestEx.cGangMembers, oTask.idItem,) );
1378 |
1379 | oDb.commit();
1380 | return True;
1381 |
1382 | elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1383 | oDb.rollback();
1384 | return True;
1385 | except:
1386 | oDb.rollback();
1387 | raise;
1388 |
1389 | # Not enabled, rollback and return no task.
1390 | oDb.rollback();
1391 | return False;
1392 |
1393 |
1394 | #
1395 | # Unit testing.
1396 | #
1397 |
1398 | # pylint: disable=C0111
1399 | class SchedQueueDataTestCase(ModelDataBaseTestCase):
1400 | def setUp(self):
1401 | self.aoSamples = [SchedQueueData(),];
1402 |
1403 | if __name__ == '__main__':
1404 | unittest.main();
1405 | # not reached.
1406 |