聊聊artemis的maxDeliveryAttempts

maxDeliveryAttempts

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java

public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport {

   //......

   public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;

   private Integer maxDeliveryAttempts = null;

   private SimpleString deadLetterAddress = null;

   //......

   public int getMaxDeliveryAttempts() {
      return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
   }

   public AddressSettings setMaxDeliveryAttempts(final int maxDeliveryAttempts) {
      this.maxDeliveryAttempts = maxDeliveryAttempts;
      return this;
   }

   public SimpleString getDeadLetterAddress() {
      return deadLetterAddress;
   }

   public AddressSettings setDeadLetterAddress(final SimpleString deadLetterAddress) {
      this.deadLetterAddress = deadLetterAddress;
      return this;
   }

   //......
}   
  • AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10

checkRedelivery

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
                                  final long timeBase,
                                  final boolean ignoreRedeliveryDelay) throws Exception {
      Message message = reference.getMessage();

      if (internalQueue) {
         if (logger.isTraceEnabled()) {
            logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
         }
         // no DLQ check on internal queues
         return new Pair<>(true, false);
      }

      if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
         storageManager.updateDeliveryCount(reference);
      }

      AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());

      int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
      int deliveryCount = reference.getDeliveryCount();

      // First check DLA
      if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
         if (logger.isTraceEnabled()) {
            logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
         }
         boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());

         return new Pair<>(false, dlaResult);
      } else {
         // Second check Redelivery Delay
         long redeliveryDelay = addressSettings.getRedeliveryDelay();
         if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
            redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);

            if (logger.isTraceEnabled()) {
               logger.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
            }

            reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);

            if (!reference.isPaged() && reference.isDurable() && isDurable()) {
               storageManager.updateScheduledDeliveryTime(reference);
            }
         }

         decDelivering(reference);

         return new Pair<>(true, false);
      }
   }

   //......
}
  • QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress

sendToDeadLetterAddress

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {
   
   //......

   private boolean sendToDeadLetterAddress(final Transaction tx,
                                        final MessageReference ref,
                                        final SimpleString deadLetterAddress) throws Exception {
      if (deadLetterAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
            ref.acknowledge(tx, AckReason.KILLED, null);
         } else {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
            return true;
         }
      } else {
         ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);

         ref.acknowledge(tx, AckReason.KILLED, null);
      }

      return false;
   }

   private void move(final Transaction originalTX,
                     final SimpleString address,
                     final Binding binding,
                     final MessageReference ref,
                     final boolean rejectDuplicate,
                     final AckReason reason,
                     final ServerConsumer consumer) throws Exception {
      Transaction tx;

      if (originalTX != null) {
         tx = originalTX;
      } else {
         // if no TX we create a new one to commit at the end
         tx = new TransactionImpl(storageManager);
      }

      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);

      copyMessage.setAddress(address);

      postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);

      acknowledge(tx, ref, reason, consumer);

      if (originalTX == null) {
         tx.commit();
      }
   }

   //......
}
  • sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

incrementDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public HandleStatus handle(final MessageReference ref) throws Exception {
      // available credits can be set back to null with a flow control option.
      AtomicInteger checkInteger = availableCredits;
      if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + " is busy for the lack of credits. Current credits = " +
                            availableCredits +
                            " Can"t receive reference " +
                            ref);
         }

         return HandleStatus.BUSY;
      }

      synchronized (lock) {
         // If the consumer is stopped then we don"t accept the message, it
         // should go back into the
         // queue for delivery later.
         // TCP-flow control has to be done first than everything else otherwise we may lose notifications
         if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {
            return HandleStatus.BUSY;
         }

         // If there is a pendingLargeMessage we can"t take another message
         // This has to be checked inside the lock as the set to null is done inside the lock
         if (largeMessageDeliverer != null) {
            if (logger.isDebugEnabled()) {
               logger.debug(this + " is busy delivering large message " +
                               largeMessageDeliverer +
                               ", can"t deliver reference " +
                               ref);
            }
            return HandleStatus.BUSY;
         }
         final Message message = ref.getMessage();

         if (!message.acceptsConsumer(sequentialID())) {
            return HandleStatus.NO_MATCH;
         }

         if (filter != null && !filter.match(message)) {
            if (logger.isTraceEnabled()) {
               logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
            }
            return HandleStatus.NO_MATCH;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);
         }
         if (!browseOnly) {
            if (!preAcknowledge) {
               deliveringRefs.add(ref);
            }

            ref.handled();

            ref.setConsumerId(this.id);

            ref.incrementDeliveryCount();

            // If updateDeliveries = false (set by strict-update),
            // the updateDeliveryCountAfterCancel would still be updated after c
            if (strictUpdateDeliveryCount && !ref.isPaged()) {
               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                  !ref.getQueue().isInternalQueue() &&
                  !ref.isPaged()) {
                  storageManager.updateDeliveryCount(ref);
               }
            }

            if (preAcknowledge) {
               if (message.isLargeMessage()) {
                  // we must hold one reference, or the file will be deleted before it could be delivered
                  ((LargeServerMessage) message).incrementDelayDeletionCount();
               }

               // With pre-ack, we ack *before* sending to the client
               ref.getQueue().acknowledge(ref, this);
               acks++;
            }

            if (message.isLargeMessage() && this.supportLargeMessage) {
               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, ref);
            }

         }

         pendingDelivery.countUp();

         return HandleStatus.HANDLED;
      }
   }

   //......
}
  • ServerConsumerImpl的handle方法会在非browseOnly的情况下会调用ref.incrementDeliveryCount()来增加deliveryCount;必要的时候会执行storageManager.updateDeliveryCount(ref)

updateDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

   //......

   public void updateDeliveryCount(final MessageReference ref) throws Exception {
      // no need to store if it"s the same value
      // otherwise the journal will get OME in case of lots of redeliveries
      if (ref.getDeliveryCount() == ref.getPersistedCount()) {
         return;
      }

      ref.setPersistedCount(ref.getDeliveryCount());
      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

      readLock();
      try {
         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
      } finally {
         readUnLock();
      }
   }

   //......
}
  • AbstractJournalStorageManager的updateDeliveryCount方法会更新persistedCount到storage

小结

AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10;QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress;sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

doc