Technique_ Log collection under microservice

In order to view the application logs distributed on multiple machines, it is often necessary to aggregate the logs The following is a summary of personal implementation

Market practice

  1. elk (es, loglash, konlia): loglash does log aggregation (in the form of appender), ES does storage, and konlia does visualization

Simple implementation

Idea: put the message on mq, consume in an orderly manner on the consumer side, and record the log

  • Detailed description:
  1. The web portal can generate the unique TraceId of the whole call chain through the servlet filter (which can be composed of request url and business end ID)
  2. Through dubbo's filter, the traceId is passed into ThreadLocal. At the beginning of the call When the consumer calls the server, the traceId As an additional attribute parameter, the server filter will put it into ThreadLocal when receiving the parameter, so as to mark the same request
  3. Customize the Appender of logback and get the TraceId in ThreadLocal. If it is the same, it will be put into the same queue of RocketMq to achieve the purpose of sequential consumption Then ensure that the log is orderly
  4. When consumption logs are printed, they are printed according to the queue cycle. Because the logs in each queue correspond to the logs of the whole call chain, the timing of printing according to the queue is normal Better for viewing logs
  • Note:
  1. For message producers, although logs are sent in sequence, it is not guaranteed that those sent first will reach the queue first Considering the performance, we can't send messages synchronously in the form of rocketmq sendOneWay is efficient, but it does not guarantee that the arrival queue is orderly
  2. You can't wrap your own implemented appender with the AsyncAppender of logback, because the global TraceId is saved in ThreadLocal. The AsyncAppender print log will be printed by a new thread, and the TraceId in the previous ThreadLocal cannot be obtained
  • Sample code
Main class release github Yes

https://github.com/normalHeFei/normal_try/tree/master/java/src/main/java/wk/clulog

logback.xml 

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="mqAppender1" class="wk.clulog.RocketMqAppender">
        <param name="Tag" value="logTag" />
        <param name="Topic" value="logTopic" />
        <param name="ProducerGroup" value="logGroup" />
        <param name="NameServerAddress" value="192.168.103.3:9876"/>
        <layout class="ch.qos.logback.classic.PatternLayout">
            <pattern>%date %p %t - %m%n</pattern>
        </layout>
    </appender>

    <appender name="mqAsyncAppender1" class="ch.qos.logback.classic.AsyncAppender">
        <queueSize>1024</queueSize>
        <discardingThreshold>80</discardingThreshold>
        <maxFlushTime>2000</maxFlushTime>
        <neverBlock>true</neverBlock>
        <appender-ref ref="mqAppender1"/>
    </appender>

    <root level="INFO">
        <appender-ref ref="mqAppender1"/>
    </root>

</configuration>

Daily reading of rocketmq related implementation code

  • Implementation of several sending methods of producer
  1. sendOneWay / sendAsync:

Select the broker according to the load balancing strategy to obtain the direct transmission of the channel. Although it is sendOneWay, rocketMq actually uses semaphores to protect the maximum number of concurrent transmissions. The relevant codes are as follows

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        request.markOnewayRPC();
        boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            //The release of semaphore is packaged with cas to avoid repeated operation of multiple releases in multi-threaded environment
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
            try {
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        once.release();
                        if (!f.isSuccess()) {
                            log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                        }
                    }
                });
            } catch (Exception e) {
                once.release();
                log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
            } else {
                String info = String.format(
                    "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                    timeoutMillis,
                    this.semaphoreOneway.getQueueLength(),
                    this.semaphoreOneway.availablePermits()
                );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }
  1. sendMessageSync

Realize synchronous return through countDownLatch The code is as follows:

 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) throws Exception {
            //Package the results into Future
            if (f.isSuccess()) {
                responseFuture.setSendRequestOK(true);
                return;
            } else {
                responseFuture.setSendRequestOK(false);
            }

            responseTable.remove(opaque);
            responseFuture.setCause(f.cause());
            responseFuture.putResponse(null);
            log.warn("send a request command to channel <" + addr + "> failed.");
        }
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

//The fence waited
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }
//When the callback returns the result, the fence is released
public void putResponse(final RemotingCommand responseCommand) {
    this.responseCommand = responseCommand;
    this.countDownLatch.countDown();
}
  • How do consumer s consume in an orderly manner

Look directly at the code

  try {
        //processQueue is the processing snapshot of queue messages, which records the offset of processing messages and other information. The sequential consumption of messages in a single queue is realized by locking the processing queue 
        this.processQueue.getLockConsume().lock();
        if (this.processQueue.isDropped()) {
            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                this.messageQueue);
            break;
        }

        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageOrderlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    } finally {
        this.processQueue.getLockConsume().unlock();
    }

Tags: Microservices

Posted by charmedp3 on Tue, 03 May 2022 09:00:43 +0300