用的是flink1.7.1,测试代码为
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Rowtime, Schema}
import org.apache.flink.table.api.Types
import org.apache.flink.types.Row
case class Person(id: Int, name: String, value: String, timestamp: Long)
object EsTableSinkExample extends App {
val esSink: Elasticsearch = new Elasticsearch()
.version("6")
.host("192.168.1.160", 9200, "http")
.index("test-myuser")
.documentType("_doc") // es6 type一定要设置成"_doc"
val json = new Json().jsonSchema(
"""
|{
| "type": "object",
| "properties": {
| "id": {
| "type": "number"
| },
| "name": {
| "type": "string"
| },
| "value": {
| "type": "string"
| },
| "timestamp": {
| "type": "number"
| }
| }
|}
""".stripMargin
)
val schema: Schema = new Schema()
.field("id", Types.INT )
.field("name", Types.STRING )
.field("value", Types.STRING )
.field("timestamp", Types.LONG )
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv
.connect(esSink)
.withFormat(json)
.withSchema(schema)
.inUpsertMode()
.registerTableSink("my-user-tableSink")
val source = env.fromElements("1,hb,100,1547121997000", "2,fsl,99,1547121999000")
.map(_.split(",")).map(x => Person(x(0).toInt, x(1), x(2), x(3).toLong))
source.print()
tableEnv.registerDataStream("source", source)
val t1 = tableEnv.sqlQuery("select * from source ")
t1.printSchema()
//val t2 = tableEnv.toRetractStream[Row](t1)
//t2.print
t1.insertInto("my-user-tableSink")
env.execute()
}
在idea里执行,成功,没有报错.
但是在es里查结果时,发现只有
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-myuser",
"_type": "_doc",
"_id": "dzDWOmgBDxFAsMLp_YNp",
"_score": 1
},
{
"_index": "test-myuser",
"_type": "_doc",
"_id": "eTDXOmgBDxFAsMLpKoNJ",
"_score": 1
}
]
}
只有随机id,没有实际记录写入.
问题:
怎样把完整的表写入es,并指定主键, 比如代码里的这个表 val t1 = tableEnv.sqlQuery("select * from source ").
答案来自 军长:
你好,
1、只有随机id,没有实际记录写入?
截图中没有查询语句,不太清楚是怎么查的。且截图里没看到_source字段。可以参考下这里:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html 把_source也查出来看下。
2、怎样指定主键?
社区文档有指定主键的说明:Key extraction: Flink automatically extracts valid keys from a query. For example, a query SELECT a, b, c FROM t GROUP BY a, b defines a composite key of the fields a and b. The Elasticsearch connector generates a document ID string for every row by concatenating all key fields in the order defined in the query using a key delimiter. A custom representation of null literals for key fields can be defined. (https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector)
即:会分析query,把group by的字段当成主键。
比如你想拿id作为主键。sql需要这么写,select id, last_value(name), last_value(value) group by id。表明key是id。last_value是一个自定义的udaf需要你实现下,每次返回最后的值。udaf怎么写可以参考下这个文档:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions, 也可以看下flink code的MAX实现,MAX返回最大值,LAST_VALUE返回最后一个值,原理差不多。
你好,
即:会分析query,把group by的字段当成主键。
比如你想拿id作为主键。sql需要这么写,select id, last_value(name), last_value(value) group by id。表明key是id。last_value是一个自定义的udaf需要你实现下,每次返回最后的值。udaf怎么写可以参考下这个文档:https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#aggregation-functions, 也可以看下flink code的MAX实现,MAX返回最大值,LAST_VALUE返回最后一个值,原理差不多。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。