1.自定义source 继承RichSourceFunction
public class TestSource extends RichSourceFunction { private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { } @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (isRunning) { Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5); //String key = "类别" + (char) ('A' + random.nextInt(3)); int sku = random.nextInt(10); int value = random.nextInt(10) + 1; String str = sku + "," + value; System.out.println(str); sourceContext.collect(str); } } @Override public void cancel() { isRunning = false; } }
2.自定义sink 继承RichSinkFunction
public class SinkTest extends RichSinkFunction<String> { @Override public void open(Configuration parameters) throws Exception { } public void invoke(String value, Context context) throws Exception { } }