Flume 高可用 负载均衡问题

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介:

首先写一个普通的Java类
这个类主要就是向hostName iP的port节点发送信息

package flumeTest;

import java.nio.charset.Charset;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

//链接avro的flume source 发送event 到flume agent
public class FlumeClient {
    private RpcClient flumeClient;
    private String hostName;
    private int port;
    
    public FlumeClient(String hostname,int port){
        this.hostName =hostname;
        this.port=port;
        this.flumeClient=RpcClientFactory.getDefaultInstance(hostname, port);
    }
    //把字符串消息发送event到avro source
    public void sendEvent(String msg){
        Map<String, String> headers =new HashMap<String, String>();
        headers.put("timestamp", String.valueOf(new Date().getTime()));
        //构建event
        Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"), headers);
        
        try{
            flumeClient.append(event);
        }catch (Exception e) {
            e.printStackTrace();
            flumeClient.close();
            flumeClient=null;
            flumeClient=RpcClientFactory.getDefaultInstance(hostName, port);
        }
    }
    public void close(){
        flumeClient.close();
        
    }
    
    //这个类的作用就是向hostName的port端口输入Flume定义的RpcClient avro格式的内容
    public static void main(String[] args) {
        FlumeClient flumeClient =new FlumeClient("master", 5555);
        String bMsg="fromjava-msg";
        for(int i=0;i<100;i++){
            flumeClient.sendEvent(bMsg+i);
        }
        flumeClient.close();
    }
}

conf文件
不在展示 很简单 source为avro sink为logger channel为memory


高可用的方式

package flumeTest;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Properties;

import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

//这个类主要是保证了Flume的高可用 通过properties文件定义Client类型是default_failover
//
public class FailoverClient {
    private Properties properties;
    private RpcClient failoverClient;
    
    //初始化rpcclient
    public FailoverClient() throws IOException{
        this.properties=new Properties();
        InputStream inputStream = new FileInputStream("E:\\new workspace\\flumeTest\\src\\main\\resources\\failover_client.conf");
        properties.load(inputStream);
        this.failoverClient=RpcClientFactory.getInstance(properties);
    }
    //发送消息
    public void sendEvent(String msg){
        
        Event event =EventBuilder.withBody(msg,Charset.forName("UTF-8"));
        try {
            failoverClient.append(event);
        } catch (Exception e) {
        e.printStackTrace();
        
        }
    }
    
    public void close(){
        failoverClient.close();
    }
    public static void main(String[] args) throws IOException, InterruptedException{
        FailoverClient failoverClient =new FailoverClient();
        String msg ="message_";
        for(int i=1;i<100;i++){
            
            failoverClient.sendEvent(msg+i);
            Thread.sleep(1000);
        }
        failoverClient.close();
    }
}

properties文件

client.type=default_failover

hosts=h1 h2

hosts.h1=master:8888
hosts.h2=slave1:8888

max-attempts=3

负载均衡的Java类和高可用的类几乎没有什么改变 变得只是properties文件

package flumeTest;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;

import org.apache.flume.Event;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

public class LoadBalanceClient {
    private RpcClient lbClient;
    private Properties properties;
    
    public LoadBalanceClient() throws FileNotFoundException, IOException{
        
        this.properties =new Properties();
        properties.load(new FileInputStream("src/main/resources/load_balance.conf"));
        this.lbClient =RpcClientFactory.getInstance(properties);
    }
    public void sentEvent(String msg){
        Event event =EventBuilder.withBody(msg, Charset.forName("UTF-8"));
        try {
            lbClient.append(event);
        } catch (Exception e) {
        e.printStackTrace();
        }
    }
    public void close(){
        lbClient.close();
    }
    
    public static void main(String[] args) throws FileNotFoundException, IOException {
        LoadBalanceClient loadBalanceClient =new LoadBalanceClient();
        String msg ="msg_";
        for (int i = 0; i < 100; i++) {
            loadBalanceClient.sentEvent(msg+i);
        }
        loadBalanceClient.close();
        
    }
}

properties

client.type=default_loadbalance

hosts=h1 h2

hosts.h1=master:8888
hosts.h2=slave1:8888

host-selector-random
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
27天前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
37 1
|
4月前
|
负载均衡 NoSQL 应用服务中间件
搭建高可用及负载均衡的Redis
【7月更文挑战第10天】
128 1
|
4月前
|
负载均衡 安全 Cloud Native
云上负载均衡:构建高可用、高性能的网络应用架构
与云原生技术深度融合:随着云原生技术的普及和发展未来的云上负载均衡将更加紧密地与云原生技术相结合。例如与Kubernetes等容器编排平台集成实现自动化的服务发现和路由管理;与Serverless架构结合提供无缝的流量接入和请求处理能力。 安全性能提升:面对日益严峻的网络安全威胁云上负载均衡将更加注重安全性能的提升。通过引入加密传输、访问控制、DDoS防护等安全措施确保网络流量的安全性和隐私性;同时还将建立完善的安全监控和应急响应机制以应对各种安全事件和突发事件。 支持多协议和多场景:未来的云上负载均衡将支持更多种类的网络协议和应用场景以满足不同用户和业务的需求。例如支持HTTP/2、
222 0
|
4月前
|
负载均衡 算法 Java
实现高可用和可扩展的负载均衡系统的Java方法
实现高可用和可扩展的负载均衡系统的Java方法
|
5月前
|
负载均衡 应用服务中间件 开发工具
技术笔记:nginx和keeplive实现负载均衡高可用
技术笔记:nginx和keeplive实现负载均衡高可用
|
6月前
|
存储 缓存 运维
解密一致性哈希算法:实现高可用和负载均衡的秘诀
解密一致性哈希算法:实现高可用和负载均衡的秘诀
717 0
|
6月前
|
Kubernetes 负载均衡 监控
Kubernetes高可用集群二进制部署(一)主机准备和负载均衡器安装
Kubernetes高可用集群二进制部署(一)主机准备和负载均衡器安装
|
12月前
|
负载均衡 算法 网络协议
Keepalived+LVS搭建高可用负载均衡
Keepalived+LVS搭建高可用负载均衡
364 1
|
6月前
|
tengine Kubernetes Cloud Native
Tengine-Ingress 高性能高可用的云原生网关
Tengine-Ingress 高性能高可用的云原生网关
130 0
|
负载均衡 关系型数据库 PostgreSQL
Pgpool-II实现高可用+读写分离+负载均衡(八)---- 维护工具
Pgpool提供了一些维护工具,用于日常观察Pgpool运行状态、上线、下线节点等操作。主要有:pcp_stop_pgpool,pcp_node_count,pcp_node_info,pcp_health_check_stats,pcp_proc_count,pcp_proc_info,pcp_detach_node,pcp_attach_node,pcp_recovery_node,pcp_promote_node,pcp_pool_status,pcp_watchdog_info,pcp_reload_config