1: 样例数据
{"change":[{"kind":"insert","schema":"public","table":"sample","columnnames":["xzqhbm","qylx","ybbq","ybbqdm","ybpssj","fbl","dtyxmc","whq","zjzt","sfyh","sfzd","sfyy","area","length","xcenter","ycenter","ybzxyxq","ybzdyxq","dem","slope","geometry"],"columntypes":["character varying(255)","character varying(255)","character varying(255)","character varying(255)","date","numeric","character varying(255)","bigint","bigint","bigint","bigint","bigint","double precision","double precision","numeric","numeric","date","date","bigint","bigint","public.geometry"],"columnvalues":["370303","平原","乔灌果园","YD010100","2020-05-12",1,"GF-2",0,0,0,0,0,0.00940095134656,0,0,0,null,null,0,0,"0106000000010000000103000000010000001500000090138B24A9265E4090596931FC454240881F4397B5265E400021E1D4F745424060C2D3F1B9265E402057AE1AF845424068E585DEBC265E4058B9B4DAF6454240A09FEB60BC265E4050131A81EB454240E84044A7BD269E4028CB3343EB45424020BFC614BD265E406809D346E1454240B06AA148BD265E4000F9D6EADF454240807380ACBC265E40B8E9315FD8454240C8F9D769B9265E40A8ADA1D9D745424068382007BA265E4038AD4E1ADF454240380E5AE7B6265E4040A64984E1454240F0AB13E6A9265E403877A5E6E445424050FC8C36A7265E40D871155BE645424068D89587A6265E40680C1980E745424000C0390FA6265E40482D6882EA45424008A88F11A7265E40B0963FA2F345424048CE2A82AB265E4098FEDC3FF2454240E80FFDE2AB265E40F026E831F545424018FC146CA8265E40580A4D21F645424090138B24A9265E4090596931FC454290","kafka2neo4j"]}]}
2: pom.xml引入
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.neo4j</groupId> <artifactId>neo4j</artifactId> <version>3.5.3</version> </dependency> <dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>${neo4j.driver.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.66</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20090211</version> </dependency> </dependencies>
3:封装工具类(可略)
public class HashMapHelper <K,T> extends HashMap<K,T>{ @Override public String toString() { StringBuffer sbf= new StringBuffer("{"); for (Entry<K,T> entry : entrySet()) { sbf.append(entry.getKey() + ":" + entry.getValue()+ ","); } sbf.setCharAt(sbf.length()-1,'}'); return sbf.toString() ; } }
4:创建flink执行环境,kafka source 读取,并转换
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties prop = new Properties(); prop.put("bootstrap.servers", broker); prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("subscribe","db01_topic"); prop.put("auto.offset.reset", "latest"); prop.put("max.poll.records",2000); env.enableCheckpointing(5000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topicId, new SimpleStringSchema(), prop); consumer.setStartFromLatest(); DataStreamSource<String> dataStreamSource = env.addSource(consumer).setParallelism(2); /** * 解析json // */ SingleOutputStreamOperator<String> jsonSource = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { HashMapHelper<String,String> map1 = new HashMapHelper<String,String>(); JSONObject jsonData = new JSONObject(value); JSONArray JSONArray = jsonData.getJSONArray("change"); if(JSONArray!=null && JSONArray.length()>0) { String valueData = JSONArray.get(0).toString(); JSONObject jsb = new JSONObject(valueData); if ("insert".equals(jsb.get("kind"))) { JSONArray columnnames = jsb.getJSONArray("columnnames"); JSONArray columnvalues = jsb.getJSONArray("columnvalues"); JSONArray columntypes = jsb.getJSONArray("columntypes"); for (int i = 0; i <= columnnames.length() - 1; i++) { if (columntypes.get(i).toString().contains("geometry")) { map1.put(columnnames.get(i).toString(), columnvalues.get(i).toString()); } else { map1.put(columnnames.get(i).toString(), columnvalues.get(i).toString() + ";"); } } } } return map1.toString(); } });
5:自定义sink
import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.neo4j.driver.v1.*; import java.util.Random; public class Neo4jSink<T> extends RichSinkFunction<T> { private Driver driver; public static final String username = "neo4j"; public static final String password = "zhanyang"; public static final String url = "bolt://192.168.32.102:7687"; public Driver getDriver() { driver = GraphDatabase.driver(url, AuthTokens.basic(username, password), Config.build().toConfig()); if(driver == null){ System.out.println("****************A driver can`t be created to {}"+url); } return driver; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); driver = getDriver(); } /** * run("MATCH (n) WHERE n.name = {myNameParam} RETURN (n)", * * Values.parameters( "myNameParam", "Bob" )) */ @Override public void invoke(T line) throws Exception { Session session = driver.session(); String cyper ="MERGE (tuple2:Tuple {id:"+new Random().nextInt()+","; for (String elems : line.toString().split(",", -1)) { String key = StringUtils.isBlank(elems.split(":")[0]) ? "null" :elems.split(":")[0].replace("{",""); String value = StringUtils.isBlank(elems.split(":")[1]) ? "null" :elems.split(":")[1].replace("}",""); cyper=cyper+key+":'"+value+"',"; } cyper=cyper.substring(0,cyper.length()-1)+"}) return tuple2"; System.out.println("*******"+cyper); session.run(cyper); } @Override public void close() throws Exception { .... } }
6: 调用sink,并执行
jsonSource.addSink(new Neo4jSink<>()); env.execute("Flink add kafka DataSource");