Akka Cluster原理与应用

本文涉及的产品
全局流量管理 GTM,标准版 1个月
日志服务 SLS,月写入数据量 50GB 1个月
云解析 DNS,旗舰版 1个月
简介:

Akka集群原理

Akka集群支持去中心化的基于P2P的集群服务,没有单点故障(SPOF)问题,它主要是通过Gossip协议来实现。对于集群成员的状态,Akka提供了一种故障检测机制,能够自动发现出现故障而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其它成员节点。

  • 状态转移与故障检测

Akka内部为集群成员定义了一组有限状态(6种状态),并给出了一个状态转移矩阵,代码如下所示:

1 private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
2 Map(
3 Joining -> Set(Up, Down, Removed),
4 Up -> Set(Leaving, Down, Removed),
5 Leaving -> Set(Exiting, Down, Removed),
6 Down -> Set(Removed),
7 Exiting -> Set(Removed, Down),
8 Removed -> Set.empty[MemberStatus])
9 }

Akka集群中的每个成员节点,都有可能处于上面的一种状态,在发生某些事件以后,会发生状态转移。需要注意的是,除了Down和Removed状态以外,节点处于其它任何一个状态时都有可能变成Down状态,即节点故障而无法提供服务,而在变成Down状态之前有一个虚拟的Unreachable状态,因为在Gossip收敛过程中,是无法到达或者经由Unreachable状态的节点,这个状态是由Akka实现的故障探测器(Failure Detector)来检测到的。处于Down状态的节点如果想要再次加入Akka集群,需要重新启动,并进入Joining状态,然后才能进行后续状态的转移变化。Akka集群成员节点状态及其转移情况,如下图所示:

我们说明一下Akka中的故障检测机制。在Akka中,集群中每一个成员节点M会被集群中的其他另一组节点(默认是5个)G监控,这一组节点G并不是整个集群中的其他所有节点,只是整个集群全部节点的一个子集,组G中的节点会检测节点M是否处于Unreachable状态,这是通过发送心跳来确认节点M是否可达,如果不可达则组G中的节点会将节点M的Unreachable状态向集群中组G之外的其它节点传播,最终使得集群中的每个成员节点都知道节点M故障。

  • Akka事件集合

节点状态发生转移会触发某个事件,我们可以根据不同类型的事件来进行相应的处理,为了能够详细捕获到各种事件,我们先看一下Akka定义的事件集合,如图所示:

通常,在基于Akka Cluster的应用中实现Actor时,可以重写Actor的preStart方法,通过Cluster来订阅集群事件,代码示例如下所示:

1 val cluster = Cluster(context.system)
2
3 override def preStart(): Unit = {
4 cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
5 classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])
6 }

例如,对于MemberUp事件,我们可以获取到对应Actor的引用ActorRef,然后通过与其进行消息交换,一起协同完成特定任务。

  • Akka成员角色(Node Role)

Akka支持在每个成员节点加入集群的时候,设置成员自己的角色。通过角色划分,可以将使用Akka集群处理业务的系统划分为多个处理逻辑独立的子系统,每个子系统处理自己的业务逻辑,而且,划分得到的多个子系统都处于一个统一的Akka集群中。因此,每个子系统也具备了Akka集群所具有的特性,如故障检测、状态转移、状态传播等等。

Akka集群应用实践

我们基于Akka实现了一个简单的模拟日志实时处理的集群系统,可以从任何数据源输入数据,如文件、消息中间件Kafka、数据库,还可以是一个远程调用请求,我们收集数据,然数据经过一个拦截器层,最后解析处理数据为特定格式,最后数据写入Kafka。具体实现逻辑如下图所示:

上图中,我们将日志实时处理系统分为3个子系统,通过Akka的Role来进行划分,3个角色分别为collector、interceptor、processor,3个子系统中的节点都是整个Akka集群的成员。整个集群系统中的数据流向是:collector接收数据(或者直接对接特定数据源而产生数据),我们这里模式发送Nginx日志记录行,将数据发送到interceptor;interceptor收到collector发送的日志记录行,解析出请求的真是IP地址,拦截在黑名单IP列表中的请求,如果IP地址不在黑名单,则发送给processor去处理;processor对整个日志记录行进行处理,最后保存到Kakfa中。
我们抽象出用来订阅集群事件相关的逻辑,实现抽象类为ClusterRoledWorker,代码如下所示:

01 package org.shirdrn.scala.akka.cluster
02
03 import akka.actor._
04 import akka.cluster.ClusterEvent.{InitialStateAsEvents, MemberEvent, MemberUp, UnreachableMember}
05 import akka.cluster.{Cluster, Member}
06
07 abstract class ClusterRoledWorker extends Actor with ActorLogging {
08
09 // 创建一个Cluster实例
10 val cluster = Cluster(context.system)
11 // 用来缓存下游注册过来的子系统ActorRef
12 var workers = IndexedSeq.empty[ActorRef]
13
14 override def preStart(): Unit = {
15 // 订阅集群事件
16 cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
17 classOf[MemberUp], classOf[UnreachableMember], classOf[MemberEvent])
18 }
19
20 override def postStop(): Unit = cluster.unsubscribe(self)
21
22 /**
23 * 下游子系统节点发送注册消息
24 */
25 def register(member: Member, createPath: (Member) => ActorPath): Unit = {
26 val actorPath = createPath(member)
27 log.info("Actor path: " + actorPath)
28 val actorSelection = context.actorSelection(actorPath)
29 actorSelection ! Registration
30 }
31 }

另外,定义了一些case class作为消息,方便在各个Actor之间进行发送/接收,代码如下所示:

1 package org.shirdrn.scala.akka.cluster
2
3 object Registration extends Serializable
4
5 trait EventMessage extends Serializable
6 case class RawNginxRecord(sourceHost: String, line: String) extends EventMessage
7 case class NginxRecord(sourceHost: String, eventCode: String, line: String) extendsEventMessage
8 case class FilteredRecord(sourceHost: String, eventCode: String, line: String, logDate: String, realIp: String) extends EventMessage

Akka Cluster使用一个配置文件,用来指定一些有关Actor的配置,我们使用的配置文件为application.conf,配置内容如下所示:

01 akka {
02 loglevel = INFO
03 stdout-loglevel = INFO
04 event-handlers = ["akka.event.Logging$DefaultLogger"]
05
06 actor {
07 provider = "akka.cluster.ClusterActorRefProvider"
08 }
09
10 remote {
11 enabled-transports = ["akka.remote.netty.tcp"]
12 log-remote-lifecycle-events = off
13 netty.tcp {
14 hostname = "127.0.0.1"
15 port = 0
16 }
17 }
18 cluster {
19 seed-nodes = [
20 "akka.tcp://event-cluster-system@127.0.0.1:2751",
21 "akka.tcp://event-cluster-system@127.0.0.1:2752",
22 "akka.tcp://event-cluster-system@127.0.0.1:2753"
23 ]
24 seed-node-timeout = 60s
25 auto-down-unreachable-after = 10s
26 }
27 }

上述配置中,我们创建的Akka Cluster的名称为event-cluster-system,初始指定了3个seed节点,实际上这3个节点是我们实现的collector角色的节点,用来收集数据。
下面,我们依次说明collector、interceptor、processor这3中角色的集群节点的处理逻辑:

  • collector实现

我们实现的collector实现类为EventCollector,它是一个Actor,该实现类继承自ClusterRoledWorker抽象类,具体实现代码如下所示:

001 package org.shirdrn.scala.akka.cluster
002
003 import akka.actor._
004 import akka.cluster.ClusterEvent._
005 import com.typesafe.config.ConfigFactory
006
007 import scala.concurrent.ExecutionContext
008 import scala.concurrent.duration._
009 import scala.concurrent.forkjoin.ForkJoinPool
010
011 class EventCollector extends ClusterRoledWorker {
012
013 @volatile var recordCounter : Int = 0
014
015 def receive = {
016 case MemberUp(member) =>
017 log.info("Member is Up: {}", member.address)
018 case UnreachableMember(member) =>
019 log.info("Member detected as Unreachable: {}", member)
020 case MemberRemoved(member, previousStatus) =>
021 log.info("Member is Removed: {} after {}", member.address, previousStatus)
022 case _: MemberEvent => // ignore
023
024 case Registration => {
025 // watch发送注册消息的interceptor,如果对应的Actor终止了,会发送一个Terminated消息
026 context watch sender
027 workers = workers :+ sender
028 log.info("Interceptor registered: " + sender)
029 log.info("Registered interceptors: " + workers.size)
030 }
031 case Terminated(interceptingActorRef) =>
032 // interceptor终止,更新缓存的ActorRef
033 workers = workers.filterNot(_ == interceptingActorRef)
034 case RawNginxRecord(sourceHost, line) => {
035 // 构造NginxRecord消息,发送到下游interceptor
036 val eventCode = "eventcode=(\\d+)".r.findFirstIn(line).get
037 log.info("Raw message: eventCode=" + eventCode + ", sourceHost=" + sourceHost +", line=" + line)
038 recordCounter += 1
039 if(workers.size > 0) {
040 // 模拟Roudrobin方式,将日志记录消息发送给下游一组interceptor中的一个
041 val interceptorIndex = (if(recordCounter < 0) 0 else recordCounter) %workers.size
042 workers(interceptorIndex) ! NginxRecord(sourceHost, eventCode, line)
043 log.info("Details: interceptorIndex=" + interceptorIndex + ", interceptors=" + workers.size)
044 }
045 }
046 }
047
048 }
049
050 /**
051 * 用来模拟发送日志记录消息的Actor
052 */
053 class EventClientActor extends Actor with ActorLogging {
054
055 implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(newForkJoinPool())
056
057 def receive = {
058 case _=>
059 }
060
061 val events = Map(
062 "2751" -> List(
063 """10.10.2.72 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000lAOX&udid=25371384b2eb1a5dc5643e14626ecbd4&sessionid=25371384b2eb1a5dc5643e14626ecbd41440152875362&imsi=460002830862833&operator=1&network=1&timestamp=1440152954&action=14&eventcode=300039&page=200002& HTTP/1.0" "-" 204 0 "-" "Dalvik/1.6.0 (Linux; U; Android 4.4.4; R8207 Build/KTU84P)" "121.25.190.146"""",
064 """10.10.2.8 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000VACO&udid=f6b0520cbc36fda6f63a72d91bf305c0&imsi=460012927613645&operator=2&network=1&timestamp=1440152956&action=1840&eventcode=100003&type=1&result=0& HTTP/1.0" "-" 204 0 "-" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; GT-I9500 Build/KOT49H)" "61.175.219.69""""
065 ),
066 "2752" -> List(
067 """10.10.2.72 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000gCo4&udid=636d127f4936109a22347b239a0ce73f&sessionid=636d127f4936109a22347b239a0ce73f1440150695096&imsi=460036010038180&operator=3&network=4&timestamp=1440152902&action=1566&eventcode=101010&playid=99d5a59f100cb778b64b5234a189e1f4&radioid=1100000048450&audioid=1000001535718&playtime=3& HTTP/1.0" "-" 204 0 "-" "Dalvik/1.6.0 (Linux; U; Android 4.4.4; R8205 Build/KTU84P)" "106.38.128.67"""",
068 """10.10.2.72 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000kPSC&udid=2ee585cde388ac57c0e81f9a76f5b797&operator=0&network=1&timestamp=1440152968&action=6423&eventcode=100003&type=1&result=0& HTTP/1.0" "-" 204 0 "-" "Dalvik/v3.3.85 (Linux; U; Android L; P8 Build/KOT49H)" "202.103.133.112"""",
069 """10.10.2.72 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000lABW&udid=face1161d739abacca913dcb82576e9d&sessionid=face1161d739abacca913dcb82576e9d1440151582673&operator=0&network=1&timestamp=1440152520&action=1911&eventcode=101010&playid=b07c241010f8691284c68186c42ab006&radioid=1100000000762&audioid=1000001751983&playtime=158& HTTP/1.0" "-" 204 0 "-" "Dalvik/1.6.0 (Linux; U; Android 4.1; H5 Build/JZO54K)" "221.232.36.250""""
070 ),
071 "2753" -> List(
072 """10.10.2.8 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000krJw&udid=939488333889f18e2b406d2ece8f938a&sessionid=939488333889f18e2b406d2ece8f938a1440137301421&imsi=460028180045362&operator=1&network=1&timestamp=1440152947&action=1431&eventcode=300030&playid=e1fd5467085475dc4483d2795f112717&radioid=1100000001123&audioid=1000000094911&playtime=951992& HTTP/1.0" "-" 204 0 "-" "Dalvik/1.6.0 (Linux; U; Android 4.0.4; R813T Build/IMM76D)" "5.45.64.205"""",
073 """10.10.2.72 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000kcpz&udid=cbc7bbb560914c374cb7a29eef8c2144&sessionid=cbc7bbb560914c374cb7a29eef8c21441440152816008&imsi=460008782944219&operator=1&network=1&timestamp=1440152873&action=360&eventcode=200003&page=200003&radioid=1100000046018& HTTP/1.0" "-" 204 0 "-" "Dalvik/v3.3.85 (Linux; U; Android 4.4.2; MX4S Build/KOT49H)" "119.128.106.232"""",
074 """10.10.2.8 [21/Aug/2015:18:29:19 +0800] "GET /t.gif?installid=0000juRL&udid=3f9a5ffa69a5cd5f0754d2ba98c0aeb2&imsi=460023744091238&operator=1&network=1&timestamp=1440152957&action=78&eventcode=100003&type=1&result=0& HTTP/1.0" "-" 204 0 "-" "Dalvik/v3.3.85 (Linux; U; Android 4.4.3; S?MSUNG. Build/KOT49H)" "223.153.72.78""""
075 )
076 )
077
078 val ports = Seq("2751","2752", "2753")
079 val actors = scala.collection.mutable.HashMap[String, ActorRef]()
080
081 ports.foreach { port =>
082 // 创建一个Config对象
083 val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
084 .withFallback(ConfigFactory.parseString("akka.cluster.roles = [collector]"))
085 .withFallback(ConfigFactory.load())
086 // 创建一个ActorSystem实例
087 val system = ActorSystem("event-cluster-system", config)
088 actors(port) = system.actorOf(Props[EventCollector], name = "collectingActor")
089 }
090
091 Thread.sleep(30000)
092
093 context.system.scheduler.schedule(0 millis, 5000 millis) {
094 // 使用Akka的Scheduler,模拟定时发送日志记录消息
095 ports.foreach { port =>
096 events(port).foreach { line =>
097 println("RAW: port=" + port + ", line=" + line)
098 actors(port) ! RawNginxRecord("host.me:" + port, line)
099 }
100 }
101 }
102 }
103
104 object EventClient extends App {
105
106 val system = ActorSystem("client")
107 // 创建EventClientActor实例
108 val clientActorRef = system.actorOf(Props[EventClientActor], name = "clientActor")
109 system.log.info("Client actor started: " + clientActorRef)
110 }

上面代码中,EventClientActor并不是属于我们创建的Akka集群event-cluster-system,它是一个位于集群外部的节点,它模拟向各个collector角色的节点发送消息。

  • interceptor实现

与编写collector类似,实现的interceptor的Actor实现类为EventInterceptor,代码如下所示:

01 package org.shirdrn.scala.akka.cluster
02
03 import akka.actor._
04 import akka.cluster.ClusterEvent._
05 import akka.cluster.Member
06 import akka.cluster.protobuf.msg.ClusterMessages.MemberStatus
07 import com.typesafe.config.ConfigFactory
08 import net.sf.json.JSONObject
09 import org.shirdrn.scala.akka.cluster.utils.DatetimeUtils
10
11 class EventInterceptor extends ClusterRoledWorker {
12
13 @volatile var interceptedRecords : Int = 0
14 val IP_PATTERN = "[^\\s]+\\s+\\[([^\\]]+)\\].+\"(\\d+\\.\\d+\\.\\d+\\.\\d+)\"".r
15 val blackIpList = Array(
16 "5.9.116.101", "103.42.176.138", "123.182.148.65", "5.45.64.205",
17 "27.159.226.192", "76.164.228.218", "77.79.178.186", "104.200.31.117",
18 "104.200.31.32", "104.200.31.238", "123.182.129.108", "220.161.98.39",
19 "59.58.152.90", "117.26.221.236", "59.58.150.110", "123.180.229.156",
20 "59.60.123.239", "117.26.222.6", "117.26.220.88", "59.60.124.227",
21 "142.54.161.50", "59.58.148.52", "59.58.150.85", "202.105.90.142"
22 ).toSet
23
24 log.info("Black IP count: " + blackIpList.size)
25 blackIpList.foreach(log.info(_))
26
27 def receive = {
28 case MemberUp(member) =>
29 log.info("Member is Up: {}", member.address)
30 register(member, getCollectorPath)
31 case state: CurrentClusterState =>
32 // 如果加入Akka集群的成员节点是Up状态,并且是collector角色,则调用register向collector进行注册
33 state.members.filter(_.status == MemberStatus.Up) foreach(register(_, getCollectorPath))
34 case UnreachableMember(member) =>
35 log.info("Member detected as Unreachable: {}", member)
36 case MemberRemoved(member, previousStatus) =>
37 log.info("Member is Removed: {} after {}", member.address, previousStatus)
38 case _: MemberEvent => // ignore
39
40 case Registration => {
41 context watch sender
42 workers = workers :+ sender
43 log.info("Processor registered: " + sender)
44 log.info("Registered processors: " + workers.size)
45 }
46 case Terminated(processingActorRef) =>
47 workers = workers.filterNot(_ == processingActorRef)
48 case NginxRecord(sourceHost, eventCode, line) => {
49 val (isIpInBlackList, data) = checkRecord(eventCode, line)
50 if(!isIpInBlackList) {
51 interceptedRecords += 1
52 if(workers.size > 0) {
53 val processorIndex = (if (interceptedRecords < 0) 0 else interceptedRecords)% workers.size
54 workers(processorIndex) ! FilteredRecord(sourceHost, eventCode, line, data.getString("eventdate"), data.getString("realip"))
55 log.info("Details: processorIndex=" + processorIndex + ", processors=" + workers.size)
56 }
57 log.info("Intercepted data: data=" + data)
58 } else {
59 log.info("Discarded: " + line)
60 }
61 }
62 }
63
64 def getCollectorPath(member: Member): ActorPath = {
65 RootActorPath(member.address) / "user" / "collectingActor"
66 }
67
68 /**
69 * 检查collector发送的消息所对应的IP是否在黑名单列表中
70 */
71 private def checkRecord(eventCode: String, line: String): (Boolean, JSONObject) = {
72 val data: JSONObject = new JSONObject()
73 var isIpInBlackList = false
74 IP_PATTERN.findFirstMatchIn(line).foreach { m =>
75 val rawDt = m.group(1)
76 val dt = DatetimeUtils.format(rawDt)
77 val realIp = m.group(2)
78
79 data.put("eventdate", dt)
80 data.put("realip", realIp)
81 data.put("eventcode", eventCode)
82 isIpInBlackList = blackIpList.contains(realIp)
83 }
84 (isIpInBlackList, data)
85 }
86 }
87
88 object EventInterceptor extends App {
89
90 Seq("2851","2852").foreach { port =>
91 val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
92 .withFallback(ConfigFactory.parseString("akka.cluster.roles = [interceptor]"))
93 .withFallback(ConfigFactory.load())
94 val system = ActorSystem("event-cluster-system", config)
95 val processingActor = system.actorOf(Props[EventInterceptor], name ="interceptingActor")
96 system.log.info("Processing Actor: " + processingActor)
97 }
98 }

上述代码中,解析出Nginx日志记录中的IP地址,查看其是否在IP黑名单列表中,如果在内名单中则直接丢掉该记录数据。

  • processor实现

EventProcessor的实现代码,如下所示:

01 package org.shirdrn.scala.akka.cluster
02
03 import java.util.Properties
04
05 import akka.actor._
06 import akka.cluster.ClusterEvent._
07 import akka.cluster.Member
08 import akka.cluster.protobuf.msg.ClusterMessages.MemberStatus
09 import com.typesafe.config.ConfigFactory
10 import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
11 import net.sf.json.JSONObject
12
13 class EventProcessor extends ClusterRoledWorker {
14
15 val topic = "app_events"
16 val producer = KakfaUtils.createProcuder
17
18 def receive = {
19 case MemberUp(member) =>
20 log.info("Member is Up: {}", member.address)
21 // 将processor注册到上游的collector中
22 register(member, getProcessorPath)
23 case state: CurrentClusterState =>
24 state.members.filter(_.status == MemberStatus.Up).foreach(register(_, getProcessorPath))
25 case UnreachableMember(member) =>
26 log.info("Member detected as Unreachable: {}", member)
27 case MemberRemoved(member, previousStatus) =>
28 log.info("Member is Removed: {} after {}", member.address, previousStatus)
29 case _: MemberEvent => // ignore
30
31 case FilteredRecord(sourceHost, eventCode, line, nginxDate, realIp) => {
32 val data = process(eventCode, line, nginxDate, realIp)
33 log.info("Processed: data=" + data)
34 // 将解析后的消息一JSON字符串的格式,保存到Kafka中
35 producer.send(new KeyedMessage[String, String](topic, sourceHost, data.toString))
36 }
37 }
38
39 def getProcessorPath(member: Member): ActorPath = {
40 RootActorPath(member.address) / "user" / "interceptingActor"
41 }
42
43 private def process(eventCode: String, line: String, eventDate: String, realIp:String): JSONObject = {
44 val data: JSONObject = new JSONObject()
45 "[\\?|&]{1}([^=]+)=([^&]+)&".r.findAllMatchIn(line) foreach { m =>
46 val key = m.group(1)
47 val value = m.group(2)
48 data.put(key, value)
49 }
50 data.put("eventdate", eventDate)
51 data.put("realip", realIp)
52 data
53 }
54 }
55
56 object KakfaUtils {
57 // bin/kafka-topics.sh --create -zookeeper zk1:2181,zk2:2181,zk3:2181/data-dept/kafka --replication-factor 2 --partitions 2 --topic app_events
58 val props = new Properties()
59 val config = Map(
60 "metadata.broker.list" -> "hadoop2:9092,hadoop3:9092",
61 "serializer.class" -> "kafka.serializer.StringEncoder",
62 "producer.type" -> "async"
63 )
64 config.foreach(entry => props.put(entry._1, entry._2))
65 val producerConfig = new ProducerConfig(props)
66
67 def createProcuder() : Producer[String, String] = {
68 new Producer[String, String](producerConfig)
69 }
70 }
71
72 object EventProcessor extends App {
73
74 // 启动了5个EventProcessor
75 Seq("2951","2952", "2953", "2954", "2955") foreach { port =>
76 val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)
77 .withFallback(ConfigFactory.parseString("akka.cluster.roles = [processor]"))
78 .withFallback(ConfigFactory.load())
79 val system = ActorSystem("event-cluster-system", config)
80 val processingActor = system.actorOf(Props[EventProcessor], name ="processingActor")
81 system.log.info("Processing Actor: " + processingActor)
82 }
83 }

角色为processor的Actor的实现类为EventProcessor,我们在其伴生对象中创建了5个实例,分别对应不同的端口。解析的Nginx日志记录最后保存到Kafka,示例如下所示:

查看源代码打印帮助

1 {"installid":"0000VACO","imsi":"460012927613645","network":"1","action":"1840","type":"1","eventdate":"2015-08-21 18:29:19","realip":"61.175.219.69"}
2 {"installid":"0000kcpz","sessionid":"cbc7bbb560914c374cb7a29eef8c21441440152816008","operator":"1","timestamp":"1440152873","eventcode":"200003","radioid":"1100000046018","eventdate":"2015-08-21 18:29:19","realip":"119.128.106.232"}
3 {"installid":"0000lAOX","sessionid":"25371384b2eb1a5dc5643e14626ecbd41440152875362","operator":"1","timestamp":"1440152954","eventcode":"300039","eventdate":"2015-08-21 18:29:19","realip":"121.25.190.146"}
目录
相关文章
|
Kubernetes 流计算 容器
FLINK ON K8S 基于Zookeeper和基于K8S原生HA的区别
FLINK ON K8S 基于Zookeeper和基于K8S原生HA的区别
360 1
|
Apache 容器
Apache ZooKeeper - 集群中 Observer 的作用以及 与 Follow 的区别
Apache ZooKeeper - 集群中 Observer 的作用以及 与 Follow 的区别
582 0
|
存储 设计模式 监控
Apache ZooKeeper - Watch 机制的底层原理
Apache ZooKeeper - Watch 机制的底层原理
161 0
spark2.1.0之源码分析——RPC服务器TransportServer
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/81062342 提示:阅读本文前最好先阅读: 《Spark2.
1536 0
|
分布式计算 Spark
spark2.1.0之源码分析——RPC配置TransportConf
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80888076       在《Spark2.1.0之内置RPC框架》提到TransportContext中的TransportConf给Spark的RPC框架提供配置信息,它有两个成员属性——配置提供者conf和配置的模块名称module。
1547 0

热门文章

最新文章