开发者学堂课程【NiFi 知识精讲与项目实战(第三阶段):Json 内容转换为 Hive 所支持的文本格式1】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/707/detail/12547
Json 内容转换为 Hive 所支持的文本格式1
内容介绍
一、课前介绍
二、案例二 Json 内容转换为 Hive 所支持的文本格式
三、操作演示
一、课前介绍
本节课进入 NiFi 的第二个典型案例,它主要是基于第一个案例的基础,再做处理。做的事情主要是把 json 内容转换为 Hive 所支持的文本格式来进行保存。在第一个案例当中,向 HDFS 同步的时候,保存的内容是 Json 格式,如图所示。
全部都是 Json 数据。 HDFS 这个文本数据需要被 Hive 的外部表的加载使用的时候,那这个 Json 格式它是不满足要求的,应该把 Json 格式转化为 Hive 所支持的文本格式。 接下来就进入到第二个案例的流程
二、案例二 Json 内容转换为 Hive 所支持的文本格式
1.处理器流程
这个处理器是在第一个步骤的基础上又增加两个新的处理器,一个是叫 EvaluateJsonPath 处理器,还有一个replace Text 处理器,这两个处理器主要是在 Json 被切割成每一个单独的对象以后,再去进行处理。这里的重点是 EvaluateJsonPath,提取 Json 当中的属性,然后提取属性以后,再通过 replace Text 的把这些内容给替换掉,用属性值替换掉的 FLOW File 的内容, 然后使被替换以后的内容能够被 hive 的外部表去进行加载。
第一步要把 Json 中的数据,把它的属性值提取出来,提取出来以后,然后再把数据转化,通过的 table 键进行分割,然后通过换行去进行分割行数据,table 键分割字段数据,回车键的分割行数据。
2.处理器说明
(1) EvaluateJsonPath 处理器,首先来看 EvaluateJsonPath 的描述信息。
属性名称 |
默认值 |
可选值 |
描述 |
Destination |
flowfile- |
flowfile- |
指示是否将]sonPath 计算结果写入 流文件内容或流文件属性;如果使用flowfile-attrbute,则必须指定属性名称属性。如果设置为 flowfile-content,则只能指定一个 |
Return Type |
auto- |
auto- |
指示 JSON 路径表达式的期望返回 类型。选择"auto-detect","flowfile-content"的诉回类型自动置为”json"“flowfile-attribute""的返回类型自动设置为"scalar""。 |
Path Not |
ignore |
warnignore |
指示在将 Destination 设置为"flowfile-attribute""时如何处理丢失的] SON 路径表达式。当没有找到 SON 路径表达式时,选择"warn"将生成一个警告。 |
EvaluateJsonPath,它主要是根据流文件当中的 CONtent 然后使用 EvaluateJsonPath 表达式去把它里面的内容提取出来,写入到新的 Flow File 属性当中去。 当然它也可以重新覆盖写入到 Flow File 他所本身的内容当中去。EvaluateJsonPath 表达式,在这里是用最简单的用法。在 EvaluateJsonPath 处理器如果想要去添加自定义的属性,就必须要先在这个处理器当中去添加自定义属性,最重要的就是这个自定义属性,它的属性值需要用到的主要是这第一个值需要去修改,也就是 destination。 写入流文件的时候,默认值里面是 flowfilecontent,需要把它修改,修改flowfile-attribute。然后再去添加新的自定义属性,就可以把这个自定义属性写入到的 flowfile 当中。这个就是新的的自定义属性,这个自定义属性的名称,就会是后来的 flowfile 当中新添加的名称,这个值就是可能 flowfile 当中将会要添加的值。
接下来 RepalceText 处理器。它的主要功能,就是通过正则表达式匹配到以后把它改变为相对应的值。
三、操作演示
接下来进行操作演示,实现转换文本的过程。首先要再新加一个,EvaluateJsonPath 处理器,添加到之前处理器后面,将原处理器连接到 EvaluateJsonPath 处理器,然后把EvaluateJsonPath处理器再去连接到 HDFS 数据库
连接后需要把 Json 字段读取出来,通过 Jsonpast 的形式,把它添加到的自定义属性当中。
在这个配置项目当中,首先第一个改成 flowflie-attribute。他的意思要把的数据写入到属性当中,而不是内容当中。然后下面这些都是自定义的属性,这些自定义属性左侧是它的名称,右侧是它的值,值到点就是典型的 Jsonpass 表达式。当然这个用法是最简单的用法,就是去使用这个对象哪个属性,那这正好就是从数据库读出来的这几个字段名。然后让它进行自连接,把它这个提示警告把它去掉,然后就可以启动。
运行之后来可以来观察它输出的结果是否已经不一样,是否已经有全新的自定义的属性,然后在这个基础上有属性之后,把这些属性最终变更写入到的新的 conent 的内容当中,把原来的 conent 的内容覆盖掉,但是这里并不是简单的复制,他还要把文本按照这个 Hive 所支持的有外部表的格式替换。需要使用的 replace text。这个处理器,然后添加完 eplace Text 到这个倒数第二个步骤,就是在的 EvaluateJsonPath 后面。
然后添加完之后,要也要去做一个自连接,然后再去进行设置。主要是的 替换以后的值。原始值肯定是然后不要。替换的新值就可以把刚才通过 EvaluateJsonPath,新添加的这些数直接通过 NIFI 表达式拿出来,然后中间通过空格进行分割,每一行数据的后面,再加上一个换行回车,通过这样的方式,把 conent 的中的原来的阶层把它给替换掉。 然后这个模式替换模式 line-by-line 也就是每一行都会进行这样配置以后保存,然后再运行,就可以查看到最终的结果,成功的变换。这样的一个格式,就是 Hive 表他所需要的结构。
最后查看展示结果