
能力说明:
了解变量作用域、Java类的结构,能够创建带main方法可执行的java应用,从命令行运行java程序;能够使用Java基本数据类型、运算符和控制结构、数组、循环结构书写和运行简单的Java程序。
能力说明:
具备数据库基础知识,了解数据库的分类,具备安装MySQL数据库的能力,掌握MySQL数据类型知识,基本了解常用SQL语句,对阿里云数据库产品有基本认知。
暂时未有相关云产品技术能力~
阿里云技能认证
详细说明背景:目前我们solr中存储的数据因为很多属性字段的不确定性,选择将数据在一个大字段中合并。如下格式中attr字段代表着一个不确定的属性字段,可能为身高、体重、性别等,也可能为页码、出版方、售价等{“id”:“1”“attr”:“180,80,男”“type”:“person”},{“id”:“2”“attr”:“50,人民出版社,50¥”“type”:“book”}针对这种近期产品提出一个要针对人类按照身高排序,书籍类按照售价排序。这种需求怎么搞呢?身高属性已经跟其他属性混为一个字段,常规排序怕是行不通。思路:既然常规思路行不通,我们想能不能通过函数截取的方式通过比对该字符串指定位置来实现排序呢?下面直接上代码代码:首先需要定义两个类SubStrValueSourceParserimport org.apache.commons.lang.StringUtils; import org.apache.lucene.queries.function.ValueSource; import org.apache.solr.schema.SchemaField; import org.apache.solr.search.FunctionQParser; import org.apache.solr.search.SyntaxError; import org.apache.solr.search.ValueSourceParser; public class SubStrValueSourceParser extends ValueSourceParser { public SubStrValueSourceParser() { super(); } @Override public ValueSource parse(FunctionQParser fp) throws SyntaxError { String fieldStr = fp.parseArg();//第一个参数 String bgnLocation = fp.parseArg();//第二个参数数组下标 ValueSource location = getValueSource(fp, fieldStr);//取得函数调用时指定字段值 //将参数及需要的文档的值传给自定义的ValueSource方法,打分规则在自定义的ValueSource中定制 SubStrValueSource stringFieldSource = new SubStrValueSource(location, bgnLocation); return stringFieldSource; } //该方法是根据字段名,从FunctionQParser得到文档该字段的相关信息 public ValueSource getValueSource(FunctionQParser fp, String arg) { if (arg == null) return null; SchemaField f = fp.getReq().getSchema().getField(arg); return f.getType().getValueSource(f, fp); } } import org.apache.commons.lang.StringUtils; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.queries.function.FunctionValues; import org.apache.lucene.queries.function.ValueSource; import org.apache.lucene.queries.function.docvalues.DoubleDocValues; import org.apache.lucene.queries.function.docvalues.FloatDocValues; import org.apache.lucene.queries.function.docvalues.IntDocValues; import org.apache.lucene.queries.function.docvalues.StrDocValues; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import java.io.IOException; import java.util.List; import java.util.Map; public class SubStrValueSource extends ValueSource { private ValueSource attr13;//需要进行格式转换的字段 private String location; //通过构造方法获取字段 public SubStrValueSource(ValueSource attr13, String location) { this.attr13 = attr13; this.location = location; } @Override public FunctionValues getValues(Map map, LeafReaderContext leafReaderContext) throws IOException { final FunctionValues fieldValues = attr13.getValues(map, leafReaderContext);//获取attr13的值 return new DoubleDocValues(this) { @Override public double doubleVal(int i) throws IOException { String fieldStr = null; Double retValue = 0.00; //若要被转化的值为空或者为空字符串则返回0 if (fieldValues.strVal(i) == null || "".equals(fieldValues.strVal(i))) { } else { fieldStr = fieldValues.strVal(i); String[] vList = fieldStr.split(","); //若实际数组长度小于下标则不处理直接返回0,若大于或等于下标则截取对应位置数据 if (vList.length < Integer.parseInt(location) + 1) { } else { String content = vList[Integer.parseInt(location)]; retValue = Double.parseDouble(content); } } return retValue; } }; } @Override public boolean equals(Object o) { return true; } @Override public int hashCode() { return 0; } @Override public String description() { return name(); } public String name() { return "subStr"; } } 发布:代码逻辑编写完毕后打成jar包并按照如下步骤操作在solr安装目录下dist目录中将打好的jar包放进去在solrconfig.xml中新增以下配置 <lib dir="${solr.install.dir:../../../..}/dist/" regex="com.tl.solr.*\.jar" /><valueSourceParser name="subStr" class="SubStrValueSourceParser" />搞定之后重启服务生效使用效果:
最近接到一个spark离线任务的需求,根据oracle关系数据库中配置表配置的信息。用hive查询大数据平台中的数据并将其同步至oracle一张指定的表中。 本地环境运行正常,在测试环境spark集群上运行时缺迟迟无法启动报java.sql.SQLException: No suitable driver。错误很明显是没有找到合适的驱动,于是按照以前的思维立马去查看代码中引用的包。依赖中是有oracle jdbc驱动的。然后又去查看了已经打好的包,发现里面也有驱动包!这就纳闷了,驱动都在为什么会报找不到合适的驱动呢? 解决办法:方法1:运行时指定jdbc路径,这个路径是spark集群目录下的路径--jars /path/to/ojdbc6.jar--driver-class-path /path/to/ojdbc6.jar方法2:配置conf/spark-env.sh文件中SPARK_CLASSPATH属性来设置driver的环境变量,这种方法需要修改配置文件,不如方法1方便。
springboot以其方便简单的使用各种中间件收到了广大开发者的欢迎,最近也是在用springboot开发定时器,因为不是第一次用@Schedule注解了所以完全没想到会在这个地方出问题。 本地开发完之后设置一个临近的时间点,测试完毕,一切正常。发布到服务器上傻眼了,设置的凌晨五点到点了却一个没执行。聪明的朋友可能已经看出来我写了不止一个。问题就出在了这个地方,下面记录一下原因和解决办法。 因为本地测试时是单个测试的,时间节点也不同所以没有发现问题。发布到服务器上因为定时任务较多并且时间设置的重复度比较高,大部分都是凌晨五点导致定时任务阻塞。因为定时任务默认是单线程执行,当一个任务较慢时多个任务都会阻塞。下面是两个解决办法:调整定时任务的执行时间,错峰执行。增加一个配置类,配置定时器为多线程执行,代码如下@Configurationpublic class ScheduleConfig implements SchedulingConfigurer{@Override public void configureTasks(ScheduleTaskRegistrar scheduledTaskRegistrar){ int maxPoolSize = 4; int corePoolSize =(scheduledTaskRegistrar.getCronTaskList().size() > maxPoolSize) ? maxPoolSize : scheduledTaskRegistrar.getCronTaskList().size() ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(corePoolSize,new BasicThreadFactory.Builder().namingPattern("******").daemon(true).build()); scheduledTaskRegistrar.setScheduler(scheduledExecutorService) }}
最近在对flink的运行任务进行优化,因为我们的计算方面业务上比较依赖组织架构比如总公司、子公司、子部门我们针对总公司要汇总一版全公司的数据,子公司要汇总一版子公司数据。但是实际上子公司的数据是属于总公司的,这样的话数据会出现多次重复参与计算的问题。为了减少这种冗余计算,我们利用创建临时表的思路搭配flink的table api以及sql api进行数据复用。从底层开始汇总,上层复用底层的轻度汇总结果进行进一步的汇总。这种思路类似与于我们学习hadoop时候mapreduce中的combine操作,即先合并后计算。并把中间计算结果保存成临时表供后续重复使用。这样调整后在flink管理页面查看ExecutionGraph执行图时会发现结构也变成了树状图,并且tasks数量也直接少了一半。但是调整后经过观察发现flink计算后的数据输出至kafka时出现数据跳动问题,导致前台展示忽大忽小,原因是使用中间表后因为中间数据也在作为sink输出,这样数据变化时会像一个传送带一样第一层数据变化引起第二层变化然后引起第三层变化。因为flink数据状态变更为先删后插这样传送带也会先删导致后续使用该中间状态的数据变小后插入时再引起数据变大从而产生数据的跳动动结果。解决方法:开启微批将中间数据跳动过滤掉,设置如下:table.exec.mini-batch.enabled 设置微truetable.exec.mini-batch.allow-latency 缓存数据最大的时间间隔,超过该间隔,将会强制触发已聚合数据写出给下游,默认-1毫秒即立即触发,可以根据需求自行调整。实时性要求较高可以适当调低,实时性要求不高可调高table.exec.mini-batch.size 黄村数据量最大条数,为防止oom做的双重保障与table.exec.mini-batch.allow-latency设置的时间满足一个就会触发
我们线上运行的flink任务通过flink->kafka->redis 路线,结果最终入redis供前端人员展示使用。最近在给前端人员对接展示的过程中发现了一个比较头疼的问题,那就是任务刚开始运行时数据是正常的,运行一段时间之后flink汇总出来的数据会比实际的多,并且汇总出来了很多目前没有的分类。可能听起来比较难理解。下面我举个例子来描述这种现象。我们的分类总共分为A、B、C、D四类,现在仅有A、B符合目前参与统计的条件。但是Flink汇总出来A、B、C三种数据。而且仅仅C的数据是多出来的,A、B分类的数据是正确的。这就让人头疼了,你说flink任务有问题吧它算出来的数据是准的,说没问题吧它算出来的又多了其他分类的数据!本着重启解决万难的想法我重启了flink任务,意料之外的是数据居然还是没对上。然后我开始分析数据看看多出来的数据都是些什么数,经过一阵脑细胞燃烧发现这些数据有两个共同点。其一是这些数据曾经符合统计条件现在不符合了,其二就是这些数据现在都应该是0。有了这两个条件组合我忽然想到了一个极大的可能那就是降到0之后没有触发flink的删除操作。按理说flink应该会触发。然后去排查代码发现我在最后输出结果的地方将flink删除的指令全部过滤掉了没有参与输出。至此真相大白。flink中sink的数据像日志一样,更新操作就是先false后true。也就是我们理解的先删后插,但是我利用了redis的覆盖更新特性,为了减少结果输出量一股脑全把false拦截了。导致真实需删除的数据没有被删除。
初学flink的人绕不开的东西就是窗口,为什么这么说呢?首先要从流和批的概念介绍起,flink我认为其能火起来的一个必不可少的原因就是流批一体,而窗口正是界分流批的重要概念。初学的时候我们可能会学习滑动窗口和滚动窗口。但实际上我们可能通过顶层的table api或者sql api直接实现窗口的部分功能。切入正题因为我们的需求是要针对每个月的数据进行分组聚合并展示,所以如果按照需求分析来说我们需要用到的是滑动窗口,也就是只针对本月的一个有限流。我们对数据的精准度要求比较高并且我们整个数据输入到flink的链路比较长中间肯定会存在延迟,针对这种情况我们需要设置的watermark就不好把控,容易造成数据丢失的现象。所以我们采用字符串匹配的方式,通过输入的时间字段与当前月时间字段进行匹配,用来做数据是否可以参与统计的标准。方案交代完毕,就来说说问题。flink任务运行一直很稳定且每天进行数据核验的时候都是比较准确的,但是在本月的一号打开电脑核对数却发现比实际的少了很多。当时的第一印象就是会不会是跟上篇一样的原因—状态过期了!但是很快就否认了这个想法,因为一号才刚刚开始计数怎么可能过期。然后通过比对数据以及分析flink结果推入消息队列的时间发现晚上24点至早上8点的数据根本就没有参与计数!之后上网查资料发现我们使用的函数存在时区差问题flink低版本无法设置时区,而我们使用的是东八区。下面直接上解决办法:1.CAST(CURRENT_TIMESTAMP AS BIGINT) * 1000 + 8 * 60 * 60 * 1000直接加八小时2.CAST(DATE_FORMAT(LOCALTIMESTAMP,'yyMM') AS VARCHAR) 使用LOCALTIMESTAMP获取当前系统时间使用CURRENT_DATE、CURRENT_TIMESTAMP均会存在时区问题,LOCALTIMESTAMP可以放心使用。以上观点为个人经验之谈,不正确的地方各位看官大神多多指正。
flink任务上线运行有一段时间了一直很稳定,这两天忽然发现有些“丢数据”的现象。这是怎么回事呢?用这篇文章记录一下踩坑之路。任务要求: 我们这个任务要达到的效果是汇总每月的业务量,需要按照具体分类分组。业务量绝对不会出现负数的问题,但是现在汇总出来的居然存在负数!肯定是某个环节出现了问题。排查思路: 我这个任务使用的是flink的table api操作比较简单。最先发现数据不正常是发现flink汇总出来的数据比实际要少,然后就开始去跟踪中间件kafka中的消息。意外的发现有些分类下的数据居然是负数!当时看到非常震惊。然后开始排查flink代码,这块计算就是简单分组后进行sum。sum怎么会出现负数呢?百思不得其解,最终经过大半天的天马行空的猜想得出个结论-玄学!当然了肯定不是玄学,所谓玄学往往出问题就在我们认为这个东西绝对不会有问题,但是现在有问题了。我们就将其归类为玄学。于是按照这个想法我开始慢慢回归问题本身。 问题本身就在于SUM出现了负数,我觉得很难理解。所以那就干脆不理解,就认为SUM不可能是负数,那出现负数的原因只能是因为在做减法的时候减到负数。有了这个思路之后我开始联想到flink的状态保存有个失效机制。如果该状态迟迟没有更新直到超过我们设置的过期时间,这个状态就会过期。当flink真正监听到该状态需要被删除时又会进行删除。如此一来一个状态被删了两次就会出现负数。解决办法:setIdleStateRetentionTime()使用此方法设定过期时间。将该时间设置大于任务所需时间即可解决负数问题。但是这样写有弊端就是会保存很多冗余的状态。不会实时的清除。这个我们后续在讨论。希望本篇文章可以对同是flink小白的你有所帮助。转载请注明出处。
2022年03月
+8是想解决本身函数中的时区问题吧,可以直接用localtimestamp 这个没有时区问题。这样就不用自定义函数了
https://developer.aliyun.com/article/842571?spm=a2c6h.13148508.0.0.49ed4f0e47UNIj
你是在yarn上运行的时候报错的吧,如果是那样你得在集群上搞个jar包,参数里面指定一下。
2022年01月
flink sqlapi用的语法是 apache的Calcite,你可以找一下https://calcite.apache.org/docs/reference.html
2021年12月
+8是想解决本身函数中的时区问题吧,可以直接用localtimestamp 这个没有时区问题。这样就不用自定义函数了
https://developer.aliyun.com/article/842571?spm=a2c6h.13148508.0.0.49ed4f0e47UNIj
写个转换类,不一定非得用tuple吧。转换成一个实体也挺好的
大音希声扫阴翳
肯定会的,id也会冲突
牛
牛啊牛啊
你是在yarn上运行的时候报错的吧,如果是那样你得在集群上搞个jar包,参数里面指定一下。
可用性的话自然是流畅度以及稳定性,易用性自然是很烦弹广告,像拼夕夕那种帮别人砍一刀就出不来了一层一层的弹
你要是自己为了排查went可以用kafka tool可视化工具
你的意思是你想更改之前的结果么
是你并发和分区数不一致导致的吧
这个跟你的checkpoint设置的时间也有关系
如果是消费的慢,消费者端应该会报超时的错误之类的。也没报错
格局大点,多学点转管理岗位。最主要得会卷,会沟通
刘全有回答的好
营销吧,有技术没营销就是空有力气使不上劲。技术决定上限,营销决定下限
加油
是的