修改了historical服务进程端口为18083,druid.processing.buffer.sizeBytes降至250M,druid.segmentCache.locations中maxSize降至20g,这些参数可以根据你的计算资源量进行放大调整。
接着配置middleManager:
vi /opt/druid/conf/druid/cluster/data/middleManager/jvm.confi
如下参考配置:
-server -Xms128m -Xmx128m -XX:+ExitOnOutOfMemoryError -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=var/tmp -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
默认配置。
vi /opt/druid/conf/druid/cluster/data/middleManager/runtime.properties
如下参考配置:
druid.service=druid/middleManager druid.plaintextPort=18091 # Number of tasks per middleManager druid.worker.capacity=4 # Task launch parameters druid.indexer.runner.javaOpts=-server -Xms1g -Xmx1g -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager druid.indexer.task.baseTaskDir=var/druid/task # HTTP server threads druid.server.http.numThreads=60 # Processing threads and buffers on Peons druid.indexer.fork.property.druid.processing.numMergeBuffers=2 druid.indexer.fork.property.druid.processing.buffer.sizeBytes=100MiB druid.indexer.fork.property.druid.processing.numThreads=1 # Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
修改了middleManager服务进程端口为18091。其他为默认配置。
4)查询节点配置
我们登陆datanode-3节点,进行查询节点配置,将datanode-3打造成查询节点,主要运行broker服务和route服务,前者主要负责查询请求向实时查询服务与历史查询服务的分发,对查询结果合并;后者主要是接收客户端的HTTP请求提交,数据摄取任务路由到overload服务,查询服务路由到broker服务,另外提供了一个图形化的web管理端。
我们先开始配置broker:
vi /opt/druid/conf/druid/cluster/query/broker/jvm.config
如下参考配置:
-server -Xms512m -Xmx2g -XX:MaxDirectMemorySize=3g -XX:+ExitOnOutOfMemoryError -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=var/tmp -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
XMs最小堆内存设置为512m,XMx最大堆内存设置为2GB,最大非堆内存设置为3GB
vi /opt/druid/conf/druid/cluster/query/broker/runtime.properties
如下参考配置:
druid.service=druid/broker druid.plaintextPort=18082 # HTTP server settings druid.server.http.numThreads=60 # HTTP client settings druid.broker.http.numConnections=50 druid.broker.http.maxQueuedBytes=10MiB # Processing threads and buffers druid.processing.buffer.sizeBytes=250MiB druid.processing.numMergeBuffers=6 druid.processing.numThreads=1 druid.processing.tmpDir=var/druid/processing # Query cache disabled -- push down caching and merging instead druid.broker.cache.useCache=false druid.broker.cache.populateCache=false
修改了broker端口,改为18082,druid.processing.buffer.sizeBytes项将为250M
最后我们配置route:
vi /opt/druid/conf/druid/cluster/query/router/jvm.config
如下参考配置:
-server -Xms256m -Xmx512m -XX:+UseG1GC -XX:MaxDirectMemorySize=128m -XX:+ExitOnOutOfMemoryError -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=var/tmp -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
XMs最小堆内存设置为256m,XMx最大堆内存设置为512m
vi /opt/druid/conf/druid/cluster/query/router/runtime.properties
如下参考配置:
druid.service=druid/router druid.plaintextPort=18888 # HTTP proxy druid.router.http.numConnections=50 druid.router.http.readTimeout=PT5M druid.router.http.numMaxThreads=100 druid.server.http.numThreads=100 # Service discovery druid.router.defaultBrokerServiceName=druid/broker druid.router.coordinatorServiceName=druid/coordinator # Management proxy to coordinator / overlord: required for unified web console. druid.router.managementProxy.enabled=true
修改了route端口,改为18888,其他默认。
启动Druid集群
启动Druid集群之前,先要保证PostgreSQL、Hadoop已经正常启动,Druid节点可以正常访问。
在datanode-1节点:
/opt/druid/bin/start-cluster-master-no-zk-server
在datanode-2节点:
/opt/druid/bin/start-cluster-data-server
在datanode-3节点:
/opt/druid/bin/start-cluster-query-server
若不输出错误日志,基本上可以断定启动成功,具体可以在/opt/druid/var/sv下面查看各个服务的日志情况。
注意:这是前台启动,不要关闭启动终端,否则服务终止。
我们用Chrome打开route服务提供的Web管理端进行查看:
http://datanode-3:18888/
我们可以从菜单栏的Load data->Start a new spec进入到Druid支持的各种数据源,我们可以看到Kafka、HDFS是常用的摄取源。
在我的HDFS路径/example/simple1/sort/input/Rate_1000.csv是一个CSV文件,我们可以看到指向该文件后,Druid就会对其进行解析。
我们还可以看到在解析时间的阶段Druid会自动发现importDate列适合作为时序的时间戳_time。
后面还会涉及一些字段过滤、字段转换、指标选择、rollup、分区等,这里就不在赘述,有时间专门写一个操作说明,我们直接看看提交前的json:
{ "type": "index_parallel", "spec": { "ioConfig": { "type": "index_parallel", "inputSource": { "type": "hdfs", "paths": "/example/simple1/sort/input/Rate_1000.csv" }, "inputFormat": { "type": "csv", "findColumnsFromHeader": true } }, "tuningConfig": { "type": "index_parallel", "partitionsSpec": { "type": "hashed" }, "forceGuaranteedRollup": true }, "dataSchema": { "timestampSpec": { "column": "ImportDate", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "StateCode", "SourceName", "FederalTIN", "RateEffectiveDate", "RateExpirationDate", "PlanId", "RatingAreaId", "Tobacco", "Age", { "type": "string", "name": "IssuerId" }, { "type": "string", "name": "BusinessYear" } ] }, "granularitySpec": { "queryGranularity": "hour", "rollup": true, "segmentGranularity": "month" }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "sum_IndividualRate", "type": "doubleSum", "fieldName": "IndividualRate" }, { "name": "sum_Couple", "type": "doubleSum", "fieldName": "Couple" }, { "name": "sum_PrimarySubscriberAndOneDependent", "type": "doubleSum", "fieldName": "PrimarySubscriberAndOneDependent" }, { "name": "sum_PrimarySubscriberAndTwoDependents", "type": "doubleSum", "fieldName": "PrimarySubscriberAndTwoDependents" }, { "name": "sum_PrimarySubscriberAndThreeOrMoreDependents", "type": "doubleSum", "fieldName": "PrimarySubscriberAndThreeOrMoreDependents" }, { "name": "sum_CoupleAndOneDependent", "type": "doubleSum", "fieldName": "CoupleAndOneDependent" }, { "name": "sum_CoupleAndTwoDependents", "type": "doubleSum", "fieldName": "CoupleAndTwoDependents" }, { "name": "sum_CoupleAndThreeOrMoreDependents", "type": "doubleSum", "fieldName": "CoupleAndThreeOrMoreDependents" } ], "dataSource": "rate1000" } } }
Apache Druid不同于其他数据库的编程式写入模式:数据库提供编程接口,用户考虑数据转换的计算逻辑,最终通过程序调用接口实现写入。
A pache Druid类似于声明式摄取模式,也就是说,制作好一个任务的声明规格,那么Druid摄取服务会根据规格说明进行数据源的数据采集、过滤、转换、索引等一系列操作的计算执行逻辑。