Movatterモバイル変換


[0]ホーム

URL:


MediaWiki master
JobQueueDB.php
Go to the documentation of this file.
1<?php
21namespaceMediaWiki\JobQueue;
22
23useMappedIterator;
24useMediaWiki\JobQueue\Exceptions\JobQueueConnectionError;
25useMediaWiki\JobQueue\Exceptions\JobQueueError;
26useMediaWiki\MediaWikiServices;
27useProfiler;
28use stdClass;
29use UnexpectedValueException;
30useWikimedia\Rdbms\DBConnectionError;
31useWikimedia\Rdbms\DBError;
32useWikimedia\Rdbms\IDatabase;
33useWikimedia\Rdbms\IMaintainableDatabase;
34useWikimedia\Rdbms\IReadableDatabase;
35useWikimedia\Rdbms\RawSQLValue;
36useWikimedia\Rdbms\SelectQueryBuilder;
37useWikimedia\Rdbms\ServerInfo;
38use Wikimedia\ScopedCallback;
39
46classJobQueueDBextendsJobQueue {
47/* seconds to cache info without re-validating */
48privateconst CACHE_TTL_SHORT = 30;
49/* seconds a job can live once claimed */
50privateconst MAX_AGE_PRUNE = 7 * 24 * 3600;
55privateconst MAX_JOB_RANDOM = 2_147_483_647;
56/* maximum number of rows to skip */
57privateconst MAX_OFFSET = 255;
58
60protected$conn;
61
63protected$server;
65protected$cluster;
66
75protectedfunction__construct( array $params ) {
76 parent::__construct( $params );
77
78if ( isset( $params['server'] ) ) {
79 $this->server = $params['server'];
80// Always use autocommit mode, even if DBO_TRX is configured
81 $this->server['flags'] ??= 0;
82 $this->server['flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
83 } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
84 $this->cluster = $params['cluster'];
85 }
86 }
87
88protectedfunctionsupportedOrders() {
89return ['random','timestamp','fifo' ];
90 }
91
92protectedfunctionoptimalOrder() {
93return'random';
94 }
95
100protectedfunctiondoIsEmpty() {
101 $dbr = $this->getReplicaDB();
102try {
103// unclaimed job
104 $found = (bool)$dbr->newSelectQueryBuilder()
105 ->select('1' )
106 ->from('job' )
107 ->where( ['job_cmd' => $this->type,'job_token' =>'' ] )
108 ->caller( __METHOD__ )->fetchField();
109 }catch (DBError $e ) {
110throw $this->getDBException( $e );
111 }
112
113return !$found;
114 }
115
120protectedfunctiondoGetSize() {
121 $key = $this->getCacheKey('size' );
122
123 $size = $this->wanCache->get( $key );
124if ( is_int( $size ) ) {
125return $size;
126 }
127
128 $dbr = $this->getReplicaDB();
129try {
130 $size = $dbr->newSelectQueryBuilder()
131 ->from('job' )
132 ->where( ['job_cmd' => $this->type,'job_token' =>'' ] )
133 ->caller( __METHOD__ )->fetchRowCount();
134 }catch (DBError $e ) {
135throw $this->getDBException( $e );
136 }
137 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
138
139return $size;
140 }
141
146protectedfunctiondoGetAcquiredCount() {
147if ( $this->claimTTL <= 0 ) {
148return 0;// no acknowledgements
149 }
150
151 $key = $this->getCacheKey('acquiredcount' );
152
153 $count = $this->wanCache->get( $key );
154if ( is_int( $count ) ) {
155return $count;
156 }
157
158 $dbr = $this->getReplicaDB();
159try {
160 $count = $dbr->newSelectQueryBuilder()
161 ->from('job' )
162 ->where( [
163'job_cmd' => $this->type,
164 $dbr->expr('job_token','!=','' ),
165 ] )
166 ->caller( __METHOD__ )->fetchRowCount();
167 }catch (DBError $e ) {
168throw $this->getDBException( $e );
169 }
170 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
171
172return $count;
173 }
174
181protectedfunctiondoGetAbandonedCount() {
182if ( $this->claimTTL <= 0 ) {
183return 0;// no acknowledgements
184 }
185
186 $key = $this->getCacheKey('abandonedcount' );
187
188 $count = $this->wanCache->get( $key );
189if ( is_int( $count ) ) {
190return $count;
191 }
192
193 $dbr = $this->getReplicaDB();
194try {
195 $count = $dbr->newSelectQueryBuilder()
196 ->from('job' )
197 ->where(
198 [
199'job_cmd' => $this->type,
200 $dbr->expr('job_token','!=','' ),
201 $dbr->expr('job_attempts','>=', $this->maxTries ),
202 ]
203 )
204 ->caller( __METHOD__ )->fetchRowCount();
205 }catch (DBError $e ) {
206throw $this->getDBException( $e );
207 }
208
209 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
210
211return $count;
212 }
213
221protectedfunctiondoBatchPush( array $jobs, $flags ) {
222// Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job.
223 $transactionProfiler =Profiler::instance()->getTransactionProfiler();
224 $scope = $transactionProfiler->silenceForScope();
225 $dbw = $this->getPrimaryDB();
226 ScopedCallback::consume( $scope );
227// In general, there will be two cases here:
228// a) sqlite; DB connection is probably a regular round-aware handle.
229// If the connection is busy with a transaction, then defer the job writes
230// until right before the main round commit step. Any errors that bubble
231// up will rollback the main commit round.
232// b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
233// No transaction is active nor will be started by writes, so enqueue the jobs
234// now so that any errors will show up immediately as the interface expects. Any
235// errors that bubble up will rollback the main commit round.
236 $fname = __METHOD__;
237 $dbw->onTransactionPreCommitOrIdle(
238 fn () => $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ),
239 $fname
240 );
241 }
242
254publicfunctiondoBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method ) {
255if ( $jobs === [] ) {
256return;
257 }
258
259 $rowSet = [];// (sha1 => job) map for jobs that are de-duplicated
260 $rowList = [];// list of jobs for jobs that are not de-duplicated
261foreach ( $jobs as$job ) {
262 $row = $this->insertFields( $job, $dbw );
263if ($job->ignoreDuplicates() ) {
264 $rowSet[$row['job_sha1']] = $row;
265 }else {
266 $rowList[] = $row;
267 }
268 }
269
270if ( $flags & self::QOS_ATOMIC ) {
271 $dbw->startAtomic( $method );// wrap all the job additions in one transaction
272 }
273try {
274// Strip out any duplicate jobs that are already in the queue...
275if ( count( $rowSet ) ) {
276 $res = $dbw->newSelectQueryBuilder()
277 ->select('job_sha1' )
278 ->from('job' )
279 ->where(
280 [
281// No job_type condition since it's part of the job_sha1 hash
282'job_sha1' => array_map('strval', array_keys( $rowSet ) ),
283'job_token' =>''// unclaimed
284 ]
285 )
286 ->caller( $method )->fetchResultSet();
287foreach ( $res as $row ) {
288wfDebug("Job with hash '{$row->job_sha1}' is a duplicate." );
289 unset( $rowSet[$row->job_sha1] );// already enqueued
290 }
291 }
292// Build the full list of job rows to insert
293 $rows = array_merge( $rowList, array_values( $rowSet ) );
294// Silence expectations related to inserting to the job table, because we have to perform the inserts to
295// track the job.
296 $transactionProfiler =Profiler::instance()->getTransactionProfiler();
297 $scope = $transactionProfiler->silenceForScope();
298// Insert the job rows in chunks to avoid replica DB lag...
299foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
301 ->insertInto('job' )
302 ->rows( $rowBatch )
303 ->caller( $method )->execute();
304 }
305 ScopedCallback::consume( $scope );
306 $this->incrStats('inserts', $this->type, count( $rows ) );
307 $this->incrStats('dupe_inserts', $this->type,
308 count( $rowSet ) + count( $rowList ) - count( $rows )
309 );
310 }catch (DBError $e ) {
311throw $this->getDBException( $e );
312 }
313if ( $flags & self::QOS_ATOMIC ) {
314 $dbw->endAtomic( $method );
315 }
316 }
317
322protectedfunctiondoPop() {
323$job =false;// job popped off
324try {
325 $uuid =wfRandomString( 32 );// pop attempt
326do {// retry when our row is invalid or deleted as a duplicate
327// Try to reserve a row in the DB...
328if ( in_array( $this->order, ['fifo','timestamp' ] ) ) {
329 $row = $this->claimOldest( $uuid );
330 }else {// random first
331 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );// encourage concurrent UPDATEs
332 $gte = (bool)mt_rand( 0, 1 );// find rows with rand before/after $rand
333 $row = $this->claimRandom( $uuid, $rand, $gte );
334 }
335// Check if we found a row to reserve...
336if ( !$row ) {
337break;// nothing to do
338 }
339 $this->incrStats('pops', $this->type );
340
341// Get the job object from the row...
342$job = $this->jobFromRow( $row );
343break;// done
344 }while (true );
345
346if ( !$job || mt_rand( 0, 9 ) == 0 ) {
347// Handled jobs that need to be recycled/deleted;
348// any recycled jobs will be picked up next attempt
350 }
351 }catch (DBError $e ) {
352throw $this->getDBException( $e );
353 }
354
355return$job;
356 }
357
366protectedfunctionclaimRandom( $uuid, $rand, $gte ) {
367 $dbw = $this->getPrimaryDB();
368// Check cache to see if the queue has <= OFFSET items
369 $tinyQueue = $this->wanCache->get( $this->getCacheKey('small' ) );
370
371 $invertedDirection =false;// whether one job_random direction was already scanned
372// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
373// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
374// not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
375// be used here with MySQL.
376do {
377if ( $tinyQueue ) {// queue has <= MAX_OFFSET rows
378// For small queues, using OFFSET will overshoot and return no rows more often.
379// Instead, this uses job_random to pick a row (possibly checking both directions).
380 $row = $dbw->newSelectQueryBuilder()
381 ->select( self::selectFields() )
382 ->from('job' )
383 ->where(
384 [
385'job_cmd' => $this->type,
386'job_token' =>'',// unclaimed
387 $dbw->expr('job_random', $gte ?'>=' :'<=', $rand )
388 ]
389 )
390 ->orderBy(
391'job_random',
392 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
393 )
394 ->caller( __METHOD__ )->fetchRow();
395if ( !$row && !$invertedDirection ) {
396 $gte = !$gte;
397 $invertedDirection =true;
398continue;// try the other direction
399 }
400 }else {// table *may* have >= MAX_OFFSET rows
401// T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
402// in MySQL if there are many rows for some reason. This uses a small OFFSET
403// instead of job_random for reducing excess claim retries.
404 $row = $dbw->newSelectQueryBuilder()
405 ->select( self::selectFields() )
406 ->from('job' )
407 ->where(
408 [
409'job_cmd' => $this->type,
410'job_token' =>'',// unclaimed
411 ]
412 )
413 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
414 ->caller( __METHOD__ )->fetchRow();
415if ( !$row ) {
416 $tinyQueue =true;// we know the queue must have <= MAX_OFFSET rows
417 $this->wanCache->set( $this->getCacheKey('small' ), 1, 30 );
418continue;// use job_random
419 }
420 }
421
422if ( !$row ) {
423break;
424 }
425
426 $dbw->newUpdateQueryBuilder()
427 ->update('job' )// update by PK
428 ->set( [
429'job_token' => $uuid,
430'job_token_timestamp' => $dbw->timestamp(),
431'job_attempts' =>newRawSQLValue('job_attempts+1' ),
432 ] )
433 ->where( [
434'job_cmd' => $this->type,
435'job_id' => $row->job_id,
436'job_token' =>''
437 ] )
438 ->caller( __METHOD__ )->execute();
439
440// This might get raced out by another runner when claiming the previously
441// selected row. The use of job_random should minimize this problem, however.
442if ( !$dbw->affectedRows() ) {
443 $row =false;// raced out
444 }
445 }while ( !$row );
446
447return $row;
448 }
449
456protectedfunctionclaimOldest( $uuid ) {
457 $dbw = $this->getPrimaryDB();
458
459 $row =false;// the row acquired
460do {
461if ( $dbw->getType() ==='mysql' ) {
462// Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
463// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
464// Postgres has no such limitation. However, MySQL offers an
465// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
466 $dbw->query("UPDATE {$dbw->tableName( 'job' )} " .
467"SET " .
468"job_token = {$dbw->addQuotes( $uuid ) }, " .
469"job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
470"job_attempts = job_attempts+1 " .
471"WHERE ( " .
472"job_cmd = {$dbw->addQuotes( $this->type )} " .
473"AND job_token = {$dbw->addQuotes( '' )} " .
474") ORDER BY job_id ASC LIMIT 1",
475 __METHOD__
476 );
477 }else {
478// Use a subquery to find the job, within an UPDATE to claim it.
479// This uses as much of the DB wrapper functions as possible.
480 $qb = $dbw->newSelectQueryBuilder()
481 ->select('job_id' )
482 ->from('job' )
483 ->where( ['job_cmd' => $this->type,'job_token' =>'' ] )
484 ->orderBy('job_id', SelectQueryBuilder::SORT_ASC )
485 ->limit( 1 );
486
487 $dbw->newUpdateQueryBuilder()
488 ->update('job' )
489 ->set( [
490'job_token' => $uuid,
491'job_token_timestamp' => $dbw->timestamp(),
492'job_attempts' =>newRawSQLValue('job_attempts+1' ),
493 ] )
494 ->where( ['job_id' =>newRawSQLValue('(' . $qb->getSQL() .')' ) ] )
495 ->caller( __METHOD__ )->execute();
496 }
497
498if ( !$dbw->affectedRows() ) {
499break;
500 }
501
502// Fetch any row that we just reserved...
503 $row = $dbw->newSelectQueryBuilder()
504 ->select( self::selectFields() )
505 ->from('job' )
506 ->where( ['job_cmd' => $this->type,'job_token' => $uuid ] )
507 ->caller( __METHOD__ )->fetchRow();
508if ( !$row ) {// raced out by duplicate job removal
509wfDebug("Row deleted as duplicate by another process." );
510 }
511 }while ( !$row );
512
513return $row;
514 }
515
522protectedfunctiondoAck(RunnableJob$job ) {
523 $id =$job->getMetadata('id' );
524if ( $id ===null ) {
525thrownew UnexpectedValueException("Job of type '{$job->getType()}' has no ID." );
526 }
527
528 $dbw = $this->getPrimaryDB();
529try {
530// Delete a row with a single DELETE without holding row locks over RTTs...
531 $dbw->newDeleteQueryBuilder()
532 ->deleteFrom('job' )
533 ->where( ['job_cmd' => $this->type,'job_id' => $id ] )
534 ->caller( __METHOD__ )->execute();
535
536 $this->incrStats('acks', $this->type );
537 }catch (DBError $e ) {
538throw $this->getDBException( $e );
539 }
540 }
541
549// Callers should call JobQueueGroup::push() before this method so that if the
550// insert fails, the de-duplication registration will be aborted. Since the insert
551// is deferred till "transaction idle", do the same here, so that the ordering is
552// maintained. Having only the de-duplication registration succeed would cause
553// jobs to become no-ops without any actual jobs that made them redundant.
554 $dbw = $this->getPrimaryDB();
555 $dbw->onTransactionCommitOrIdle(
556function () use ($job ) {
557 parent::doDeduplicateRootJob($job );
558 },
559 __METHOD__
560 );
561
562returntrue;
563 }
564
569protectedfunctiondoDelete() {
570 $dbw = $this->getPrimaryDB();
571try {
572 $dbw->newDeleteQueryBuilder()
573 ->deleteFrom('job' )
574 ->where( ['job_cmd' => $this->type ] )
575 ->caller( __METHOD__ )->execute();
576 }catch (DBError $e ) {
577throw $this->getDBException( $e );
578 }
579
580returntrue;
581 }
582
587protectedfunctiondoWaitForBackups() {
588if ( $this->server ) {
589return;// not using LBFactory instance
590 }
591
592 $lbFactory =MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
593 $lbFactory->waitForReplication();
594 }
595
599protectedfunctiondoFlushCaches() {
600foreach ( ['size','acquiredcount' ] as$type ) {
601 $this->wanCache->delete( $this->getCacheKey($type ) );
602 }
603 }
604
609publicfunctiongetAllQueuedJobs() {
610return $this->getJobIterator( ['job_cmd' => $this->getType(),'job_token' =>'' ] );
611 }
612
617publicfunctiongetAllAcquiredJobs() {
618 $dbr = $this->getReplicaDB();
619return $this->getJobIterator( ['job_cmd' => $this->getType(), $dbr->expr('job_token','>','' ) ] );
620 }
621
626publicfunctiongetAllAbandonedJobs() {
627 $dbr = $this->getReplicaDB();
628return $this->getJobIterator( [
629'job_cmd' => $this->getType(),
630 $dbr->expr('job_token','>','' ),
631 $dbr->expr('job_attempts','>=', intval( $this->maxTries ) ),
632 ] );
633 }
634
639protectedfunctiongetJobIterator( array $conds ) {
640 $dbr = $this->getReplicaDB();
641 $qb = $dbr->newSelectQueryBuilder()
642 ->select( self::selectFields() )
643 ->from('job' )
644 ->where( $conds );
645try {
646returnnewMappedIterator(
647 $qb->caller( __METHOD__ )->fetchResultSet(),
648 function ( $row ) {
649return $this->jobFromRow( $row );
650 }
651 );
652 }catch (DBError $e ) {
653throw $this->getDBException( $e );
654 }
655 }
656
658if ( $this->server ) {
659returnnull;// not using the LBFactory instance
660 }
661
662return is_string( $this->cluster )
663 ?"DBCluster:{$this->cluster}:{$this->domain}"
664 :"LBFactory:{$this->domain}";
665 }
666
667protectedfunctiondoGetSiblingQueuesWithJobs( array $types ) {
668 $dbr = $this->getReplicaDB();
669// @note: this does not check whether the jobs are claimed or not.
670// This is useful so JobQueueGroup::pop() also sees queues that only
671// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
672// failed jobs so that they can be popped again for that edge case.
673 $res = $dbr->newSelectQueryBuilder()
674 ->select('job_cmd' )
675 ->distinct()
676 ->from('job' )
677 ->where( ['job_cmd' => $types ] )
678 ->caller( __METHOD__ )->fetchResultSet();
679
680 $types = [];
681foreach ( $res as $row ) {
682 $types[] = $row->job_cmd;
683 }
684
685return $types;
686 }
687
688protectedfunctiondoGetSiblingQueueSizes( array $types ) {
689 $dbr = $this->getReplicaDB();
690
691 $res = $dbr->newSelectQueryBuilder()
692 ->select( ['job_cmd','count' =>'COUNT(*)' ] )
693 ->from('job' )
694 ->where( ['job_cmd' => $types ] )
695 ->groupBy('job_cmd' )
696 ->caller( __METHOD__ )->fetchResultSet();
697
698 $sizes = [];
699foreach ( $res as $row ) {
700 $sizes[$row->job_cmd] = (int)$row->count;
701 }
702
703return $sizes;
704 }
705
712 $now = time();
713 $count = 0;// affected rows
714 $dbw = $this->getPrimaryDB();
715
716try {
717if ( !$dbw->lock("jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
718return $count;// already in progress
719 }
720
721// Remove claims on jobs acquired for too long if enabled...
722if ( $this->claimTTL > 0 ) {
723 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
724// Get the IDs of jobs that have be claimed but not finished after too long.
725// These jobs can be recycled into the queue by expiring the claim. Selecting
726// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
727 $res = $dbw->newSelectQueryBuilder()
728 ->select('job_id' )
729 ->from('job' )
730 ->where(
731 [
732'job_cmd' => $this->type,
733 $dbw->expr('job_token','!=','' ),// was acquired
734 $dbw->expr('job_token_timestamp','<', $claimCutoff ),// stale
735 $dbw->expr('job_attempts','<', $this->maxTries ),// retries left
736 ]
737 )
738 ->caller( __METHOD__ )->fetchResultSet();
739 $ids = array_map(
740staticfunction ( $o ) {
741return $o->job_id;
742 }, iterator_to_array( $res )
743 );
744if ( count( $ids ) ) {
745// Reset job_token for these jobs so that other runners will pick them up.
746// Set the timestamp to the current time, as it is useful to now that the job
747// was already tried before (the timestamp becomes the "released" time).
748 $dbw->newUpdateQueryBuilder()
749 ->update('job' )
750 ->set( [
751'job_token' =>'',
752'job_token_timestamp' => $dbw->timestamp( $now )// time of release
753 ] )
754 ->where( [
755'job_id' => $ids,
756 $dbw->expr('job_token','!=','' ),
757 ] )
758 ->caller( __METHOD__ )->execute();
759
760 $affected = $dbw->affectedRows();
761 $count += $affected;
762 $this->incrStats('recycles', $this->type, $affected );
763 }
764 }
765
766// Just destroy any stale jobs...
767 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
768 $qb = $dbw->newSelectQueryBuilder()
769 ->select('job_id' )
770 ->from('job' )
771 ->where(
772 [
773'job_cmd' => $this->type,
774 $dbw->expr('job_token','!=','' ),// was acquired
775 $dbw->expr('job_token_timestamp','<', $pruneCutoff )// stale
776 ]
777 );
778if ( $this->claimTTL > 0 ) {// only prune jobs attempted too many times...
779 $qb->andWhere( $dbw->expr('job_attempts','>=', $this->maxTries ) );
780 }
781// Get the IDs of jobs that are considered stale and should be removed. Selecting
782// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
783 $res = $qb->caller( __METHOD__ )->fetchResultSet();
784 $ids = array_map(
785staticfunction ( $o ) {
786return $o->job_id;
787 }, iterator_to_array( $res )
788 );
789if ( count( $ids ) ) {
790 $dbw->newDeleteQueryBuilder()
791 ->deleteFrom('job' )
792 ->where( ['job_id' => $ids ] )
793 ->caller( __METHOD__ )->execute();
794 $affected = $dbw->affectedRows();
795 $count += $affected;
796 $this->incrStats('abandons', $this->type, $affected );
797 }
798
799 $dbw->unlock("jobqueue-recycle-{$this->type}", __METHOD__ );
800 }catch (DBError $e ) {
801throw $this->getDBException( $e );
802 }
803
804return $count;
805 }
806
813return [
814// Fields that describe the nature of the job
815'job_cmd' =>$job->getType(),
816'job_namespace' =>$job->getParams()['namespace'] ??NS_SPECIAL,
817'job_title' =>$job->getParams()['title'] ??'',
818'job_params' =>self::makeBlob($job->getParams() ),
819// Additional job metadata
820'job_timestamp' => $db->timestamp(),
821'job_sha1' => \Wikimedia\base_convert(
822 sha1( serialize($job->getDeduplicationInfo() ) ),
823 16, 36, 31
824 ),
825'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
826 ];
827 }
828
833protectedfunctiongetReplicaDB() {
834try {
835return $this->getDB(DB_REPLICA );
836 }catch (DBConnectionError $e ) {
837thrownewJobQueueConnectionError("DBConnectionError:" . $e->getMessage() );
838 }
839 }
840
846protectedfunctiongetPrimaryDB() {
847try {
848return $this->getDB(DB_PRIMARY );
849 }catch (DBConnectionError $e ) {
850thrownewJobQueueConnectionError("DBConnectionError:" . $e->getMessage() );
851 }
852 }
853
858protectedfunctiongetDB( $index ) {
859if ( $this->server ) {
860if ( $this->conn instanceofIDatabase ) {
861return$this->conn;
862 } elseif ( $this->conn instanceofDBError ) {
863throw$this->conn;
864 }
865
866try {
867 $this->conn =MediaWikiServices::getInstance()->getDatabaseFactory()->create(
868 $this->server['type'],
869 $this->server
870 );
871 }catch (DBError $e ) {
872 $this->conn = $e;
873throw $e;
874 }
875
876return$this->conn;
877 }else {
878 $lbFactory =MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
879 $lb = is_string( $this->cluster )
880 ? $lbFactory->getExternalLB( $this->cluster )
881 : $lbFactory->getMainLB( $this->domain );
882
883if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !=='sqlite' ) {
884// Keep a separate connection to avoid contention and deadlocks;
885// However, SQLite has the opposite behavior due to DB-level locking.
886 $flags = $lb::CONN_TRX_AUTOCOMMIT;
887 }else {
888// Jobs insertion will be deferred until the PRESEND stage to reduce contention.
889 $flags = 0;
890 }
891
892return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
893 }
894 }
895
900privatefunction getCacheKey( $property ) {
901$cluster = is_string( $this->cluster ) ? $this->cluster :'main';
902
903return $this->wanCache->makeGlobalKey(
904'jobqueue',
905 $this->domain,
906$cluster,
907 $this->type,
908 $property
909 );
910 }
911
916protectedstaticfunctionmakeBlob( $params ) {
917if ( $params !==false ) {
918return serialize( $params );
919 }else {
920return'';
921 }
922 }
923
928protectedfunctionjobFromRow( $row ) {
929 $params = ( (string)$row->job_params !=='' ) ? unserialize( $row->job_params ) : [];
930if ( !is_array( $params ) ) {// this shouldn't happen
931thrownew UnexpectedValueException(
932"Could not unserialize job with ID '{$row->job_id}'." );
933 }
934
935 $params += ['namespace' => $row->job_namespace,'title' => $row->job_title ];
936$job = $this->factoryJob( $row->job_cmd, $params );
937$job->setMetadata('id', $row->job_id );
938$job->setMetadata('timestamp', $row->job_timestamp );
939
940return$job;
941 }
942
947protectedfunctiongetDBException(DBError $e ) {
948returnnewJobQueueError( get_class( $e ) .": " . $e->getMessage() );
949 }
950
956publicstaticfunctionselectFields() {
957return [
958'job_id',
959'job_cmd',
960'job_namespace',
961'job_title',
962'job_timestamp',
963'job_params',
964'job_random',
965'job_attempts',
966'job_token',
967'job_token_timestamp',
968'job_sha1',
969 ];
970 }
971}
972
974class_alias( JobQueueDB::class,'JobQueueDB' );
NS_SPECIAL
const NS_SPECIAL
DefinitionDefines.php:54
wfDebug
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
DefinitionGlobalFunctions.php:671
wfRandomString
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
DefinitionGlobalFunctions.php:276
MappedIterator
Convenience class for generating iterators from iterators.
DefinitionMappedIterator.php:28
MediaWiki\JobQueue\Exceptions\JobQueueConnectionError
DefinitionJobQueueConnectionError.php:28
MediaWiki\JobQueue\Exceptions\JobQueueError
DefinitionJobQueueError.php:30
MediaWiki\JobQueue\JobQueueDB
Database-backed job queue storage.
DefinitionJobQueueDB.php:46
MediaWiki\JobQueue\JobQueueDB\doWaitForBackups
doWaitForBackups()
DefinitionJobQueueDB.php:587
MediaWiki\JobQueue\JobQueueDB\$server
array null $server
Server configuration array.
DefinitionJobQueueDB.php:63
MediaWiki\JobQueue\JobQueueDB\$conn
IMaintainableDatabase DBError null $conn
DefinitionJobQueueDB.php:60
MediaWiki\JobQueue\JobQueueDB\doGetSiblingQueueSizes
doGetSiblingQueueSizes(array $types)
DefinitionJobQueueDB.php:688
MediaWiki\JobQueue\JobQueueDB\getCoalesceLocationInternal
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
DefinitionJobQueueDB.php:657
MediaWiki\JobQueue\JobQueueDB\getReplicaDB
getReplicaDB()
DefinitionJobQueueDB.php:833
MediaWiki\JobQueue\JobQueueDB\doGetAcquiredCount
doGetAcquiredCount()
DefinitionJobQueueDB.php:146
MediaWiki\JobQueue\JobQueueDB\doGetSiblingQueuesWithJobs
doGetSiblingQueuesWithJobs(array $types)
DefinitionJobQueueDB.php:667
MediaWiki\JobQueue\JobQueueDB\doPop
doPop()
DefinitionJobQueueDB.php:322
MediaWiki\JobQueue\JobQueueDB\claimRandom
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
DefinitionJobQueueDB.php:366
MediaWiki\JobQueue\JobQueueDB\doIsEmpty
doIsEmpty()
DefinitionJobQueueDB.php:100
MediaWiki\JobQueue\JobQueueDB\jobFromRow
jobFromRow( $row)
DefinitionJobQueueDB.php:928
MediaWiki\JobQueue\JobQueueDB\recycleAndDeleteStaleJobs
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
DefinitionJobQueueDB.php:711
MediaWiki\JobQueue\JobQueueDB\optimalOrder
optimalOrder()
Get the default queue order to use if configuration does not specify one.
DefinitionJobQueueDB.php:92
MediaWiki\JobQueue\JobQueueDB\doAck
doAck(RunnableJob $job)
DefinitionJobQueueDB.php:522
MediaWiki\JobQueue\JobQueueDB\$cluster
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
DefinitionJobQueueDB.php:65
MediaWiki\JobQueue\JobQueueDB\selectFields
static selectFields()
Return the list of job fields that should be selected.
DefinitionJobQueueDB.php:956
MediaWiki\JobQueue\JobQueueDB\doGetAbandonedCount
doGetAbandonedCount()
DefinitionJobQueueDB.php:181
MediaWiki\JobQueue\JobQueueDB\doGetSize
doGetSize()
DefinitionJobQueueDB.php:120
MediaWiki\JobQueue\JobQueueDB\getAllQueuedJobs
getAllQueuedJobs()
DefinitionJobQueueDB.php:609
MediaWiki\JobQueue\JobQueueDB\doBatchPushInternal
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
DefinitionJobQueueDB.php:254
MediaWiki\JobQueue\JobQueueDB\supportedOrders
supportedOrders()
Get the allowed queue orders for configuration validation.
DefinitionJobQueueDB.php:88
MediaWiki\JobQueue\JobQueueDB\__construct
__construct(array $params)
Additional parameters include:
DefinitionJobQueueDB.php:75
MediaWiki\JobQueue\JobQueueDB\makeBlob
static makeBlob( $params)
DefinitionJobQueueDB.php:916
MediaWiki\JobQueue\JobQueueDB\doBatchPush
doBatchPush(array $jobs, $flags)
DefinitionJobQueueDB.php:221
MediaWiki\JobQueue\JobQueueDB\doFlushCaches
doFlushCaches()
DefinitionJobQueueDB.php:599
MediaWiki\JobQueue\JobQueueDB\getDBException
getDBException(DBError $e)
DefinitionJobQueueDB.php:947
MediaWiki\JobQueue\JobQueueDB\getDB
getDB( $index)
DefinitionJobQueueDB.php:858
MediaWiki\JobQueue\JobQueueDB\getPrimaryDB
getPrimaryDB()
DefinitionJobQueueDB.php:846
MediaWiki\JobQueue\JobQueueDB\getJobIterator
getJobIterator(array $conds)
DefinitionJobQueueDB.php:639
MediaWiki\JobQueue\JobQueueDB\getAllAbandonedJobs
getAllAbandonedJobs()
DefinitionJobQueueDB.php:626
MediaWiki\JobQueue\JobQueueDB\insertFields
insertFields(IJobSpecification $job, IReadableDatabase $db)
DefinitionJobQueueDB.php:812
MediaWiki\JobQueue\JobQueueDB\doDelete
doDelete()
DefinitionJobQueueDB.php:569
MediaWiki\JobQueue\JobQueueDB\claimOldest
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
DefinitionJobQueueDB.php:456
MediaWiki\JobQueue\JobQueueDB\doDeduplicateRootJob
doDeduplicateRootJob(IJobSpecification $job)
DefinitionJobQueueDB.php:548
MediaWiki\JobQueue\JobQueueDB\getAllAcquiredJobs
getAllAcquiredJobs()
DefinitionJobQueueDB.php:617
MediaWiki\JobQueue\JobQueue
Base class for queueing and running background jobs from a storage backend.
DefinitionJobQueue.php:50
MediaWiki\JobQueue\JobQueue\getType
getType()
DefinitionJobQueue.php:181
MediaWiki\JobQueue\JobQueue\incrStats
incrStats( $event, $type, $delta=1)
Call StatsFactory::incrementBy() for the queue overall and for the queue type.
DefinitionJobQueue.php:784
MediaWiki\JobQueue\JobQueue\$type
string $type
Job type.
DefinitionJobQueue.php:54
MediaWiki\JobQueue\JobQueue\factoryJob
factoryJob( $command, $params)
DefinitionJobQueue.php:750
MediaWiki\MediaWikiServices
Service locator for MediaWiki core services.
DefinitionMediaWikiServices.php:250
MediaWiki\MediaWikiServices\getInstance
static getInstance()
Returns the global default instance of the top level service locator.
DefinitionMediaWikiServices.php:338
Profiler
Profiler base class that defines the interface and some shared functionality.
DefinitionProfiler.php:37
Profiler\instance
static instance()
DefinitionProfiler.php:105
Wikimedia\Rdbms\DBConnectionError
DefinitionDBConnectionError.php:26
Wikimedia\Rdbms\DBError
Database error base class.
DefinitionDBError.php:36
Wikimedia\Rdbms\RawSQLValue
Raw SQL value to be used in query builders.
DefinitionRawSQLValue.php:13
Wikimedia\Rdbms\SelectQueryBuilder
Build SELECT queries with a fluent interface.
DefinitionSelectQueryBuilder.php:26
Wikimedia\Rdbms\ServerInfo
Container for accessing information about the database servers in a database cluster.
DefinitionServerInfo.php:14
MediaWiki\JobQueue\IJobSpecification
Interface for serializable objects that describe a job queue task.
DefinitionIJobSpecification.php:42
MediaWiki\JobQueue\RunnableJob
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
DefinitionRunnableJob.php:38
Wikimedia\Rdbms\IDatabase
Interface to a relational database.
DefinitionIDatabase.php:45
Wikimedia\Rdbms\IDatabase\endAtomic
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
Wikimedia\Rdbms\IDatabase\startAtomic
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
Wikimedia\Rdbms\IDatabase\newInsertQueryBuilder
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Wikimedia\Rdbms\IMaintainableDatabase
Advanced database interface for IDatabase handles that include maintenance methods.
DefinitionIMaintainableDatabase.php:34
Wikimedia\Rdbms\IReadableDatabase
A database connection without write operations.
DefinitionIReadableDatabase.php:34
Wikimedia\Rdbms\IReadableDatabase\newSelectQueryBuilder
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
Wikimedia\Rdbms\Platform\ISQLPlatform\timestamp
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
MediaWiki\JobQueue
DB_REPLICA
const DB_REPLICA
Definitiondefines.php:26
DB_PRIMARY
const DB_PRIMARY
Definitiondefines.php:28
$job
if(count( $args)< 1) $job
DefinitionrecompressTracked.php:52

[8]ページ先頭

©2009-2025 Movatter.jp