openresty+lua+kafka realize log collection

This article is transferred from: https://www.cnblogs.com/gxyandwmm/p/11298912.html

********************* deployment process**************************

1: Scene description

For online heavy traffic services or nginx services that need to report logs, a large number of logs will be generated every day. These logs are very valuable. It can be used for counting and reporting, user behavior analysis, interface quality, performance monitoring and other requirements. However, the traditional nginx log recording method will scatter the data on their respective nginx, and the large traffic log itself is also an impact on the disk.  
We need to collect and summarize these nginx logs. The collection process and results need to meet the following requirements:
Support different businesses to obtain data, such as monitoring business, data analysis and statistics business, recommendation business, etc.  
Real time data
High performance guarantee

2: Technical scheme

Thanks to the high performance of openresty and kafka, we can realize the current requirements very light and efficient. The architecture is as follows:
 
Scheme Description:
1: After the online request is sent to nginx, lua is used to complete the log sorting: such as unifying the log format, filtering invalid requests, grouping, etc.  
2: Divide different topic s according to the nginx logs of different businesses.  
3:lua implements asynchronous sending of products to kafka cluster.  
4: Business groups interested in different logs consume and obtain log data in real time.

3: Related technology
openresty: http://openresty.org 
kafka: http://kafka.apache.org 
lua-resty-kafka: https://github.com/doujiang24/lua-resty-kafka

4: Installation configuration
For simplicity and directness, we use stand-alone configuration and deployment, and the cluster situation is similar.  
1) Prepare openresty dependencies:
Java code collection code
apt-get install libreadline-dev libncurses5-dev libpcre3-dev libssl-dev perl make build-essential 
#Or
yum install readline-devel pcre-devel openssl-devel gcc

2) Install and compile openresty:
Java code collection code
#1: Install openresty:
cd /opt/nginx / # installation file directory
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz 
tar -xzf openresty-1.9.7.4.tar.gz /opt/nginx/

#Configuration:
#The specified directory is / opt/openresty, which is / usr/local by default.  
./configure –prefix=/opt/openresty \ 
–with-luajit \ 
–without-http_redis2_module \ 
–with-http_iconv_module 
make 
make install

3) Install Lua resty Kafka

Java code collection code
#Download Lua resty Kafka:
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip 
unzip lua-resty-kafka-master.zip -d /opt/nginx/

#Copy Lua resty Kafka to openresty
mkdir /opt/openresty/lualib/kafka 
cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/

4) : installing stand-alone kafka
Java code collection
cd /opt/nginx/ 
wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz 
tar xvf kafka_2.10-0.9.0.1.tgz

#Start the stand-alone zookeeper
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 & 
**#Binding broker ip, required
**#In config / server Under properties, modify host name 
host.name={your_server_ip} 
#Start kafka service
nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 & 
#Create test topic
sh bin/kafka-topics.sh –zookeeper localhost:2181 –create –topic test1 –partitions 1 –replication-factor 1

5: Configuration run

Development Editor / opt / openresty / nginx / conf / nginx Conf implements kafka to record nginx logs. The source code is as follows:
Java code collection code
worker_processes 12;

events { 
use epoll; 
worker_connections 65535; 
}

http { 
include mime.types; 
default_type application/octet-stream; 
sendfile on; 
keepalive_timeout 0; 
gzip on; 
gzip_min_length 1k; 
gzip_buffers 4 8k; 
gzip_http_version 1.1; 
gzip_types text/plain application/x-javascript text/css application/xml application/X-JSON; 
charset UTF-8; 
#Configure backend proxy service
upstream rc{ 
server 10.10.*.15:8080 weight=5 max_fails=3; 
server 10.10.*.16:8080 weight=5 max_fails=3; 
server 10.16.*.54:8080 weight=5 max_fails=3; 
server 10.16.*.55:8080 weight=5 max_fails=3; 
server 10.10.*.113:8080 weight=5 max_fails=3; 
server 10.10.*.137:8080 weight=6 max_fails=3; 
server 10.10.*.138:8080 weight=6 max_fails=3; 
server 10.10.*.33:8080 weight=4 max_fails=3; 
#Maximum number of long links
keepalive 32; 

#Configure lua dependent library address
lua_package_path "/opt/openresty/lualib/kafka/?.lua;;";

server {  
    listen       80;  
    server_name  localhost;  
    location /favicon.ico {  
        root   html;  
            index  index.html index.htm;  
    }  
    location / {  
        proxy_connect_timeout 8;  
        proxy_send_timeout 8;  
        proxy_read_timeout 8;  
        proxy_buffer_size 4k;  
        proxy_buffers 512 8k;  
        proxy_busy_buffers_size 8k;  
        proxy_temp_file_write_size 64k;  
        proxy_next_upstream http_500 http_502  http_503 http_504  error timeout invalid_header;  
        root   html;  
        index  index.html index.htm;  
        proxy_pass http://rc;  
        proxy_http_version 1.1;  
        proxy_set_header Connection "";  
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;  
        # Use log_by_lua contains lua code because log_ by_ The lua instruction runs at the end of the request and does not affect the proxy_pass mechanism  
        log_by_lua '  
            -- introduce lua All api  
            local cjson = require "cjson"  
            local producer = require "resty.kafka.producer"  
            -- definition kafka broker Address, ip Need and kafka of host.name Consistent configuration  
            local broker_list = {  
                { host = "10.10.78.52", port = 9092 },  
            }  
            -- definition json Facilitate the collation and collection of log data  
            local log_json = {}  
            log_json["uri"]=ngx.var.uri  
            log_json["args"]=ngx.var.args  
            log_json["host"]=ngx.var.host  
            log_json["request_body"]=ngx.var.request_body  
            log_json["remote_addr"] = ngx.var.remote_addr  
            log_json["remote_user"] = ngx.var.remote_user  
            log_json["time_local"] = ngx.var.time_local  
            log_json["status"] = ngx.var.status  
            log_json["body_bytes_sent"] = ngx.var.body_bytes_sent  
            log_json["http_referer"] = ngx.var.http_referer  
            log_json["http_user_agent"] = ngx.var.http_user_agent  
            log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for  
            log_json["upstream_response_time"] = ngx.var.upstream_response_time  
            log_json["request_time"] = ngx.var.request_time  
            -- transformation json As string  
            local message = cjson.encode(log_json);  
            -- definition kafka Asynchronous producer  
            local bp = producer:new(broker_list, { producer_type = "async" })  
            -- Send log messages,send Second parameter key,be used for kafka Routing control:  
            -- key by nill(empty)When, for a period of time to the same partition Write data  
            -- appoint key,according to key of hash Write to the corresponding partition  
            local ok, err = bp:send("test1", nil, message)  

            if not ok then  
                ngx.log(ngx.ERR, "kafka send err:", err)  
                return  
            end  
        ';  
    }  
    error_page   500 502 503 504  /50x.html;  
    location = /50x.html {  
        root   html;  
    }  
}  

 

}

6: Detection & operation

Java code collection code
Check the configuration. Only check whether the nginx configuration is correct. The lua error log is in the error of nginx Log file
./nginx -t /opt/openresty/nginx/conf/nginx.conf 
#Start
./nginx -c /opt/openresty/nginx/conf/nginx.conf 
#Restart
./nginx -s reload

7: Testing

1: Send any http request to the current nginx, such as:
quote

http://10.10.78.52/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

2: Check whether the upstream agent is working properly
3: Check whether the topic corresponding to the kafka log generates a message log, as follows:
quote

#Consume topic data command from scratch
sh kafka-console-consumer.sh –zookeeper 10.10.78.52:2181 –topic test1 –from-beginning

Effect monitoring:

4:ab pressure test
quote

#Single nginx+upstream test:
ab -n 10000 -c 100 -k http://10.10.34.15/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

#Results
Server Software: nginx 
Server Hostname: 10.10.34.15 
Server Port: 80 
Document Path: /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com 
Document Length: 13810 bytes 
Concurrency Level: 100 
Time taken for tests: 2.148996 seconds 
Complete requests: 10000 
Failed requests: 9982 
(Connect: 0, Length: 9982, Exceptions: 0) 
Write errors: 0 
Keep-Alive requests: 0 
Total transferred: 227090611 bytes 
HTML transferred: 225500642 bytes 
Requests per second: 4653.34 [#/sec] (mean) 
Time per request: 21.490 [ms] (mean) 
Time per request: 0.215 [ms] (mean, across all concurrent requests) 
Transfer rate: 103196.10 [Kbytes/sec] received 
Connection Times (ms) 
min mean[+/-sd] median max 
Connect: 0 0 0.1 0 2 
Processing: 5 20 23.6 16 701 
Waiting: 4 17 20.8 13 686 
Total: 5 20 23.6 16 701 
Percentage of the requests served within a certain time (ms) 
50% 16 
66% 20 
75% 22 
80% 25 
90% 33 
95% 41 
98% 48 
99% 69 
100% 701 (longest request)

quote

#Single nginx+upstream+log_lua_kafka access test:
ab -n 10000 -c 100 -k http://10.10.78.52/m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

#Results
Server Software: openresty/1.9.7.4 
Server Hostname: 10.10.78.52 
Server Port: 80 
Document Path: /m/personal/AC8E3BC7-6130-447B-A9D6-DF11CB74C3EF/rc/v1?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com 
Document Length: 34396 bytes 
Concurrency Level: 100 
Time taken for tests: 2.234785 seconds 
Complete requests: 10000 
Failed requests: 9981 
(Connect: 0, Length: 9981, Exceptions: 0) 
Write errors: 0 
Keep-Alive requests: 0 
Total transferred: 229781343 bytes 
HTML transferred: 228071374 bytes 
Requests per second: 4474.70 [#/sec] (mean) 
Time per request: 22.348 [ms] (mean) 
Time per request: 0.223 [ms] (mean, across all concurrent requests) 
Transfer rate: 100410.10 [Kbytes/sec] received 
Connection Times (ms) 
min mean[+/-sd] median max 
Connect: 0 0 0.2 0 3 
Processing: 6 20 27.6 17 1504 
Waiting: 5 15 12.0 14 237 
Total: 6 20 27.6 17 1504 
Percentage of the requests served within a certain time (ms) 
50% 17 
66% 19 
75% 21 
80% 23 
90% 28 
95% 34 
98% 46 
99% 67 
100% 1004 (longest request)

 

*********************The most important module**************************

The nginx configuration file is configured as follows:

#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    upstream myServer {
    server 192.168.0.109:8080 weight=1;
    }

    lua_package_path "/opt/openresty/lualib/kafka/?.lua;;";
    lua_need_request_body on;

    server {
        listen       80;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location /test1 {
       # Request to go to custom server list
            proxy_pass http://myServer;
        }

    location /test2 {

        # Use log_by_lua contains lua code because log_ by_ The lua instruction runs at the end of the request and does not affect the proxy_pass mechanism  
        log_by_lua '  
            -- introduce lua All api
        local topic = "test"
            local cjson = require "cjson"  
            local producer = require "resty.kafka.producer"  
            -- definition kafka broker Address, ip Need and kafka of host.name Consistent configuration  
            local broker_list = {  
                { host = "192.168.0.109", port = 9092 },
        { host = "192.168.0.110", port = 9092 },
        { host = "192.168.0.101", port = 9092 }
            }  
            -- definition json Facilitate the collation and collection of log data  
            local log_json = {}  
            log_json["uri"]=ngx.var.uri  
            log_json["args"]=ngx.req.get_uri_args()  
            log_json["host"]=ngx.var.host  
            log_json["request_body"]=ngx.var.request_body  
            log_json["remote_addr"] = ngx.var.remote_addr  
            log_json["remote_user"] = ngx.var.remote_user  
            log_json["time_local"] = ngx.var.time_local  
            log_json["status"] = ngx.var.status  
            log_json["body_bytes_sent"] = ngx.var.body_bytes_sent  
            log_json["http_referer"] = ngx.var.http_referer  
            log_json["http_user_agent"] = ngx.var.http_user_agent  
            log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for  
            log_json["upstream_response_time"] = ngx.var.upstream_response_time  
            log_json["request_time"] = ngx.var.request_time  
            -- transformation json As string  
            local message = cjson.encode(ngx.req.get_uri_args());  
            -- definition kafka Asynchronous producer  
            local bp = producer:new(broker_list, { producer_type = "async" })  
            -- Send log messages,send Second parameter key,be used for kafka Routing control:  
            -- key by nill(empty)When, for a period of time to the same partition Write data  
            -- appoint key,according to key of hash Write to the corresponding partition  
            local ok, err = bp:send(topic, nil, message)  

            if not ok then  
                ngx.log(ngx.ERR, "kafka send err:", err)  
                return  
            end  
        ';  
        }  


        #error_page  404              /404.html;

        # redirect server error pages to the static page /50x.html
        #
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

        # proxy the PHP scripts to Apache listening on 127.0.0.1:80
        #
        #location ~ \.php$ {
        #    proxy_pass   http://127.0.0.1;
        #}

        # pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
        #
        #location ~ \.php$ {
        #    root           html;
        #    fastcgi_pass   127.0.0.1:9000;
        #    fastcgi_index  index.php;
        #    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
        #    include        fastcgi_params;
        #}

        # deny access to .htaccess files, if Apache's document root
        # concurs with nginx's one
        #
        #location ~ /\.ht {
        #    deny  all;
        #}
    }


    # another virtual host using mix of IP-, name-, and port-based configuration
    #
    #server {
    #    listen       8000;
    #    listen       somename:8080;
    #    server_name  somename  alias  another.alias;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}


    # HTTPS server
    #
    #server {
    #    listen       443 ssl;
    #    server_name  localhost;

    #    ssl_certificate      cert.pem;
    #    ssl_certificate_key  cert.key;

    #    ssl_session_cache    shared:SSL:1m;
    #    ssl_session_timeout  5m;

    #    ssl_ciphers  HIGH:!aNULL:!MD5;
    #    ssl_prefer_server_ciphers  on;

    #    location / {
    #        root   html;
    #        index  index.html index.htm;
    #    }
    #}

}

 

*********************Pit encountered ***********************************

Problem overview:

Using the lua script of openresty nginx on server1 server to write data to kafka in server5, it is found that the host cannot be resolved (no resolver defined to resolve "xxxxx"), xxxxx is the domain name of a machine. Later, after a day of exploration, the problem was found.

Cause of problem:

Finally, it was found that openResty would not resolve the host mapping, because the Kafka client will request the broker after connecting with IP, and then go to zookeeper to get the broker cluster information (the address record is kafka236:1111). At this time, lua consumers get the IP of kafka236,

However, if the host cannot be parsed, an error will be reported that the host cannot be parsed.

Solution
If there is a router DNS resolution service, directly configure a domain name resolution in DNS, and then point to the DNS server in nginx configuration (if not, you need to build your own DNS service)

    nginx.conf configuration:

 

DNS configuration:

 

Remarks:
1. If the kafka server is configured as IP or domain name, the local kafka client on the kafka server cannot be connected with localhost (unless the server also uses localhost)

2. If the kafka server Listen is configured as IP, the IP address is recorded in zookeeper

If the kafka server Listen is configured as a domain name, the domain name is recorded in the zookeeper

If the kafka server has advertised If listeners is configured as a domain name, zookeeper will record it as a domain name, no matter what listener is configured as

 

Later found
In the lower version of openresty-1.7.10.2, configure the domain name or IP in kafka, which can be accessed

In the higher version of openresty-1.13.6.2, the domain name configured in kafka cannot be accessed. It can only be IP, and it can't be configured with resolver.

Tags: Nginx

Posted by jalapena on Fri, 20 May 2022 17:35:24 +0300