Drupal: Using QueueWorkers to Sync Multiple Data Sources

Profiles on our site can get their data from two external data sources: a directory connected via LDAP to ActiveDirectory for handling contact information, and a course catalog connected to Banner for lists of courses taught by instructors. We want to keep this data fresh, but don’t want to load it on demand since we’re caching the page content for performance. But there are ~2,500 profiles throughout our sites, so hitting the APIs for the external services to fetch this data on a single cron run would cause them to crash.

Fortunately, Drupal 8 added a QueueWorker class that allows us to easily batch the updates and, using a custom hook, ensure that each entity in the batch gets updates from all the external sources, in case we ever add more to handle event listings or other data.

As a note, our Profile content type in Drupal contains two text fields that aren’t shown on the rendered page. The first, field_external_id, contains a 32 character hexadecimal string that is a unique, persistent identifier for the individual. Everyone at Middlebury has one of these which allows for quick records access across platforms. The second, field_profile_sync, contains a timestamp we can use to ensure we’re not hammering the remote services to fetch the same data over and over again.

/**
 * Implements hook_cron().
 */
function middlebury_profile_sync_cron() {
  $queueFactory = \Drupal::service('queue');
  $queue = $queueFactory->get('middlebury_profile_update');

  $now = new \DateTime();
  $ttl = new \DateInterval('P1D');
  $cutOff = $now->sub($ttl);

  $storage = \Drupal::service('entity_type.manager')->getStorage('node');
  $query = $storage->getQuery()
    ->condition('type', 'profile')
    ->exists('field_external_id.value')
    ->condition('field_profile_sync', $cutOff->getTimestamp(), '<');
  $nids = $query->execute();
  if (!empty($nids)) {
    foreach ($nids as $nid) {
      $item = new \stdClass();
      $item->nid = $nid;
      $queue->createItem($item);
    }
  }
}

Our main module’s hook_cron() sets up the QueueWorker, fetches all of the profiles which have an external id and have not been updated in the last 24 hours, and adds them to the queue. This just puts their node ids into a database table. The cron run will batch through these nodes during its execution for a pre-determined number of seconds and then leave whatever is left in the batch until its next run.

<?php

namespace Drupal\middlebury_profile_sync\Plugin\QueueWorker;

use Drupal\Core\Cache\CacheTagsInvalidatorInterface;
use Drupal\Core\Entity\EntityStorageInterface;
use Drupal\Core\Extension\ModuleHandler;
use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
use Drupal\Core\Queue\QueueWorkerBase;
use Symfony\Component\DependencyInjection\ContainerInterface;

/**
 * Provides base functionality for the Profile Queue Workers.
 *
 * @QueueWorker(
 *   id = "middlebury_profile_update",
 *   title = @Translation("Middlebury Profile Update"),
 *   cron = {"time" = 15}
 * )
 */
class ProfileUpdate extends QueueWorkerBase implements ContainerFactoryPluginInterface {

  /**
   * The node storage.
   *
   * @var \Drupal\Core\Entity\EntityStorageInterface
   */
  protected $nodeStorage;

  /**
   * Cache invalidator service.
   *
   * @var \Drupal\Core\Cache\CacheTagsInvalidatorInterface
   */
  protected $cacheInvalidator;

  /**
   * The module handler.
   *
   * @var \Drupal\Core\Extension\ModuleHandler
   */
  protected $moduleHandler;

  /**
   * Creates a new ProfileUpdate object.
   *
   * @param \Drupal\Core\Entity\EntityStorageInterface $node_storage
   *   The node storage.
   * @param \Drupal\Core\Cache\CacheTagsInvalidatorInterface $cache_invalidator
   *   The cache invalidator service for marking cache invalid so that pages
   *   are notified when profiles are updated.
   * @param \Drupal\Core\Extension\ModuleHandler $module_handler
   *   The module handler.
   */
  public function __construct(EntityStorageInterface $node_storage, CacheTagsInvalidatorInterface $cache_invalidator, ModuleHandler $module_handler) {
    $this->nodeStorage = $node_storage;
    $this->cacheInvalidator = $cache_invalidator;
    $this->moduleHandler = $module_handler;
  }

  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
    return new static(
      $container->get('entity_type.manager')->getStorage('node'),
      $container->get('cache_tags.invalidator'),
      $container->get('module_handler')
    );
  }

  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    $node = $this->nodeStorage->load($data->nid);

    $this->cacheInvalidator->invalidateTags(['n:' . $data->nid]);

    if ($node) {
      $changes = [];
      $this->moduleHandler->invokeAll('midd_update_profile_external', [
        $node,
        &$changes,
      ]);
      if (count($changes) > 0) {
        $node->field_profile_sync = strtotime('now');
        $node->isSavedByCron = TRUE;
        return $node->save();
      }
    }
  }

}

Line 18 is where we tell cron not to spend more than 15 seconds processing the queue. Further down, in processItem(), the Profile node is loaded based on the id from the database table. We then invalidate its cache to ensure that people will see the latest version of the profile data.

Beginning on line 81 we track whether any changes were returned from the external data sources. Since we need to save the node to make changes, and that save action will create a new revision row in the database, we only want to perform this action if actual changes have been made to course lists or contact information so we don’t bloat the database. The $changes[] array is therefore passed by reference through the hook so that any external source can add its information to the variable.

If changes have been made we update the timestamp the profile was last synced and note that we’re using cron to save it, then go ahead and perform the save action. the note on line 87 is important so we can differentiate this action from a normal save done by an editor in the Drupal UI.

/**
 * Implements hook_node_presave().
 */
function middlebury_profile_sync_node_presave($node) {
  if (empty($node->isSavedByCron) && $node->getType() == 'profile' && !empty($node->field_external_id->value)) {
    $changes = [];
    $node->field_profile_sync = strtotime('now');
    \Drupal::moduleHandler()->invokeAll('midd_update_profile_external', [
      $node,
      &$changes,
    ]);
  }
}

Since we know whether a save is begin triggered by cron, we can safely add an invocation of our custom hook within hook_node_presave(), as without that knowledge we’d end up in a loop.

/**
 * Implements hook_midd_update_profile_external().
 */
function middlebury_profile_sync_midd_update_profile_external($node, &$changes) {
  $storage = \Drupal::service('entity_type.manager')->getStorage('profile_source');
  $sources = $storage->loadMultiple($storage->getQuery()->execute());
  foreach ($sources as $source) {
    $changed = middlebury_profile_sync_sync_profile($source, $node);
    foreach ($changed as $change) {
      $changes[] = $change;
    }
  }
}

/**
 * Implements hook_midd_update_profile_external().
 */
function middlebury_courselist_midd_update_profile_external($node, &$changes) {
  if ($node->getType() == 'profile' && $node->field_show_courses->value) {
    $changed = \Drupal::service('middlebury_courselist.profile_sync')->syncNode($node);
    foreach ($changed as $change) {
      $changes[] = $change;
    }
  }
}

The implementations of our custom hook in their respective modules are fairly straightforward. They load the connection information of the external API(s) and then call their local function or service to sync the data. Changes are safely merged into the $changes[] array so we can know whether a save action needs to be performed on the node.

/**
 * Update a profile.
 *
 * @param \Drupal\middlebury_profile_sync\ProfileInterface $profile
 *   The Profile data.
 * @param \Drupal\node\Entity\Node $node
 *   The profile node.
 */
function middlebury_profile_sync_update_profile(ProfileInterface $profile, Node $node) {
  $changed_fields = [];

  // Email.
  if (empty($node->field_override_email->value) && $profile->hasEmail()) {
    if ($node->field_email->value != $profile->getEmail()) {
      $changed_fields[] = t('email');
    }
    $node->field_email = $profile->getEmail();
  }

  // ... snip ...

  // Job title.
  if (empty($node->field_override_job_title->value) && $profile->hasJobTitle()) {
    if ($node->field_job_title->value != $profile->getJobTitle()) {
      $changed_fields[] = t('job_title');
    }
    $node->field_job_title = $profile->getJobTitle();
  }

  return $changed_fields;
}

The actual implementations of fetching and comparing the field data are bespoke to our environment, but you can see here we have a function that goes through each field returned by the external source, compares it to the data currently stored in the node and notes a change, if necessary.