package kafkaproducer; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import org.apache.kafka.clients.producer.ProducerRecord; import java.awt.*; import java.text.DecimalFormat; import java.util.Properties; import java.util.Random; public class Producer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取文件的方式写入kafka // DataStreamSource<String> lines = env.readTextFile("file:///d:/test.txt"); DataStreamSource<String> lines = env.addSource(new SourceFunction<String>() { private static final long serialVersionUID = 1L; private volatile boolean isRunning = true; int count = 1; DecimalFormat userDecimal = new DecimalFormat("000"); DecimalFormat typeDecimal = new DecimalFormat("0"); String[] typeList = {"pv", "pu", "cart"}; String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市"}; @Override public void run(SourceContext<String> out) throws Exception { // 无限循环 // while (isRunning){ //这里修改需要的调数,方便进行数据统计 while (count <= 100) { int r_user = (int) (Math.round(Math.random() * 9 + 1)); int r_activity = (int) (Math.round(Math.random() * 4 + 1)); int p_type = (int) (Math.random() * typeList.length); int t_city = (int) (Math.random() * cityList.length); String user = "U" + userDecimal.format(r_user); String activity = "A" + typeDecimal.format(r_activity); long timeStramp = System.currentTimeMillis(); int pageview = (int) (Math.round(Math.random() * 4 + 1)); String typeP = typeList[p_type]; String city = cityList[t_city]; out.collect(user + " " + activity + " " + timeStramp + " " + pageview + " " + typeP + " " + city); count++; } } @Override public void cancel() { isRunning = false; } }); String groupID = "test"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092"); lines.addSink(new FlinkKafkaProducer<>( groupID, new SimpleStringSchema(), prop )); env.execute("Producer"); } }