大数据工具篇之flume1.4-安装部署指南

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介:

一、引言

  flume-ng是一个分布式、高可靠和高效的日志收集系统,flume-ng是flume的新版本的意思,其中“ng”意为new generate(新一代),目前来说,flume-ng 1.4是最新的版本。flume-ng与flume相比,发生了很大的变化,因为之前一直在flume0.9的版本,一直没有升级到flume-ng,最近因为项目需要,做了一次升级,发现了一些问题,特记录下来,分享给大家。

二、版本说明

  flume-ng 1.4.0

三、安装步骤

  下载、解压、安装JDK、设置环境变量部分已经有很多介绍性的问题,不做说明。需要特别说明之处的是,flume-ng不需要要zookeeper,无需设置。

四、flume-ng bug  

  安装完成后运行flume-ng会出现错误信息,这主要是因为shell脚本的问题,我将修改后的flume-ng完整的上传如下,其中标注:#zhangzl下面的行是需要修改的部分。完整脚本如下所示:  


#!/bin/bash
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

################################
# constants
################################

FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

CLEAN_FLAG=1
################################
# functions
################################

info() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Info: $msg" >&2
  fi
}

warn() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Warning: $msg" >&2
  fi
}

error() {
  local msg=$1
  local exit_code=$2

  echo "Error: $msg" >&2

  if [ -n "$exit_code" ] ; then
    exit $exit_code
  fi
}

# If avail, add Hadoop paths to the FLUME_CLASSPATH and to the
# FLUME_JAVA_LIBRARY_PATH env vars.
# Requires Flume jars to already be on FLUME_CLASSPATH.
add_hadoop_paths() {
  local HADOOP_IN_PATH=$(PATH="${HADOOP_HOME:-${HADOOP_PREFIX}}/bin:$PATH" \
      which hadoop 2>/dev/null)

  if [ -f "${HADOOP_IN_PATH}" ]; then
    info "Including Hadoop libraries found via ($HADOOP_IN_PATH) for HDFS access"

    # determine hadoop java.library.path and use that for flume
    local HADOOP_CLASSPATH=""
    local HADOOP_JAVA_LIBRARY_PATH=$(HADOOP_CLASSPATH="$FLUME_CLASSPATH" \
        ${HADOOP_IN_PATH} org.apache.flume.tools.GetJavaProperty \
        java.library.path)

    # look for the line that has the desired property value
    # (considering extraneous output from some GC options that write to stdout)
    # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
    IFS=$'\n'
    for line in $HADOOP_JAVA_LIBRARY_PATH; do
      #if [[ $line =~ ^java\.library\.path=(.*)$ ]]; then
      if [[ "$line" =~ "^java\.library\.path=(.*)$" ]]; then
        HADOOP_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
        break
      fi
    done
    unset IFS

    if [ -n "${HADOOP_JAVA_LIBRARY_PATH}" ]; then
      FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HADOOP_JAVA_LIBRARY_PATH"
    fi

    # determine hadoop classpath
    HADOOP_CLASSPATH=$($HADOOP_IN_PATH classpath)

    # hack up and filter hadoop classpath
    local ELEMENTS=$(sed -e 's/:/ /g' <<<${HADOOP_CLASSPATH})
    local ELEMENT
    for ELEMENT in $ELEMENTS; do
      local PIECE
      for PIECE in $(echo $ELEMENT); do
          #zhangzl
        if [[ $PIECE =~ "slf4j-(api|log4j12).*\.jar" ]]; then
          info "Excluding $PIECE from classpath"
          continue
        else
          FLUME_CLASSPATH="$FLUME_CLASSPATH:$PIECE"
        fi
      done
    done

  fi
}
add_HBASE_paths() {
  local HBASE_IN_PATH=$(PATH="${HBASE_HOME}/bin:$PATH" \
      which hbase 2>/dev/null)

  if [ -f "${HBASE_IN_PATH}" ]; then
    info "Including HBASE libraries found via ($HBASE_IN_PATH) for HBASE access"

    # determine HBASE java.library.path and use that for flume
    local HBASE_CLASSPATH=""
    local HBASE_JAVA_LIBRARY_PATH=$(HBASE_CLASSPATH="$FLUME_CLASSPATH" \
        ${HBASE_IN_PATH} org.apache.flume.tools.GetJavaProperty \
        java.library.path)

    # look for the line that has the desired property value
    # (considering extraneous output from some GC options that write to stdout)
    # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
    IFS=$'\n'
    for line in $HBASE_JAVA_LIBRARY_PATH; do
    #zhangzl
      if [[ $line =~ "^java\.library\.path=(.*)$" ]]; then
        HBASE_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
        break
      fi
    done
    unset IFS

    if [ -n "${HBASE_JAVA_LIBRARY_PATH}" ]; then
      FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HBASE_JAVA_LIBRARY_PATH"
    fi

    # determine HBASE classpath
    HBASE_CLASSPATH=$($HBASE_IN_PATH classpath)

    # hack up and filter HBASE classpath
    local ELEMENTS=$(sed -e 's/:/ /g' <<<${HBASE_CLASSPATH})
    local ELEMENT
    for ELEMENT in $ELEMENTS; do
      local PIECE
      for PIECE in $(echo $ELEMENT); do
      #zhangzl
        if [[ $PIECE =~ "slf4j-(api|log4j12).*\.jar" ]]; then
          info "Excluding $PIECE from classpath"
          continue
        else
          FLUME_CLASSPATH="$FLUME_CLASSPATH:$PIECE"
        fi
      done
    done
    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_HOME/conf"

  fi
}

set_LD_LIBRARY_PATH(){
#Append the FLUME_JAVA_LIBRARY_PATH to whatever the user may have specified in
#flume-env.sh
  if [ -n "${FLUME_JAVA_LIBRARY_PATH}" ]; then
    export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FLUME_JAVA_LIBRARY_PATH}"
  fi
}

display_help() {
  cat <<EOF
Usage: $0 <command> [options]...

commands:
  help                  display this help text
  agent                 run a Flume agent
  avro-client           run an avro Flume client
  version               show Flume version info

global options:
  --conf,-c <conf>      use configs in <conf> directory
  --classpath,-C <cp>   append to the classpath
  --dryrun,-d           do not actually start Flume, just print the command
  --plugins-path <dirs> colon-separated list of plugins.d directories. See the
                        plugins.d section in the user guide for more details.
                        Default: \$FLUME_HOME/plugins.d
  -Dproperty=value      sets a Java system property value
  -Xproperty=value      sets a Java -X option

agent options:
  --conf-file,-f <file> specify a config file (required)
  --name,-n <name>      the name of this agent (required)
  --help,-h             display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.

EOF
}

run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}

################################
# main
################################

# set default params
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
opt_java_props=""
opt_dryrun=""

mode=$1
shift

case "$mode" in
  help)
    display_help
    exit 0
    ;;
  agent)
    opt_agent=1
    ;;
  node)
    opt_agent=1
    warn "The \"node\" command is deprecated. Please use \"agent\" instead."
    ;;
  avro-client)
    opt_avro_client=1
    ;;
  tool)
    opt_tool=1
    ;;
  version)
   opt_version=1
   CLEAN_FLAG=0
   ;;
  *)
    error "Unknown or unspecified command '$mode'"
    echo
    display_help
    exit 1
    ;;
esac

args=""
while [ -n "$*" ] ; do
  arg=$1
  shift

  case "$arg" in
    --conf|-c)
      [ -n "$1" ] || error "Option --conf requires an argument" 1
      opt_conf=$1
      shift
      ;;
    --classpath|-C)
      [ -n "$1" ] || error "Option --classpath requires an argument" 1
      opt_classpath=$1
      shift
      ;;
    --dryrun|-d)
      opt_dryrun="1"
      ;;
    --plugins-path)
      opt_plugins_dirs=$1
      shift
      ;;
    -D*)
      opt_java_props="$opt_java_props $arg"
      ;;
    -X*)
      opt_java_props="$opt_java_props $arg"
      ;;
    *)
      args="$args $arg"
      ;;
  esac
done

# make opt_conf absolute
if [[ -n "$opt_conf" && -d "$opt_conf" ]]; then
  opt_conf=$(cd $opt_conf; pwd)
fi

# allow users to override the default env vars via conf/flume-env.sh
if [ -z "$opt_conf" ]; then
  warn "No configuration directory set! Use --conf <dir> to override."
elif [ -f "$opt_conf/flume-env.sh" ]; then
  info "Sourcing environment configuration script $opt_conf/flume-env.sh"
  source "$opt_conf/flume-env.sh"
fi

# append command-line java options to stock or env script JAVA_OPTS
if [ -n "${opt_java_props}" ]; then
  JAVA_OPTS="${JAVA_OPTS} ${opt_java_props}"
fi

# prepend command-line classpath to env script classpath
if [ -n "${opt_classpath}" ]; then
  if [ -n "${FLUME_CLASSPATH}" ]; then
    FLUME_CLASSPATH="${opt_classpath}:${FLUME_CLASSPATH}"
  else
    FLUME_CLASSPATH="${opt_classpath}"
  fi
fi

if [ -z "${FLUME_HOME}" ]; then
  FLUME_HOME=$(cd $(dirname $0)/..; pwd)
fi

# prepend $FLUME_HOME/lib jars to the specified classpath (if any)
if [ -n "${FLUME_CLASSPATH}" ] ; then
  FLUME_CLASSPATH="${FLUME_HOME}/lib/*:$FLUME_CLASSPATH"
else
  FLUME_CLASSPATH="${FLUME_HOME}/lib/*"
fi

# load plugins.d directories
PLUGINS_DIRS=""
if [ -n "${opt_plugins_dirs}" ]; then
  PLUGINS_DIRS=$(sed -e 's/:/ /g' <<<${opt_plugins_dirs})
else
  PLUGINS_DIRS="${FLUME_HOME}/plugins.d"
fi

unset plugin_lib plugin_libext plugin_native
for PLUGINS_DIR in $PLUGINS_DIRS; do
  if [[ -d ${PLUGINS_DIR} ]]; then
    for plugin in ${PLUGINS_DIR}/*; do
      if [[ -d "$plugin/lib" ]]; then
        plugin_lib="${plugin_lib}${plugin_lib+:}${plugin}/lib/*"
      fi
      if [[ -d "$plugin/libext" ]]; then
        plugin_libext="${plugin_libext}${plugin_libext+:}${plugin}/libext/*"
      fi
      if [[ -d "$plugin/native" ]]; then
        plugin_native="${plugin_native}${plugin_native+:}${plugin}/native"
      fi
    done
  fi
done

if [[ -n "${plugin_lib}" ]]
then
  FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_lib}"
fi

if [[ -n "${plugin_libext}" ]]
then
  FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_libext}"
fi

if [[ -n "${plugin_native}" ]]
then
  if [[ -n "${FLUME_JAVA_LIBRARY_PATH}" ]]
  then
    FLUME_JAVA_LIBRARY_PATH="${FLUME_JAVA_LIBRARY_PATH}:${plugin_native}"
  else
    FLUME_JAVA_LIBRARY_PATH="${plugin_native}"
  fi
fi

# find java
if [ -z "${JAVA_HOME}" ] ; then
  warn "JAVA_HOME is not set!"
  # Try to use Bigtop to autodetect JAVA_HOME if it's available
  if [ -e /usr/libexec/bigtop-detect-javahome ] ; then
    . /usr/libexec/bigtop-detect-javahome
  elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ] ; then
    . /usr/lib/bigtop-utils/bigtop-detect-javahome
  fi

  # Using java from path if bigtop is not installed or couldn't find it
  if [ -z "${JAVA_HOME}" ] ; then
    JAVA_DEFAULT=$(type -p java)
    [ -n "$JAVA_DEFAULT" ] || error "Unable to find java executable. Is it in your PATH?" 1
    JAVA_HOME=$(cd $(dirname $JAVA_DEFAULT)/..; pwd)
  fi
fi

# look for hadoop libs
add_hadoop_paths
add_HBASE_paths

# prepend conf dir to classpath
if [ -n "$opt_conf" ]; then
  FLUME_CLASSPATH="$opt_conf:$FLUME_CLASSPATH"
fi

set_LD_LIBRARY_PATH
# allow dryrun
EXEC="exec"
if [ -n "${opt_dryrun}" ]; then
  warn "Dryrun mode enabled (will not actually initiate startup)"
  EXEC="echo"
fi

# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

exit 0
AI 代码解读

五、测试配置文件

  在conf目录下创建example-conf.properties文件,属性如下所示:  


# Describe the source 
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
# 将数据输出至日志中
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
AI 代码解读

六、运行命令

  6.1 启动代理


[hadoop@hadoop1 conf]$ flume-ng agent -n a1 -f example-conf.properties
AI 代码解读

6.2 启动avro-client客户端向agent代理发送数据-需要单独启动新的窗口


[hadoop@hadoop1 conf]$ flume-ng avro-client -H localhost -p 44444 -F file01
AI 代码解读

七、结果查看


14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] OPEN
14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] BOUND: /127.0.0.1:44444
14/01/16 22:26:34 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 => /127.0.0.1:44444] CONNECTED: /127.0.0.1:54289
14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] DISCONNECTED
14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] UNBOUND
14/01/16 22:26:36 INFO ipc.NettyServer: [id: 0x0100c7e4, /127.0.0.1:54289 :> /127.0.0.1:44444] CLOSED
14/01/16 22:26:36 INFO ipc.NettyServer: Connection to /127.0.0.1:54289 disconnected.
14/01/16 22:26:38 INFO sink.LoggerSink: Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64                hello world }
AI 代码解读


相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
0
0
0
173
分享
相关文章
从Excel到大数据:别让工具限制你的思维!
从Excel到大数据:别让工具限制你的思维!
167 85
大数据常用技术与工具
【10月更文挑战第16天】
329 4
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
134 2
玩转数据:初学者的大数据处理工具指南
玩转数据:初学者的大数据处理工具指南
98 14
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
408 3
踏上大数据第一步:flume
Flume 是一个分布式、可靠且高效的系统,用于收集、聚合和移动大量日志数据。它是 Apache 顶级项目,广泛应用于 Hadoop 生态系统中。Flume 支持从多种数据源(如 Web 服务器、应用服务器)收集日志,并将其传输到中央存储(如 HDFS、HBase)。其核心组件包括 Source、Channel 和 Sink,分别负责数据获取、临时存储和最终存储。本文还介绍了在 Ubuntu 20.04 上安装 Flume 1.9.0 的步骤,涵盖 JDK 安装、Flume 下载、解压、配置环境变量及验证安装等详细过程。
87 10
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
238 0
Flume+Hadoop:打造你的大数据处理流水线
本文介绍了如何使用Apache Flume采集日志数据并上传至Hadoop分布式文件系统(HDFS)。Flume是一个高可用、可靠的分布式系统,适用于大规模日志数据的采集和传输。文章详细描述了Flume的安装、配置及启动过程,并通过具体示例展示了如何将本地日志数据实时传输到HDFS中。同时,还提供了验证步骤,确保数据成功上传。最后,补充说明了使用文件模式作为channel以避免数据丢失的方法。
130 4
PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具
在数字化时代,企业面对海量数据的挑战,PolarDB 以其出色的性能和可扩展性,成为大数据分析的重要工具。它不仅支持高速数据读写,还通过数据分区、索引优化等策略提升分析效率,适用于电商、金融等多个行业,助力企业精准决策。
107 4
为什么局域网协作工具是大数据时代的必需品?
本文深入解析了局域网文档协同编辑的技术原理与优势,涵盖分布式系统架构、实时同步技术、操作变换及冲突自由的副本数据类型等核心概念。同时,探讨了其在信息安全要求高的组织、远程与现场混合团队、教育与科研团队等场景的应用,以及国内外技术方案对比和市场未来趋势。

热门文章

最新文章