把HDFS里的json数据转换成csv格式

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,182元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

1. 全景图

NewImage

2. 用ListHDFS获取所有文件名

NewImage

 

如果想重新再取一次,右健view state:

NewImage

 

点击 clear state, 再运行,即可再次采集数据了。

 

3. 用FetchHDFS 取出json 数据

NewImage

 

4. 用ExecuteScript 转换

NewImage

import org.apache.commons.io.IOUtils import java.nio.charset.* import java.text.SimpleDateFormat import groovy.json.*

def flowFile = session.get()

flowFile = session.write(flowFile, {inputStream, outputStream ->

def js = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def data = new JsonSlurper().parseText( js ) def columns = data.data*.keySet().flatten().unique()

// Wrap strings in double quotes, and remove nulls def encode = { e -> e == null ? '' : e instanceof String ? /"$e"/ : "$e" }

// Print all the column names def columnName = columns.collect { c -> encode( c ) }.join( ',' )

// Then create all the rows def columnData = data.data.collect { row -> // A row at a time columns.collect { colName -> encode( row[ colName ] ) }.join( ',' ) }.join( '\n' )

StringBuilder cd = new StringBuilder() cd.append(columnName + "\n") cd.append(columnData)

outputStream.write(cd.toString().getBytes(StandardCharsets.UTF_8)) }as StreamCallback)

session.transfer(flowFile, REL_SUCCESS)

 

参考:http://stackoverflow.com/questions/21576162/groovy-code-to-convert-json-to-csv-file

 

5. 用PutHDFS 插入

NewImage

 

问题:

最近加了cluster,发现listhdfs不能取到数据了:

NewImage

 

查看日志:

NewImage

 

发现日志里提到了zookeeper导致connection refused

 

nifi设置成cluster必须走zookeeper来调度资源,所以必须要连上我们的zookeeper server,有一个配置要加

conf/state-management.xml里面有个配置

<cluster-provider> <id>zk-provider</id> <class>org.apache.nifi.controller.state.providers.zookeeper.ZooKeeperStateProvider</class> <property name="Connect String">wdp.xxx.cn:2181</property> <property name="Root Node">/nifi</property> <property name="Session Timeout">30 seconds</property> <property name="Access Control">CreatorOnly</property> <property name="Username">nifi</property> <property name="Password">nifi</property> </cluster-provider>

 

 本文转自疯吻IT博客园博客,原文链接:http://www.cnblogs.com/fengwenit/p/5795576.html,如需转载请自行联系原作者


 

目录
相关文章
|
22天前
|
JSON API 数据格式
淘宝/天猫图片搜索API接口,json返回数据。
淘宝/天猫平台虽未开放直接的图片搜索API,但可通过阿里妈妈淘宝联盟或天猫开放平台接口实现类似功能。本文提供基于淘宝联盟的图片关联商品搜索Curl示例及JSON响应说明,适用于已获权限的开发者。如需更高精度搜索,可选用阿里云视觉智能API。
|
20天前
|
JSON API 数据安全/隐私保护
深度分析淘宝卖家订单详情API接口,用json返回数据
淘宝卖家订单详情API(taobao.trade.fullinfo.get)是淘宝开放平台提供的重要接口,用于获取单个订单的完整信息,包括订单状态、买家信息、商品明细、支付与物流信息等,支撑订单管理、ERP对接及售后处理。需通过appkey、appsecret和session认证,并遵守调用频率与数据权限限制。本文详解其使用方法并附Python调用示例。
|
30天前
|
JSON 缓存 API
淘宝店铺所有商品API,json数据返回
淘宝店铺所有商品API的JSON数据返回通常包含商品的基本信息、动态数据以及分页信息等。以下是一个详细的JSON数据返回示例,以及相关字段的说明
|
1月前
|
JSON 算法 API
淘宝商品评论API接口核心解析,json数据返回
淘宝商品评论API是淘宝开放平台提供的数据服务接口,允许开发者通过编程方式获取指定商品的用户评价数据,包括文字、图片、视频评论及评分等。其核心价值在于:
|
2月前
|
JSON API 数据格式
Python采集京东商品评论API接口示例,json数据返回
下面是一个使用Python采集京东商品评论的完整示例,包括API请求、JSON数据解析
|
29天前
|
设计模式 JSON Unix
微店商品详情API接口,json数据返回
微店商品详情API接口的典型JSON返回数据结构说明,基于公开的微店开放平台API文档和常见电商API设计模式整理。实际使用时请以微店官方最新文档为准
|
22天前
|
JSON 安全 API
Python处理JSON数据的最佳实践:从基础到进阶的实用指南
JSON作为数据交换通用格式,广泛应用于Web开发与API交互。本文详解Python处理JSON的10个关键实践,涵盖序列化、复杂结构处理、性能优化与安全编程,助开发者高效应对各类JSON数据挑战。
103 1
|
22天前
|
JSON 数据挖掘 API
淘宝详情API接口与高级详情API接口用json返回数据区别
淘宝“商品详情API”与“高级商品API”主要区别在于数据深度、字段丰富度及适用场景。前者适用于轻量级导购展示,后者支持详情页展示与深度分析,需根据业务需求选择使用。
|
1月前
|
JSON API 数据格式
淘宝关键词搜索API接口,json数据返回
淘宝关键词搜索API接口允许开发者通过关键词检索商品,并返回符合条件的商品信息,这些信息通常以JSON格式呈现。以下是一个淘宝关键词搜索API接口返回的JSON数据示例及关键字段说明
|
1月前
|
JSON 安全 数据安全/隐私保护
查询 Restful/JSON 数据--SPL 轻量级多源混算实践 3
本教程介绍如何通过 Restful 接口获取 JSON 格式的订单数据,并进行筛选、汇总与客户等级划分等计算操作,同时讲解访问安全控制中的 Session/Cookie 与 Token 认证方式,适用于数据处理与分析场景。