VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testmanager/core/schedulerbase.py@ 56295

最後變更 在這個檔案從56295是 56295,由 vboxsync 提交於 10 年 前

ValidationKit: Updated (C) year.

  • 屬性 svn:eol-style 設為 native
  • 屬性 svn:keywords 設為 Author Date Id Revision
檔案大小: 61.8 KB
 
1# -*- coding: utf-8 -*-
2# $Id: schedulerbase.py 56295 2015-06-09 14:29:55Z vboxsync $
3# pylint: disable=C0302
4
5
6"""
7Test Manager - Base class and utilities for the schedulers.
8"""
9
10__copyright__ = \
11"""
12Copyright (C) 2012-2015 Oracle Corporation
13
14This file is part of VirtualBox Open Source Edition (OSE), as
15available from http://www.alldomusa.eu.org. This file is free software;
16you can redistribute it and/or modify it under the terms of the GNU
17General Public License (GPL) as published by the Free Software
18Foundation, in version 2 as it comes in the "COPYING" file of the
19VirtualBox OSE distribution. VirtualBox OSE is distributed in the
20hope that it will be useful, but WITHOUT ANY WARRANTY of any kind.
21
22The contents of this file may alternatively be used under the terms
23of the Common Development and Distribution License Version 1.0
24(CDDL) only, as it comes in the "COPYING.CDDL" file of the
25VirtualBox OSE distribution, in which case the provisions of the
26CDDL are applicable instead of those of the GPL.
27
28You may elect to license modified versions of this file under the
29terms and conditions of either the GPL or the CDDL or both.
30"""
31__version__ = "$Revision: 56295 $"
32
33
34# Standard python imports.
35import unittest;
36
37# Validation Kit imports.
38from common import utils, constants;
39from testmanager import config;
40from testmanager.core.build import BuildDataEx, BuildLogic;
41from testmanager.core.base import ModelDataBase, ModelDataBaseTestCase, TMExceptionBase;
42from testmanager.core.buildsource import BuildSourceData, BuildSourceLogic;
43from testmanager.core.globalresource import GlobalResourceLogic;
44from testmanager.core.schedgroup import SchedGroupData, SchedGroupLogic;
45from testmanager.core.systemlog import SystemLogData, SystemLogLogic;
46from testmanager.core.testbox import TestBoxData;
47from testmanager.core.testboxstatus import TestBoxStatusData, TestBoxStatusLogic;
48from testmanager.core.testcase import TestCaseLogic;
49from testmanager.core.testcaseargs import TestCaseArgsDataEx, TestCaseArgsLogic;
50from testmanager.core.testset import TestSetData, TestSetLogic;
51
52
53class ReCreateQueueData(object):
54 """
55 Data object for recreating a scheduling queue.
56
57 It's mostly a storage object, but has a few data checking operation
58 associated with it.
59 """
60
61 def __init__(self, oDb, idSchedGroup):
62 #
63 # Load data from the database.
64 #
65
66 # Will extend the entries with aoTestCases and dTestCases members
67 # further down. checkForGroupDepCycles will add aidTestGroupPreReqs.
68 self.aoTestGroups = SchedGroupLogic(oDb).getMembers(idSchedGroup);
69
70 # aoTestCases entries are TestCaseData instance with iSchedPriority
71 # and idTestGroup added for our purposes.
72 # We will add oTestGroup and aoArgsVariations members to each further down.
73 self.aoTestCases = SchedGroupLogic(oDb).getTestCasesForGroup(idSchedGroup, cMax = 4096);
74
75 # Load dependencies.
76 oTestCaseLogic = TestCaseLogic(oDb)
77 for oTestCase in self.aoTestCases:
78 oTestCase.aidPreReqs = oTestCaseLogic.getTestCasePreReqIds(oTestCase.idTestCase, cMax = 4096);
79
80 # aoTestCases entries are TestCaseArgsData instance with iSchedPriority
81 # and idTestGroup added for our purposes.
82 # We will add oTestGroup and oTestCase members to each further down.
83 self.aoArgsVariations = SchedGroupLogic(oDb).getTestCaseArgsForGroup(idSchedGroup, cMax = 65536);
84
85 #
86 # Generate global lookups.
87 #
88
89 # Generate a testcase lookup dictionary for use when working on
90 # argument variations.
91 self.dTestCases = dict();
92 for oTestCase in self.aoTestCases:
93 self.dTestCases[oTestCase.idTestCase] = oTestCase;
94 assert len(self.dTestCases) <= len(self.aoTestCases); # Note! Can be shorter!
95
96 # Generate a testgroup lookup dictionary.
97 self.dTestGroups = dict();
98 for oTestGroup in self.aoTestGroups:
99 self.dTestGroups[oTestGroup.idTestGroup] = oTestGroup;
100 assert len(self.dTestGroups) == len(self.aoTestGroups);
101
102 #
103 # Associate extra members with the base data.
104 #
105 if len(self.aoTestGroups) > 0:
106 # Prep the test groups.
107 for oTestGroup in self.aoTestGroups:
108 oTestGroup.aoTestCases = list();
109 oTestGroup.dTestCases = dict();
110
111 # Link testcases to their group, both directions. Prep testcases for
112 # argument varation association.
113 oTestGroup = self.aoTestGroups[0];
114 for oTestCase in self.aoTestCases:
115 if oTestGroup.idTestGroup != oTestCase.idTestGroup:
116 oTestGroup = self.dTestGroups[oTestCase.idTestGroup];
117
118 assert oTestCase.idTestCase not in oTestGroup.dTestCases;
119 oTestGroup.dTestCases[oTestCase.idTestCase] = oTestCase;
120 oTestGroup.aoTestCases.append(oTestCase);
121 oTestCase.oTestGroup = oTestGroup;
122 oTestCase.aoArgsVariations = list();
123
124 # Associate testcase argument variations with their testcases (group)
125 # in both directions.
126 oTestGroup = self.aoTestGroups[0];
127 oTestCase = self.aoTestCases[0] if len(self.aoTestCases) > 0 else None;
128 for oArgVariation in self.aoArgsVariations:
129 if oTestGroup.idTestGroup != oArgVariation.idTestGroup:
130 oTestGroup = self.dTestGroups[oArgVariation.idTestGroup];
131 if oTestCase.idTestCase != oArgVariation.idTestCase or oTestCase.idTestGroup != oArgVariation.idTestGroup:
132 oTestCase = oTestGroup.dTestCases[oArgVariation.idTestCase];
133
134 oTestCase.aoArgsVariations.append(oArgVariation);
135 oArgVariation.oTestCase = oTestCase;
136 oArgVariation.oTestGroup = oTestGroup;
137
138 else:
139 assert len(self.aoTestCases) == 0;
140 assert len(self.aoArgsVariations) == 0;
141 # done.
142
143 @staticmethod
144 def _addPreReqError(aoErrors, aidChain, oObj, sMsg):
145 """ Returns a chain of IDs error entry. """
146
147 sMsg += ' Dependency chain: %s' % (aidChain[0],);
148 for i in range(1, len(aidChain)):
149 sMsg += ' -> %s' % (aidChain[i],);
150
151 aoErrors.append([sMsg, oObj]);
152 return aoErrors;
153
154 def checkForGroupDepCycles(self):
155 """
156 Checks for testgroup depencency cycles and any missing testgroup
157 dependencies.
158 Returns array of errors (see SchedulderBase.recreateQueue()).
159 """
160 aoErrors = list();
161 for oTestGroup in self.aoTestGroups:
162 idPreReq = oTestGroup.idTestGroupPreReq;
163 if idPreReq is None:
164 oTestGroup.aidTestGroupPreReqs = list();
165 continue;
166
167 aidChain = [oTestGroup.idTestGroup,];
168 while idPreReq is not None:
169 aidChain.append(idPreReq);
170 if len(aidChain) >= 10:
171 self._addPreReqError(aoErrors, aidChain, oTestGroup,
172 'TestGroup #%s prerequisite chain is too long!'
173 % (oTestGroup.idTestGroup,));
174 break;
175
176 oDep = self.dTestGroups.get(idPreReq, None);
177 if oDep is None:
178 self._addPreReqError(aoErrors, aidChain, oTestGroup,
179 'TestGroup #%s prerequisite #%s is not in the scheduling group!'
180 % (oTestGroup.idTestGroup, idPreReq,));
181 break;
182
183 idPreReq = oDep.idTestGroupPreReq;
184 oTestGroup.aidTestGroupPreReqs = aidChain[1:];
185
186 return aoErrors;
187
188
189 def checkForMissingTestCaseDeps(self):
190 """
191 Checks that testcase dependencies stays within bounds. We do not allow
192 dependencies outside a testgroup, no dependency cycles or even remotely
193 long dependency chains.
194
195 Returns array of errors (see SchedulderBase.recreateQueue()).
196 """
197 aoErrors = list();
198 for oTestGroup in self.aoTestGroups:
199 for oTestCase in oTestGroup.aoTestCases:
200 if len(oTestCase.aidPreReqs) == 0:
201 continue;
202
203 # Stupid recursion code using special stack(s).
204 aiIndexes = [(oTestCase, 0), ];
205 aidChain = [oTestCase.idTestGroup,];
206 while len(aiIndexes) > 0:
207 (oCur, i) = aiIndexes[-1];
208 if i >= len(oCur.aidPreReqs):
209 aiIndexes.pop();
210 aidChain.pop();
211 else:
212 aiIndexes[-1][1] = i + 1; # whatever happens, we'll advance on the current level.
213
214 idPreReq = oTestCase.aidPreReqs[i];
215 oDep = oTestGroup.dTestCases.get(idPreReq, None);
216 if oDep is None:
217 self._addPreReqError(aoErrors, aidChain, oTestCase,
218 'TestCase #%s prerequisite #%s is not in the scheduling group!'
219 % (oTestCase.idTestCase, idPreReq));
220 elif idPreReq in aidChain:
221 self._addPreReqError(aoErrors, aidChain, oTestCase,
222 'TestCase #%s prerequisite #%s creates a cycle!'
223 % (oTestCase.idTestCase, idPreReq));
224 elif len(oDep.aiPreReqs) == 0:
225 pass;
226 elif len(aidChain) >= 10:
227 self._addPreReqError(aoErrors, aidChain, oTestCase,
228 'TestCase #%s prerequisite chain is too long!' % (oTestCase.idTestCase,));
229 else:
230 aiIndexes.append((oDep, 0));
231 aidChain.append(idPreReq);
232
233 return aoErrors;
234
235 def deepTestGroupSort(self):
236 """
237 Sorts the testgroups and their testcases by priority and dependencies.
238 Note! Don't call this before checking for dependency cycles!
239 """
240 if len(self.aoTestGroups) == 0:
241 return;
242
243 #
244 # ASSUMES groups as well as testcases are sorted by priority by the
245 # database. So we only have to concern ourselves with the dependency
246 # sorting.
247 #
248 iGrpPrio = self.aoTestGroups[0].iSchedPriority;
249 for oTestGroup in self.aoTestGroups:
250 if oTestGroup.iSchedPriority > iGrpPrio:
251 raise TMExceptionBase('Incorrectly sorted testgroups returned by database.');
252 iGrpPrio = oTestGroup.iSchedPriority;
253
254 if len(oTestGroup.aoTestCases) > 0:
255 iTstPrio = oTestGroup.aoTestCases[0];
256 for oTestCase in oTestGroup.aoTestCases:
257 if oTestCase.iSchedPriority > iTstPrio:
258 raise TMExceptionBase('Incorrectly sorted testcases returned by database.');
259
260 #
261 # Sort the testgroups by dependencies.
262 #
263 i = 0;
264 while i < len(self.aoTestGroups):
265 oTestGroup = self.aoTestGroups[i];
266 if oTestGroup.idTestGroupPreReq is not None:
267 iPreReq = self.aoTestGroups.index(self.dTestGroups[oTestGroup.idTestGroupPreReq]);
268 if iPreReq > i:
269 # The prerequisite is after the current entry. Move the
270 # current entry so that it's following it's prereq entry.
271 self.aoTestGroups.insert(iPreReq + 1, oTestGroup);
272 self.aoTestGroups.pop(i);
273 continue;
274 assert iPreReq < i;
275 i += 1; # Advance.
276
277 #
278 # Sort the testcases by dependencies.
279 # Same algorithm as above, just more prerequisites.
280 #
281 for oTestGroup in self.aoTestGroups:
282 i = 0;
283 while i < len(oTestGroup.aoTestCases):
284 oTestCase = oTestGroup.aoTestCases[i];
285 if len(oTestCase.aidPreReqs) > 0:
286 for idPreReq in oTestCase.aidPreReqs:
287 iPreReq = oTestGroup.aoTestCases.index(oTestGroup.dTestCases[idPreReq]);
288 if iPreReq > i:
289 # The prerequisite is after the current entry. Move the
290 # current entry so that it's following it's prereq entry.
291 oTestGroup.aoTestGroups.insert(iPreReq + 1, oTestCase);
292 oTestGroup.aoTestGroups.pop(i);
293 i -= 1; # Don't advance.
294 break;
295 assert iPreReq < i;
296 i += 1; # Advance.
297
298
299 return True;
300
301
302
303class SchedQueueData(ModelDataBase):
304 """
305 Scheduling queue data item.
306 """
307
308 ksIdAttr = 'idSchedGroup';
309
310 ksParam_idSchedGroup = 'SchedQueueData_idSchedGroup';
311 ksParam_idItem = 'SchedQueueData_idItem';
312 ksParam_offQueue = 'SchedQueueData_offQueue';
313 ksParam_idGenTestCaseArgs = 'SchedQueueData_idGenTestCaseArgs';
314 ksParam_idTestGroup = 'SchedQueueData_idTestGroup';
315 ksParam_aidTestGroupPreReqs = 'SchedQueueData_aidTestGroupPreReqs';
316 ksParam_bmHourlySchedule = 'SchedQueueData_bmHourlySchedule';
317 ksParam_tsConfig = 'SchedQueueData_tsConfig';
318 ksParam_tsLastScheduled = 'SchedQueueData_tsLastScheduled';
319 ksParam_idTestSetGangLeader = 'SchedQueueData_idTestSetGangLeader';
320 ksParam_cMissingGangMembers = 'SchedQueueData_cMissingGangMembers';
321
322 kasAllowNullAttributes = [ 'idItem', 'offQueue', 'aidTestGroupPreReqs', 'bmHourlySchedule', 'idTestSetGangLeader',
323 'tsConfig', 'tsLastScheduled' ];
324
325
326 def __init__(self):
327 ModelDataBase.__init__(self);
328
329 #
330 # Initialize with defaults.
331 # See the database for explanations of each of these fields.
332 #
333 self.idSchedGroup = None;
334 self.idItem = None;
335 self.offQueue = None;
336 self.idGenTestCaseArgs = None;
337 self.idTestGroup = None;
338 self.aidTestGroupPreReqs = None;
339 self.bmHourlySchedule = None;
340 self.tsConfig = None;
341 self.tsLastScheduled = None;
342 self.idTestSetGangLeader = None;
343 self.cMissingGangMembers = 1;
344
345 def initFromValues(self, idSchedGroup, idGenTestCaseArgs, idTestGroup, aidTestGroupPreReqs, # pylint: disable=R0913
346 bmHourlySchedule, cMissingGangMembers,
347 idItem = None, offQueue = None, tsConfig = None, tsLastScheduled = None, idTestSetGangLeader = None):
348 """
349 Reinitialize with all attributes potentially given as inputs.
350 Return self.
351 """
352 self.idSchedGroup = idSchedGroup;
353 self.idItem = idItem;
354 self.offQueue = offQueue;
355 self.idGenTestCaseArgs = idGenTestCaseArgs;
356 self.idTestGroup = idTestGroup;
357 self.aidTestGroupPreReqs = aidTestGroupPreReqs;
358 self.bmHourlySchedule = bmHourlySchedule;
359 self.tsConfig = tsConfig;
360 self.tsLastScheduled = tsLastScheduled;
361 self.idTestSetGangLeader = idTestSetGangLeader;
362 self.cMissingGangMembers = cMissingGangMembers;
363 return self;
364
365 def initFromDbRow(self, aoRow):
366 """
367 Initialize from database row (SELECT * FROM SchedQueues).
368 Returns self.
369 Raises exception if no row is specfied.
370 """
371 if aoRow is None:
372 raise TMExceptionBase('SchedQueueData not found.');
373
374 self.idSchedGroup = aoRow[0];
375 self.idItem = aoRow[1];
376 self.offQueue = aoRow[2];
377 self.idGenTestCaseArgs = aoRow[3];
378 self.idTestGroup = aoRow[4];
379 self.aidTestGroupPreReqs = aoRow[5];
380 self.bmHourlySchedule = aoRow[6];
381 self.tsConfig = aoRow[7];
382 self.tsLastScheduled = aoRow[8];
383 self.idTestSetGangLeader = aoRow[9];
384 self.cMissingGangMembers = aoRow[10];
385 return self;
386
387
388
389
390
391
392class SchedulerBase(object):
393 """
394 The scheduler base class.
395
396 The scheduler classes have two functions:
397 1. Recreate the scheduling queue.
398 2. Pick the next task from the queue.
399
400 The first is scheduler specific, the latter isn't.
401 """
402
403 class BuildCache(object):
404 """ Build cache. """
405
406 class BuildCacheIterator(object):
407 """ Build class iterator. """
408 def __init__(self, oCache):
409 self.oCache = oCache;
410 self.iCur = 0;
411
412 def __iter__(self):
413 """Returns self, required by the language."""
414 return self;
415
416 def next(self):
417 """Returns the next build, raises StopIteration when the end has been reached."""
418 while True:
419 if self.iCur >= len(self.oCache.aoEntries):
420 oEntry = self.oCache.fetchFromCursor();
421 if oEntry is None:
422 raise StopIteration;
423 else:
424 oEntry = self.oCache.aoEntries[self.iCur];
425 self.iCur += 1;
426 if not oEntry.fRemoved:
427 return oEntry;
428 # end
429
430 class BuildCacheEntry(object):
431 """ Build cache entry. """
432
433 def __init__(self, oBuild, fMaybeBlacklisted):
434 self.oBuild = oBuild;
435 self._fBlacklisted = None if fMaybeBlacklisted is True else False;
436 self.fRemoved = False;
437 self._dPreReqDecisions = dict();
438
439 def remove(self):
440 """
441 Marks the cache entry as removed.
442 This doesn't actually remove it from the cache array, only marks
443 it as removed. It has no effect on open iterators.
444 """
445 self.fRemoved = True;
446
447 def getPreReqDecision(self, sPreReqSet):
448 """
449 Retrieves a cached prerequisite decision.
450 Returns boolean if found, None if not.
451 """
452 return self._dPreReqDecisions.get(sPreReqSet);
453
454 def setPreReqDecision(self, sPreReqSet, fDecision):
455 """
456 Caches a prerequistie decision.
457 """
458 self._dPreReqDecisions[sPreReqSet] = fDecision;
459 return fDecision;
460
461 def isBlacklisted(self, oDb):
462 """ Checks if the build is blacklisted. """
463 if self._fBlacklisted is None:
464 self._fBlacklisted = BuildLogic(oDb).isBuildBlacklisted(self.oBuild);
465 return self._fBlacklisted;
466
467
468 def __init__(self):
469 self.aoEntries = [];
470 self.oCursor = None;
471
472 def setupSource(self, oDb, idBuildSrc, sOs, sCpuArch, tsNow):
473 """ Configures the build cursor for the cache. """
474 if len(self.aoEntries) == 0 and self.oCursor is None:
475 oBuildSource = BuildSourceData().initFromDbWithId(oDb, idBuildSrc, tsNow);
476 self.oCursor = BuildSourceLogic(oDb).openBuildCursor(oBuildSource, sOs, sCpuArch, tsNow);
477 return True;
478
479 def __iter__(self):
480 """Return an iterator."""
481 return self.BuildCacheIterator(self);
482
483 def fetchFromCursor(self):
484 """ Fetches a build from the cursor and adds it to the cache."""
485 if self.oCursor is None:
486 return None;
487
488 try:
489 aoRow = self.oCursor.fetchOne();
490 except:
491 return None;
492 if aoRow is None:
493 return None;
494
495 oBuild = BuildDataEx().initFromDbRow(aoRow);
496 oEntry = self.BuildCacheEntry(oBuild, aoRow[-1]);
497 self.aoEntries.append(oEntry);
498 return oEntry;
499
500 def __init__(self, oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
501 self._oDb = oDb;
502 self._oSchedGrpData = oSchedGrpData;
503 self._iVerbosity = iVerbosity;
504 self._asMessages = [];
505 self._tsSecStart = tsSecStart if tsSecStart is not None else utils.timestampSecond();
506 self.oBuildCache = self.BuildCache();
507 self.dTestGroupMembers = dict();
508
509 @staticmethod
510 def _instantiate(oDb, oSchedGrpData, iVerbosity = 0, tsSecStart = None):
511 """
512 Instantiate the scheduler specified by the scheduling group.
513 Returns scheduler child class instance. May raise exception if
514 the input is invalid.
515 """
516 if oSchedGrpData.enmScheduler == SchedGroupData.ksScheduler_BestEffortContinousItegration:
517 from testmanager.core.schedulerbeci import SchdulerBeci;
518 oScheduler = SchdulerBeci(oDb, oSchedGrpData, iVerbosity, tsSecStart);
519 else:
520 raise oDb.integrityException('Invalid scheduler "%s", idSchedGroup=%d' \
521 % (oSchedGrpData.enmScheduler, oSchedGrpData.idSchedGroup));
522 return oScheduler;
523
524
525 #
526 # Misc.
527 #
528
529 def msgDebug(self, sText):
530 """Debug printing."""
531 if self._iVerbosity > 1:
532 self._asMessages.append('debug:' + sText);
533 return None;
534
535 def msgInfo(self, sText):
536 """Info printing."""
537 if self._iVerbosity > 1:
538 self._asMessages.append('info: ' + sText);
539 return None;
540
541 def dprint(self, sMsg):
542 """Prints a debug message to the srv glue log (see config.py). """
543 if config.g_kfSrvGlueDebugScheduler:
544 self._oDb.dprint(sMsg);
545 return None;
546
547 def getElapsedSecs(self):
548 """ Returns the number of seconds this scheduling task has been running. """
549 tsSecNow = utils.timestampSecond();
550 if tsSecNow < self._tsSecStart: # paranoia
551 self._tsSecStart = tsSecNow;
552 return tsSecNow - self._tsSecStart;
553
554
555 #
556 # Create schedule.
557 #
558
559 def _recreateQueueCancelGatherings(self):
560 """
561 Cancels all pending gang gatherings on the current queue.
562 """
563 self._oDb.execute('SELECT idTestSetGangLeader\n'
564 'FROM SchedQueues\n'
565 'WHERE idSchedGroup = %s\n'
566 ' AND idTestSetGangLeader is not NULL\n'
567 , (self._oSchedGrpData.idSchedGroup,));
568 if self._oDb.getRowCount() > 0:
569 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
570 for aoRow in self._oDb.fetchAll():
571 idTestSetGangLeader = aoRow[0];
572 oTBStatusLogic.updateGangStatus(idTestSetGangLeader,
573 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
574 fCommit = False);
575 return True;
576
577 def _recreateQueueItems(self, oData):
578 """
579 Returns an array of queue items (SchedQueueData).
580 Child classes must override this.
581 """
582 _ = oData;
583 return [];
584
585 def recreateQueueWorker(self):
586 """
587 Worker for recreateQueue.
588 """
589
590 #
591 # Collect the necessary data and validate it.
592 #
593 oData = ReCreateQueueData(self._oDb, self._oSchedGrpData.idSchedGroup);
594 aoErrors = oData.checkForGroupDepCycles();
595 aoErrors.extend(oData.checkForMissingTestCaseDeps());
596 if len(aoErrors) == 0:
597 oData.deepTestGroupSort();
598
599 #
600 # The creation of the scheduling queue is done by the child class.
601 #
602 # We will try guess where in queue we're currently at and rotate
603 # the items such that we will resume execution in the approximately
604 # same position. The goal of the scheduler is to provide a 100%
605 # deterministic result so that if we regenerate the queue when there
606 # are no changes to the testcases, testgroups or scheduling groups
607 # involved, test execution will be unchanged (save for maybe just a
608 # little for gang gathering).
609 #
610 aoItems = list();
611 if len(oData.aoArgsVariations) > 0:
612 aoItems = self._recreateQueueItems(oData);
613 self.msgDebug('len(aoItems)=%s' % (len(aoItems),));
614 for i in range(len(aoItems)):
615 self.msgDebug('aoItems[%2d]=%s' % (i, aoItems[i]));
616 if len(aoItems) > 0:
617 self._oDb.execute('SELECT offQueue FROM SchedQueues WHERE idSchedGroup = %s ORDER BY idItem LIMIT 1'
618 , (self._oSchedGrpData.idSchedGroup,));
619 if self._oDb.getRowCount() > 0:
620 offQueue = self._oDb.fetchOne()[0];
621 self._oDb.execute('SELECT COUNT(*) FROM SchedQueues WHERE idSchedGroup = %s'
622 , (self._oSchedGrpData.idSchedGroup,));
623 cItems = self._oDb.fetchOne()[0];
624 offQueueNew = (offQueue * cItems) / len(aoItems);
625 if offQueueNew != 0:
626 aoItems = aoItems[offQueueNew:] + aoItems[:offQueueNew];
627
628 #
629 # Replace the scheduling queue.
630 # Care need to be take to first timeout/abort any gangs in the
631 # gathering state since these use the queue to set up the date.
632 #
633 self._recreateQueueCancelGatherings();
634 self._oDb.execute('DELETE FROM SchedQueues WHERE idSchedGroup = %s\n', (self._oSchedGrpData.idSchedGroup,));
635 for oItem in aoItems:
636 self._oDb.execute('INSERT INTO SchedQueues (\n'
637 ' idSchedGroup,\n'
638 ' offQueue,\n'
639 ' idGenTestCaseArgs,\n'
640 ' idTestGroup,\n'
641 ' aidTestGroupPreReqs,\n'
642 ' bmHourlySchedule,\n'
643 ' cMissingGangMembers )\n'
644 'VALUES ( %s, %s, %s, %s, %s, %s, %s )\n'
645 , ( oItem.idSchedGroup,
646 oItem.offQueue,
647 oItem.idGenTestCaseArgs,
648 oItem.idTestGroup,
649 oItem.aidTestGroupPreReqs if len(oItem.aidTestGroupPreReqs) > 0 else None,
650 oItem.bmHourlySchedule,
651 oItem.cMissingGangMembers
652 ));
653 return (aoErrors, self._asMessages);
654
655 @staticmethod
656 def recreateQueue(oDb, uidAuthor, idSchedGroup, iVerbosity = 1):
657 """
658 (Re-)creates the scheduling queue for the given group.
659
660 Returns (asMessages, asMessages). On success the array with the error
661 will be empty, on failure it will contain (sError, oRelatedObject)
662 entries. The messages is for debugging and are simple strings.
663
664 Raises exception database error.
665 """
666 aoErrors = [];
667 asMessages = [];
668 try:
669 #
670 # To avoid concurrency issues (SchedQueues) and inconsistent data (*),
671 # we lock quite a few tables while doing this work. We access more
672 # data than scheduleNewTask so we lock some additional tables.
673 #
674 oDb.rollback();
675 oDb.begin();
676 oDb.execute('LOCK TABLE SchedGroups, SchedGroupMembers, TestGroups, TestGroupMembers IN SHARE MODE');
677 oDb.execute('LOCK TABLE TestBoxes, TestCaseArgs, TestCases IN SHARE MODE');
678 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
679
680 #
681 # Instantiate the scheduler and call the worker function.
682 #
683 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, idSchedGroup);
684 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
685
686 (aoErrors, asMessages) = oScheduler.recreateQueueWorker();
687 if len(aoErrors) == 0:
688 SystemLogLogic(oDb).addEntry(SystemLogData.ksEvent_SchedQueueRecreate,
689 'User #%d recreated sched queue #%d.' % (uidAuthor, idSchedGroup,));
690 oDb.commit();
691 else:
692 oDb.rollback();
693
694 except:
695 oDb.rollback();
696 raise;
697
698 return (aoErrors, asMessages);
699
700
701
702 #
703 # Schedule Task.
704 #
705
706 def _composeGangArguments(self, idTestSet):
707 """
708 Composes the gang specific testdriver arguments.
709 Returns command line string, including a leading space.
710 """
711
712 oTestSet = TestSetData().initFromDbWithId(self._oDb, idTestSet);
713 aoGangMembers = TestSetLogic(self._oDb).getGang(oTestSet.idTestSetGangLeader);
714
715 sArgs = ' --gang-member-no %s --gang-members %s' % (oTestSet.iGangMemberNo, len(aoGangMembers));
716 for i in range(len(aoGangMembers)):
717 sArgs = ' --gang-ipv4-%s %s' % (i, aoGangMembers[i].ip); ## @todo IPv6
718
719 return sArgs;
720
721
722 def composeExecResponseWorker(self, idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl):
723 """
724 Given all the bits of data, compose an EXEC command response to the testbox.
725 """
726 sScriptZips = oTestEx.oTestCase.sValidationKitZips;
727 if sScriptZips is None or sScriptZips.find('@VALIDATIONKIT_ZIP@') >= 0:
728 assert oValidationKitBuild;
729 if sScriptZips is None:
730 sScriptZips = oValidationKitBuild.sBinaries;
731 else:
732 sScriptZips = sScriptZips.replace('@VALIDATIONKIT_ZIP@', oValidationKitBuild.sBinaries);
733 sScriptZips = sScriptZips.replace('@DOWNLOAD_BASE_URL@', sBaseUrl + config.g_ksTmDownloadBaseUrlRel);
734
735 sCmdLine = oTestEx.oTestCase.sBaseCmd + ' ' + oTestEx.sArgs;
736 sCmdLine = sCmdLine.replace('@BUILD_BINARIES@', oBuild.sBinaries);
737 sCmdLine = sCmdLine.strip();
738 if oTestEx.cGangMembers > 1:
739 sCmdLine += ' ' + self._composeGangArguments(idTestSet);
740
741 cSecTimeout = oTestEx.cSecTimeout if oTestEx.cSecTimeout is not None else oTestEx.oTestCase.cSecTimeout;
742 cSecTimeout = cSecTimeout * oTestBox.pctScaleTimeout / 100;
743
744 dResponse = \
745 {
746 constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_EXEC,
747 constants.tbresp.EXEC_PARAM_RESULT_ID: idTestSet,
748 constants.tbresp.EXEC_PARAM_SCRIPT_ZIPS: sScriptZips,
749 constants.tbresp.EXEC_PARAM_SCRIPT_CMD_LINE: sCmdLine,
750 constants.tbresp.EXEC_PARAM_TIMEOUT: cSecTimeout,
751 };
752 return dResponse;
753
754 @staticmethod
755 def composeExecResponse(oDb, idTestSet, sBaseUrl, iVerbosity = 0):
756 """
757 Composes an EXEC response for a gang member (other than the last).
758 Returns a EXEC response or raises an exception (DB/input error).
759 """
760 #
761 # Gather the necessary data.
762 #
763 oTestSet = TestSetData().initFromDbWithId(oDb, idTestSet);
764 oTestBox = TestBoxData().initFromDbWithGenId(oDb, oTestSet.idGenTestBox);
765 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTestSet.idGenTestCaseArgs);
766 oBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuild);
767 oValidationKitBuild = None;
768 if oTestSet.idBuildTestSuite is not None:
769 oValidationKitBuild = BuildDataEx().initFromDbWithId(oDb, oTestSet.idBuildTestSuite);
770
771 #
772 # Instantiate the specified scheduler and let it do the rest.
773 #
774 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBox.idSchedGroup, oTestSet.tsCreated);
775 assert oSchedGrpData.fEnabled is True;
776 assert oSchedGrpData.idBuildSrc is not None;
777 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity);
778
779 return oScheduler.composeExecResponseWorker(idTestSet, oTestEx, oTestBox, oBuild, oValidationKitBuild, sBaseUrl);
780
781
782 def _updateTask(self, oTask, tsNow):
783 """
784 Updates a gang schedule task.
785 """
786 assert oTask.cMissingGangMembers >= 1;
787 assert oTask.idTestSetGangLeader is not None;
788 assert oTask.idTestSetGangLeader >= 1;
789 if tsNow is not None:
790 self._oDb.execute('UPDATE SchedQueues\n'
791 ' SET idTestSetGangLeader = %s,\n'
792 ' cMissingGangMembers = %s,\n'
793 ' tsLastScheduled = %s\n'
794 'WHERE idItem = %s\n'
795 , (oTask.idTestSetGangLeader, oTask.cMissingGangMembers, tsNow, oTask.idItem,) );
796 else:
797 self._oDb.execute('UPDATE SchedQueues\n'
798 ' SET cMissingGangMembers = %s\n'
799 'WHERE idItem = %s\n'
800 , (oTask.cMissingGangMembers, oTask.idItem,) );
801 return True;
802
803 def _moveTaskToEndOfQueue(self, oTask, cGangMembers, tsNow):
804 """
805 The task has been scheduled successfully, reset it's data move it to
806 the end of the queue.
807 """
808 if cGangMembers > 1:
809 self._oDb.execute('UPDATE SchedQueues\n'
810 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
811 ' idTestSetGangLeader = NULL,\n'
812 ' cMissingGangMembers = %s\n'
813 'WHERE idItem = %s\n'
814 , (cGangMembers, oTask.idItem,) );
815 else:
816 self._oDb.execute('UPDATE SchedQueues\n'
817 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
818 ' idTestSetGangLeader = NULL,\n'
819 ' cMissingGangMembers = 1,\n'
820 ' tsLastScheduled = %s\n'
821 'WHERE idItem = %s\n'
822 , (tsNow, oTask.idItem,) );
823 return True;
824
825
826
827
828 def _createTestSet(self, oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow):
829 """
830 Creates a test set for using the given data.
831 Will not commit, someone up the callstack will that later on.
832 Returns the test set ID, may raise an exception on database error.
833 """
834 # Lazy bird doesn't want to write testset.py and does it all here.
835
836 #
837 # We're getting the TestSet ID first in order to include it in the base
838 # file name (that way we can directly relate files on the disk to the
839 # test set when doing batch work), and also for idTesetSetGangLeader.
840 #
841 self._oDb.execute('SELECT NEXTVAL(\'TestSetIdSeq\')');
842 idTestSet = self._oDb.fetchOne()[0];
843
844 sBaseFilename = '%04d/%02d/%02d/%02d/TestSet-%s' \
845 % (tsNow.year, tsNow.month, tsNow.day, (tsNow.hour / 6) * 6, idTestSet);
846
847 #
848 # Gang scheduling parameters. Changes the oTask data for updating by caller.
849 #
850 iGangMemberNo = 0;
851
852 if oTestEx.cGangMembers <= 1:
853 assert oTask.idTestSetGangLeader is None;
854 assert oTask.cMissingGangMembers <= 1;
855 elif oTask.idTestSetGangLeader is None:
856 assert oTask.cMissingGangMembers == oTestEx.cGangMembers;
857 oTask.cMissingGangMembers = oTestEx.cGangMembers - 1;
858 oTask.idTestSetGangLeader = idTestSet;
859 else:
860 assert oTask.cMissingGangMembers > 0 and oTask.cMissingGangMembers < oTestEx.cGangMembers;
861 oTask.cMissingGangMembers -= 1;
862
863 #
864 # Do the database stuff.
865 #
866 self._oDb.execute('INSERT INTO TestSets (\n'
867 ' idTestSet,\n'
868 ' tsConfig,\n'
869 ' tsCreated,\n'
870 ' idBuild,\n'
871 ' idBuildCategory,\n'
872 ' idBuildTestSuite,\n'
873 ' idGenTestBox,\n'
874 ' idTestBox,\n'
875 ' idTestGroup,\n'
876 ' idGenTestCase,\n'
877 ' idTestCase,\n'
878 ' idGenTestCaseArgs,\n'
879 ' idTestCaseArgs,\n'
880 ' sBaseFilename,\n'
881 ' iGangMemberNo,\n'
882 ' idTestSetGangLeader )\n'
883 'VALUES ( %s,\n' # idTestSet
884 ' %s,\n' # tsConfig
885 ' %s,\n' # tsCreated
886 ' %s,\n' # idBuild
887 ' %s,\n' # idBuildCategory
888 ' %s,\n' # idBuildTestSuite
889 ' %s,\n' # idGenTestBox
890 ' %s,\n' # idTestBox
891 ' %s,\n' # idTestGroup
892 ' %s,\n' # idGenTestCase
893 ' %s,\n' # idTestCase
894 ' %s,\n' # idGenTestCaseArgs
895 ' %s,\n' # idTestCaseArgs
896 ' %s,\n' # sBaseFilename
897 ' %s,\n' # iGangMemberNo
898 ' %s)\n' # idTestSetGangLeader
899 , ( idTestSet,
900 oTask.tsConfig,
901 tsNow,
902 oBuild.idBuild,
903 oBuild.idBuildCategory,
904 oValidationKitBuild.idBuild if oValidationKitBuild is not None else None,
905 oTestBoxData.idGenTestBox,
906 oTestBoxData.idTestBox,
907 oTask.idTestGroup,
908 oTestEx.oTestCase.idGenTestCase,
909 oTestEx.oTestCase.idTestCase,
910 oTestEx.idGenTestCaseArgs,
911 oTestEx.idTestCaseArgs,
912 sBaseFilename,
913 iGangMemberNo,
914 oTask.idTestSetGangLeader,
915 ));
916
917 self._oDb.execute('INSERT INTO TestResults (\n'
918 ' idTestResultParent,\n'
919 ' idTestSet,\n'
920 ' tsCreated,\n'
921 ' idStrName,\n'
922 ' cErrors,\n'
923 ' enmStatus,\n'
924 ' iNestingDepth)\n'
925 'VALUES ( NULL,\n' # idTestResultParent
926 ' %s,\n' # idTestSet
927 ' %s,\n' # tsCreated
928 ' 0,\n' # idStrName
929 ' 0,\n' # cErrors
930 ' \'running\'::TestStatus_T,\n'
931 ' 0)\n' # iNestingDepth
932 'RETURNING idTestResult'
933 , ( idTestSet, tsNow, ));
934 idTestResult = self._oDb.fetchOne()[0];
935
936 self._oDb.execute('UPDATE TestSets\n'
937 ' SET idTestResult = %s\n'
938 'WHERE idTestSet = %s\n'
939 , (idTestResult, idTestSet, ));
940
941 return idTestSet;
942
943 def _tryFindValidationKitBit(self, oTestBoxData, tsNow):
944 """
945 Tries to find the most recent validation kit build suitable for the given testbox.
946 Returns BuildDataEx or None. Raise exception on database error.
947
948 Can be overridden by child classes to change the default build requirements.
949 """
950 oBuildLogic = BuildLogic(self._oDb);
951 oBuildSource = BuildSourceData().initFromDbWithId(self._oDb, self._oSchedGrpData.idBuildSrcTestSuite, tsNow);
952 oCursor = BuildSourceLogic(self._oDb).openBuildCursor(oBuildSource, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
953 for _ in range(oCursor.getRowCount()):
954 oBuild = BuildDataEx().initFromDbRow(oCursor.fetchOne());
955 if not oBuildLogic.isBuildBlacklisted(oBuild):
956 return oBuild;
957 return None;
958
959 def _tryFindBuild(self, oTask, oTestEx, oTestBoxData, tsNow):
960 """
961 Tries to find a fitting build.
962 Returns BuildDataEx or None. Raise exception on database error.
963
964 Can be overridden by child classes to change the default build requirements.
965 """
966
967 #
968 # Gather the set of prerequisites we have and turn them into a value
969 # set for use in the loop below.
970 #
971 # Note! We're scheduling on testcase level and ignoring argument variation
972 # selections in TestGroupMembers is intentional.
973 #
974 dPreReqs = {};
975
976 # Direct prerequisites. We assume they're all enabled as this can be
977 # checked at queue creation time.
978 for oPreReq in oTestEx.aoTestCasePreReqs:
979 dPreReqs[oPreReq.idTestCase] = 1;
980
981 # Testgroup dependencies from the scheduling group config.
982 if oTask.aidTestGroupPreReqs is not None:
983 for iTestGroup in oTask.aidTestGroupPreReqs:
984 # Make sure the _active_ test group members are in the cache.
985 if iTestGroup not in self.dTestGroupMembers:
986 self._oDb.execute('SELECT DISTINCT TestGroupMembers.idTestCase\n'
987 'FROM TestGroupMembers, TestCases\n'
988 'WHERE TestGroupMembers.idTestGroup = %s\n'
989 ' AND TestGroupMembers.tsExpire > %s\n'
990 ' AND TestGroupMembers.tsEffective <= %s\n'
991 ' AND TestCases.idTestCase = TestGroupMembers.idTestCase\n'
992 ' AND TestCases.tsExpire > %s\n'
993 ' AND TestCases.tsEffective <= %s\n'
994 ' AND TestCases.fEnabled is TRUE\n'
995 , (iTestGroup, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig, oTask.tsConfig,));
996 aidTestCases = [];
997 for aoRow in self._oDb.fetchAll():
998 aidTestCases.append(aoRow[0]);
999 self.dTestGroupMembers[iTestGroup] = aidTestCases;
1000
1001 # Add the testgroup members to the prerequisites.
1002 for idTestCase in self.dTestGroupMembers[iTestGroup]:
1003 dPreReqs[idTestCase] = 1;
1004
1005 # Create a SQL values table out of them.
1006 sPreReqSet = ''
1007 if len(dPreReqs) > 0:
1008 for idPreReq in sorted(dPreReqs.keys()):
1009 sPreReqSet += ', (' + str(idPreReq) + ')';
1010 sPreReqSet = sPreReqSet[2:]; # drop the leading ', '.
1011
1012 #
1013 # Try the builds.
1014 #
1015 self.oBuildCache.setupSource(self._oDb, self._oSchedGrpData.idBuildSrc, oTestBoxData.sOs, oTestBoxData.sCpuArch, tsNow);
1016 for oEntry in self.oBuildCache:
1017 #
1018 # Check build requirements set by the test.
1019 #
1020 if not oTestEx.matchesBuildProps(oEntry.oBuild):
1021 continue;
1022
1023 if oEntry.isBlacklisted(self._oDb):
1024 oEntry.remove();
1025 continue;
1026
1027 #
1028 # Check prerequisites. The default scheduler is satisfied if one
1029 # argument variation has been executed successfully. It is not
1030 # satisfied if there are any failure runs.
1031 #
1032 if len(sPreReqSet) > 0:
1033 fDecision = oEntry.getPreReqDecision(sPreReqSet);
1034 if fDecision is None:
1035 ## @todo DB Tuning
1036 # Check for missing prereqs.
1037 self._oDb.execute('SELECT COUNT(*)\n'
1038 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase)\n'
1039 'LEFT OUTER JOIN (SELECT *\n'
1040 ' FROM TestSets\n'
1041 ' WHERE enmStatus IN (%s, %s)\n'
1042 ' AND idBuild = %s\n'
1043 ' ) AS TestSets\n'
1044 ' ON (PreReqs.idTestCase = TestSets.idTestCase)\n'
1045 'WHERE TestSets.idTestSet is NULL\n'
1046 , ( TestSetData.ksTestStatus_Success, TestSetData.ksTestStatus_Skipped,
1047 oEntry.oBuild.idBuild, ));
1048 cMissingPreReqs = self._oDb.fetchOne()[0];
1049 if cMissingPreReqs > 0:
1050 self.dprint('build %s is missing %u prerequisites (out of %s)'
1051 % (oEntry.oBuild.idBuild, cMissingPreReqs, sPreReqSet,));
1052 oEntry.setPreReqDecision(sPreReqSet, False);
1053 continue;
1054
1055 # Check for failed prereq runs.
1056 self._oDb.execute('SELECT COUNT(*)\n'
1057 'FROM (VALUES ' + sPreReqSet + ') AS PreReqs(idTestCase),\n'
1058 ' TestSets\n'
1059 'WHERE PreReqs.idTestCase = TestSets.idTestCase\n'
1060 ' AND TestSets.idBuild = %s\n'
1061 ' AND TestSets.enmStatus IN (%s, %s, %s)\n'
1062 , ( oEntry.oBuild.idBuild,
1063 TestSetData.ksTestStatus_Failure,
1064 TestSetData.ksTestStatus_TimedOut,
1065 TestSetData.ksTestStatus_Rebooted,
1066 )
1067 );
1068 cFailedPreReqs = self._oDb.fetchOne()[0];
1069 if cFailedPreReqs > 0:
1070 self.dprint('build %s is has %u prerequisite failures (out of %s)'
1071 % (oEntry.oBuild.idBuild, cFailedPreReqs, sPreReqSet,));
1072 oEntry.setPreReqDecision(sPreReqSet, False);
1073 continue;
1074
1075 oEntry.setPreReqDecision(sPreReqSet, True);
1076 elif not fDecision:
1077 continue;
1078
1079 #
1080 # If we can, check if the build files still exist.
1081 #
1082 if oEntry.oBuild.areFilesStillThere() is False:
1083 self.dprint('build %s no longer exists' % (oEntry.oBuild.idBuild,));
1084 oEntry.remove();
1085 continue;
1086
1087 self.dprint('found oBuild=%s' % (oEntry.oBuild,));
1088 return oEntry.oBuild;
1089 return None;
1090
1091 def _tryFindMatchingBuild(self, oLeaderBuild, oTestBoxData, idBuildSrc):
1092 """
1093 Tries to find a matching build for gang scheduling.
1094 Returns BuildDataEx or None. Raise exception on database error.
1095
1096 Can be overridden by child classes to change the default build requirements.
1097 """
1098 #
1099 # Note! Should probably check build prerequisites if we get a different
1100 # build back, so that we don't use a build which hasn't passed
1101 # the smoke test.
1102 #
1103 _ = idBuildSrc;
1104 return BuildLogic(self._oDb).tryFindSameBuildForOsArch(oLeaderBuild, oTestBoxData.sOs, oTestBoxData.sCpuArch);
1105
1106
1107 def _tryAsLeader(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1108 """
1109 Try schedule the task as a gang leader (can be a gang of one).
1110 Returns response or None. May raise exception on DB error.
1111 """
1112
1113 # We don't wait for busy resources, we just try the next test.
1114 oTestArgsLogic = TestCaseArgsLogic(self._oDb);
1115 if not oTestArgsLogic.areResourcesFree(oTestEx):
1116 self.dprint('Cannot get global test resources!');
1117 return None;
1118
1119 #
1120 # Find a matching build (this is the difficult bit).
1121 #
1122 oBuild = self._tryFindBuild(oTask, oTestEx, oTestBoxData, tsNow);
1123 if oBuild is None:
1124 self.dprint('No build!');
1125 return None;
1126 if oTestEx.oTestCase.needValidationKitBit():
1127 oValidationKitBuild = self._tryFindValidationKitBit(oTestBoxData, tsNow);
1128 if oValidationKitBuild is None:
1129 self.dprint('No validation kit build!');
1130 return None;
1131 else:
1132 oValidationKitBuild = None;
1133
1134 #
1135 # Create a testset, allocate the resources and update the state.
1136 # Note! Since resource allocation may still fail, we create a nested
1137 # transaction so we can roll back. (Heed lock warning in docs!)
1138 #
1139 self._oDb.execute('SAVEPOINT tryAsLeader');
1140 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1141
1142 if GlobalResourceLogic(self._oDb).allocateResources(oTestBoxData.idTestBox, oTestEx.aoGlobalRsrc, fCommit = False) \
1143 is not True:
1144 self._oDb.execute('ROLLBACK TO SAVEPOINT tryAsLeader');
1145 self.dprint('Failed to allocate global resources!');
1146 return False;
1147
1148 if oTestEx.cGangMembers <= 1:
1149 # We're alone, put the task back at the end of the queue and issue EXEC cmd.
1150 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1151 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1152 sTBState = TestBoxStatusData.ksTestBoxState_Testing;
1153 else:
1154 # We're missing gang members, issue WAIT cmd.
1155 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1156 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1157 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1158
1159 TestBoxStatusLogic(self._oDb).updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1160 self._oDb.execute('RELEASE SAVEPOINT tryAsLeader');
1161 return dResponse;
1162
1163 def _tryAsGangMember(self, oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl):
1164 """
1165 Try schedule the task as a gang member.
1166 Returns response or None. May raise exception on DB error.
1167 """
1168
1169 #
1170 # The leader has choosen a build, we need to find a matching one for our platform.
1171 # (It's up to the scheduler decide upon how strict dependencies are to be enforced
1172 # upon subordinate group members.)
1173 #
1174 oLeaderTestSet = TestSetData().initFromDbWithId(self._oDb, oTestBoxData.idTestSetGangLeader);
1175
1176 oLeaderBuild = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuild);
1177 oBuild = self._tryFindMatchingBuild(oLeaderBuild, oTestBoxData, self._oSchedGrpData.idBuildSrc);
1178 if oBuild is None:
1179 return None;
1180
1181 oValidationKitBuild = None;
1182 if oLeaderTestSet.idBuildTestSuite is not None:
1183 oLeaderValidationKitBit = BuildDataEx().initFromDbWithId(self._oDb, oLeaderTestSet.idBuildTestSuite);
1184 oValidationKitBuild = self._tryFindMatchingBuild(oLeaderValidationKitBit, oTestBoxData,
1185 self._oSchedGrpData.idBuildSrcTestSuite);
1186
1187 #
1188 # Create a testset and update the state(s).
1189 #
1190 idTestSet = self._createTestSet(oTask, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, tsNow);
1191
1192 oTBStatusLogic = TestBoxStatusLogic(self._oDb);
1193 if oTask.cMissingGangMembers < 1:
1194 # The whole gang is there, move the task to the end of the queue
1195 # and update the status on the other gang members.
1196 self._moveTaskToEndOfQueue(oTask, oTestEx.cGangMembers, tsNow);
1197 dResponse = self.composeExecResponseWorker(idTestSet, oTestEx, oTestBoxData, oBuild, oValidationKitBuild, sBaseUrl);
1198 sTBState = TestBoxStatusData.ksTestBoxState_GangTesting;
1199 oTBStatusLogic.updateGangStatus(oTask.idTestSetGangLeader, sTBState, fCommit = False);
1200 else:
1201 # We're still missing some gang members, issue WAIT cmd.
1202 self._updateTask(oTask, tsNow if idTestSet == oTask.idTestSetGangLeader else None);
1203 dResponse = { constants.tbresp.ALL_PARAM_RESULT: constants.tbresp.CMD_WAIT, };
1204 sTBState = TestBoxStatusData.ksTestBoxState_GangGathering;
1205
1206 oTBStatusLogic.updateState(oTestBoxData.idTestBox, sTBState, idTestSet, fCommit = False);
1207 return dResponse;
1208
1209
1210 def scheduleNewTaskWorker(self, oTestBoxData, tsNow, sBaseUrl):
1211 """
1212 Worker for schduling a new task.
1213 """
1214
1215 #
1216 # Iterate the scheduler queue (fetch all to avoid having to concurrent
1217 # queries), trying out each task to see if the testbox can execute it.
1218 #
1219 dRejected = {}; # variations we've already checked out and rejected.
1220 self._oDb.execute('SELECT *\n'
1221 'FROM SchedQueues\n'
1222 'WHERE idSchedGroup = %s\n'
1223 ' AND ( bmHourlySchedule IS NULL\n'
1224 ' OR get_bit(bmHourlySchedule, %s) = 1 )\n'
1225 'ORDER BY idItem ASC\n'
1226 , (self._oSchedGrpData.idSchedGroup, utils.getLocalHourOfWeek()) );
1227 aaoRows = self._oDb.fetchAll();
1228 for aoRow in aaoRows:
1229 # Don't loop forever.
1230 if self.getElapsedSecs() >= config.g_kcSecMaxNewTask:
1231 break;
1232
1233 # Unpack the data and check if we've rejected the testcasevar/group variation already (they repeat).
1234 oTask = SchedQueueData().initFromDbRow(aoRow);
1235 if config.g_kfSrvGlueDebugScheduler:
1236 self.dprint('** Considering: idItem=%s idGenTestCaseArgs=%s idTestGroup=%s Deps=%s last=%s cfg=%s\n'
1237 % ( oTask.idItem, oTask.idGenTestCaseArgs, oTask.idTestGroup, oTask.aidTestGroupPreReqs,
1238 oTask.tsLastScheduled, oTask.tsConfig,));
1239
1240 sRejectNm = '%s:%s' % (oTask.idGenTestCaseArgs, oTask.idTestGroup,);
1241 if sRejectNm in dRejected:
1242 self.dprint('Duplicate, already rejected! (%s)' % (sRejectNm,));
1243 continue;
1244 dRejected[sRejectNm] = 1;
1245
1246 # Fetch all the test case info (too much, but who cares right now).
1247 oTestEx = TestCaseArgsDataEx().initFromDbWithGenIdEx(self._oDb, oTask.idGenTestCaseArgs,
1248 tsConfigEff = oTask.tsConfig,
1249 tsRsrcEff = oTask.tsConfig);
1250 if config.g_kfSrvGlueDebugScheduler:
1251 self.dprint('TestCase "%s": %s %s' % (oTestEx.oTestCase.sName, oTestEx.oTestCase.sBaseCmd, oTestEx.sArgs,));
1252
1253 # This shouldn't happen, but just in case it does...
1254 if oTestEx.oTestCase.fEnabled is not True:
1255 self.dprint('Testcase is not enabled!!');
1256 continue;
1257
1258 # Check if the testbox properties matches the test.
1259 if not oTestEx.matchesTestBoxProps(oTestBoxData):
1260 self.dprint('Testbox mismatch!');
1261 continue;
1262
1263 # Try schedule it.
1264 if oTask.idTestSetGangLeader is None or oTestEx.cGangMembers <= 1:
1265 dResponse = self._tryAsLeader(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1266 elif oTask.cMissingGangMembers > 1:
1267 dResponse = self._tryAsGangMember(oTask, oTestEx, oTestBoxData, tsNow, sBaseUrl);
1268 else:
1269 dResponse = None; # Shouldn't happen!
1270 if dResponse is not None:
1271 self.dprint('Found a task! dResponse=%s' % (dResponse,));
1272 return dResponse;
1273
1274 # Found no suitable task.
1275 return None;
1276
1277 @staticmethod
1278 def scheduleNewTask(oDb, oTestBoxData, sBaseUrl, iVerbosity = 0):
1279 """
1280 Schedules a new task.
1281 """
1282 try:
1283 #
1284 # To avoid concurrency issues in SchedQueues we lock all the rows
1285 # related to our scheduling queue. Also, since this is a very
1286 # expensive operation we lock the testbox status row to fend of
1287 # repeated retires by fault testbox script.
1288 #
1289 tsSecStart = utils.timestampSecond();
1290 oDb.rollback();
1291 oDb.begin();
1292 oDb.execute('SELECT idTestBox FROM TestBoxStatuses WHERE idTestBox = %s FOR UPDATE NOWAIT'
1293 % (oTestBoxData.idTestBox,));
1294 oDb.execute('SELECT idSchedGroup FROM SchedQueues WHERE idSchedGroup = %s FOR UPDATE'
1295 % (oTestBoxData.idSchedGroup,));
1296
1297 # We need the current timestamp.
1298 tsNow = oDb.getCurrentTimestamp();
1299
1300 # Re-read the testbox data ...
1301 oTestBoxDataCur = TestBoxData().initFromDbWithId(oDb, oTestBoxData.idTestBox, tsNow);
1302 if oTestBoxDataCur.fEnabled \
1303 and oTestBoxDataCur.idGenTestBox == oTestBoxData.idGenTestBox \
1304 and oTestBoxDataCur.idSchedGroup == oTestBoxData.idSchedGroup: # (paranoia wrt idSchedGroup)
1305
1306 # ... and schedule group data.
1307 oSchedGrpData = SchedGroupData().initFromDbWithId(oDb, oTestBoxDataCur.idSchedGroup, tsNow);
1308 if oSchedGrpData.fEnabled and oSchedGrpData.idBuildSrc is not None:
1309
1310 #
1311 # Instantiate the specified scheduler and let it do the rest.
1312 #
1313 oScheduler = SchedulerBase._instantiate(oDb, oSchedGrpData, iVerbosity, tsSecStart);
1314 dResponse = oScheduler.scheduleNewTaskWorker(oTestBoxDataCur, tsNow, sBaseUrl);
1315 if dResponse is not None:
1316 oDb.commit();
1317 return dResponse;
1318 except:
1319 oDb.rollback();
1320 raise;
1321
1322 # Not enabled, rollback and return no task.
1323 oDb.rollback();
1324 return None;
1325
1326 @staticmethod
1327 def tryCancelGangGathering(oDb, oStatusData):
1328 """
1329 Try canceling a gang gathering.
1330
1331 Returns True if successfully cancelled.
1332 Returns False if not (someone raced us to the SchedQueue table).
1333
1334 Note! oStatusData is re-initialized.
1335 """
1336 assert oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering;
1337 try:
1338 #
1339 # Lock the tables we're updating so we don't run into concurrency
1340 # issues (we're racing both scheduleNewTask and other callers of
1341 # this method).
1342 #
1343 oDb.rollback();
1344 oDb.begin();
1345 oDb.execute('LOCK TABLE TestBoxStatuses, SchedQueues IN EXCLUSIVE MODE');
1346
1347 #
1348 # Re-read the testbox data and check that we're still in the same state.
1349 #
1350 oStatusData.initFromDbWithId(oDb, oStatusData.idTestBox);
1351 if oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGathering:
1352 #
1353 # Get the leader thru the test set and change the state of the whole gang.
1354 #
1355 oTestSetData = TestSetData().initFromDbWithId(oDb, oStatusData.idTestSet);
1356
1357 oTBStatusLogic = TestBoxStatusLogic(oDb);
1358 oTBStatusLogic.updateGangStatus(oTestSetData.idTestSetGangLeader,
1359 TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut,
1360 fCommit = False);
1361
1362 #
1363 # Move the scheduling queue item to the end.
1364 #
1365 oDb.execute('SELECT *\n'
1366 'FROM SchedQueues\n'
1367 'WHERE idTestSetGangLeader = %s\n'
1368 , (oTestSetData.idTestSetGangLeader,) );
1369 oTask = SchedQueueData().initFromDbRow(oDb.fetchOne());
1370 oTestEx = TestCaseArgsDataEx().initFromDbWithGenId(oDb, oTask.idGenTestCaseArgs);
1371
1372 oDb.execute('UPDATE SchedQueues\n'
1373 ' SET idItem = NEXTVAL(\'SchedQueueItemIdSeq\'),\n'
1374 ' idTestSetGangLeader = NULL,\n'
1375 ' cMissingGangMembers = %s\n'
1376 'WHERE idItem = %s\n'
1377 , (oTestEx.cGangMembers, oTask.idItem,) );
1378
1379 oDb.commit();
1380 return True;
1381
1382 elif oStatusData.enmState == TestBoxStatusData.ksTestBoxState_GangGatheringTimedOut:
1383 oDb.rollback();
1384 return True;
1385 except:
1386 oDb.rollback();
1387 raise;
1388
1389 # Not enabled, rollback and return no task.
1390 oDb.rollback();
1391 return False;
1392
1393
1394#
1395# Unit testing.
1396#
1397
1398# pylint: disable=C0111
1399class SchedQueueDataTestCase(ModelDataBaseTestCase):
1400 def setUp(self):
1401 self.aoSamples = [SchedQueueData(),];
1402
1403if __name__ == '__main__':
1404 unittest.main();
1405 # not reached.
1406
注意: 瀏覽 TracBrowser 來幫助您使用儲存庫瀏覽器

© 2025 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette