记录我是如何把rsyslog做成docker镜像,获取nginx的accesslog并且转发到python的
关键点1 nginx日志配置
nginx日志要设置成json格式输出,nginx.conf如下图所示,这个可以在docker镜像中通过volume把nginx.conf挂载进去,然后把/var/log/nginx/access.log挂载到本地
user root; worker_processes 1; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid; events { worker_connections 1024; } http { include /etc/nginx/mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; log_format main '{"time_local": "$time_local", ' '"path": "$request_uri", ' '"ip": "$remote_addr", ' '"time": "$time_iso8601", ' '"user_agent": "$http_user_agent", ' '"user_id_got": "$uid_got", ' '"user_id_set": "$uid_set", ' '"remote_user": "$remote_user", ' '"request": "$request", ' '"status": "$status", ' '"body_bytes_sent": "$body_bytes_sent", ' '"request_time": "$request_time", ' '"http_referrer": "$http_referer" }'; access_log /var/log/nginx/access.log main; sendfile on; #tcp_nopush on; keepalive_timeout 65; #gzip on; include /etc/nginx/conf.d/*.conf; }
关键点2 rsyslog配置与Dockerfile
编写一个51-nginx-forward.conf文件放置在/etc/rsyslog.d/下即可
module(load="imfile") input(type="imfile" File="/var/log/nginx/access.log" Tag="mywebsite:") # omfwd module for forwarding the logs to another tcp server if( $syslogtag == 'mywebsite:') then { action(type="omfwd" target="python服务器IP地址" port="6000" protocol="tcp" action.resumeRetryCount="100" queue.type="linkedList" queue.size="10000") }
我们可以用一个Dockerfile来运行rsyslog,docker run的时候注意日志的挂载
FROM ubuntu:16.04 RUN apt-get update && apt-get install -y rsyslog; \ rm -rf /var/lib/apt/lists/* ADD 51-nginx-forward.conf /etc/rsyslog.d/. # RUN cat /dev/null> /var/log/mail.log CMD service rsyslog start && tail -f /var/log/syslog
关键点3 python程序通过tcp的方式读取rsyslog
python程序与rsyslog建立tcp连接,可以实时的进行数据库的插入语句
import asyncio import json import time import database_init class LogAnalyser: def __init__(self): pass def process(self, str_input): # print(str_input) str_input = str_input.decode("utf-8", errors="ignore") # Add your processing steps here # ... try: # Extract created_at from the log string str_splits = str_input.split("{", 1) json_text = "{" + str_splits[1] data = json.loads(json_text) created_at = data["time"] request_all = data["request"].split(" /", 1) http_type = request_all[0] path = data["path"] request_time = data["request_time"] if PREFIX in data["path"]: path = data["path"] return http_type, path, created_at,request_time # The order is relevant for INSERT query params except Exception as e: print("error in read_rsylog.py,Class LogAnalyser,function process") print(e) return None @asyncio.coroutine def handle_echo(reader, writer): log_filter = LogAnalyser() while True: line = yield from reader.readline() if not line: break params = log_filter.process(line) if params: # 进行一堆操作,例如进行数据库的插入 # execute_sql(params=params) if __name__ == '__main__': CURSOR = database_init.DBConnect().CURSOR CONN = database_init.DBConnect().CONN PREFIX = database_init.DBConnect().CONFIG["TEST_SWAGGER"]["PREFIX"] database_init.DBConnect().create_table() loop = asyncio.get_event_loop() coro = asyncio.start_server(handle_echo, None, 6000, loop=loop) server = loop.run_until_complete(coro) # Serve requests until Ctrl+C is pressed print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server print("Closing the server.") server.close() loop.run_until_complete(server.wait_closed()) loop.close() CURSOR.close() CONN.close()