29use UnexpectedValueException;
38use Wikimedia\ScopedCallback;
47/* seconds to cache info without re-validating */ 48privateconst CACHE_TTL_SHORT = 30;
49/* seconds a job can live once claimed */ 50privateconst MAX_AGE_PRUNE = 7 * 24 * 3600;
55privateconst MAX_JOB_RANDOM = 2_147_483_647;
56/* maximum number of rows to skip */ 57privateconst MAX_OFFSET = 255;
76 parent::__construct( $params );
78if ( isset( $params[
'server'] ) ) {
79 $this->server = $params[
'server'];
80// Always use autocommit mode, even if DBO_TRX is configured 81 $this->server[
'flags'] ??= 0;
82 $this->server[
'flags'] &= ~( IDatabase::DBO_TRX | IDatabase::DBO_DEFAULT );
83 } elseif ( isset( $params[
'cluster'] ) && is_string( $params[
'cluster'] ) ) {
84 $this->cluster = $params[
'cluster'];
89return [
'random',
'timestamp',
'fifo' ];
104 $found = (bool)$dbr->newSelectQueryBuilder()
107 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
108 ->caller( __METHOD__ )->fetchField();
121 $key = $this->getCacheKey(
'size' );
123 $size = $this->wanCache->get( $key );
124if ( is_int( $size ) ) {
130 $size = $dbr->newSelectQueryBuilder()
132 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
133 ->caller( __METHOD__ )->fetchRowCount();
137 $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
147if ( $this->claimTTL <= 0 ) {
148return 0;
// no acknowledgements 151 $key = $this->getCacheKey(
'acquiredcount' );
153 $count = $this->wanCache->get( $key );
154if ( is_int( $count ) ) {
160 $count = $dbr->newSelectQueryBuilder()
163'job_cmd' => $this->type,
164 $dbr->expr(
'job_token',
'!=',
'' ),
166 ->caller( __METHOD__ )->fetchRowCount();
170 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
182if ( $this->claimTTL <= 0 ) {
183return 0;
// no acknowledgements 186 $key = $this->getCacheKey(
'abandonedcount' );
188 $count = $this->wanCache->get( $key );
189if ( is_int( $count ) ) {
195 $count = $dbr->newSelectQueryBuilder()
199'job_cmd' => $this->type,
200 $dbr->expr(
'job_token',
'!=',
'' ),
201 $dbr->expr(
'job_attempts',
'>=', $this->maxTries ),
204 ->caller( __METHOD__ )->fetchRowCount();
209 $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
222// Silence expectations related to getting a primary DB, as we have to get a primary DB to insert the job. 224 $scope = $transactionProfiler->silenceForScope();
226 ScopedCallback::consume( $scope );
227// In general, there will be two cases here: 228// a) sqlite; DB connection is probably a regular round-aware handle. 229// If the connection is busy with a transaction, then defer the job writes 230// until right before the main round commit step. Any errors that bubble 231// up will rollback the main commit round. 232// b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. 233// No transaction is active nor will be started by writes, so enqueue the jobs 234// now so that any errors will show up immediately as the interface expects. Any 235// errors that bubble up will rollback the main commit round. 237 $dbw->onTransactionPreCommitOrIdle(
259 $rowSet = [];
// (sha1 => job) map for jobs that are de-duplicated 260 $rowList = [];
// list of jobs for jobs that are not de-duplicated 261foreach ( $jobs as
$job ) {
263if (
$job->ignoreDuplicates() ) {
264 $rowSet[$row[
'job_sha1']] = $row;
270if ( $flags & self::QOS_ATOMIC ) {
271 $dbw->
startAtomic( $method );
// wrap all the job additions in one transaction 274// Strip out any duplicate jobs that are already in the queue... 275if ( count( $rowSet ) ) {
277 ->select(
'job_sha1' )
281// No job_type condition since it's part of the job_sha1 hash 282'job_sha1' => array_map(
'strval', array_keys( $rowSet ) ),
283'job_token' =>
''// unclaimed 286 ->caller( $method )->fetchResultSet();
287foreach ( $res as $row ) {
288wfDebug(
"Job with hash '{$row->job_sha1}' is a duplicate." );
289 unset( $rowSet[$row->job_sha1] );
// already enqueued 292// Build the full list of job rows to insert 293 $rows = array_merge( $rowList, array_values( $rowSet ) );
294// Silence expectations related to inserting to the job table, because we have to perform the inserts to 297 $scope = $transactionProfiler->silenceForScope();
298// Insert the job rows in chunks to avoid replica DB lag... 299foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
303 ->caller( $method )->execute();
305 ScopedCallback::consume( $scope );
306 $this->
incrStats(
'inserts', $this->type, count( $rows ) );
307 $this->
incrStats(
'dupe_inserts', $this->type,
308 count( $rowSet ) + count( $rowList ) - count( $rows )
313if ( $flags & self::QOS_ATOMIC ) {
323$job =
false;
// job popped off 326do {
// retry when our row is invalid or deleted as a duplicate 327// Try to reserve a row in the DB... 328if ( in_array( $this->order, [
'fifo',
'timestamp' ] ) ) {
330 }
else {
// random first 331 $rand = mt_rand( 0, self::MAX_JOB_RANDOM );
// encourage concurrent UPDATEs 332 $gte = (bool)mt_rand( 0, 1 );
// find rows with rand before/after $rand 335// Check if we found a row to reserve... 337break;
// nothing to do 341// Get the job object from the row... 346if ( !
$job || mt_rand( 0, 9 ) == 0 ) {
347// Handled jobs that need to be recycled/deleted; 348// any recycled jobs will be picked up next attempt 368// Check cache to see if the queue has <= OFFSET items 369 $tinyQueue = $this->wanCache->get( $this->getCacheKey(
'small' ) );
371 $invertedDirection =
false;
// whether one job_random direction was already scanned 372// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT 373// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is 374// not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot 375// be used here with MySQL. 377if ( $tinyQueue ) {
// queue has <= MAX_OFFSET rows 378// For small queues, using OFFSET will overshoot and return no rows more often. 379// Instead, this uses job_random to pick a row (possibly checking both directions). 380 $row = $dbw->newSelectQueryBuilder()
381 ->select( self::selectFields() )
385'job_cmd' => $this->type,
386'job_token' =>
'',
// unclaimed 387 $dbw->expr(
'job_random', $gte ?
'>=' :
'<=', $rand )
392 $gte ? SelectQueryBuilder::SORT_ASC : SelectQueryBuilder::SORT_DESC
394 ->caller( __METHOD__ )->fetchRow();
395if ( !$row && !$invertedDirection ) {
397 $invertedDirection =
true;
398continue;
// try the other direction 400 }
else {
// table *may* have >= MAX_OFFSET rows 401// T44614: "ORDER BY job_random" with a job_random inequality causes high CPU 402// in MySQL if there are many rows for some reason. This uses a small OFFSET 403// instead of job_random for reducing excess claim retries. 404 $row = $dbw->newSelectQueryBuilder()
405 ->select( self::selectFields() )
409'job_cmd' => $this->type,
410'job_token' =>
'',
// unclaimed 413 ->offset( mt_rand( 0, self::MAX_OFFSET ) )
414 ->caller( __METHOD__ )->fetchRow();
416 $tinyQueue =
true;
// we know the queue must have <= MAX_OFFSET rows 417 $this->wanCache->set( $this->getCacheKey(
'small' ), 1, 30 );
418continue;
// use job_random 426 $dbw->newUpdateQueryBuilder()
427 ->update(
'job' )
// update by PK 430'job_token_timestamp' => $dbw->timestamp(),
434'job_cmd' => $this->type,
435'job_id' => $row->job_id,
438 ->caller( __METHOD__ )->execute();
440// This might get raced out by another runner when claiming the previously 441// selected row. The use of job_random should minimize this problem, however. 442if ( !$dbw->affectedRows() ) {
443 $row =
false;
// raced out 459 $row =
false;
// the row acquired 461if ( $dbw->getType() ===
'mysql' ) {
462// Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the 463// same table being changed in an UPDATE query in MySQL (gives Error: 1093). 464// Postgres has no such limitation. However, MySQL offers an 465// alternative here by supporting ORDER BY + LIMIT for UPDATE queries. 466 $dbw->query(
"UPDATE {$dbw->tableName( 'job' )} " .
468"job_token = {$dbw->addQuotes( $uuid ) }, " .
469"job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
470"job_attempts = job_attempts+1 " .
472"job_cmd = {$dbw->addQuotes( $this->type )} " .
473"AND job_token = {$dbw->addQuotes( '' )} " .
474") ORDER BY job_id ASC LIMIT 1",
478// Use a subquery to find the job, within an UPDATE to claim it. 479// This uses as much of the DB wrapper functions as possible. 480 $qb = $dbw->newSelectQueryBuilder()
483 ->where( [
'job_cmd' => $this->type,
'job_token' =>
'' ] )
484 ->orderBy(
'job_id', SelectQueryBuilder::SORT_ASC )
487 $dbw->newUpdateQueryBuilder()
491'job_token_timestamp' => $dbw->timestamp(),
494 ->where( [
'job_id' =>
newRawSQLValue(
'(' . $qb->getSQL() .
')' ) ] )
495 ->caller( __METHOD__ )->execute();
498if ( !$dbw->affectedRows() ) {
502// Fetch any row that we just reserved... 503 $row = $dbw->newSelectQueryBuilder()
504 ->select( self::selectFields() )
506 ->where( [
'job_cmd' => $this->type,
'job_token' => $uuid ] )
507 ->caller( __METHOD__ )->fetchRow();
508if ( !$row ) {
// raced out by duplicate job removal 509wfDebug(
"Row deleted as duplicate by another process." );
523 $id =
$job->getMetadata(
'id' );
525thrownew UnexpectedValueException(
"Job of type '{$job->getType()}' has no ID." );
530// Delete a row with a single DELETE without holding row locks over RTTs... 531 $dbw->newDeleteQueryBuilder()
533 ->where( [
'job_cmd' => $this->type,
'job_id' => $id ] )
534 ->caller( __METHOD__ )->execute();
549// Callers should call JobQueueGroup::push() before this method so that if the 550// insert fails, the de-duplication registration will be aborted. Since the insert 551// is deferred till "transaction idle", do the same here, so that the ordering is 552// maintained. Having only the de-duplication registration succeed would cause 553// jobs to become no-ops without any actual jobs that made them redundant. 555 $dbw->onTransactionCommitOrIdle(
556function () use (
$job ) {
557 parent::doDeduplicateRootJob(
$job );
572 $dbw->newDeleteQueryBuilder()
574 ->where( [
'job_cmd' => $this->type ] )
575 ->caller( __METHOD__ )->execute();
588if ( $this->server ) {
589return;
// not using LBFactory instance 593 $lbFactory->waitForReplication();
600foreach ( [
'size',
'acquiredcount' ] as
$type ) {
601 $this->wanCache->delete( $this->getCacheKey(
$type ) );
630 $dbr->expr(
'job_token',
'>',
'' ),
631 $dbr->expr(
'job_attempts',
'>=', intval( $this->maxTries ) ),
641 $qb = $dbr->newSelectQueryBuilder()
642 ->select( self::selectFields() )
647 $qb->caller( __METHOD__ )->fetchResultSet(),
658if ( $this->server ) {
659returnnull;
// not using the LBFactory instance 662return is_string( $this->cluster )
663 ?
"DBCluster:{$this->cluster}:{$this->domain}" 664 :
"LBFactory:{$this->domain}";
669// @note: this does not check whether the jobs are claimed or not. 670// This is useful so JobQueueGroup::pop() also sees queues that only 671// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue 672// failed jobs so that they can be popped again for that edge case. 673 $res = $dbr->newSelectQueryBuilder()
677 ->where( [
'job_cmd' => $types ] )
678 ->caller( __METHOD__ )->fetchResultSet();
681foreach ( $res as $row ) {
682 $types[] = $row->job_cmd;
691 $res = $dbr->newSelectQueryBuilder()
692 ->select( [
'job_cmd',
'count' =>
'COUNT(*)' ] )
694 ->where( [
'job_cmd' => $types ] )
695 ->groupBy(
'job_cmd' )
696 ->caller( __METHOD__ )->fetchResultSet();
699foreach ( $res as $row ) {
700 $sizes[$row->job_cmd] = (int)$row->count;
713 $count = 0;
// affected rows 717if ( !$dbw->lock(
"jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
718return $count;
// already in progress 721// Remove claims on jobs acquired for too long if enabled... 722if ( $this->claimTTL > 0 ) {
723 $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
724// Get the IDs of jobs that have be claimed but not finished after too long. 725// These jobs can be recycled into the queue by expiring the claim. Selecting 726// the IDs first means that the UPDATE can be done by primary key (less deadlocks). 727 $res = $dbw->newSelectQueryBuilder()
732'job_cmd' => $this->type,
733 $dbw->expr(
'job_token',
'!=',
'' ),
// was acquired 734 $dbw->expr(
'job_token_timestamp',
'<', $claimCutoff ),
// stale 735 $dbw->expr(
'job_attempts',
'<', $this->maxTries ),
// retries left 738 ->caller( __METHOD__ )->fetchResultSet();
740staticfunction ( $o ) {
742 }, iterator_to_array( $res )
744if ( count( $ids ) ) {
745// Reset job_token for these jobs so that other runners will pick them up. 746// Set the timestamp to the current time, as it is useful to now that the job 747// was already tried before (the timestamp becomes the "released" time). 748 $dbw->newUpdateQueryBuilder()
752'job_token_timestamp' => $dbw->timestamp( $now )
// time of release 756 $dbw->expr(
'job_token',
'!=',
'' ),
758 ->caller( __METHOD__ )->execute();
760 $affected = $dbw->affectedRows();
762 $this->
incrStats(
'recycles', $this->type, $affected );
766// Just destroy any stale jobs... 767 $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
768 $qb = $dbw->newSelectQueryBuilder()
773'job_cmd' => $this->type,
774 $dbw->expr(
'job_token',
'!=',
'' ),
// was acquired 775 $dbw->expr(
'job_token_timestamp',
'<', $pruneCutoff )
// stale 778if ( $this->claimTTL > 0 ) {
// only prune jobs attempted too many times... 779 $qb->andWhere( $dbw->expr(
'job_attempts',
'>=', $this->maxTries ) );
781// Get the IDs of jobs that are considered stale and should be removed. Selecting 782// the IDs first means that the UPDATE can be done by primary key (less deadlocks). 783 $res = $qb->caller( __METHOD__ )->fetchResultSet();
785staticfunction ( $o ) {
787 }, iterator_to_array( $res )
789if ( count( $ids ) ) {
790 $dbw->newDeleteQueryBuilder()
792 ->where( [
'job_id' => $ids ] )
793 ->caller( __METHOD__ )->execute();
794 $affected = $dbw->affectedRows();
796 $this->
incrStats(
'abandons', $this->type, $affected );
799 $dbw->unlock(
"jobqueue-recycle-{$this->type}", __METHOD__ );
814// Fields that describe the nature of the job 815'job_cmd' =>
$job->getType(),
817'job_title' =>
$job->getParams()[
'title'] ??
'',
819// Additional job metadata 821'job_sha1' => \Wikimedia\base_convert(
822 sha1( serialize(
$job->getDeduplicationInfo() ) ),
825'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
859if ( $this->server ) {
862 } elseif ( $this->conn instanceof
DBError ) {
868 $this->server[
'type'],
879 $lb = is_string( $this->cluster )
880 ? $lbFactory->getExternalLB( $this->cluster )
881 : $lbFactory->getMainLB( $this->domain );
883if ( $lb->getServerType( ServerInfo::WRITER_INDEX ) !==
'sqlite' ) {
884// Keep a separate connection to avoid contention and deadlocks; 885// However, SQLite has the opposite behavior due to DB-level locking. 886 $flags = $lb::CONN_TRX_AUTOCOMMIT;
888// Jobs insertion will be deferred until the PRESEND stage to reduce contention. 892return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
900privatefunction getCacheKey( $property ) {
901$cluster = is_string( $this->cluster ) ? $this->cluster :
'main';
903return $this->wanCache->makeGlobalKey(
917if ( $params !==
false ) {
918return serialize( $params );
929 $params = ( (string)$row->job_params !==
'' ) ? unserialize( $row->job_params ) : [];
930if ( !is_array( $params ) ) {
// this shouldn't happen 931thrownew UnexpectedValueException(
932"Could not unserialize job with ID '{$row->job_id}'." );
935 $params += [
'namespace' => $row->job_namespace,
'title' => $row->job_title ];
937$job->setMetadata(
'id', $row->job_id );
938$job->setMetadata(
'timestamp', $row->job_timestamp );
948returnnewJobQueueError( get_class( $e ) .
": " . $e->getMessage() );
967'job_token_timestamp',
974class_alias( JobQueueDB::class,
'JobQueueDB' );
wfDebug( $text, $dest='all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
wfRandomString( $length=32)
Get a random string containing a number of pseudo-random hex characters.
Convenience class for generating iterators from iterators.
Database-backed job queue storage.
array null $server
Server configuration array.
IMaintainableDatabase DBError null $conn
doGetSiblingQueueSizes(array $types)
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
doGetSiblingQueuesWithJobs(array $types)
claimRandom( $uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
optimalOrder()
Get the default queue order to use if configuration does not specify one.
string null $cluster
Name of an external DB cluster or null for the local DB cluster.
static selectFields()
Return the list of job fields that should be selected.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
supportedOrders()
Get the allowed queue orders for configuration validation.
__construct(array $params)
Additional parameters include:
static makeBlob( $params)
doBatchPush(array $jobs, $flags)
getDBException(DBError $e)
getJobIterator(array $conds)
insertFields(IJobSpecification $job, IReadableDatabase $db)
claimOldest( $uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
doDeduplicateRootJob(IJobSpecification $job)
Base class for queueing and running background jobs from a storage backend.
incrStats( $event, $type, $delta=1)
Call StatsFactory::incrementBy() for the queue overall and for the queue type.
factoryJob( $command, $params)
Service locator for MediaWiki core services.
static getInstance()
Returns the global default instance of the top level service locator.
Profiler base class that defines the interface and some shared functionality.
Database error base class.
Raw SQL value to be used in query builders.
Build SELECT queries with a fluent interface.
Container for accessing information about the database servers in a database cluster.
Interface for serializable objects that describe a job queue task.
Job that has a run() method and metadata accessors for JobQueue::pop() and JobQueue::ack().
Interface to a relational database.
endAtomic( $fname=__METHOD__)
Ends an atomic section of SQL statements.
startAtomic( $fname=__METHOD__, $cancelable=self::ATOMIC_NOT_CANCELABLE)
Begin an atomic section of SQL statements.
newInsertQueryBuilder()
Get an InsertQueryBuilder bound to this connection.
Advanced database interface for IDatabase handles that include maintenance methods.
A database connection without write operations.
newSelectQueryBuilder()
Create an empty SelectQueryBuilder which can be used to run queries against this connection.
timestamp( $ts=0)
Convert a timestamp in one of the formats accepted by ConvertibleTimestamp to the format used for ins...
if(count( $args)< 1) $job