现在有这样一个场景,我们需要将hbase做成一个数据流,而不是数据集。根据Flink自带的Flink-Hbase只能帮我们做到数据集,所以这个时候选择了重写Hbase的数据源。
package com.yjp.flink.demo11;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.shaded.org.joda.time.DateTime;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* 以Hbase为数据源
* 从Hbase中获取数据,然后以流的形式发射
* Date : 9:50 2018/3/12
*/
public class HbaseSource implements SourceFunction<String> {
private static Logger loggerFactory = LoggerFactory.getLogger(HbaseSource.class);
private static final long serialVersionUID = 1;
private volatile boolean isRunning = true;
/**
* 开始的时间戳
*/
private long startTime;
/**
* 每次查询多长时间的数据
*/
private long interval;
/**
* 需要查询的列名
*/
private ArrayList<String> columns;
/**
* 需要查询的表名
*/
private String tableName;
public HbaseSource(long startTime, long interval, ArrayList<String> columns, String tableName) {
this.startTime = startTime;
this.interval = interval;
this.columns = columns;
this.tableName = tableName;
}
public HbaseSource() {
}
@Override
public void run(SourceContext<String> out) {
if (isRunning) {
long endTime = DateTime.now().getMillis() - interval;
ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, endTime - startTime, columns);
new HbaseSource().transmitData(rs, out);
startTime = endTime;
}
while (isRunning) {
ResultScanner rs = new HbaseSource().getHbaseData(tableName, startTime, interval, columns);
new HbaseSource().transmitData(rs, out);
startTime += interval;
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
throw new RuntimeException("休眠异常", e);
}
}
}
@Override
public void cancel() {
}
/**
* 获取数据集
*
* @param startTime 时间戳开始的时间
* @param interval 间隔时间
* @return 对应的结果集
*/
private ResultScanner getHbaseData(String tableName, long startTime, long interval, List<String> columns) {
Configuration conf = HBaseConfiguration.create();
HTable table;
Scan scan;
try {
table = new HTable(conf, tableName);
scan = new Scan();
scan.setTimeRange(startTime, startTime + interval);
for (String column : columns) {
String[] columnName = column.split(":");
scan.addColumn(Bytes.toBytes(columnName[0]), Bytes.toBytes(columnName[1]));
}
return table.getScanner(scan);
} catch (IOException e) {
throw new RuntimeException("读取数据异常", e);
}
}
private void transmitData(ResultScanner rs, SourceContext<String> out) {
Result result;
try {
while ((result = rs.next()) != null && isRunning) {
KeyValue[] kvs = result.raw();
for (KeyValue kv : kvs) {
String value = new String(kv.getValue());
out.collect(value);
}
}
} catch (IOException e) {
throw new RuntimeException("结果集遍历异常", e);
}
}
}
然后将数据结果加工和处理存入Hbase中
package com.yjp.flink.hbase;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class HbaseToHbase {
public static Logger logger = LoggerFactory.getLogger(HbaseToHbase.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
sEnv.getConfig().disableSysoutLogging();
List<String> getColumns = new ArrayList<String>(3);
getColumns.add("cf1_name");
getColumns.add("cf2_amount");
getColumns.add("cf3_groupId");
List<String> columnFamily = new ArrayList<>(3);
columnFamily.add("cf1");
columnFamily.add("cf2");
columnFamily.add("cf3");
List<String> setColumns = new ArrayList<>(3);
setColumns.add("cf2:result");
DataStreamSource<Orders>
orderDataStream = sEnv.addSource(new
HbaseStreamDataSource("Orders", 0L, 2000L, getColumns, Orders.class));
DataStream<Tuple3<String, Double, Integer>> dataStream = orderDataStream.flatMap(
new FlatMapFunction<Orders, Tuple3<String, Double, Integer>>() {
@Override
public void flatMap(Orders value, Collector<Tuple3<String, Double, Integer>> out) throws Exception {
out.collect(new Tuple3<String, Double, Integer>(value.getCf1_name(),
value.getCf2_amount(), value.getCf3_groupId()));
}
});
dataStream.keyBy(2).sum(1).addSink(
new SinkHbase<Tuple3<String, Double, Integer>>(
"OrderResult", columnFamily, setColumns, "result"));
sEnv.execute("test Hbase");
}
}
package com.yjp.flink.hbase;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import java.lang.reflect.Method;
import java.util.*;
/**
* 自定义Sink
*
* Date : 17:23 2018/3/12
*/
public class SinkHbase<T> extends RichSinkFunction<T> {
private static final long serialVersionUID = 1L;
/**
* 表名
*/
private String tableName;
/**
* 列族名
*/
private List<String> columnFails;
/**
* 列名 以 family:column的形式传入 column与tuple中的值一一对应
*/
private List<String> columns;
/**
* 行名
*/
private String rowKey;
/**
* @param tableName 表名
* @param columnFamily 列族名 当表存在时不用输入
* @param columns 储存的列名 列族:列名
* @param rowKey 传入的行名
*/
public SinkHbase(String tableName, List<String> columnFamily, List<String> columns, String rowKey) {
this.tableName = tableName;
this.columnFails = columnFamily;
this.columns = columns;
this.rowKey = rowKey;
}
/**
* @param tableName 表名
* @param columns 储存的列名 列族:列名
* @param rowKey 传入的行名
*/
public SinkHbase(String tableName, List<String> columns, String rowKey) {
this.tableName = tableName;
this.columns = columns;
this.rowKey = rowKey;
}
public SinkHbase() {
}
/**
* 初始化完成连接 当表不存在的时候 新建表和family列
*
* @param parameters 调用父类的方法
* @throws Exception 创建连接失败
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Admin admin = FactoryConnect.getConnection().getAdmin();
final TableName tableName1 = TableName.valueOf(tableName);
if (!admin.tableExists(tableName1)) {
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName1);
for (String columnFamily : columnFails) {
hTableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
}
admin.createTable(hTableDescriptor);
}
}
/**
* 执行方法 将数据存入hbase
*
* @param value 传入的结果
*/
@Override
public void invoke(T value, Context context) throws Exception {
Map<Class, Method> map = new HashMap<>(25);
new SinkHbase<T>().initMap(map);
Table table = FactoryConnect.getConnection().getTable(TableName.valueOf(tableName));
Set<Class> keys = map.keySet();
for (Class key : keys) {
if (value.getClass() == key) {
map.get(key).invoke(new AssignmentTuple(), value, rowKey, columns, table);
return;
}
}
}
private void initMap(Map<Class, Method> map) {
try {
map.put(Tuple1.class, AssignmentTuple.class.getMethod("setTuple1", Tuple1.class, String.class, ArrayList.class, Table.class));
map.put(Tuple2.class, AssignmentTuple.class.getMethod("setTuple2", Tuple2.class, String.class, ArrayList.class, Table.class));
map.put(Tuple3.class, AssignmentTuple.class.getMethod("setTuple3", Tuple3.class, String.class, ArrayList.class, Table.class));
map.put(Tuple4.class, AssignmentTuple.class.getMethod("setTuple4", Tuple4.class, String.class, ArrayList.class, Table.class));
map.put(Tuple5.class, AssignmentTuple.class.getMethod("setTuple5", Tuple5.class, String.class, ArrayList.class, Table.class));
map.put(Tuple6.class, AssignmentTuple.class.getMethod("setTuple6", Tuple6.class, String.class, ArrayList.class, Table.class));
map.put(Tuple7.class, AssignmentTuple.class.getMethod("setTuple7", Tuple7.class, String.class, ArrayList.class, Table.class));
map.put(Tuple8.class, AssignmentTuple.class.getMethod("setTuple8", Tuple8.class, String.class, ArrayList.class, Table.class));
map.put(Tuple9.class, AssignmentTuple.class.getMethod("setTuple9", Tuple9.class, String.class, ArrayList.class, Table.class));
map.put(Tuple10.class, AssignmentTuple.class.getMethod("setTuple10", Tuple10.class, String.class, ArrayList.class, Table.class));
map.put(Tuple11.class, AssignmentTuple.class.getMethod("setTuple11", Tuple11.class, String.class, ArrayList.class, Table.class));
map.put(Tuple12.class, AssignmentTuple.class.getMethod("setTuple12", Tuple12.class, String.class, ArrayList.class, Table.class));
map.put(Tuple13.class, AssignmentTuple.class.getMethod("setTuple13", Tuple13.class, String.class, ArrayList.class, Table.class));
map.put(Tuple14.class, AssignmentTuple.class.getMethod("setTuple14", Tuple14.class, String.class, ArrayList.class, Table.class));
map.put(Tuple15.class, AssignmentTuple.class.getMethod("setTuple15", Tuple15.class, String.class, ArrayList.class, Table.class));
map.put(Tuple16.class, AssignmentTuple.class.getMethod("setTuple16", Tuple16.class, String.class, ArrayList.class, Table.class));
map.put(Tuple17.class, AssignmentTuple.class.getMethod("setTuple17", Tuple17.class, String.class, ArrayList.class, Table.class));
map.put(Tuple18.class, AssignmentTuple.class.getMethod("setTuple18", Tuple18.class, String.class, ArrayList.class, Table.class));
map.put(Tuple19.class, AssignmentTuple.class.getMethod("setTuple19", Tuple19.class, String.class, ArrayList.class, Table.class));
map.put(Tuple20.class, AssignmentTuple.class.getMethod("setTuple20", Tuple20.class, String.class, ArrayList.class, Table.class));
map.put(Tuple21.class, AssignmentTuple.class.getMethod("setTuple21", Tuple21.class, String.class, ArrayList.class, Table.class));
map.put(Tuple22.class, AssignmentTuple.class.getMethod("setTuple22", Tuple22.class, String.class, ArrayList.class, Table.class));
map.put(Tuple23.class, AssignmentTuple.class.getMethod("setTuple23", Tuple23.class, String.class, ArrayList.class, Table.class));
map.put(Tuple24.class, AssignmentTuple.class.getMethod("setTuple24", Tuple24.class, String.class, ArrayList.class, Table.class));
map.put(Tuple25.class, AssignmentTuple.class.getMethod("setTuple25", Tuple25.class, String.class, ArrayList.class, Table.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException("反射失败", e);
}
}
}
package com.yjp.flink.hbase;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
import java.io.Serializable;
/**
* 单例模式 安全的拿到连接
*
* Date : 16:45 2018/3/16
*/
public class FactoryConnect implements Serializable {
private static volatile Connection connection;
private FactoryConnect() {
}
public static Connection getConnection() throws IOException {
if (null == connection) {
synchronized (FactoryConnect.class) {
try {
if (null == connection) {
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
connection = ConnectionFactory.createConnection(conf);
}
} catch (Exception e) {
System.err.println("读取配置文件异常");
}
}
}
return connection;
}
}
package com.yjp.flink.hbase;
import org.apache.flink.api.java.tuple.*;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* 将tuple中的存放在Hbase中
*
* Date : 16:49 2018/3/12
*/
public class AssignmentTuple {
/**
* tuple 为1
*
* @param tuple1 传入tuple的值
* @param rowKey 传入的rowkey的值
* @param columns 需要赋值的列
* @param table put的table对象
*/
public void setTuple1(Tuple1<Object> tuple1, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple1, rowKey, columns, table);
}
public void setTuple2(Tuple2<Object, Object> tuple2, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple2, rowKey, columns, table);
}
public void setTuple3(Tuple3<Object, Object, Object> tuple3, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple3, rowKey, columns, table);
}
public void setTuple4(Tuple4<Object, Object, Object, Object> tuple4, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple4, rowKey, columns, table);
}
public void setTuple5(Tuple5<Object, Object, Object, Object, Object> tuple5, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple5, rowKey, columns, table);
}
public void setTuple6(Tuple6 tuple6, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple6, rowKey, columns, table);
}
public void setTuple7(Tuple7 tuple7, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple7, rowKey, columns, table);
}
public void setTuple8(Tuple8 tuple8, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple8, rowKey, columns, table);
}
public void setTuple9(Tuple9 tuple9, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple9, rowKey, columns, table);
}
public void setTuple10(Tuple10 tuple10, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple10, rowKey, columns, table);
}
public void setTuple11(Tuple11 tuple11, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple11, rowKey, columns, table);
}
public void setTuple12(Tuple12 tuple12, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple12, rowKey, columns, table);
}
public void setTuple13(Tuple13 tuple13, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple13, rowKey, columns, table);
}
public void setTuple14(Tuple14 tuple14, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple14, rowKey, columns, table);
}
public void setTuple15(Tuple15 tuple15, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple15, rowKey, columns, table);
}
public void setTuple16(Tuple16 tuple16, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple16, rowKey, columns, table);
}
public void setTuple17(Tuple17 tuple17, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple17, rowKey, columns, table);
}
public void setTuple18(Tuple18 tuple18, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple18, rowKey, columns, table);
}
public void setTuple19(Tuple19 tuple19, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple19, rowKey, columns, table);
}
public void setTuple20(Tuple20 tuple20, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple20, rowKey, columns, table);
}
public void setTuple21(Tuple21 tuple21, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple21, rowKey, columns, table);
}
public void setTuple22(Tuple22 tuple22, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple22, rowKey, columns, table);
}
public void setTuple23(Tuple23 tuple23, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple23, rowKey, columns, table);
}
public void setTuple24(Tuple24 tuple24, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple24, rowKey, columns, table);
}
public void setTuple25(Tuple25 tuple25, String rowKey, ArrayList<String> columns, Table table) {
new AssignmentTuple().putData(tuple25, rowKey, columns, table);
}
/**
* 将tuple中的数据一一对应的赋值给列
*
* @param tuple tuple中的数据
* @param rowKey 设置的行值
* @param columns 对应的列名
* @param table 对应的table对象
*/
public void putData(Tuple tuple, String rowKey, List<String> columns, Table table) {
Put put = new Put(Bytes.toBytes(rowKey));
Long timeStamp = Instant.now().toEpochMilli();
for (int i = 0; i < columns.size(); i++) {
String[] split = columns.get(i).split(":");
put.addColumn(Bytes.toBytes(split[0]), Bytes.toBytes(split[1]), timeStamp, Bytes.toBytes(tuple.getField(i).toString()));
}
try {
table.put(put);
} catch (IOException e) {
throw new RuntimeException("存放失败", e);
}
}
}
为了做到一个通用的数据源和数据存储,于是采用了反射的方法。