一、引言
flume-ng是一个分布式、高可靠和高效的日志收集系统,flume-ng是flume的新版本的意思,其中“ng”意为new generate(新一代),目前来说,flume-ng 1.4是最新的版本。flume-ng与flume相比,发生了很大的变化,因为之前一直在flume0.9的版本,一直没有升级到flume-ng,最近因为项目需要,做了一次升级,发现了一些问题,特记录下来,分享给大家。
二、版本说明
三、安装步骤
下载、解压、安装JDK、设置环境变量部分已经有很多介绍性的问题,不做说明。需要特别说明之处的是,flume-ng不需要要zookeeper,无需设置。
四、flume-ng bug
安装完成后运行flume-ng会出现错误信息,这主要是因为shell脚本的问题,我将修改后的flume-ng完整的上传如下,其中标注:#zhangzl下面的行是需要修改的部分。完整脚本如下所示:
#!/bin/bash # # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # ################################ # constants ################################ FLUME_AGENT_CLASS="org.apache.flume.node.Application" FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient" FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo" FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain" CLEAN_FLAG=1 ################################ # functions ################################ info() { if [ ${CLEAN_FLAG} -ne 0 ]; then local msg=$1 echo "Info: $msg" >&2 fi } warn() { if [ ${CLEAN_FLAG} -ne 0 ]; then local msg=$1 echo "Warning: $msg" >&2 fi } error() { local msg=$1 local exit_code=$2 echo "Error: $msg" >&2 if [ -n "$exit_code" ] ; then exit $exit_code fi } # If avail, add Hadoop paths to the FLUME_CLASSPATH and to the # FLUME_JAVA_LIBRARY_PATH env vars. # Requires Flume jars to already be on FLUME_CLASSPATH. add_hadoop_paths() { local HADOOP_IN_PATH=$(PATH="${HADOOP_HOME:-${HADOOP_PREFIX}}/bin:$PATH" \ which hadoop 2>/dev/null) if [ -f "${HADOOP_IN_PATH}" ]; then info "Including Hadoop libraries found via ($HADOOP_IN_PATH) for HDFS access" # determine hadoop java.library.path and use that for flume local HADOOP_CLASSPATH="" local HADOOP_JAVA_LIBRARY_PATH=$(HADOOP_CLASSPATH="$FLUME_CLASSPATH" \ ${HADOOP_IN_PATH} org.apache.flume.tools.GetJavaProperty \ java.library.path) # look for the line that has the desired property value # (considering extraneous output from some GC options that write to stdout) # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter) IFS=$'\n' for line in $HADOOP_JAVA_LIBRARY_PATH; do #if [[ $line =~ ^java\.library\.path=(.*)$ ]]; then if [[ "$line" =~ "^java\.library\.path=(.*)$" ]]; then HADOOP_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]} break fi done unset IFS if [ -n "${HADOOP_JAVA_LIBRARY_PATH}" ]; then FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HADOOP_JAVA_LIBRARY_PATH" fi # determine hadoop classpath HADOOP_CLASSPATH=$($HADOOP_IN_PATH classpath) # hack up and filter hadoop classpath local ELEMENTS=$(sed -e 's/:/ /g' <<<${HADOOP_CLASSPATH}) local ELEMENT for ELEMENT in $ELEMENTS; do local PIECE for PIECE in $(echo $ELEMENT); do #zhangzl if [[ $PIECE =~ "slf4j-(api|log4j12).*\.jar" ]]; then info "Excluding $PIECE from classpath" continue else FLUME_CLASSPATH="$FLUME_CLASSPATH:$PIECE" fi done done fi } add_HBASE_paths() { local HBASE_IN_PATH=$(PATH="${HBASE_HOME}/bin:$PATH" \ which hbase 2>/dev/null) if [ -f "${HBASE_IN_PATH}" ]; then info "Including HBASE libraries found via ($HBASE_IN_PATH) for HBASE access" # determine HBASE java.library.path and use that for flume local HBASE_CLASSPATH="" local HBASE_JAVA_LIBRARY_PATH=$(HBASE_CLASSPATH="$FLUME_CLASSPATH" \ ${HBASE_IN_PATH} org.apache.flume.tools.GetJavaProperty \ java.library.path) # look for the line that has the desired property value # (considering extraneous output from some GC options that write to stdout) # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter) IFS=$'\n' for line in $HBASE_JAVA_LIBRARY_PATH; do #zhangzl if [[ $line =~ "^java\.library\.path=(.*)$" ]]; then HBASE_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]} break fi done unset IFS if [ -n "${HBASE_JAVA_LIBRARY_PATH}" ]; then FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HBASE_JAVA_LIBRARY_PATH" fi # determine HBASE classpath HBASE_CLASSPATH=$($HBASE_IN_PATH classpath) # hack up and filter HBASE classpath local ELEMENTS=$(sed -e 's/:/ /g' <<<${HBASE_CLASSPATH}) local ELEMENT for ELEMENT in $ELEMENTS; do local PIECE for PIECE in $(echo $ELEMENT); do #zhangzl if [[ $PIECE =~ "slf4j-(api|log4j12).*\.jar" ]]; then info "Excluding $PIECE from classpath" continue else FLUME_CLASSPATH="$FLUME_CLASSPATH:$PIECE" fi done done FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_HOME/conf" fi } set_LD_LIBRARY_PATH(){ #Append the FLUME_JAVA_LIBRARY_PATH to whatever the user may have specified in #flume-env.sh if [ -n "${FLUME_JAVA_LIBRARY_PATH}" ]; then export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FLUME_JAVA_LIBRARY_PATH}" fi } display_help() { cat <<EOF Usage: $0 <command> [options]... commands: help display this help text agent run a Flume agent avro-client run an avro Flume client version show Flume version info global options: --conf,-c <conf> use configs in <conf> directory --classpath,-C <cp> append to the classpath --dryrun,-d do not actually start Flume, just print the command --plugins-path <dirs> colon-separated list of plugins.d directories. See the plugins.d section in the user guide for more details. Default: \$FLUME_HOME/plugins.d -Dproperty=value sets a Java system property value -Xproperty=value sets a Java -X option agent options: --conf-file,-f <file> specify a config file (required) --name,-n <name> the name of this agent (required) --help,-h display help text avro-client options: --rpcProps,-P <file> RPC client properties file with server connection params --host,-H <host> hostname to which events will be sent --port,-p <port> port of the avro source --dirname <dir> directory to stream to avro source --filename,-F <file> text file to stream to avro source (default: std input) --headerFile,-R <file> File containing event headers as key/value pairs on each new line --help,-h display help text Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath. EOF } run_flume() { local FLUME_APPLICATION_CLASS if [ "$#" -gt 0 ]; then FLUME_APPLICATION_CLASS=$1 shift else error "Must specify flume application class" 1 fi if [ ${CLEAN_FLAG} -ne 0 ]; then set -x fi $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp "$FLUME_CLASSPATH" \ -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $* } ################################ # main ################################ # set default params FLUME_CLASSPATH="" FLUME_JAVA_LIBRARY_PATH="" JAVA_OPTS="-Xmx20m" LD_LIBRARY_PATH="" opt_conf="" opt_classpath="" opt_plugins_dirs="" opt_java_props="" opt_dryrun="" mode=$1 shift case "$mode" in help) display_help exit 0 ;; agent) opt_agent=1 ;; node) opt_agent=1 warn "The \"node\" command is deprecated. Please use \"agent\" instead." ;; avro-client) opt_avro_client=1 ;; tool) opt_tool=1 ;; version) opt_version=1 CLEAN_FLAG=0 ;; *) error "Unknown or unspecified command '$mode'" echo display_help exit 1 ;; esac args="" while [ -n "$*" ] ; do arg=$1 shift case "$arg" in --conf|-c) [ -n "$1" ] || error "Option --conf requires an argument" 1 opt_conf=$1 shift ;; --classpath|-C) [ -n "$1" ] || error "Option --classpath requires an argument" 1 opt_classpath=$1 shift ;; --dryrun|-d) opt_dryrun="1" ;; --plugins-path) opt_plugins_dirs=$1 shift ;; -D*) opt_java_props="$opt_java_props $arg" ;; -X*) opt_java_props="$opt_java_props $arg" ;; *) args="$args $arg" ;; esac done # make opt_conf absolute if [[ -n "$opt_conf" && -d "$opt_conf" ]]; then opt_conf=$(cd $opt_conf; pwd) fi # allow users to override the default env vars via conf/flume-env.sh if [ -z "$opt_conf" ]; then warn "No configuration directory set! Use --conf <dir> to override." elif [ -f "$opt_conf/flume-env.sh" ]; then info "Sourcing environment configuration script $opt_conf/flume-env.sh" source "$opt_conf/flume-env.sh" fi # append command-line java options to stock or env script JAVA_OPTS if [ -n "${opt_java_props}" ]; then JAVA_OPTS="${JAVA_OPTS} ${opt_java_props}" fi # prepend command-line classpath to env script classpath if [ -n "${opt_classpath}" ]; then if [ -n "${FLUME_CLASSPATH}" ]; then FLUME_CLASSPATH="${opt_classpath}:${FLUME_CLASSPATH}" else FLUME_CLASSPATH="${opt_classpath}" fi fi if [ -z "${FLUME_HOME}" ]; then FLUME_HOME=$(cd $(dirname $0)/..; pwd) fi # prepend $FLUME_HOME/lib jars to the specified classpath (if any) if [ -n "${FLUME_CLASSPATH}" ] ; then FLUME_CLASSPATH="${FLUME_HOME}/lib/*:$FLUME_CLASSPATH" else FLUME_CLASSPATH="${FLUME_HOME}/lib/*" fi # load plugins.d directories PLUGINS_DIRS="" if [ -n "${opt_plugins_dirs}" ]; then PLUGINS_DIRS=$(sed -e 's/:/ /g' <<<${opt_plugins_dirs}) else PLUGINS_DIRS="${FLUME_HOME}/plugins.d" fi unset plugin_lib plugin_libext plugin_native for PLUGINS_DIR in $PLUGINS_DIRS; do if [[ -d ${PLUGINS_DIR} ]]; then for plugin in ${PLUGINS_DIR}/*; do if [[ -d "$plugin/lib" ]]; then plugin_lib="${plugin_lib}${plugin_lib+:}${plugin}/lib/*" fi if [[ -d "$plugin/libext" ]]; then plugin_libext="${plugin_libext}${plugin_libext+:}${plugin}/libext/*" fi if [[ -d "$plugin/native" ]]; then plugin_native="${plugin_native}${plugin_native+:}${plugin}/native" fi done fi done if [[ -n "${plugin_lib}" ]] then FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_lib}" fi if [[ -n "${plugin_libext}" ]] then FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_libext}" fi if [[ -n "${plugin_native}" ]] then if [[ -n "${FLUME_JAVA_LIBRARY_PATH}" ]] then FLUME_JAVA_LIBRARY_PATH="${FLUME_JAVA_LIBRARY_PATH}:${plugin_native}" else FLUME_JAVA_LIBRARY_PATH="${plugin_native}" fi fi # find java if [ -z "${JAVA_HOME}" ] ; then warn "JAVA_HOME is not set!" # Try to use Bigtop to autodetect JAVA_HOME if it's available if [ -e /usr/libexec/bigtop-detect-javahome ] ; then . /usr/libexec/bigtop-detect-javahome elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ] ; then . /usr/lib/bigtop-utils/bigtop-detect-javahome fi # Using java from path if bigtop is not installed or couldn't find it if [ -z "${JAVA_HOME}" ] ; then JAVA_DEFAULT=$(type -p java) [ -n "$JAVA_DEFAULT" ] || error "Unable to find java executable. Is it in your PATH?" 1 JAVA_HOME=$(cd $(dirname $JAVA_DEFAULT)/..; pwd) fi fi # look for hadoop libs add_hadoop_paths add_HBASE_paths # prepend conf dir to classpath if [ -n "$opt_conf" ]; then FLUME_CLASSPATH="$opt_conf:$FLUME_CLASSPATH" fi set_LD_LIBRARY_PATH # allow dryrun EXEC="exec" if [ -n "${opt_dryrun}" ]; then warn "Dryrun mode enabled (will not actually initiate startup)" EXEC="echo" fi # finally, invoke the appropriate command if [ -n "$opt_agent" ] ; then run_flume $FLUME_AGENT_CLASS $args elif [ -n "$opt_avro_client" ] ; then run_flume $FLUME_AVRO_CLIENT_CLASS $args elif [ -n "${opt_version}" ] ; then run_flume $FLUME_VERSION_CLASS $args elif [ -n "${opt_tool}" ] ; then run_flume $FLUME_TOOLS_CLASS $args else error "This message should never appear" 1 fi exit 0
AI 代码解读
五、测试配置文件
在conf目录下创建example-conf.properties文件,属性如下所示:
# Describe the source a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink # 将数据输出至日志中 a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
AI 代码解读
六、运行命令
6.1 启动代理
[hadoop@hadoop1 conf]$ flume-ng agent -n a1 -f example-conf.properties
AI 代码解读
6.2 启动avro-client客户端向agent代理发送数据-需要单独启动新的窗口
[hadoop@hadoop1 conf]$ flume-ng avro-client -H localhost -p 44444 -F file01
AI 代码解读
七、结果查看
14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] OPEN 14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] BOUND: /127.0.0.1:44444 14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] CONNECTED: /127.0.0.1:54289 14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] DISCONNECTED 14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] UNBOUND 14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] CLOSED 14/01/16 22:26:36 INFO ipc.NettyServer: Connection to /127.0.0.1:54289 disconnected. 14/01/16 22:26:38 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
AI 代码解读