我有一个自定义AllWindowFunction类,它有一个负责数据库插入的数据。与数据库的连接是持久的,并在构造期间打开。
问题是,AllWindowFunction创建/打开连接的实例与在apply事件上调用的实例不同。解决方法是一个静态数据,但我想知道这是否是唯一的解决方法?
示例代码:
public class CustomWindowFunction implements AllWindowFunction {
private static Connection database;
CustomWindowFunction() {
database = new Connection();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
// process data
database.save(data);
out.collect(data.toString());
}
}
我找不到任何关于这种机制的内容,我所知道的是构造函数中的对象ID与调用的对象ID不同apply。
借楼问下, open方法在什么时候被调用?
RichAllWindowFunction 方法在apply应该怎样被是使用?
'var filteredstream = inputStream
.flatMap(buildHbaseMsg(_))
.filter(b => b.actType match {
case 0|1|2|3 => true
case _ => false
})
.countWindowAll(2L).apply(new CustomWindowFunction ())'
这样吗?
{
stream.apply((window, input, out: Collector[Unit]) => new CustomWindowFunction ().apply(window, input, out))
}
这样调用并不会触发open方法
应该怎么使用自定义的function类
这是因为每个函数都必须被序列化以分布在集群的节点上。您可以尝试使用RichAllWindowFunction,这就是所谓的“Rich”版本,您可以在其中使用open()方法,该方法将在每个并行运算符的开头调用。在此方法中,您可以创建连接
public class CustomWindowFunction implements RichAllWindowFunction {
private Connection database;
@Override
public void open(Configuration parameters) {
database = new Connection();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<String> trades, Collector<String> out) {
// process data
database.save(data);
out.collect(data.toString());
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。