背景:
Symantec 的设备管理来管理员工设备的访问,如禁用摄像头、蓝牙设备、存储设备等。因为经常有人申请开通设备权限,所以要找到被禁用设备的devID ,说白了就是USB的地址及路径等信息。但是symantec 本身的设备访问报表过滤和查看起来太麻烦了,因此有想法用ELK分析下,因为最近也在研究如何使用JDBC 来从数据库索引数据到kibana,所以正好研究了下。
目前遇到的坑:
- JDBC的数据源一定要有一个KEY字段,导入elasticsearch时,以这个KEY字段作为ID,这样才不会重复导入数据,否则没有KEY字段,数据会反复导入。
- JDBC的日期字段导入到elasticsearch时会有转换错误(object java.string 之类的)好像和Logstash-input-jdbc的版本有关系,可惜自己手动升级jdbc插件总是报另外一个错,也没有升级成功,所以后面又把数据库里的日期字段转成字符串的形式才成功。
ELK on windows
- 下载安装JRE 或者JDK
- 设置JAVA_HOME 指向JRE或者JDK安装目录。
- 安装elasticsearch 服务,进入D:\ELK\elasticsearch-2.1.0\bin
- 输入service install Servicename
安装logstash 服务,需要下载NSSM ,进入NSSM的安装目录:执行nssm install logstash,配置如下。
LOGstash 如果和elasticsearch 一起使用,可以在logstash的dependencies 里面配置依赖服务elasticsearch
我们看看run.bat 的内容
logstash.bat agent -f logstash.conf
安装kibana的windows 服务。进入NSSM的安装目录:执行nssm install kibana,配置如下。
elasticsearch的初始配置,一般不需要配置,但是为了提升安全性,我把监听地址改为只有本机能访问,修改elasticsearch.yml如下图。
kibana的初始配置,同样,Kibana也修改成了只监听本地端口,修改config\kibana.yml 如下图:
Logstash的目录结构:
下面主要看logstash的配置:
input {
jdbc {
jdbc_driver_library => "sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://sep-bg:1433;databaseName=sem5"
jdbc_user => "logstash"
jdbc_password => "yourpassword"
schedule => "* * * * *"
statement_filepath => "semSqlQuery.sql"
type => "SEPDeviceLog"
}
}
filter {
if [type]=='SEPDeviceLog' {
grok {
patterns_dir => "../patterns"
match => { "event_desc" => "%{KEY:key1}:%{VALUE:deviceName} %{KEY:key2}:%{VALUE:devCategory} %{KEY:key3}:%{VALUE:devGUID} %{KEY:key4}:%{VALUE:devID}" }
remove_field => ["key1","key2","key3","key4"]
}
date {
match => ["datetime",
"yyyy-MM-dd HH:mm:ss.SSS",
"yyyy-MM-dd HH:mm:ss,SSS",
"yyyy-MM-dd HH:mm:ss",
"yyyy/MM/dd HH:mm:ss",
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss",
"dd/MMM/yyyy:HH:mm:ss Z",
"yyyy-MM-dd HH:mm:ss.SSSZ",
"yyyy-MM-dd'T'HH:mm:ss.SSSZ",
"yyyy-MM-dd'T'HH:mm:ssZ",
"E MMM dd HH:mm:ss yyyy Z"
]
remove_field => ["datetime"]
}
}
}
output {
# stdout {
# codec => rubydebug
# }
elasticsearch {
hosts => ["localhost:9200"]
document_id => "%{uid}"
}
}
SemSQLQuery.sql 文件内容(注意我的SQL里面把agent_security_log_idx 作为了index 里面每个文档的ID,参考上面logstash的output 到elasticsearch时的document_id 设置,这样才不会重复往elasticsearch中导入数据,而且SQL中如果有更新(插入、Update等)Elasticsearch中也会对应更改(但是Delete操作不会同步)。
此处2016-02-04年有所更新,之前的bigint 表示的milliseconds 转换成datetime 后,发现时间不对,查了下资料,数据库中记录的是从1970-01-01年之后的milliseconds 秒数,
所以正确的semsqlquery.sql 内容应该如下(转换后的时间是UTC的格式,所以使用最新版本的logstash 可以自动转到对应的时区):
select
agent_security_log_idx as uid,
EVENT_TIME,
dateadd(MILLISECOND, EVENT_TIME % 1000 ,DATEADD(SECOND, EVENT_TIME / 1000, '19700101')) as datetime,
EVENT_ID,
EVENT_DESC,
SEVERITY,
HOST_NAME,
LOCAL_HOST_MAC,
user_name,
domain_name,
local_host_Ip_text
from V_AGENT_SECURITY_LOG
修正sql查询后,目前日志里面的时间显示正常,最新的日志时间和我计算机的时间基本一致。
Grok fitler的配置文件放在了Symantec_dev_log 文件里面,就定义了个KEY,VALUE的匹配规则。
说说JDBC的设置:
jdbc {
# 使用微软网站download 的SQL Server JDBC的压缩包。我放在了bin目录下。
jdbc_driver_library => "sqljdbc42.jar"
# SQL Server JDBC的Driver Class,可以从sqljdbc42.jar 文件中的services下的java.sql.Driver文件里面找到。
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# 下面这个可以参考MSDN,基本都一样。
jdbc_connection_string => "jdbc:sqlserver://sep-bg:1433;databaseName=sem5"
# 用户名,我的SQL配置了一个专用的SQL账号(非集成认证),仅给了限定的读取权限。
jdbc_user => "logstash"
jdbc_password => "yourpassword"
schedule => "* * * * *"
# SQL 查询的文件路径。
statement_filepath => "semSqlQuery.sql"
# 加了一个Type,好写过滤规则。
type => "SEPDeviceLog"
然后可以开启logstash直接导入数据到elasticsearch,测试时可以开启output 插件(我的配置文件里面已经注释掉了 )/
最后就是kibana的最终使用阶段了。
访问http://127.0.0.1:5601/
选择Severity 为7的事件(应该是被禁用的设备事件)。
其实可以点击上面Pie图里面的任意块来进行过滤和筛选。比如按机器名、按照设备Name,设备Class。