package kafkaproducer; public class Data { public String user; public String activity; public long timeStramp; public int pageViews; public String typeP; public String city; public Data() { } public Data(String user, String activity, long timeStramp, int pageViews, String typeP, String city) { this.user = user; this.activity = activity; this.timeStramp = timeStramp; this.pageViews = pageViews; this.typeP = typeP; this.city = city; } @Override public String toString() { return "data{" + "user='" + user + '\'' + ", activity='" + activity + '\'' + ", timeStramp=" + timeStramp + ", pageViews=" + pageViews + ", typeP='" + typeP + '\'' + ", city='" + city + '\'' + '}'; } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getActivity() { return activity; } public void setActivity(String activity) { this.activity = activity; } public long getTimeStramp() { return timeStramp; } public void setTimeStramp(long timeStramp) { this.timeStramp = timeStramp; } public int getPageViews() { return pageViews; } public void setPageViews(int pageViews) { this.pageViews = pageViews; } public String getTypeP() { return typeP; } public void setTypeP(String typeP) { this.typeP = typeP; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } }
package kafkaproducer; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.DecimalFormat; import java.util.Properties; public class DataProducer { //定义broker public static final String broker_list = "master:9092,slave1:9092,slave2:9092"; //定义topic 和kafka正在用的一致 public static final String topic = "test"; public static void writerTest() throws Exception { Properties prop = new Properties(); prop.setProperty("bootstrap.servers", broker_list); prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化 prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化 KafkaProducer producer = new KafkaProducer<String, String>(prop); //位数不够,自动填充补0 DecimalFormat userDecimal = new DecimalFormat("000"); DecimalFormat typeDecimal = new DecimalFormat("0"); String[] typeList = {"pv", "pu", "cart"}; String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市", "河北省", "湖北省", "河南省", "山东省"}; //获取1-10数 int r_user = (int)(Math.round(Math.random() * 9 + 1)); int r_activity = (int)(Math.round(Math.random() * 4 + 1)); int p_type = (int)(Math.random() * typeList.length); int t_city = (int)(Math.random() * cityList.length); //对用户进行组合 String user = "U" + userDecimal.format(r_user); String activity = "A" + typeDecimal.format(r_activity); //获取当前时间戳 long timeStramp = System.currentTimeMillis(); int pageview = (int) (Math.round(Math.random() * 4 + 1)); String typeP = typeList[p_type]; String city = cityList[t_city]; Data data = new Data(); data.setUser(user); data.setActivity(activity); data.setPageViews(pageview); data.setCity(city); data.setTypeP(typeP); data.setTimeStramp(timeStramp); //直接用数据写入到kafka ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, null, (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp)); producer.send(record); System.out.println("发送数据: " + (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp)); producer.flush(); } public static void main(String[] args) throws Exception { //无限循环,保证数据源不断流 while (true) { Thread.sleep(300); writerTest(); } } }
POM.XML添加
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.3</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>