从log4j2到Disruptor详解

  • log4j2实现原理可查看://www.jb51.net/article/232602.htm
  • 文章同样基于log4j-2.7版本,disruptor-3.3.6

相信看过log4j2的源码后大家应该明白为什么第二代日志性能会提升那么多,这其中最大的功臣莫过于Disruptor并发编程框架。

下面我们就跟着log4j2来走进Disruptor这个神奇的框(wang)架(zhan)

log4j2异步日志简要回顾

从日志工厂(Log4jLoggerFactory)中获取日志Logger实例

从日志上下文工厂(Log4jContextFactory)获取日志上下文

启用日志上下文(AsyncLoggerContext)

启动Disruptor(AsyncLoggerDisruptor)

序列号屏障(ProcessingSequenceBarrier)等待序列号发布

等待策略(WaitStrategy)等待序列号

返回Logger等待序列号(即等待日志写入)

异步日志(AsyncLogger)写入

日志内容与转化者(RingBufferLogEventTranslator)绑定

Disruptor尝试发布转化者tryPublish

RingBuffer尝试发布事件tryPublishEvent

获取下一个可用序号

转化并发布序号,日志与序号对应的事件绑定,并发布序号

RingBuffer发布序号,MultiProducerSequencer发布

等待策略(waitStrategy)唤醒阻塞

Disruptor在log4j2中的应用

AsyncLoggerDisruptor

异步日志Disruptor启动

创建事件工厂EventFactory

计算ringBufferSize:AsyncLogger.RingBufferSize属性

创建等待策略:AsyncLogger.WaitStrategy属性

创建守护线程执行器executor

创建异步队列满时处理策略AsyncQueueFullPolicy(非Disruptor步骤)

创建Disruptor

  • 创建RingBuffer与Disruptor绑定
  • RingBuffer根据生产者类型创建对应的实例,例如多生产者:MultiProducerSequencer
  • 创建多生产者序号(bufferSize,waitStrategy)

绑定异常句柄(Disruptor.handleExceptionsWith)

绑定事件处理句柄(Disruptor.handleEventsWith)

  • 根据handle列表创建事件处理器createEventProcessors
  • RingBuffer为Sequence(MultiProducerSequencer)序列创建序列屏障ProcessingSequenceBarrier
  • 创建事件批处理器BatchEventProcessor
  • 为事件批处理器绑定异常处理句柄
  • 消费者仓库(consumerRepository)添加消费者,创建事件处理信息EventProcessorInfo添加至消费者信息列表consumerInfos
  • RingBuffer添加处理序列号列表processorSequences为序列号闸
  • 如果存在序列号屏障,从闸门中移除屏障序列号并标识endOfChain为false

启动Disruptor

  • 遍历消费者仓库放入执行器中执行消费者EventProcessorInfo
  • 启动事件批处理器BatchEventProcessor
  • 事件批处理器序列号自增1
  • 死循环
  • 序列号屏障ProcessingSequenceBarrier等待下个有效序列号,默认为超时等待策略,超时会继续下轮循环
  • 事件批处理器序列号如果小于等于有效序列号
  • 从RingBuffer中按照序列号获取event事件
  • 通知回调事件句柄eventHandler.onEvent如果当前消费下标等于有效序列号availableSequence说明是当前批次的最后一个消息,endOfBatch为true:eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
  • 事件批处理器序列号设置为有效序列号

异步日志Disruptor写入

尝试发布tryPublish事件转化器EventTranslator:RingBufferLogEventTranslator

Disruptor获取RingBuffer尝试发布事件tryPublishEvent

序列号获取下个有效序号,步进为1,例如:MultiProducerSequencer.tryNext

游标按照步进移动

判断是否有足够的空间,没有则抛出InsufficientCapacityException异常

返回有效序列号

转化器转化消息为对应有效序列号的事件放入entries

发布序列号

  • 设置有效序列号至缓存availableBuffer
  • 等待策略唤醒阻塞waitStrategy.signalAllWhenBlocking

架构及流程

红色数字标识流程为获取logger时Disruptor创建消费者流程

黑色数字标识流程为logger写入日志时Disruptor创建事件并通知消费者流程

RingBuffer对于所有消费者、生产者是同一个实例

  • 环形队列,dataProvide,数据的存储与提供者

Sequencer:生产者

  • 对于所有消费者、生产者(可能是多生产者序列类型对于Multi类型)是同一个实例,包含一个游标序列号Sequence

SequenceBarrier:序列号屏障

  • 对于所有消费者、生产者也是同一个实例,序列号屏障包含一个等待策略、一个RingBuffer引用、一个游标序列号、一个依赖序列号(可能是组序列号类型)

BatchEventProcessor:消费者

  • 消费者包含一个RingBuffer引用
  • 一个序列号屏障,可以包含多个屏障序列号,默认为0个则使用RingBuffer的MultiProducerSequencer的游标序列号Sequence
  • 一个EventHandler:RingBufferLogEventHandler
  • 遍历EventHandler列表将其封装为BatchEventProcessor,将其与原始eventHandler、barrier屏障注册至消费者资源库consumerRepository。
  • 获取batchEventProcessor序列号默认为-1,将其缓存至processorSequences标识正在处理,并将processorSequences、disruptor、consumerRepository绑定至EventHandlerGroup。Disruptor启动遍历消费者资源库启动消费者:BatchEventProcessor

消费者入口

  • 消费者消费前先自增本地序列号(即-1+1=0序号),向序列号屏障申请该序列号的消费,默认为Timeout策略申请。
  • 屏障收到申请waitFor序列号,当前屏障游标序列号小于申请的消费序列号,等待生产者生产至当前序列号,如果超时则抛出异常(本地序列号不更新继续重试);如果没有超时,将屏障的dependentSequence序列号(如果不是非多序列号屏障类型,log4j2使用的是非多序列号屏障,则是屏障的本地游标)赋值为availableSequence返回。
  • 如果availableSequence有效的序列号(即屏障的游标序列号)小于申请要消费的序列号直接返回availableSequence(即消费超出的生产的速度,消费者申请的序列号向后回移至有效序列号)。否则getHighestPublishedSequence判断申请的序列号至availableSequence序列号之间的每个序列号对应的消息事件均是有效的则返回有效序列号(即生产者生产很快,消费者申请消费的序列号很小,向前移动至有效的,可能是本身也可能会跳跃多个下标),根据生产者的availableBuffer判断是否有效,因为生产者先发布序列号再写入数据,此处避免了读取数据异常,如果数据没有写入,有效序列号缓存标识没有写入(即无效),消费者会进行刚刚所说的“重试”,如果之间存在无效序列号则返回申请序列号-1(即回滚一个值,进入逻辑时增加了一个值,也就是回滚至申请前的点,可以理解为与超时相同,即重试)
  • 如果申请的序列号小于等于有效的序列号,则消费序列号对应的消息事件并更新本地BatchEventProcessor的序列号,按照下标去dataProvide(RingBuffer.entries)中提取对应位置的数据消费
  • 如果申请的序列号大于有效的序列号,则将消费者本地序列号设置为有效序列号(即消费超出的生产的速度,消费的序列号向后回移)
  • 如果期间出现任何未catch住的异常则会跳过当前下标,异常出现时的下标及对应的事件会交由exceptionHandler处理,默认为AsyncLoggerDefaultExceptionHandler异步处理,会将异常事件输出至系统的标准错误管道,虽然是异步也是会占用消费者线程池资源

Disruptor:生产者入口

  • 获取RingBuffer尝试发布消息,生产者(例如:MultiProducerSequencer)
  • 生产者游标序列号尝试自增,判断当前是否有足够的空间,当前游标+步进-bufferSize是否大于最小的闸门序列号(gatingSequences,即:所有消费者的本地游标序列号processorSequences列表),最小序列号会缓存至本地gatingSequenceCache用于下次判断减少进行所有闸门序列号的遍历次数,如果是说明已经没有空间(因为生产者生产申请的序列号已经追上了消费者消费序列号的最小值。RingBuffer是一个环形队列结构。上面已经讲到消费者序列号会与生产者序列号同步,同步指消费者申请序列号小于有效序列号时会前进至有效序列号,即使有延迟也保证了有大于等于buffer值的缓冲空间供生产者生产),如果没有空间返回false进入下一轮生产

在这里插入图片描述

  • 自增成功后,将消息转化为对应序列号下标位置的事件数据
  • Sequencer发布序列号,将当前序列号设置为有效(availableBuffer),并根据等待策略唤醒等待的消费者,被唤醒的消费者根据发布的序列号获取相应下标处事件数据进行处理

disruptor_new.jpg

Disruptor为什么这么快?

Disruptor采用无锁并发编程,框架中主要使用CAS与volatile关键字保证并发安全

使用环形数据结构(另一个典型的应用是时钟算法也是使用的环形数据结构),为环形结构添加序列号屏障来控制对环形队列读写操作,保证存储数据的并发安全

另一个点便是神奇的缓冲行填充了

Log4j2为什么这么快?

使用Disruptor并发编程框架

使用NIO写入日志数据

当然log4j2中有很多细节,如果我们想要获取线程栈信息,可以同样学习一下这样的写法

// LOG4J2-1029 new Throwable().getStackTrace is faster than Thread.currentThread().getStackTrace().
final StackTraceElement[] stackTrace = new Throwable().getStackTrace();
StackTraceElement last = null;
for (int i = stackTrace.length - 1; i > 0; i--) {
    final String className = stackTrace[i].getClassName();
    if (fqcnOfLogger.equals(className)) {
        return last;
    }
    last = stackTrace[i];
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持云海天教程。