/**
* KafkaSink 生产者下沉到消费者
*
* Created by 王一宁 on 2018/10/23.
*/
public class StreamingKafkaSink {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//设置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
DataStreamSource<String> text = env.socketTextStream("hadoop1", 9001, "\n");
String brokerList = "hadoop1:9092";
String topic = "t1";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",brokerList);
//第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
//设置事务超时时间
//prop.setProperty("transaction.timeout.ms",60000*15+"");
//第二种解决方案,设置kafka的最大事务超时时间
//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用仅一次语义的kafkaProducer
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.addSink(myProducer);
env.execute("StreamingFromCollection");
}
}