开发者社区> 问答> 正文

在Apache Flink中读取包含22个以上列的CSV

我到目前为止所做的是读取CSV如下:

val data = env.readCsvFileElecNormNew.getPath)

val dataSet = data map { tuple =>

  val list = tuple.productIterator.toList
  val numList = list map (_.asInstanceOf[Double])
  LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
}

哪里的ElecNorNew是case class:

case class ElecNormNew(
var date: Double,
var day: Double,
var period: Double,
var nswprice: Double,
var nswdemand: Double,
var vicprice: Double,
var vicdemand: Double,
var transfer: Double,
var label: Double) extends Serializable {
}
正如Flink的文档中所述。但现在我正在尝试读取53列的CSV。有没有办法自动化这个过程?我需要创建一个包含53个字段的POJO吗?

更新
在Fabian回答之后,我正在尝试这个:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
val csvData: DataSet[Row] = env.createInputRow
val dataSet2 = csvData.map { tuple =>

  ???

}
但不知道如何继续,我想如何使用RowTypeInfo?

展开
收起
社区小助手 2018-12-11 16:01:43 4377 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    您可以使用RowCsvInputFormat如下:

    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)

    val rowIF = new RowCsvInputFormat(new Path("file:///myCsv"), fieldTypes)
    val csvData: DataSet[Row] = env.createInputRow
    Row将数据存储在Array[Any]。因此,Flink无法自动推断a的字段类型Row。这比使用类型化的元组或案例类更难使用。您需要明确提供RowTypeInfo正确的类型。这可以作为隐式值或通过扩展ResultTypeQueryable接口的函数来完成。

    2019-07-17 23:19:48
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载

相关镜像