使用函数计算(FC)通过Syslog协议投递日志

简介: 本文主要介绍一种使用函数计算(FC)通过Syslog协议投递日志的方法。

本文主要介绍一种使用函数计算(FC)通过Syslog协议投递日志的方法。

Syslog是一个常见的日志通道,几乎所有的SIEM(例如IBM Qradar, HP Arcsight)都支持通过Syslog渠道接收日志。本文主要介绍如何使用函数计算通过Syslog协议投递日志服务(SLS)中的日志。

1. 背景信息

  • Syslog主要是基于RFC5424和RFC3164定义相关格式规范,RFC3164协议是2001年发布的,RFC5424协议是2009年发布的升级版本。因为新版兼容旧版,且新版本解决了很多问题,因此推荐使用RFC5424协议。更多信息,请参见RFC5424RFC3164

  • Syslog over TCP/TLS:Syslog只规定日志格式,理论上TCP和UDP都支持Syslog,可以较好的保证数据传输稳定性。RFC5425协议也定义了TLS的安全传输层,如果您的SIEM支持TCP通道或者TLS通道,则建议优先使用。更多信息,请参见RFC5425

  • 日志服务:日志服务SLS是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能。

  • 函数计算:阿里云函数计算是事件驱动的全托管计算服务。通过函数计算,您无需管理服务器等基础设施,只需编写代码并上传。函数计算会为您准备好计算资源,以弹性、可靠的方式运行您的代码,并提供日志查询、性能监控、报警等功能。

  • 函数计算SLS触发器:日志服务ETL Job对应于函数计算的一个触发器,当创建日志服务ETL Job后,日志服务会根据该ETL Job的配置启动定时器,定时器轮询Logstore中的Shard信息,当发现有新的数据写入时,即生成三元组信息作为函数Event,并触发函数执行。

2. 投递流程

推荐使用日志服务消费组构建程序来进行实时消费,消费程序可以托管在函数计算,然后日志服务触发器通过Syslog over TCP/TLS来发送日志给SIEM,流程如下。

image.png

3. 投递示例

本文以SLS的Logstore为数据源,通过阿里云函数计算,并使用Syslog协议投递到SIEM平台。

image.png

4. 投递过程

本文主要参考函数计算SLS触发器官方文档和日志服务通过Syslog投递日志到SIEM文档的步骤。

前提条件

步骤一:在函数计算中创建函数

进入函数计算控制台

在服务与函数中,创建服务

image.png

创建函数

image.png

函数配置

  • 选择使用内置运行时创建

  • 请求处理程序类型选择:处理事件请求

  • 运行环境选择: Python 3.9

  • 代码上传方式选择:使用实例代码,选择日志服务SLS触发函数。

image.png

环境变量配置

  • SYSLOG_HOST:Syslog服务端的Host(数据接收端)

  • SYSLOG_PORT:Syslog服务端的端口,默认为514(数据接收端)

  • SYSLOG_PROTOCOL:Syslog的协议,默认为tcp.

image.png

触发器配置

  • 选择触发器类型:日志服务 SLS

  • 日志项目、日志库、触发器间隔、重试次数、触发器日志根据实际需要填写,其中触发日志会记录触发器的触发日志。

image.png

  • 调用参数,保持默认。

  • 角色名称选择默认的AliyunLogETLRole

image.png

点击创建。

image.png

配置函数代码

打开函数代码标签,在index.py中复制如下代码:

image.png

待复制代码如下,可以根据实际情况修改对应逻辑。

"""
本代码样例主要实现以下功能:
* 从 event 中解析出 SLS 事件触发相关信息
* 根据以上获取的信息,初始化 SLS 客户端
* 从源 log store 获取实时日志数据
* 将日志数据通过syslog协议输出


This sample code is mainly doing the following things:
* Get SLS processing related information from event
* Initiate SLS client
* Pull logs from source log store
* Send logs with syslog

"""
# !/usr/bin/env python
# -*- coding: utf-8 -*-

import json
from aliyun.log import LogClient
import os
import logging
import six
from datetime import datetime

from aliyun.log import PullLogResponse
from aliyun.log.ext import syslogclient
from pysyslogclient import SyslogClientRFC5424 as SyslogClient

logger = logging.getLogger()


def get_syslog_config():
    config = {
        'host': os.environ.get('SYSLOG_HOST', ''),
        'port': int(os.environ.get('SYSLOG_PORT', '514')),
        'protocol': os.environ.get('SYSLOG_PROTOCOL', 'tcp'),
        "facility": syslogclient.FAC_USER,  # 可选,可以参考其他syslogclient.FAC_*的值。
        "severity": syslogclient.SEV_INFO,  # 可选,可以参考其他syslogclient.SEV_*的值。
        "hostname": "aliyun.example.com",  # 可选,机器名,默认选择本机机器名。
        "tag": "tag"  # 可选,标签,默认是短划线(-)。
    }
    return config


def init_syslog_client(config):
    client = SyslogClient(config.get('host'), config.get('port'), config.get('protocol'))
    return client


def process(shard_id, log_groups, config):
    logs = PullLogResponse.loggroups_to_flattern_list(log_groups, time_as_str=True, decode_bytes=True)
    logger.info("Get data from shard {0}, log count: {1}".format(shard_id, len(logs)))
    try:
        client = init_syslog_client(config)
        for log in logs:
            # suppose we only care about audit log
            timestamp = datetime.fromtimestamp(int(log[u'__time__']))
            del log['__time__']

            io = six.StringIO()
            # 可以根据需要修改格式化内容,这里使用Key=Value传输,并使用默认的双竖线(||)进行分割。
            for k, v in six.iteritems(log):
                io.write("{0}{1}={2}".format('||', k, v))

            data = io.getvalue()

            # 可以根据需要修改facility或者severity。
            client.log(data,
                       facility=config.get("facility", None),
                       severity=config.get("severity", None),
                       timestamp=timestamp,
                       program=config.get("tag", None),
                       hostname=config.get("hostname", None))

    except Exception as err:
        logger.debug("Failed to connect to remote syslog server ({0}). Exception: {1}".format(config, err))
        # 需要添加一些错误处理的代码,例如重试或者通知等。
        raise err

    logger.info("Complete send data to remote")


def handler(event, context):
    # 可以通过 context.credentials 获取密钥信息
    # Access keys can be fetched through context.credentials
    print("The content in context entity is: \n")
    print(context)
    creds = context.credentials
    access_key_id = creds.access_key_id
    access_key_secret = creds.access_key_secret
    security_token = creds.security_token

    # 解析 event 参数至 object 格式
    # parse event in object
    event_obj = json.loads(event.decode())
    print("The content in event entity is: \n")
    print(event_obj)

    # 从 event.source 中获取日志项目名称、日志仓库名称、日志服务访问 endpoint、日志起始游标、日志终点游标以及分区 id
    # Get the name of log project, the name of log store, the endpoint of sls, begin cursor, end cursor and shardId from event.source
    source = event_obj['source']
    log_project = source['projectName']
    log_store = source['logstoreName']
    endpoint = source['endpoint']
    begin_cursor = source['beginCursor']
    end_cursor = source['endCursor']
    shard_id = source['shardId']

    # 初始化 sls 客户端
    # Initialize client of sls
    client = LogClient(endpoint=endpoint,
                       accessKeyId=access_key_id,
                       accessKey=access_key_secret,
                       securityToken=security_token)

    syslog_config = get_syslog_config()

    # 基于日志的游标从源日志库中读取日志,本示例中的游标范围包含了触发本次执行的所有日志内容
    # Read data from source logstore within cursor: [begin_cursor, end_cursor) in the example, which contains all the logs trigger the invocation
    while True:
        response = client.pull_logs(project_name=log_project,
                                    logstore_name=log_store,
                                    shard_id=shard_id,
                                    cursor=begin_cursor,
                                    count=100,
                                    end_cursor=end_cursor,
                                    compress=False)
        log_group_cnt = response.get_loggroup_count()
        if log_group_cnt == 0:
            break
        logger.info("get %d log group from %s" % (log_group_cnt, log_store))
        process(shard_id, response.get_loggroup_list(), syslog_config)

        begin_cursor = response.get_next_cursor()

    return 'success'

点击编辑层,添加自定义层。

image.png

创建自定义层

image.png

  • 层上传方式选择 在线构建依赖层

  • 构建环境:Python 3.9

  • requestments.txt中填入pysyslogclient依赖,点击创建

image.png

返回函数代码标签,选择自定义层:

image.png

测试并部署

选择测试函数,返回结果成功。

最后选择部署代码,完成发布。

image.png

后续可以通过syslog服务器查看日志接收情况。

同时也可以通过调用日志标签查看函数触发、调用情况。

5. 相关链接

作者介绍
目录