}
/**
* Stake a claim on actions.
*
* @param int $max_actions Maximum number of action to include in claim.
* @param \DateTime $before_date Jobs must be schedule before this date. Defaults to now.
* @param array $hooks Hooks to filter for.
* @param string $group Group to filter for.
*
* @return ActionScheduler_ActionClaim
*/
public function stake_claim( $max_actions = 10, \DateTime $before_date = null, $hooks = array(), $group = '' ) {
$claim_id = $this->generate_claim_id();
$this->claim_before_date = $before_date;
$this->claim_actions( $claim_id, $max_actions, $before_date, $hooks, $group );
$action_ids = $this->find_actions_by_claim_id( $claim_id );
$this->claim_before_date = null;
return new ActionScheduler_ActionClaim( $claim_id, $action_ids );
}
/**
* Generate a new action claim.
*
* @return int Claim ID.
*/
protected function generate_claim_id() {
/** @var \wpdb $wpdb */
global $wpdb;
$now = as_get_datetime_object();
$wpdb->insert( $wpdb->actionscheduler_claims, array( 'date_created_gmt' => $now->format( 'Y-m-d H:i:s' ) ) );
return $wpdb->insert_id;
}
/**
* Set a claim filter.
*
* @param string $filter_name Claim filter name.
* @param mixed $filter_values Values to filter.
* @return void
*/
public function set_claim_filter( $filter_name, $filter_values ) {
if ( isset( $this->claim_filters[ $filter_name ] ) ) {
$this->claim_filters[ $filter_name ] = $filter_values;
}
}
/**
* Get the claim filter value.
*
* @param string $filter_name Claim filter name.
* @return mixed
*/
public function get_claim_filter( $filter_name ) {
if ( isset( $this->claim_filters[ $filter_name ] ) ) {
return $this->claim_filters[ $filter_name ];
}
return '';
}
/**
* Mark actions claimed.
*
* @param string $claim_id Claim Id.
* @param int $limit Number of action to include in claim.
* @param \DateTime $before_date Should use UTC timezone.
* @param array $hooks Hooks to filter for.
* @param string $group Group to filter for.
*
* @return int The number of actions that were claimed.
* @throws \InvalidArgumentException Throws InvalidArgumentException if group doesn't exist.
* @throws \RuntimeException Throws RuntimeException if unable to claim action.
*/
protected function claim_actions( $claim_id, $limit, \DateTime $before_date = null, $hooks = array(), $group = '' ) {
/** @var \wpdb $wpdb */
global $wpdb;
$now = as_get_datetime_object();
$date = is_null( $before_date ) ? $now : clone $before_date;
// can't use $wpdb->update() because of the <= condition.
$update = "UPDATE {$wpdb->actionscheduler_actions} SET claim_id=%d, last_attempt_gmt=%s, last_attempt_local=%s";
$params = array(
$claim_id,
$now->format( 'Y-m-d H:i:s' ),
current_time( 'mysql' ),
);
// Set claim filters.
if ( ! empty( $hooks ) ) {
$this->set_claim_filter( 'hooks', $hooks );
} else {
$hooks = $this->get_claim_filter( 'hooks' );
}
if ( ! empty( $group ) ) {
$this->set_claim_filter( 'group', $group );
} else {
$group = $this->get_claim_filter( 'group' );
}
$where = 'WHERE claim_id = 0 AND scheduled_date_gmt <= %s AND status=%s';
$params[] = $date->format( 'Y-m-d H:i:s' );
$params[] = self::STATUS_PENDING;
if ( ! empty( $hooks ) ) {
$placeholders = array_fill( 0, count( $hooks ), '%s' );
$where .= ' AND hook IN (' . join( ', ', $placeholders ) . ')';
$params = array_merge( $params, array_values( $hooks ) );
}
$group_operator = 'IN';
if ( empty( $group ) ) {
$group = $this->get_claim_filter( 'exclude-groups' );
$group_operator = 'NOT IN';
}
if ( ! empty( $group ) ) {
$group_ids = $this->get_group_ids( $group, false );
// throw exception if no matching group(s) found, this matches ActionScheduler_wpPostStore's behaviour.
if ( empty( $group_ids ) ) {
throw new InvalidArgumentException(
sprintf(
/* translators: %s: group name(s) */
_n(
'The group "%s" does not exist.',
'The groups "%s" do not exist.',
is_array( $group ) ? count( $group ) : 1,
'action-scheduler'
),
$group
)
);
}
$id_list = implode( ',', array_map( 'intval', $group_ids ) );
$where .= " AND group_id {$group_operator} ( $id_list )";
}
/**
* Sets the order-by clause used in the action claim query.
*
* @since 3.4.0
*
* @param string $order_by_sql
*/
$order = apply_filters( 'action_scheduler_claim_actions_order_by', 'ORDER BY priority ASC, attempts ASC, scheduled_date_gmt ASC, action_id ASC' );
$params[] = $limit;
$sql = $wpdb->prepare( "{$update} {$where} {$order} LIMIT %d", $params ); // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders
$rows_affected = $wpdb->query( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared, WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching
if ( false === $rows_affected ) {
$error = empty( $wpdb->last_error )
? _x( 'unknown', 'database error', 'action-scheduler' )
: $wpdb->last_error;
throw new \RuntimeException(
sprintf(
/* translators: %s database error. */
__( 'Unable to claim actions. Database error: %s.', 'action-scheduler' ),
$error
)
);
}
return (int) $rows_affected;
}
/**
* Get the number of active claims.
*
* @return int
*/
public function get_claim_count() {
global $wpdb;
$sql = "SELECT COUNT(DISTINCT claim_id) FROM {$wpdb->actionscheduler_actions} WHERE claim_id != 0 AND status IN ( %s, %s)";
$sql = $wpdb->prepare( $sql, array( self::STATUS_PENDING, self::STATUS_RUNNING ) ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
return (int) $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
}
/**
* Return an action's claim ID, as stored in the claim_id column.
*
* @param string $action_id Action ID.
* @return mixed
*/
public function get_claim_id( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$sql = "SELECT claim_id FROM {$wpdb->actionscheduler_actions} WHERE action_id=%d";
$sql = $wpdb->prepare( $sql, $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
return (int) $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
}
/**
* Retrieve the action IDs of action in a claim.
*
* @param int $claim_id Claim ID.
* @return int[]
*/
public function find_actions_by_claim_id( $claim_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$action_ids = array();
$before_date = isset( $this->claim_before_date ) ? $this->claim_before_date : as_get_datetime_object();
$cut_off = $before_date->format( 'Y-m-d H:i:s' );
$sql = $wpdb->prepare(
"SELECT action_id, scheduled_date_gmt FROM {$wpdb->actionscheduler_actions} WHERE claim_id = %d ORDER BY priority ASC, attempts ASC, scheduled_date_gmt ASC, action_id ASC",
$claim_id
);
// Verify that the scheduled date for each action is within the expected bounds (in some unusual
// cases, we cannot depend on MySQL to honor all of the WHERE conditions we specify).
foreach ( $wpdb->get_results( $sql ) as $claimed_action ) { // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
if ( $claimed_action->scheduled_date_gmt <= $cut_off ) {
$action_ids[] = absint( $claimed_action->action_id );
}
}
return $action_ids;
}
/**
* Release actions from a claim and delete the claim.
*
* @param ActionScheduler_ActionClaim $claim Claim object.
* @throws \RuntimeException When unable to release actions from claim.
*/
public function release_claim( ActionScheduler_ActionClaim $claim ) {
/** @var \wpdb $wpdb */
global $wpdb;
/**
* Deadlock warning: This function modifies actions to release them from claims that have been processed. Earlier, we used to it in a atomic query, i.e. we would update all actions belonging to a particular claim_id with claim_id = 0.
* While this was functionally correct, it would cause deadlock, since this update query will hold a lock on the claim_id_.. index on the action table.
* This allowed the possibility of a race condition, where the claimer query is also running at the same time, then the claimer query will also try to acquire a lock on the claim_id_.. index, and in this case if claim release query has already progressed to the point of acquiring the lock, but have not updated yet, it would cause a deadlock.
*
* We resolve this by getting all the actions_id that we want to release claim from in a separate query, and then releasing the claim on each of them. This way, our lock is acquired on the action_id index instead of the claim_id index. Note that the lock on claim_id will still be acquired, but it will only when we actually make the update, rather than when we select the actions.
*/
$action_ids = $wpdb->get_col( $wpdb->prepare( "SELECT action_id FROM {$wpdb->actionscheduler_actions} WHERE claim_id = %d", $claim->get_id() ) );
$row_updates = 0;
if ( count( $action_ids ) > 0 ) {
$action_id_string = implode( ',', array_map( 'absint', $action_ids ) );
$row_updates = $wpdb->query( "UPDATE {$wpdb->actionscheduler_actions} SET claim_id = 0 WHERE action_id IN ({$action_id_string})" ); // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared
}
$wpdb->delete( $wpdb->actionscheduler_claims, array( 'claim_id' => $claim->get_id() ), array( '%d' ) );
if ( $row_updates < count( $action_ids ) ) {
throw new RuntimeException(
sprintf(
// translators: %d is an id.
__( 'Unable to release actions from claim id %d.', 'action-scheduler' ),
$claim->get_id()
)
);
}
}
/**
* Remove the claim from an action.
*
* @param int $action_id Action ID.
*
* @return void
*/
public function unclaim_action( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$wpdb->update(
$wpdb->actionscheduler_actions,
array( 'claim_id' => 0 ),
array( 'action_id' => $action_id ),
array( '%s' ),
array( '%d' )
);
}
/**
* Mark an action as failed.
*
* @param int $action_id Action ID.
* @throws \InvalidArgumentException Throw an exception if action was not updated.
*/
public function mark_failure( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$updated = $wpdb->update(
$wpdb->actionscheduler_actions,
array( 'status' => self::STATUS_FAILED ),
array( 'action_id' => $action_id ),
array( '%s' ),
array( '%d' )
);
if ( empty( $updated ) ) {
/* translators: %s is the action ID */
throw new \InvalidArgumentException( sprintf( __( 'Unidentified action %s: we were unable to mark this action as having failed. It may may have been deleted by another process.', 'action-scheduler' ), $action_id ) );
}
}
/**
* Add execution message to action log.
*
* @throws Exception If the action status cannot be updated to self::STATUS_RUNNING ('in-progress').
*
* @param int $action_id Action ID.
*
* @return void
*/
public function log_execution( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$sql = "UPDATE {$wpdb->actionscheduler_actions} SET attempts = attempts+1, status=%s, last_attempt_gmt = %s, last_attempt_local = %s WHERE action_id = %d";
$sql = $wpdb->prepare( $sql, self::STATUS_RUNNING, current_time( 'mysql', true ), current_time( 'mysql' ), $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
// phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
$status_updated = $wpdb->query( $sql );
if ( ! $status_updated ) {
throw new Exception(
sprintf(
/* translators: 1: action ID. 2: status slug. */
__( 'Unable to update the status of action %1$d to %2$s.', 'action-scheduler' ),
$action_id,
self::STATUS_RUNNING
)
);
}
}
/**
* Mark an action as complete.
*
* @param int $action_id Action ID.
*
* @return void
* @throws \InvalidArgumentException Throw an exception if action was not updated.
*/
public function mark_complete( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$updated = $wpdb->update(
$wpdb->actionscheduler_actions,
array(
'status' => self::STATUS_COMPLETE,
'last_attempt_gmt' => current_time( 'mysql', true ),
'last_attempt_local' => current_time( 'mysql' ),
),
array( 'action_id' => $action_id ),
array( '%s' ),
array( '%d' )
);
if ( empty( $updated ) ) {
/* translators: %s is the action ID */
throw new \InvalidArgumentException( sprintf( __( 'Unidentified action %s: we were unable to mark this action as having completed. It may may have been deleted by another process.', 'action-scheduler' ), $action_id ) );
}
/**
* Fires after a scheduled action has been completed.
*
* @since 3.4.2
*
* @param int $action_id Action ID.
*/
do_action( 'action_scheduler_completed_action', $action_id );
}
/**
* Get an action's status.
*
* @param int $action_id Action ID.
*
* @return string
* @throws \InvalidArgumentException Throw an exception if not status was found for action_id.
* @throws \RuntimeException Throw an exception if action status could not be retrieved.
*/
public function get_status( $action_id ) {
/** @var \wpdb $wpdb */
global $wpdb;
$sql = "SELECT status FROM {$wpdb->actionscheduler_actions} WHERE action_id=%d";
$sql = $wpdb->prepare( $sql, $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
$status = $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared
if ( null === $status ) {
throw new \InvalidArgumentException( __( 'Invalid action ID. No status found.', 'action-scheduler' ) );
} elseif ( empty( $status ) ) {
throw new \RuntimeException( __( 'Unknown status found for action.', 'action-scheduler' ) );
} else {
return $status;
}
}
}