flume 采集数据到pulsar

简介: flume 采集数据到pulsar

flume简介

FlumeApache Software Foundation的顶级项目。它是一个分布式,可靠且可用的系统,主要用于高效地收集,聚合大量日志数据并将其从不同的源移动到集中式数据存储中。

Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,因此Flume可用于传输大量事件数据,包括但不限于网络流量数据,社交媒体数据,电子邮件消息以及几乎所有可能的数据源。 

flume demo

日志采集经常需要通过flume同步到kafka,hdfs,hive,es,hbase等场景,本处简单列举一个demo:文件目录---》kafka--->hive

"""
taildir-file-kafka-hive.conf
设计source channel sink的名称
"""
a1.sources = r1
a1.channels = c1
a1.sinks = h1
"""
taildir-file-kafka-hive.conf
配置source
taildir source
"""
# source使用的类型是TAILDIR
a1.sources.r1.type = TAILDIR
# 该文件将以json的形式存储tailfile的绝对路径已经最后的访问位置
a1.sources.r1.positionFile = ../log/inodes/taildir_position.json
# 定义file group
a1.sources.r1.filegroups = f1
# 定义file group f1的监控文件
# 初始路径是$FLUME_HOME/conf
a1.sources.r1.filegroups.f1 = ../log/access-.*\.log
# 通过header key和header value,可以用来指示特定的file group
a1.sources.r1.headers.f1.headerKey1 = value1
"""
taildir-file-kafka-hive.conf
配置channel
KafkaChannel
"""
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
a1.channels.c1.kafka.topic = access12
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.kafkaChannel.consumer.group.id = flume-consumer
"""
taildir-file-kafka-hive.conf
配置siink
hiveSink
"""
a1.sinks.h1.type=hive
a1.sinks.h1.hive.metastore=thrift://localhost:9083
a1.sinks.h1.hive.database=hivedatabase
a1.sinks.h1.hive.table=hivetable
a1.sinks.h1.hive.txnsPerBatchAsk=2
a1.sinks.h1.batchSize=10
a1.sinks.h1.serializer=JSON
a1.sinks.h1.serializer.fieldnames=id,name,age
"""
taildir-file-hdfs-kafka.conf
关联source,channel,sink
"""
a1.sources.r1.channels = c1
a1.sinks.c1.channel = h1

flume自定义pulsar sink

flume支持各中自定义source,channel,sink,拦截器,分区器等规则,本文以自定义PulsarSink为样例。

package org.apache.flume.sink.pulsar;
import com.google.common.base.Optional;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.BATCH_SIZE;
import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.DEFAULT_BATCH_SIZE;
public class PulsarSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private static final Logger log = LoggerFactory.getLogger(PulsarSink.class);
    private long batchSize;
    private boolean useAvroEventFormat;
    private SinkCounter counter = null;
    private Producer<byte[]> producer;
    private PulsarClient client;
    private Optional<ByteArrayOutputStream> tempOutStream = Optional.absent();
    private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer = Optional.absent();
    private ProducerBuilder producerBuilder;
    private ClientBuilder clientBuilder;
    private String authPluginClassName;
    private String authParamsString;
    private String tlsCertFile;
    private String tlsKeyFile;
    private Boolean useTLS;
    private Integer operationTimeout;
    private Integer numIoThreads;
    private Integer connectionsPerBroker;
    private Map<String, Object> config = new HashMap<>();
    private String serviceUrl;
    private String topicName;
    private String producerName;
    private Integer sendTimeout;
    private Boolean blockIfQueueFull;
    private Boolean enableBatching;
    private Integer batchMessagesMaxMessagesPerBatch;
    private Long batchDelay;
    private Integer messageRoutingMode;
    private Integer hashingScheme;
    private Integer compressionType;
    private Boolean enableTcpNoDelay;
    private String tlsTrustCertsFilePath;
    private Boolean allowTlsInsecureConnection;
    private Boolean enableTlsHostnameVerification;
    private Integer statsInterval;
    private Integer maxConcurrentLookupRequests;
    private Integer maxLookupRequests;
    private Integer maxNumberOfRejectedRequestPerConnection;
    private Integer keepAliveIntervalSeconds;
    private Integer connectionTimeout;
    private Integer numListenerThreads;
    private Boolean syncMode;
    //Fine to use null for initial value, Avro will create new ones if this
    // is null
    private BinaryEncoder encoder = null;
    // pulsar相关配置参数
    @Override
    public void configure(Context context) {
        batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
        useAvroEventFormat = context.getBoolean("useAvroEventFormat", false);
        // client options
        serviceUrl = context.getString("serviceUrl", "localhost:6650");
        authPluginClassName = context.getString("authPluginClassName", "");
        authParamsString = context.getString("authParamsString", "");
        tlsCertFile = context.getString("tlsCertFile", "");
        tlsKeyFile = context.getString("tlsKeyFile", "");
        useTLS = context.getBoolean("useTLS", false);
        operationTimeout = context.getInteger("operationTimeout", 0);
        numIoThreads = context.getInteger("numIoThreads", 0);
        connectionsPerBroker = context.getInteger("connectionsPerBroker", 0);
        enableTcpNoDelay = context.getBoolean("enableTcpNoDelay", false);
        tlsTrustCertsFilePath = context.getString("tlsTrustCertsFilePath", "");
        allowTlsInsecureConnection = context.getBoolean("allowTlsInsecureConnection", false);
        enableTlsHostnameVerification = context.getBoolean("enableTlsHostnameVerification", false);
        statsInterval = context.getInteger("statsInterval", 0);
        maxConcurrentLookupRequests = context.getInteger("maxConcurrentLookupRequests", 0);
        maxLookupRequests = context.getInteger("maxLookupRequests", 0);
        maxNumberOfRejectedRequestPerConnection = context.getInteger("maxNumberOfRejectedRequestPerConnection", 0);
        keepAliveIntervalSeconds = context.getInteger("keepAliveIntervalSeconds", 0);
        connectionTimeout = context.getInteger("connectionTimeout", 0);
        numListenerThreads = context.getInteger("numListenerThreads", 0);
        // producer options
        topicName = context.getString("topicName", "");
        producerName = context.getString("producerName", "");
        sendTimeout = context.getInteger("sendTimeout", 10);
        blockIfQueueFull = context.getBoolean("blockIfQueueFull", false);
        enableBatching = context.getBoolean("enableBatching", false);
        batchMessagesMaxMessagesPerBatch = context.getInteger("batchMessagesMaxMessagesPerBatch", 1000);
        batchDelay = context.getLong("batchDelay", 0L);
        messageRoutingMode = context.getInteger("messageRoutingMode", -1);
        hashingScheme = context.getInteger("hashingSchema", -1);
        compressionType = context.getInteger("compressionType", -1);
        // message options
        syncMode = context.getBoolean("syncMode", true);
    }
    @Override
    public long getBatchSize() {
        return batchSize;
    }
    // 核心处理方法
    @Override
    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = null;
        Event event = null;
        if (null == producer || null == client){
            initPulsarClient();
            initPulsarProducer();
        }
        try {
            transaction = channel.getTransaction();
            transaction.begin();
            long processedEvents = 0;
            for (; processedEvents < batchSize; processedEvents += 1) {
                event = channel.take();
                if (event == null) {
                    // no events available in the channel
                    if (processedEvents == 0) {
                        result = Status.BACKOFF;
                        counter.incrementBatchEmptyCount();
                    } else if (processedEvents < batchSize) {
                        counter.incrementBatchUnderflowCount();
                    } else {
                        counter.incrementBatchCompleteCount();
                    }
                    break;
                }
                TypedMessageBuilder<byte[]> newMessage = producer.newMessage();
                if (event.getHeaders() != null) {
                    if (event.getHeaders().get("key") != null) {
                        newMessage = newMessage.key(event.getHeaders().get("key"));
                    }
                    newMessage.value(serializeEvent(event, useAvroEventFormat)).properties(event.getHeaders());
                } else {
                    newMessage.value(serializeEvent(event, useAvroEventFormat));
                }
                if (syncMode) {
                    newMessage.send();
                } else {
                    newMessage.sendAsync();
                }
            }
            if (!syncMode) {
                producer.flush();
            }
            transaction.commit();
        } catch (Exception ex) {
            log.error("Failed to publish events", ex);
            counter.incrementEventWriteOrChannelFail(ex);
            result = Status.BACKOFF;
            if (transaction != null) {
                try {
                    // If the transaction wasn't committed before we got the exception, we
                    // need to rollback.
                    transaction.rollback();
                } catch (RuntimeException e) {
                    log.error("Transaction rollback failed: " + e.getLocalizedMessage());
                    log.debug("Exception follows.", e);
                } finally {
                    transaction.close();
                    transaction = null;
                }
            }
        } finally {
            if (transaction != null) {
                transaction.close();
            }
        }
        return result;
    }
    @Override
    public synchronized void start() {
        try{
            log.info("start pulsar producer");
            initPulsarClient();
            initPulsarProducer();
            this.counter = new SinkCounter("flume-sink");
            super.start();
        } catch (Exception e) {
            log.error("init pulsar failed:{}", e.getMessage());
        }
    }
    @Override
    public synchronized void stop() {
        try{
            log.info("stop pulsar producer");
            producer.close();
            client.close();
        } catch (Exception e) {
            log.error("stop pulsar failed");
        }
        super.stop();
    }
    // 初始化客户端参数
    private void initPulsarClient() {
        try {
            clientBuilder = PulsarClient.builder();
            if (authPluginClassName.length() > 0 && authParamsString.length() > 0) {
                clientBuilder.authentication(authPluginClassName, authParamsString);
            }
            if (useTLS) {
                clientBuilder.serviceUrl("pulsar://+ssl" + serviceUrl);
            } else {
                clientBuilder.serviceUrl("pulsar://" + serviceUrl);
            }
            if (tlsCertFile.length() > 0 && tlsKeyFile.length() > 0) {
                Map<String, String> authParams = new HashMap<>();
                authParams.put("tlsCertFile", tlsCertFile);
                authParams.put("tlsKeyFile", tlsKeyFile);
                Authentication tlsAuth = AuthenticationFactory
                        .create(AuthenticationTls.class.getName(), authParams);
                clientBuilder.authentication(tlsAuth);
            }
            if (operationTimeout > 0) {
                clientBuilder.operationTimeout(operationTimeout, TimeUnit.SECONDS);
            }
            if (numIoThreads > 0) {
                clientBuilder.ioThreads(numIoThreads);
            }
            if (numListenerThreads > 0) {
                clientBuilder.listenerThreads(numListenerThreads);
            }
            if (connectionsPerBroker > 0) {
                clientBuilder.connectionsPerBroker(connectionsPerBroker);
            }
            if (enableTcpNoDelay) {
                clientBuilder.enableTcpNoDelay(enableTcpNoDelay);
            }
            if (tlsTrustCertsFilePath.length() > 0) {
                clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
            }
            if (allowTlsInsecureConnection) {
                clientBuilder.allowTlsInsecureConnection(allowTlsInsecureConnection);
            }
            if (enableTlsHostnameVerification) {
                clientBuilder.enableTlsHostnameVerification(enableTlsHostnameVerification);
            }
            if (statsInterval > 0) {
                clientBuilder.statsInterval(statsInterval, TimeUnit.SECONDS);
            }
            if (maxConcurrentLookupRequests > 0) {
                clientBuilder.maxConcurrentLookupRequests(maxConcurrentLookupRequests);
            }
            if (maxLookupRequests > 0) {
                clientBuilder.maxLookupRequests(maxLookupRequests);
            }
            if (maxNumberOfRejectedRequestPerConnection > 0) {
                clientBuilder.maxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
            }
            if (keepAliveIntervalSeconds > 0) {
                clientBuilder.keepAliveInterval(keepAliveIntervalSeconds, TimeUnit.SECONDS);
            }
            if (connectionTimeout > 0) {
                clientBuilder.connectionTimeout(connectionTimeout, TimeUnit.SECONDS);
            }
            client = clientBuilder.build();
        } catch(Exception e) {
            log.error("init pulsar client failed:{}", e.getMessage());
        }
    }
    //  初始化生产者相关参数
    private void initPulsarProducer() {
        try {
            producerBuilder = client.newProducer();
            if(topicName.length() > 0) {
                producerBuilder = producerBuilder.topic(topicName);
            }
            if (producerName.length() > 0) {
                producerBuilder = producerBuilder.producerName(producerName);
            }
            if (sendTimeout > 0) {
                producerBuilder.sendTimeout(sendTimeout, TimeUnit.SECONDS);
            } else {
                producerBuilder.sendTimeout(10, TimeUnit.SECONDS);
            }
            if (blockIfQueueFull) {
                producerBuilder.blockIfQueueFull(blockIfQueueFull);
            }
            if (enableBatching) {
                producerBuilder.enableBatching(enableBatching);
            }
            if (batchMessagesMaxMessagesPerBatch > 0){
                producerBuilder.batchingMaxMessages(batchMessagesMaxMessagesPerBatch);
            }
            if (batchDelay > 0) {
                producerBuilder.batchingMaxPublishDelay(batchDelay, TimeUnit.MILLISECONDS);
            }
            if (MessageRoutingMode.SinglePartition.equals(messageRoutingMode)) {
                producerBuilder.messageRoutingMode(MessageRoutingMode.SinglePartition);
            } else if (MessageRoutingMode.CustomPartition.equals(messageRoutingMode)) {
                producerBuilder.messageRoutingMode(MessageRoutingMode.CustomPartition);
            } else if (MessageRoutingMode.RoundRobinPartition.equals(messageRoutingMode)) {
                producerBuilder.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
            }
            if (HashingScheme.JavaStringHash.equals(hashingScheme)) {
                producerBuilder.hashingScheme(HashingScheme.JavaStringHash);
            } else if (HashingScheme.Murmur3_32Hash.equals(hashingScheme)) {
                producerBuilder.hashingScheme(HashingScheme.Murmur3_32Hash);
            }
            if (CompressionType.LZ4.equals(compressionType)) {
                producerBuilder.compressionType(CompressionType.LZ4);
            } else if (CompressionType.ZLIB.equals(compressionType)) {
                producerBuilder.compressionType(CompressionType.ZLIB);
            } else if (CompressionType.ZSTD.equals(compressionType)) {
                producerBuilder.compressionType(CompressionType.ZSTD);
            } else if (CompressionType.NONE.equals(compressionType)) {
                producerBuilder.compressionType(CompressionType.NONE);
            }
            producer = producerBuilder.create();
        } catch (Exception e) {
            log.error("init pulsar producer failed:{}", e.getMessage());
        }
    }
    // 序列化事件方法
    private byte[] serializeEvent(Event event, boolean useAvroEventFormat) throws IOException {
        byte[] bytes;
        if (useAvroEventFormat) {
            if (!tempOutStream.isPresent()) {
                tempOutStream = Optional.of(new ByteArrayOutputStream());
            }
            if (!writer.isPresent()) {
                writer = Optional.of(new SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));
            }
            tempOutStream.get().reset();
            AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()),
                    ByteBuffer.wrap(event.getBody()));
            encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(), encoder);
            writer.get().write(e, encoder);
            encoder.flush();
            bytes = tempOutStream.get().toByteArray();
        } else {
            bytes = event.getBody();
        }
        return bytes;
    }
    // 将string类型的map转化成CharSequence 类型的map结构
    private static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> stringMap) {
        Map<CharSequence, CharSequence> charSeqMap = new HashMap<CharSequence, CharSequence>();
        for (Map.Entry<String, String> entry : stringMap.entrySet()) {
            charSeqMap.put(entry.getKey(), entry.getValue());
        }
        return charSeqMap;
    }
}

接下来就是编译打包安装等一系列操作了...

flume-ng2pulsar conf配置 demo

# A single-node Flume configuration
#
# Source: netcat 
# Sink: pulsar
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44445
## Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.pulsar.PulsarSink
# Configure the pulsar service url (without `pulsar://`)
a1.sinks.k1.serviceUrl = pulsar-flume-standalone:6650
a1.sinks.k1.topicName = flume-test-topic
a1.sinks.k1.producerName = flume-test-producer
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置文件放在conf目录下,执行flume-ng命令

apache-flume-1.9.0-bin/bin/flume-ng agent --conf apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1


相关文章
|
运维 安全 Unix
使用JSch远程部署flume采集点
使用JSch远程部署flume采集点
186 0
61 Flume采集系统结构图
61 Flume采集系统结构图
96 0
61 Flume采集系统结构图
|
存储 监控
63 Flume采集目录到HDFS
63 Flume采集目录到HDFS
164 0
|
1月前
|
数据采集 缓存 大数据
【赵渝强老师】大数据日志采集引擎Flume
Apache Flume 是一个分布式、可靠的数据采集系统,支持从多种数据源收集日志信息,并传输至指定目的地。其核心架构由Source、Channel、Sink三组件构成,通过Event封装数据,保障高效与可靠传输。
185 1
|
分布式计算 Java Hadoop
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
Hadoop-18 Flume HelloWorld 第一个Flume尝试!编写conf实现Source+Channel+Sink 控制台查看收集到的数据 流式收集
160 1
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
231 1
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
457 2
|
数据采集 分布式计算 Java
【数据采集与预处理】流数据采集工具Flume
【数据采集与预处理】流数据采集工具Flume
1154 8
|
消息中间件 存储 监控
flume采集的一些特性
flume采集的一些特性
179 0
|
SQL 消息中间件 存储
案例:Flume消费Kafka数据保存Hive
案例:Flume消费Kafka数据保存Hive
433 0

热门文章

最新文章