背景:

Symantec 的设备管理来管理员工设备的访问,如禁用摄像头、蓝牙设备、存储设备等。因为经常有人申请开通设备权限,所以要找到被禁用设备的devID ,说白了就是USB的地址及路径等信息。但是symantec 本身的设备访问报表过滤和查看起来太麻烦了,因此有想法用ELK分析下,因为最近也在研究如何使用JDBC 来从数据库索引数据到kibana,所以正好研究了下。

目前遇到的坑:

  1. JDBC的数据源一定要有一个KEY字段,导入elasticsearch时,以这个KEY字段作为ID,这样才不会重复导入数据,否则没有KEY字段,数据会反复导入。
  2. JDBC的日期字段导入到elasticsearch时会有转换错误(object java.string 之类的)好像和Logstash-input-jdbc的版本有关系,可惜自己手动升级jdbc插件总是报另外一个错,也没有升级成功,所以后面又把数据库里的日期字段转成字符串的形式才成功。

ELK on windows

  1. 下载安装JRE 或者JDK
  2. 设置JAVA_HOME 指向JRE或者JDK安装目录。
  3. 安装elasticsearch 服务,进入D:\ELK\elasticsearch-2.1.0\bin
  4. 输入service install Servicename

clip_image001

安装logstash 服务,需要下载NSSM ,进入NSSM的安装目录:执行nssm install logstash,配置如下。

clip_image002

LOGstash 如果和elasticsearch 一起使用,可以在logstash的dependencies 里面配置依赖服务elasticsearch

我们看看run.bat 的内容

logstash.bat agent -f logstash.conf

安装kibana的windows 服务。进入NSSM的安装目录:执行nssm install kibana,配置如下。

clip_image003

clip_image004

elasticsearch的初始配置,一般不需要配置,但是为了提升安全性,我把监听地址改为只有本机能访问,修改elasticsearch.yml如下图。

clip_image005

kibana的初始配置,同样,Kibana也修改成了只监听本地端口,修改config\kibana.yml 如下图:

clip_image006

Logstash的目录结构:

clip_image007

下面主要看logstash的配置:

input {

jdbc {

jdbc_driver_library => "sqljdbc42.jar"

jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"

jdbc_connection_string => "jdbc:sqlserver://sep-bg:1433;databaseName=sem5"

jdbc_user => "logstash"

jdbc_password => "yourpassword"

schedule => "* * * * *"

statement_filepath => "semSqlQuery.sql"

type => "SEPDeviceLog"

}

}

filter {

if [type]=='SEPDeviceLog' {

grok {

patterns_dir => "../patterns"

match => { "event_desc" => "%{KEY:key1}:%{VALUE:deviceName} %{KEY:key2}:%{VALUE:devCategory} %{KEY:key3}:%{VALUE:devGUID} %{KEY:key4}:%{VALUE:devID}" }

remove_field => ["key1","key2","key3","key4"]

}

date {

match => ["datetime",

"yyyy-MM-dd HH:mm:ss.SSS",

"yyyy-MM-dd HH:mm:ss,SSS",

"yyyy-MM-dd HH:mm:ss",

"yyyy/MM/dd HH:mm:ss",

"MMM d HH:mm:ss",

"MMM dd HH:mm:ss",

"dd/MMM/yyyy:HH:mm:ss Z",

"yyyy-MM-dd HH:mm:ss.SSSZ",

"yyyy-MM-dd'T'HH:mm:ss.SSSZ",

"yyyy-MM-dd'T'HH:mm:ssZ",

"E MMM dd HH:mm:ss yyyy Z"

]

remove_field => ["datetime"]

}

}

}

output {

# stdout {

# codec => rubydebug

# }

elasticsearch {

hosts => ["localhost:9200"]

document_id => "%{uid}"

}

}

SemSQLQuery.sql 文件内容(注意我的SQL里面把agent_security_log_idx 作为了index 里面每个文档的ID,参考上面logstash的output 到elasticsearch时的document_id 设置,这样才不会重复往elasticsearch中导入数据,而且SQL中如果有更新(插入、Update等)Elasticsearch中也会对应更改(但是Delete操作不会同步)。

此处2016-02-04年有所更新,之前的bigint 表示的milliseconds 转换成datetime 后,发现时间不对,查了下资料,数据库中记录的是从1970-01-01年之后的milliseconds 秒数,

所以正确的semsqlquery.sql 内容应该如下(转换后的时间是UTC的格式,所以使用最新版本的logstash 可以自动转到对应的时区):

select 
agent_security_log_idx as uid, 
EVENT_TIME, 
dateadd(MILLISECOND,  EVENT_TIME % 1000 ,DATEADD(SECOND, EVENT_TIME / 1000, '19700101')) as datetime, 
EVENT_ID, 
EVENT_DESC, 
SEVERITY, 
HOST_NAME, 
LOCAL_HOST_MAC, 
user_name, 
domain_name, 
local_host_Ip_text 
from  V_AGENT_SECURITY_LOG

修正sql查询后,目前日志里面的时间显示正常,最新的日志时间和我计算机的时间基本一致。

image

Grok fitler的配置文件放在了Symantec_dev_log 文件里面,就定义了个KEY,VALUE的匹配规则。

clip_image009

说说JDBC的设置:

jdbc {

# 使用微软网站download 的SQL Server JDBC的压缩包。我放在了bin目录下。

jdbc_driver_library => "sqljdbc42.jar"

# SQL Server JDBC的Driver Class,可以从sqljdbc42.jar 文件中的services下的java.sql.Driver文件里面找到。

clip_image010

jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"

# 下面这个可以参考MSDN,基本都一样。

jdbc_connection_string => "jdbc:sqlserver://sep-bg:1433;databaseName=sem5"

# 用户名,我的SQL配置了一个专用的SQL账号(非集成认证),仅给了限定的读取权限。

jdbc_user => "logstash"

jdbc_password => "yourpassword"

schedule => "* * * * *"

# SQL 查询的文件路径。

statement_filepath => "semSqlQuery.sql"

 

# 加了一个Type,好写过滤规则。

type => "SEPDeviceLog"

然后可以开启logstash直接导入数据到elasticsearch,测试时可以开启output 插件(我的配置文件里面已经注释掉了 )/

最后就是kibana的最终使用阶段了。

访问http://127.0.0.1:5601/

clip_image011

选择Severity 为7的事件(应该是被禁用的设备事件)。

clip_image012

clip_image013

其实可以点击上面Pie图里面的任意块来进行过滤和筛选。比如按机器名、按照设备Name,设备Class。