由文件权限引发的flink实践坑解决经过-阿里云开发者社区

开发者社区> 开发与运维> 正文

由文件权限引发的flink实践坑解决经过

简介: 现在参照@Jeff Zhang 简锋的教程进行Flink on Zeppelin的实时Streaming读写数据,学习在Zeppelin中使用Flink的Streaming功能。但在练习过程中,在搭建环境和进行测试数据导入时,出现了问题。以下就是出坑经历。
现在参照@Jeff Zhang 简锋的教程进行Flink on Zeppelin的实时Streaming读写数据,学习在Zeppelin中使用Flink的Streaming功能。但在练习过程中,在搭建环境和进行测试数据导入时,出现了问题。由于这个问题在特定环境下出现(CentOS),所以在@Jeff Zhang 简锋教程里也没提及,而且在kafka datagen的作者github里也没提及,下面就这个问题做的探索和解决进行了一一记录,利于同样环境下的同学参考和解决。好了,下面就一一说来问题和解决的探索。

  首先进行kafka的搭建。按https://mp.weixin.qq.com/s/k_0NgJinpK0VVTXw_Jd7ag进行kafka的docker搭建。

 运行kafka起来后,并进行对应的kafka数据生成。

运行:

curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @connect.source.datagen.json
出现问题:

{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.hdfs.HdfsSinkConnector, name='io.confluent.connect.hdfs.HdfsSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class io.confluent.connect.hdfs.tools.SchemaSourceConnector, name='io.confluent.connect.hdfs.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=connector, typeName='connector', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"}
根据出错提示,发现kafka是找不到对应的类。再看了看post的json文件。

{
"name": "connect.source.datagen",
"config": {

"connector.class": "com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector",
"tasks.max": "1",
"topic.name": "generated.events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": false,
"poll.size": 10,
"poll.interval.ms": 5000,
"message.template": "{\"status\":\"foo\",\"direction\":\"up\"}",
"random.fields": "status:foo|bar|baz, direction:up|down|left|right",
"event.timestamp.field": "event_ts"

}
}
大概做了一个分析,curl调用kafka的端口进行数据生成数据时,会调用"com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector"进行数据生成。

看反馈出的错误,是没有找到这个类。

这个类是不是需要手工copy进docker?怀着这个疑问,又看了下Docker-compose.yml的文件。

发现了这句:

volumes:

  - ./plugins:/tmp/connect-plugins

这个就是把本地plugins的目录映射进了docker的/tmp/connect-plugins的目录。

也就是本地的jar文件通过这种办法被引入了docker的kafka 。那为什么会找不到呢?

命令行方式进入docker里,看看这个目录发生了什么问题?

[root@localhost quickstart]# docker exec -it 6f723d718e3b bash
root@connect:/# ls /tmp
connect-plugins hsperfdata_root
这个目录存在,那是什么问题呢?

root@connect:/# ls /tmp/connect-plugins
ls: cannot open directory /tmp/connect-plugins: Permission denied
原来是这个问题。目录里的文件由于权限无法访问,怪不得出这个问题呢?

那如何解决呢?

想到的首先是在宿主机上对文件权限进行修改。chmod ,但发现不行。还是访问不到。

又想到这个文件在宿主机的/root目录下。是不是由于这个问题呢?那接着把这个目录移到非root目录下,还是不行。

终于灵光一现,是不是selinux惹的麻烦?

下面简单说下selinux。

安全增强型 Linux(Security-Enhanced Linux)简称 SELinux,它是一个 Linux 内核模块,也是 Linux 的一个安全子系统。

SELinux 主要由美国国家安全局开发。2.6 及以上版本的 Linux 内核都已经集成了 SELinux 模块。

SELinux 的结构及配置非常复杂,而且有大量概念性的东西,要学精难度较大。很多 Linux 系统管理员嫌麻烦都把 SELinux 关闭了。

正好这台服务器上selinux没有关闭。

二话不说,

setenforce 0
再进去看看。没有问题了,可以访问到jar了。

[root@localhost quickstart]# docker exec -it 6f723d718e3b bash
root@connect:/# ls /tmp/
connect-plugins/ hsperfdata_root/
root@connect:/# ls /tmp/connect-plugins/
kafka-connect-datagen-0.1.1.jar
再重新运行下注入数据的命令。

curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @connect.source.datagen.json
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.hdfs.HdfsSinkConnector, name='io.confluent.connect.hdfs.HdfsSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class io.confluent.connect.hdfs.tools.SchemaSourceConnector, name='io.confluent.connect.hdfs.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='4.1.1', encodedVersion=4.1.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='4.1.1', encodedVersion=4.1.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-hdfs/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=connector, typeName='connector', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.1.1-cp1', encodedVersion=1.1.1-cp1, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"}
依然不行?为什么?

原来是恢复权限后,需要重启下应用服务器。

docker-compose down

docker-compose up -d
然后再来访问。就没有问题了。

curl -X POST http://localhost:8083/connectors -H 'Content-Type:application/json' -H 'Accept:application/json' -d @connect.source.datagen.json
{"name":"connect.source.datagen","config":{"connector.class":"com.github.xushiyan.kafka.connect.datagen.performance.DatagenConnector","tasks.max":"1","topic.name":"generated.events","key.converter":"org.apache.kafka.connect.storage.StringConverter","key.converter.schemas.enable":"false","value.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter.schemas.enable":"false","poll.size":"10","poll.interval.ms":"5000","message.template":"{"status":"foo","direction":"up"}","random.fields":"status:foo|bar|baz, direction:up|down|left|right","event.timestamp.field":"event_ts","name":"connect.source.datagen"},"tasks":[],"type":null}

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
开发与运维
使用钉钉扫一扫加入圈子
+ 订阅

集结各类场景实战经验,助你开发运维畅行无忧

其他文章