聊聊artemis的SlowConsumerReaperRunnable

SlowConsumerPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerPolicy.java

public enum SlowConsumerPolicy {
   KILL, NOTIFY;

   public static SlowConsumerPolicy getType(int type) {
      switch (type) {
         case 0:
            return KILL;
         case 1:
            return NOTIFY;
         default:
            return null;
      }
   }
}
  • SlowConsumerPolicy定义了KILL、NOTIFY两个枚举值

SlowConsumerReaperRunnable

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   private final class SlowConsumerReaperRunnable implements Runnable {

      private final SlowConsumerPolicy policy;
      private final float threshold;
      private final long checkPeriod;

      private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) {
         this.checkPeriod = checkPeriod;
         this.policy = policy;
         this.threshold = threshold;
      }

      @Override
      public void run() {
         float queueRate = getRate();
         long queueMessages = getMessageCount();

         if (logger.isDebugEnabled()) {
            logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
         }


         if (consumers.size() == 0) {
            logger.debug("There are no consumers, no need to check slow consumer"s rate");
            return;
         } else {
            float queueThreshold = threshold * consumers.size();

            if (queueRate < queueThreshold && queueMessages < queueThreshold) {
               if (logger.isDebugEnabled()) {
                  logger.debug("Insufficient messages received on queue "" + getName() + "" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
               }
               return;
            }
         }

         for (ConsumerHolder consumerHolder : consumers) {
            Consumer consumer = consumerHolder.consumer();
            if (consumer instanceof ServerConsumerImpl) {
               ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
               float consumerRate = serverConsumer.getRate();
               if (consumerRate < threshold) {
                  RemotingConnection connection = null;
                  ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
                  RemotingService remotingService = server.getRemotingService();

                  for (RemotingConnection potentialConnection : remotingService.getConnections()) {
                     if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {
                        connection = potentialConnection;
                     }
                  }

                  serverConsumer.fireSlowConsumer();

                  if (connection != null) {
                     ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                     if (policy.equals(SlowConsumerPolicy.KILL)) {
                        connection.killMessage(server.getNodeID());
                        remotingService.removeConnection(connection.getID());
                        connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
                     } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
                        TypedProperties props = new TypedProperties();

                        props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount());

                        props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);

                        props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress()));

                        if (connection.getID() != null) {
                           props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString()));
                        }

                        props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID());

                        props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID()));

                        Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props);

                        ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService();
                        try {
                           managementService.sendNotification(notification);
                        } catch (Exception e) {
                           ActiveMQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e);
                        }
                     }
                  }
               }
            }
         }
      }
   }
  • SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

fireSlowConsumer

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public void fireSlowConsumer() {
      if (slowConsumerListener != null) {
         slowConsumerListener.onSlowConsumer(this);
      }
   }

   //......
}
  • fireSlowConsumer执行的是slowConsumerListener.onSlowConsumer(this)方法

SlowConsumerDetection

activemq-artemis-2.11.0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java

   class SlowConsumerDetection implements SlowConsumerDetectionListener {

      @Override
      public void onSlowConsumer(ServerConsumer consumer) {
         if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {
            AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
            ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());
            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
            try {
               advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());
               protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);
            }
         }
      }
   }
  • SlowConsumerDetection实现了SlowConsumerDetectionListener接口,其onSlowConsumer方法执行的是protocolManager.fireAdvisory方法

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {

   //......

   public void killMessage(SimpleString nodeID) {
      if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {
         return;
      }
      Channel clientChannel = getChannel(1, -1);
      DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);

      clientChannel.send(response, -1);
   }

   public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
      synchronized (failLock) {
         if (destroyed) {
            return;
         }

         destroyed = true;
      }

      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
         ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
      }

      try {
         transportConnection.forceClose();
      } catch (Throwable e) {
         ActiveMQClientLogger.LOGGER.failedForceClose(e);
      }

      // Then call the listeners
      callFailureListeners(me, scaleDownTargetNodeID);

      callClosingListeners();

      internalClose();

      for (Channel channel : channels.values()) {
         channel.returnBlocking(me);
      }
   }

   //......
}   
  • killMessage方法构造DisconnectConsumerWithKillMessage并通过clientChannel.send方法;fail方法则执行transportConnection.forceClose()、callFailureListeners、callClosingListeners、internalClose以及channel.returnBlocking方法

sendNotification

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java

public class ManagementServiceImpl implements ManagementService {

   //......

   public void sendNotification(final Notification notification) throws Exception {
      if (logger.isTraceEnabled()) {
         logger.trace("Sending Notification = " + notification +
                         ", notificationEnabled=" + notificationsEnabled +
                         " messagingServerControl=" + messagingServerControl);
      }
      // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
      synchronized (this) {
         if (messagingServerControl != null && notificationsEnabled) {
            // We also need to synchronize on the post office notification lock
            // otherwise we can get notifications arriving in wrong order / missing
            // if a notification occurs at same time as sendQueueInfoToQueue is processed
            synchronized (postOffice.getNotificationLock()) {

               // First send to any local listeners
               for (NotificationListener listener : listeners) {
                  try {
                     listener.onNotification(notification);
                  } catch (Exception e) {
                     // Exception thrown from one listener should not stop execution of others
                     ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e);
                  }
               }

               // start sending notification *messages* only when server has initialised
               // Note at backup initialisation we don"t want to send notifications either
               // https://jira.jboss.org/jira/browse/HORNETQ-317
               if (messagingServer == null || !messagingServer.isActive()) {
                  if (logger.isDebugEnabled()) {
                     logger.debug("ignoring message " + notification + " as the server is not initialized");
                  }
                  return;
               }

               long messageID = storageManager.generateID();

               Message notificationMessage = new CoreMessage(messageID, 512);

               // Notification messages are always durable so the user can choose whether to add a durable queue to
               // consume them in
               notificationMessage.setDurable(true);
               notificationMessage.setAddress(managementNotificationAddress);

               if (notification.getProperties() != null) {
                  TypedProperties props = notification.getProperties();
                  props.forEach(notificationMessage::putObjectProperty);
               }

               notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));

               notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());

               if (notification.getUID() != null) {
                  notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));
               }

               postOffice.route(notificationMessage, false);
            }
         }
      }
   }

   //......
}
  • sendNotification方法会回调listeners的onNotification方法,之后通过postOffice.route(notificationMessage, false)发送notificationMessage

小结

SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

doc