Akka集群原理
Akka集群支持去中心化的基于P2P的集群服务,没有单点故障(SPOF)问题,它主要是通过Gossip协议来实现。对于集群成员的状态,Akka提供了一种故障检测机制,能够自动发现出现故障而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其它成员节点。
Akka内部为集群成员定义了一组有限状态(6种状态),并给出了一个状态转移矩阵,代码如下所示:
1 |
private [cluster] val allowedTransitions : Map[MemberStatus, Set[MemberStatus]] = |
3 |
Joining -> Set(Up, Down, Removed), |
4 |
Up -> Set(Leaving, Down, Removed), |
5 |
Leaving -> Set(Exiting, Down, Removed), |
7 |
Exiting -> Set(Removed, Down), |
8 |
Removed -> Set.empty[MemberStatus]) |
Akka集群中的每个成员节点,都有可能处于上面的一种状态,在发生某些事件以后,会发生状态转移。需要注意的是,除了Down和Removed状态以外,节点处于其它任何一个状态时都有可能变成Down状态,即节点故障而无法提供服务,而在变成Down状态之前有一个虚拟的Unreachable状态,因为在Gossip收敛过程中,是无法到达或者经由Unreachable状态的节点,这个状态是由Akka实现的故障探测器(Failure Detector)来检测到的。处于Down状态的节点如果想要再次加入Akka集群,需要重新启动,并进入Joining状态,然后才能进行后续状态的转移变化。Akka集群成员节点状态及其转移情况,如下图所示:
我们说明一下Akka中的故障检测机制。在Akka中,集群中每一个成员节点M会被集群中的其他另一组节点(默认是5个)G监控,这一组节点G并不是整个集群中的其他所有节点,只是整个集群全部节点的一个子集,组G中的节点会检测节点M是否处于Unreachable状态,这是通过发送心跳来确认节点M是否可达,如果不可达则组G中的节点会将节点M的Unreachable状态向集群中组G之外的其它节点传播,最终使得集群中的每个成员节点都知道节点M故障。
节点状态发生转移会触发某个事件,我们可以根据不同类型的事件来进行相应的处理,为了能够详细捕获到各种事件,我们先看一下Akka定义的事件集合,如图所示:
通常,在基于Akka Cluster的应用中实现Actor时,可以重写Actor的preStart方法,通过Cluster来订阅集群事件,代码示例如下所示:
1 |
val cluster = Cluster(context.system) |
3 |
override def preStart() : Unit = { |
4 |
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, |
5 |
classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember]) |
例如,对于MemberUp事件,我们可以获取到对应Actor的引用ActorRef,然后通过与其进行消息交换,一起协同完成特定任务。
Akka支持在每个成员节点加入集群的时候,设置成员自己的角色。通过角色划分,可以将使用Akka集群处理业务的系统划分为多个处理逻辑独立的子系统,每个子系统处理自己的业务逻辑,而且,划分得到的多个子系统都处于一个统一的Akka集群中。因此,每个子系统也具备了Akka集群所具有的特性,如故障检测、状态转移、状态传播等等。
Akka集群应用实践
我们基于Akka实现了一个简单的模拟日志实时处理的集群系统,可以从任何数据源输入数据,如文件、消息中间件Kafka、数据库,还可以是一个远程调用请求,我们收集数据,然数据经过一个拦截器层,最后解析处理数据为特定格式,最后数据写入Kafka。具体实现逻辑如下图所示:
上图中,我们将日志实时处理系统分为3个子系统,通过Akka的Role来进行划分,3个角色分别为collector、interceptor、processor,3个子系统中的节点都是整个Akka集群的成员。整个集群系统中的数据流向是:collector接收数据(或者直接对接特定数据源而产生数据),我们这里模式发送Nginx日志记录行,将数据发送到interceptor;interceptor收到collector发送的日志记录行,解析出请求的真是IP地址,拦截在黑名单IP列表中的请求,如果IP地址不在黑名单,则发送给processor去处理;processor对整个日志记录行进行处理,最后保存到Kakfa中。
我们抽象出用来订阅集群事件相关的逻辑,实现抽象类为ClusterRoledWorker,代码如下所示:
01 |
package org.shirdrn.scala.akka.cluster |
04 |
import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberEvent, MemberUp, UnreachableMember} |
05 |
import akka.cluster.{Cluster, Member} |
07 |
abstract class ClusterRoledWorker extends Actor with ActorLogging { |
10 |
val cluster = Cluster(context.system) |
12 |
var workers = IndexedSeq.empty[ActorRef] |
14 |
override def preStart() : Unit = { |
16 |
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, |
17 |
classOf[MemberUp], classOf[UnreachableMember], classOf[MemberEvent]) |
20 |
override def postStop() : Unit = cluster.unsubscribe(self) |
25 |
def register(member : Member, createPath : (Member) = > ActorPath) : Unit = { |
26 |
val actorPath = createPath(member) |
27 |
log.info( "Actor path: " + actorPath) |
28 |
val actorSelection = context.actorSelection(actorPath) |
29 |
actorSelection ! Registration |
另外,定义了一些case class作为消息,方便在各个Actor之间进行发送/接收,代码如下所示:
1 |
package org.shirdrn.scala.akka.cluster |
3 |
object Registration extends Serializable |
5 |
trait EventMessage extends Serializable |
6 |
case class RawNginxRecord(sourceHost : String, line : String) extends EventMessage |
7 |
case class NginxRecord(sourceHost : String, eventCode : String, line : String) extends EventMessage |
8 |
case class FilteredRecord(sourceHost : String, eventCode : String, line : String, logDate : String, realIp : String) extends EventMessage |
Akka Cluster使用一个配置文件,用来指定一些有关Actor的配置,我们使用的配置文件为application.conf,配置内容如下所示:
03 |
stdout-loglevel = INFO |
04 |
event-handlers = ["akka.event.Logging$DefaultLogger"] |
07 |
provider = "akka.cluster.ClusterActorRefProvider" |
11 |
enabled-transports = ["akka.remote.netty.tcp"] |
12 |
log-remote-lifecycle-events = off |
14 |
hostname = "127.0.0.1" |
24 |
seed-node-timeout = 60s |
25 |
auto-down-unreachable-after = 10s |
上述配置中,我们创建的Akka Cluster的名称为event-cluster-system,初始指定了3个seed节点,实际上这3个节点是我们实现的collector角色的节点,用来收集数据。
下面,我们依次说明collector、interceptor、processor这3中角色的集群节点的处理逻辑:
我们实现的collector实现类为EventCollector,它是一个Actor,该实现类继承自ClusterRoledWorker抽象类,具体实现代码如下所示:
001 |
package org.shirdrn.scala.akka.cluster |
004 |
import akka.cluster.ClusterEvent. _ |
005 |
import com.typesafe.config.ConfigFactory |
007 |
import scala.concurrent.ExecutionContext |
008 |
import scala.concurrent.duration. _ |
009 |
import scala.concurrent.forkjoin.ForkJoinPool |
011 |
class EventCollector extends ClusterRoledWorker { |
013 |
@ volatile var recordCounter : Int = 0 |
016 |
case MemberUp(member) = > |
017 |
log.info( "Member is Up: {}" , member.address) |
018 |
case UnreachableMember(member) = > |
019 |
log.info( "Member detected as Unreachable: {}" , member) |
020 |
case MemberRemoved(member, previousStatus) = > |
021 |
log.info( "Member is Removed: {} after {}" , member.address, previousStatus) |
022 |
case _: MemberEvent = > |
024 |
case Registration = > { |
027 |
workers = workers : + sender |
028 |
log.info( "Interceptor registered: " + sender) |
029 |
log.info( "Registered interceptors: " + workers.size) |
031 |
case Terminated(interceptingActorRef) = > |
033 |
workers = workers.filterNot( _ == interceptingActorRef) |
034 |
case RawNginxRecord(sourceHost, line) = > { |
036 |
val eventCode = "eventcode=(\\d+)" .r.findFirstIn(line).get |
037 |
log.info( "Raw message: eventCode=" + eventCode + ", sourceHost=" + sourceHost + ", line=" + line) |
039 |
if (workers.size > 0 ) { |
041 |
val interceptorIndex = ( if (recordCounter < 0 ) 0 else recordCounter) % workers.size |
042 |
workers(interceptorIndex) ! NginxRecord(sourceHost, eventCode, line) |
043 |
log.info( "Details: interceptorIndex=" + interceptorIndex + ", interceptors=" + workers.size) |
053 |
class EventClientActor extends Actor with ActorLogging { |
055 |
implicit val ec : ExecutionContext = ExecutionContext.fromExecutor( new ForkJoinPool()) |
063 |
"" "10.10.2.72 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 lAOX&udid = 25371384 b 2 eb 1 a 5 dc 5643 e 14626 ecbd 4 &sessionid = 25371384 b 2 eb 1 a 5 dc 5643 e 14626 ecbd 41440152875362 &imsi = 460002830862833 &operator = 1 &network = 1 ×tamp = 1440152954 &action = 14 &eventcode = 300039 &page = 200002 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/ 1.6 . 0 (Linux; U; Android 4.4 . 4 ; R 8207 Build/KTU 84 P) " " 121.25 . 190.146 "" "" , |
064 |
"" "10.10.2.8 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 VACO&udid = f 6 b 0520 cbc 36 fda 6 f 63 a 72 d 91 bf 305 c 0 &imsi = 460012927613645 &operator = 2 &network = 1 ×tamp = 1440152956 &action = 1840 &eventcode = 100003 & type = 1 &result = 0 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/ 1.6 . 0 (Linux; U; Android 4.4 . 2 ; GT-I 9500 Build/KOT 49 H) " " 61.175 . 219.69 "" "" |
067 |
"" "10.10.2.72 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 gCo 4 &udid = 636 d 127 f 4936109 a 22347 b 239 a 0 ce 73 f&sessionid = 636 d 127 f 4936109 a 22347 b 239 a 0 ce 73 f 1440150695096 &imsi = 460036010038180 &operator = 3 &network = 4 ×tamp = 1440152902 &action = 1566 &eventcode = 101010 &playid = 99 d 5 a 59 f 100 cb 778 b 64 b 5234 a 189 e 1 f 4 &radioid = 1100000048450 &audioid = 1000001535718 &playtime = 3 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/ 1.6 . 0 (Linux; U; Android 4.4 . 4 ; R 8205 Build/KTU 84 P) " " 106.38 . 128.67 "" "" , |
068 |
"" "10.10.2.72 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 kPSC&udid = 2 ee 585 cde 388 ac 57 c 0 e 81 f 9 a 76 f 5 b 797 &operator = 0 &network = 1 ×tamp = 1440152968 &action = 6423 &eventcode = 100003 & type = 1 &result = 0 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/v 3.3 . 85 (Linux; U; Android L; P 8 Build/KOT 49 H) " " 202.103 . 133.112 "" "" , |
069 |
"" "10.10.2.72 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 lABW&udid = face 1161 d 739 abacca 913 dcb 82576 e 9 d&sessionid = face 1161 d 739 abacca 913 dcb 82576 e 9 d 1440151582673 &operator = 0 &network = 1 ×tamp = 1440152520 &action = 1911 &eventcode = 101010 &playid = b 07 c 241010 f 8691284 c 68186 c 42 ab 006 &radioid = 1100000000762 &audioid = 1000001751983 &playtime = 158 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/ 1.6 . 0 (Linux; U; Android 4.1 ; H 5 Build/JZO 54 K) " " 221.232 . 36.250 "" "" |
072 |
"" "10.10.2.8 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 krJw&udid = 939488333889 f 18 e 2 b 406 d 2 ece 8 f 938 a&sessionid = 939488333889 f 18 e 2 b 406 d 2 ece 8 f 938 a 1440137301421 &imsi = 460028180045362 &operator = 1 &network = 1 ×tamp = 1440152947 &action = 1431 &eventcode = 300030 &playid = e 1 fd 5467085475 dc 4483 d 2795 f 112717 &radioid = 1100000001123 &audioid = 1000000094911 &playtime = 951992 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/ 1.6 . 0 (Linux; U; Android 4.0 . 4 ; R 813 T Build/IMM 76 D) " " 5.45 . 64.205 "" "" , |
073 |
"" "10.10.2.72 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 kcpz&udid = cbc 7 bbb 560914 c 374 cb 7 a 29 eef 8 c 2144 &sessionid = cbc 7 bbb 560914 c 374 cb 7 a 29 eef 8 c 21441440152816008 &imsi = 460008782944219 &operator = 1 &network = 1 ×tamp = 1440152873 &action = 360 &eventcode = 200003 &page = 200003 &radioid = 1100000046018 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/v 3.3 . 85 (Linux; U; Android 4.4 . 2 ; MX 4 S Build/KOT 49 H) " " 119.128 . 106.232 "" "" , |
074 |
"" "10.10.2.8 [21/Aug/2015:18:29:19 +0800] " GET /t.gif?installid = 0000 juRL&udid = 3 f 9 a 5 ffa 69 a 5 cd 5 f 0754 d 2 ba 98 c 0 aeb 2 &imsi = 460023744091238 &operator = 1 &network = 1 ×tamp = 1440152957 &action = 78 &eventcode = 100003 & type = 1 &result = 0 & HTTP/ 1.0 " " - " 204 0 " - " " Dalvik/v 3.3 . 85 (Linux; U; Android 4.4 . 3 ; S?MSUNG. Build/KOT 49 H) " " 223.153 . 72.78 "" "" |
078 |
val ports = Seq( "2751" , "2752" , "2753" ) |
079 |
val actors = scala.collection.mutable.HashMap[String, ActorRef]() |
081 |
ports.foreach { port = > |
083 |
val config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=" + port) |
084 |
.withFallback(ConfigFactory.parseString( "akka.cluster.roles = [collector]" )) |
085 |
.withFallback(ConfigFactory.load()) |
087 |
val system = ActorSystem( "event-cluster-system" , config) |
088 |
actors(port) = system.actorOf(Props[EventCollector], name = "collectingActor" ) |
093 |
context.system.scheduler.schedule( 0 millis, 5000 millis) { |
095 |
ports.foreach { port = > |
096 |
events(port).foreach { line = > |
097 |
println( "RAW: port=" + port + ", line=" + line) |
098 |
actors(port) ! RawNginxRecord( "host.me:" + port, line) |
104 |
object EventClient extends App { |
106 |
val system = ActorSystem( "client" ) |
108 |
val clientActorRef = system.actorOf(Props[EventClientActor], name = "clientActor" ) |
109 |
system.log.info( "Client actor started: " + clientActorRef) |
上面代码中,EventClientActor并不是属于我们创建的Akka集群event-cluster-system,它是一个位于集群外部的节点,它模拟向各个collector角色的节点发送消息。
与编写collector类似,实现的interceptor的Actor实现类为EventInterceptor,代码如下所示:
01 |
package org.shirdrn.scala.akka.cluster |
04 |
import akka.cluster.ClusterEvent. _ |
05 |
import akka.cluster.Member |
06 |
import akka.cluster.protobuf.msg.ClusterMessages.MemberStatus |
07 |
import com.typesafe.config.ConfigFactory |
08 |
import net.sf.json.JSONObject |
09 |
import org.shirdrn.scala.akka.cluster.utils.DatetimeUtils |
11 |
class EventInterceptor extends ClusterRoledWorker { |
13 |
@ volatile var interceptedRecords : Int = 0 |
14 |
val IP _ PATTERN = "[^\\s]+\\s+\\[([^\\]]+)\\].+\"(\\d+\\.\\d+\\.\\d+\\.\\d+)\"" .r |
15 |
val blackIpList = Array( |
16 |
"5.9.116.101" , "103.42.176.138" , "123.182.148.65" , "5.45.64.205" , |
17 |
"27.159.226.192" , "76.164.228.218" , "77.79.178.186" , "104.200.31.117" , |
18 |
"104.200.31.32" , "104.200.31.238" , "123.182.129.108" , "220.161.98.39" , |
19 |
"59.58.152.90" , "117.26.221.236" , "59.58.150.110" , "123.180.229.156" , |
20 |
"59.60.123.239" , "117.26.222.6" , "117.26.220.88" , "59.60.124.227" , |
21 |
"142.54.161.50" , "59.58.148.52" , "59.58.150.85" , "202.105.90.142" |
24 |
log.info( "Black IP count: " + blackIpList.size) |
25 |
blackIpList.foreach(log.info( _ )) |
28 |
case MemberUp(member) = > |
29 |
log.info( "Member is Up: {}" , member.address) |
30 |
register(member, getCollectorPath) |
31 |
case state : CurrentClusterState = > |
33 |
state.members.filter( _ .status == MemberStatus.Up) foreach(register( _ , getCollectorPath)) |
34 |
case UnreachableMember(member) = > |
35 |
log.info( "Member detected as Unreachable: {}" , member) |
36 |
case MemberRemoved(member, previousStatus) = > |
37 |
log.info( "Member is Removed: {} after {}" , member.address, previousStatus) |
38 |
case _: MemberEvent = > |
40 |
case Registration = > { |
42 |
workers = workers : + sender |
43 |
log.info( "Processor registered: " + sender) |
44 |
log.info( "Registered processors: " + workers.size) |
46 |
case Terminated(processingActorRef) = > |
47 |
workers = workers.filterNot( _ == processingActorRef) |
48 |
case NginxRecord(sourceHost, eventCode, line) = > { |
49 |
val (isIpInBlackList, data) = checkRecord(eventCode, line) |
50 |
if (!isIpInBlackList) { |
51 |
interceptedRecords + = 1 |
52 |
if (workers.size > 0 ) { |
53 |
val processorIndex = ( if (interceptedRecords < 0 ) 0 else interceptedRecords) % workers.size |
54 |
workers(processorIndex) ! FilteredRecord(sourceHost, eventCode, line, data.getString( "eventdate" ), data.getString( "realip" )) |
55 |
log.info( "Details: processorIndex=" + processorIndex + ", processors=" + workers.size) |
57 |
log.info( "Intercepted data: data=" + data) |
59 |
log.info( "Discarded: " + line) |
64 |
def getCollectorPath(member : Member) : ActorPath = { |
65 |
RootActorPath(member.address) / "user" / "collectingActor" |
71 |
private def checkRecord(eventCode : String, line : String) : (Boolean, JSONObject) = { |
72 |
val data : JSONObject = new JSONObject() |
73 |
var isIpInBlackList = false |
74 |
IP _ PATTERN.findFirstMatchIn(line).foreach { m = > |
75 |
val rawDt = m.group( 1 ) |
76 |
val dt = DatetimeUtils.format(rawDt) |
77 |
val realIp = m.group( 2 ) |
79 |
data.put( "eventdate" , dt) |
80 |
data.put( "realip" , realIp) |
81 |
data.put( "eventcode" , eventCode) |
82 |
isIpInBlackList = blackIpList.contains(realIp) |
84 |
(isIpInBlackList, data) |
88 |
object EventInterceptor extends App { |
90 |
Seq( "2851" , "2852" ).foreach { port = > |
91 |
val config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=" + port) |
92 |
.withFallback(ConfigFactory.parseString( "akka.cluster.roles = [interceptor]" )) |
93 |
.withFallback(ConfigFactory.load()) |
94 |
val system = ActorSystem( "event-cluster-system" , config) |
95 |
val processingActor = system.actorOf(Props[EventInterceptor], name = "interceptingActor" ) |
96 |
system.log.info( "Processing Actor: " + processingActor) |
上述代码中,解析出Nginx日志记录中的IP地址,查看其是否在IP黑名单列表中,如果在内名单中则直接丢掉该记录数据。
EventProcessor的实现代码,如下所示:
01 |
package org.shirdrn.scala.akka.cluster |
03 |
import java.util.Properties |
06 |
import akka.cluster.ClusterEvent. _ |
07 |
import akka.cluster.Member |
08 |
import akka.cluster.protobuf.msg.ClusterMessages.MemberStatus |
09 |
import com.typesafe.config.ConfigFactory |
10 |
import kafka.producer.{KeyedMessage, Producer, ProducerConfig} |
11 |
import net.sf.json.JSONObject |
13 |
class EventProcessor extends ClusterRoledWorker { |
15 |
val topic = "app_events" |
16 |
val producer = KakfaUtils.createProcuder |
19 |
case MemberUp(member) = > |
20 |
log.info( "Member is Up: {}" , member.address) |
22 |
register(member, getProcessorPath) |
23 |
case state : CurrentClusterState = > |
24 |
state.members.filter( _ .status == MemberStatus.Up).foreach(register( _ , getProcessorPath)) |
25 |
case UnreachableMember(member) = > |
26 |
log.info( "Member detected as Unreachable: {}" , member) |
27 |
case MemberRemoved(member, previousStatus) = > |
28 |
log.info( "Member is Removed: {} after {}" , member.address, previousStatus) |
29 |
case _: MemberEvent = > |
31 |
case FilteredRecord(sourceHost, eventCode, line, nginxDate, realIp) = > { |
32 |
val data = process(eventCode, line, nginxDate, realIp) |
33 |
log.info( "Processed: data=" + data) |
35 |
producer.send( new KeyedMessage[String, String](topic, sourceHost, data.toString)) |
39 |
def getProcessorPath(member : Member) : ActorPath = { |
40 |
RootActorPath(member.address) / "user" / "interceptingActor" |
43 |
private def process(eventCode : String, line : String, eventDate : String, realIp : String) : JSONObject = { |
44 |
val data : JSONObject = new JSONObject() |
45 |
"[\\?|&]{1}([^=]+)=([^&]+)&" .r.findAllMatchIn(line) foreach { m = > |
47 |
val value = m.group( 2 ) |
50 |
data.put( "eventdate" , eventDate) |
51 |
data.put( "realip" , realIp) |
58 |
val props = new Properties() |
60 |
"metadata.broker.list" -> "hadoop2:9092,hadoop3:9092" , |
61 |
"serializer.class" -> "kafka.serializer.StringEncoder" , |
62 |
"producer.type" -> "async" |
64 |
config.foreach(entry = > props.put(entry. _ 1 , entry. _ 2 )) |
65 |
val producerConfig = new ProducerConfig(props) |
67 |
def createProcuder() : Producer[String, String] = { |
68 |
new Producer[String, String](producerConfig) |
72 |
object EventProcessor extends App { |
75 |
Seq( "2951" , "2952" , "2953" , "2954" , "2955" ) foreach { port = > |
76 |
val config = ConfigFactory.parseString( "akka.remote.netty.tcp.port=" + port) |
77 |
.withFallback(ConfigFactory.parseString( "akka.cluster.roles = [processor]" )) |
78 |
.withFallback(ConfigFactory.load()) |
79 |
val system = ActorSystem( "event-cluster-system" , config) |
80 |
val processingActor = system.actorOf(Props[EventProcessor], name = "processingActor" ) |
81 |
system.log.info( "Processing Actor: " + processingActor) |
角色为processor的Actor的实现类为EventProcessor,我们在其伴生对象中创建了5个实例,分别对应不同的端口。解析的Nginx日志记录最后保存到Kafka,示例如下所示:
查看源代码打印帮助
1 |
{"installid":"0000VACO","imsi":"460012927613645","network":"1","action":"1840","type":"1","eventdate":"2015-08-21 18:29:19","realip":"61.175.219.69"} |
2 |
{"installid":"0000kcpz","sessionid":"cbc7bbb560914c374cb7a29eef8c21441440152816008","operator":"1","timestamp":"1440152873","eventcode":"200003","radioid":"1100000046018","eventdate":"2015-08-21 18:29:19","realip":"119.128.106.232"} |
3 |
{"installid":"0000lAOX","sessionid":"25371384b2eb1a5dc5643e14626ecbd41440152875362","operator":"1","timestamp":"1440152954","eventcode":"300039","eventdate":"2015-08-21 18:29:19","realip":"121.25.190.146"} |