flink cdc 连接elasticsearch 怎么连接 ?
我用这种 但是显示 connection refused
Flink CDC 连接 Elasticsearch 需要使用 Flink 的 Elasticsearch Sink。具体步骤如下:
在项目的 pom.xml 文件中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
其中,${flink.version}
需要替换为实际使用的 Flink 版本号。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch6.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.config.ElasticsearchSinkConfig;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.HashMap;
import java.util.Map;
public class FlinkCDCElasticsearch {
private static final String ES_HOST = "localhost"; // Elasticsearch 主机地址
private static final int ES_PORT = 9200; // Elasticsearch 端口号
private static final String ES_INDEX = "test"; // Elasticsearch Index名称
private static final String ES_TYPE = "doc"; // Elasticsearch Type名称
private static final String ES_SNAPSHOT_RETENTION_SETTING = "2d"; // Elasticsearch快照保留策略设置,可以根据需求进行修改。这里设置为2天。
private static final RequestIndexer indexer = new RequestIndexer(); // RequestIndexer对象,用于构建索引请求和文档请求等操作。需要根据实际情况创建该对象并设置相关参数和状态信息。
private static final SimpleStringSchema schema = new SimpleStringSchema(); // SimpleStringSchema对象,用于将数据序列化为字符串形式。需要根据实际情况创建该对象并设置相关参数和状态信息。
}
```java
// 创建Elasticsearch Sink配置对象,并设置相关参数和状态信息。需要根据实际情况创建该对象并设置相关参数和状态信息。
final Map config = new HashMap<>();
config.put(ElasticsearchSinkConfig.ES_HOSTS, ES_HOST + ":" + ES_PORT); // 设置Elasticsearch主机地址和端口号。如果需要指定多个Elasticsearch节点,可以使用逗号分隔的形式。例如:"localhost:9200,localhost:9300"。
config.put(ElasticsearchSinkConfig.ES_INDEX, ES_INDEX); // 设置Elasticsearch Index名称。如果需要指定多个Index,可以使用逗号分隔的形式。例如:"index1,index2"。
config.put(ElasticsearchSinkConfig.ES_TYPE, ES_TYPE); // 设置Elasticsearch Type名称。如果需要指定多个Type,可以使用逗号分隔的形式。例如:"type1,type2"。
config.put(ElasticsearchSinkConfig.ES_SNAPSHOT_RETENTION_SETTING, ES_SNAPSHOT_RETENTION_SETTING); // 设置Elasticsearch快照保留策略设置。可以根据实际情况进行修改。默认值为"none",表示不启用快照保留策略;也可以设置为"number_of_days",表示保留多少天的快照数据;还可以设置为其他自定义的值,例如:"2d"、"7d"等。
本文为您介绍如何使用Elasticsearch连接器。https://help.aliyun.com/zh/flink/developer-reference/elasticsearch-connector?spm=a2c4g.11186623.0.i68
Elasticsearch连接器支持的信息如下:
前提条件
已创建Elasticsearch索引,详情请参见创建示例。
已配置Elasticsearch公网或私网访问白名单,详情请参见配置实例公网或私网访问白名单。
使用限制
源表和维表仅支持Elasticsearch 5.5及以上版本。
结果表仅支持Elasticsearch 6.x和7.x版本。
仅Flink计算引擎VVR 2.0.0及以上版本支持Elasticsearch连接器。
仅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。