logback 从 1.0.4 版本开始引入了 AsyncAppender 以支持异步写日志,异步处理日志对业务本身的性能有很大的提升作用,这种异步是将要写入的内容扔进阻塞队列里,由异步线程来处理日志处理。
配置
笔者示例中,以滚动策略收集日志(按天分割),并以 logstash 收集格式编码,然后将其引用套在 AsyncAppender 实例中,具体如下:
<!-- 访问日志 -->
<appender name="accessFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- logstash 编码,用于对接 elasticsearch -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/access.%d{yyyyMMdd}.log</fileNamePattern>
<!-- 保留最近 5 天数据 -->
<maxHistory>5</maxHistory>
</rollingPolicy>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<!-- 访问日志变异步 -->
<appender name="accessFileAsyncAppender" class="ch.qos.logback.classic.AsyncAppender">
<!-- 队列的深度,该值会影响性能,默认 256 -->
<queueSize>256</queueSize>
<!-- 设为 0 表示队列达到 80%,也不丢弃任务-->
<discardingThreshold>0</discardingThreshold>
<!-- 日志上下文关闭后,AsyncAppender 继续执行写任务的时间,单位毫秒 -->
<maxFlushTime>1000</maxFlushTime>
<!-- 队列满了是否直接丢弃要写的消息,false、丢弃,true、不丢弃 -->
<neverBlock>false</neverBlock>
<!-- 是否包含调用方的信息,false 则无法打印类名方法名行号等 -->
<includeCallerData>false</includeCallerData>
<!--One and only one appender may be attached to AsyncAppender,添加多个的话后面的会被忽略-->
<appender-ref ref="accessFileAppender"/>
</appender>
<logger name="access_logger" additivity="false">
<appender-ref ref="accessFileAsyncAppender"/>
</logger>
配置项
- queueSize:阻塞队列的大小,默认是 256;该值首次建议设置大一些,后续根据自己业务的特点去调优。
- discardingThreshold:阻塞队列空间剩余大小触发丢弃的阈值,该设置项是为了避免阻塞队列触发阻塞所设计,默认情况下,该值为 -1,这时当可用空间小于容量的 20% 时,触发丢弃操作,当然也可以理解成当blockingQueue 的容量高于阈值时(80%),丢弃日志;如果不希望丢弃日志(即每次都是全量保存),那可以设置为 0,但这时,如果常常能把容量打满,这对生产者会产生阻塞作用,即扔不进去日志并阻塞,这对性能是有损伤的。
- neverBlock:队列满了是否直接丢弃要写的消息,默认是 false,即会阻塞等待队列腾出空间,底层是调用了 BlockingQueue 的
offer
方法,若设置 true,直接丢弃数据,不阻塞,底层是调用了 BlockingQueue 的put
方法。 - maxFlushTime:日志上下文关闭后,AsyncAppender 继续执行写任务的时间,单位毫秒,底层是调用 Thread 的 join 方法,默认是 1000 毫秒,即 1 秒。
配置项的相关变量都在 logback-core jar 包的 ch.qos.logback.core.AsyncAppenderBase
类中,相关代码片段如下:
public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {
...
BlockingQueue<E> blockingQueue;
public static final int DEFAULT_QUEUE_SIZE = 256;
int queueSize = DEFAULT_QUEUE_SIZE;
int appenderCount = 0;
static final int UNDEFINED = -1;
int discardingThreshold = UNDEFINED;
boolean neverBlock = false;
Worker worker = new Worker();
public static final int DEFAULT_MAX_FLUSH_TIME = 1000;
int maxFlushTime = DEFAULT_MAX_FLUSH_TIME;
...
@Override
public void start() {
if (isStarted())
return;
if (appenderCount == 0) {
addError("No attached appenders found.");
return;
}
if (queueSize < 1) {
addError("Invalid queue size [" + queueSize + "]");
return;
}
blockingQueue = new ArrayBlockingQueue<E>(queueSize);
if (discardingThreshold == UNDEFINED)
discardingThreshold = queueSize / 5;
addInfo("Setting discardingThreshold to " + discardingThreshold);
worker.setDaemon(true);
worker.setName("AsyncAppender-Worker-" + getName());
// make sure this instance is marked as "started" before staring the worker Thread
super.start();
worker.start();
}
@Override
public void stop() {
if (!isStarted())
return;
// mark this appender as stopped so that Worker can also processPriorToRemoval if it is invoking
// aii.appendLoopOnAppenders
// and sub-appenders consume the interruption
super.stop();
// interrupt the worker thread so that it can terminate. Note that the interruption can be consumed
// by sub-appenders
worker.interrupt();
InterruptUtil interruptUtil = new InterruptUtil(context);
try {
interruptUtil.maskInterruptFlag();
worker.join(maxFlushTime);
// check to see if the thread ended and if not add a warning message
if (worker.isAlive()) {
addWarn("Max queue flush timeout (" + maxFlushTime + " ms) exceeded. Approximately " + blockingQueue.size()
+ " queued events were possibly discarded.");
} else {
addInfo("Queue flush finished successfully within timeout.");
}
} catch (InterruptedException e) {
int remaining = blockingQueue.size();
addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e);
} finally {
interruptUtil.unmaskInterruptFlag();
}
}
@Override
protected void append(E eventObject) {
if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
return;
}
preprocess(eventObject);
put(eventObject);
}
private boolean isQueueBelowDiscardingThreshold() {
return (blockingQueue.remainingCapacity() < discardingThreshold);
}
private void put(E eventObject) {
if (neverBlock) {
blockingQueue.offer(eventObject);
} else {
putUninterruptibly(eventObject);
}
}
private void putUninterruptibly(E eventObject) {
boolean interrupted = false;
try {
while (true) {
try {
blockingQueue.put(eventObject);
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
...
}