业务场景
Spark用fileStream实时从NFS获取一批文件,将文件中JSON结构里面的大小图二进制数据上传云存储获取url然后再将url以string回写到json中发送kafka,最早使用多线程并行发送内个线程创建一个KafkaProducer速度慢的惊人无法满足现场,接着在线程中共用一个KafkaProducer再测试发现没有改观。
问题分析
分析kafka日志发现,每次发送数据大部分时间在0-1ms,出现时延的情况时发现都是连续出现的,由于发送端只有一个producer实例,这样当一个message发送阻塞了,将会瞬间导致TPS急剧下降,正常情况下一个kafka实例在1秒内能够处理上千个发送请求(由于我们的消息每个在323970B左右,千兆网用尽上行io差不多也只能发送11710241024/323970 = 378),但出现1秒的时延将会导致1秒只能处理1个发送请求,这样会阻塞后面数据的处理。
问题原因
由于producer是线程安全的,所以采用单实例,但一次发送阻塞(因为使用同步发送,每次发送都会等待结果,这个过程是同步的),将会影响到后续的数据处理,那就只能缓存producer实例了。
实现方案
对象池工厂实现的代码实现
public class KafkaProducerPooledObjectFactory implements PooledObjectFactory<KafkaProducer<String, String>>, Serializable {
Properties props;
public KafkaProducerPooledObjectFactory(Properties props) {
this.props = props;
}
@Override
public PooledObject<KafkaProducer<String, String>> makeObject() throws Exception {
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
return new DefaultPooledObject<KafkaProducer<String, String>>(kafkaProducer);
}
@Override
public void destroyObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
KafkaProducer<String, String> o = p.getObject();
o = null;
}
@Override
public boolean validateObject(PooledObject<KafkaProducer<String, String>> p) {
return false;
}
@Override
public void activateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
//System.out.println("activateObject");
}
@Override
public void passivateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
//System.out.println("passivateObject");
}
}
对象池工对外的代码实现
public class KafkaProducerPool implements Serializable {
private GenericObjectPool<KafkaProducer<String, String>> objectPool;
public KafkaProducerPool(Properties props) {
KafkaProducerPooledObjectFactory kafkaProducerPooledObjectFactory = new KafkaProducerPooledObjectFactory(props);
GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 池子配置文件
config.setMaxTotal(100); // 整个池最大值
config.setMaxIdle(10); // 最大空闲
config.setMinIdle(0); // 最小空闲
config.setMaxWaitMillis(5000); // 最大等待时间,-1表示一直等
config.setBlockWhenExhausted(true); // 当对象池没有空闲对象时,新的获取对象的请求是否阻塞。true阻塞。默认值是true
config.setTestOnBorrow(false); // 在从对象池获取对象时是否检测对象有效,true是;默认值是false
config.setTestOnReturn(false); // 在向对象池中归还对象时是否检测对象有效,true是,默认值是false
config.setTestWhileIdle(false); // 在检测空闲对象线程检测到对象不需要移除时,是否检测对象的有效性。true是,默认值是false
config.setMinEvictableIdleTimeMillis(60000L); // 可发呆的时间,10mins
config.setTestWhileIdle(true); // 发呆过长移除的时候是否test一下先
config.setTimeBetweenEvictionRunsMillis(3000); // 回收资源线程的执行周期 3s
config.setNumTestsPerEvictionRun(10);
objectPool = new GenericObjectPool<>(kafkaProducerPooledObjectFactory, config);
}
public static Properties getConfig(String hosts) {
Properties props = new Properties();
props.put("bootstrap.servers", hosts);
// procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
// acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
// acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
// acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
// 可以设置的值为:all, -1, 0, 1
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 10000);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public KafkaProducer<String, String> getProducer() {
try {
KafkaProducer<String, String> producer = objectPool.borrowObject();
return producer;
} catch (Exception e) {
throw new RuntimeException("获取KafkaProducer连接异常", e);
}
}
public void returnProducer(KafkaProducer<String, String> producer) {
try {
objectPool.returnObject(producer);// 将对象放回对象池
} catch (Exception e) {
throw new RuntimeException("释放KafkaProducer连接异常", e);
}
}
public static void main(String[] args) {
String topic = "TEST_1";
String hosts = "hdh109:9092";
Properties props = getConfig(hosts);
KafkaProducerPool pool = new KafkaProducerPool(props);
for (int i = 0; i < 10000; i++) {
KafkaProducer<String, String> producer = pool.getProducer();
pool.returnProducer(producer);
}
}
}
调用代码片段1
val pool = new KafkaProducerPool(KafkaProducerPool.getConfig(hosts))
val executors:ExecutorService = Executors.newFixedThreadPool(40)
while (i.hasNext) {
val item = i.next().toString
val exec = new Exec(ACCESS_KEY, SECRET_KEY, gateHost, gatePort, serialId, poolId, token, topic, hosts, item, pool)
val future = executors.submit(exec)
future.get()
}
executors.shutdown()
Exec中的代码片段2
public class Exec implements Runnable {
String msg;
String topic, hosts;
KafkaProducerPool pool;
public Exec(String topic, String hosts, String msg, KafkaProducerPool pool) {
this.msg = msg;
this.topic = topic;
this.hosts = hosts;
this.pool = pool;
}
public Exec(String topic, String hosts, String msg, KafkaProducerExample prod) {
this.msg = msg;
this.topic = topic;
this.hosts = hosts;
this.prod = prod;
}
@Override
public void run() {
//省略一些消息处理相关代码
KafkaProducer<String, String> producer = pool.getProducer();
ProducerRecord rec = new ProducerRecord<>(topic, UUID.randomUUID().toString(), jo.toJSONString());
producer.send(rec);
pool.returnProducer(producer);
} catch (IOException exec) {
exec.printStackTrace();
}
}
}