凹凹凸曼_个人页

个人头像照片 凹凹凸曼
个人头像照片
21
6
0

个人介绍

暂无个人介绍

擅长的技术

获得更多能力
通用技术能力:

暂时未有相关通用技术能力~

云产品技术能力:

暂时未有相关云产品技术能力~

阿里云技能认证

详细说明
暂无更多信息

2020年06月

  • 发表了文章 2018-10-10

    Ubuntu 14.04 更换阿里云源

  • 发表了文章 2018-10-10

    初次使用阿里云服务器该如何搭建

  • 发表了文章 2018-10-10

    Laravel 使用阿里云 oss 存储对象

  • 发表了文章 2018-10-10

    利用阿里云搭建私有Git服务器

  • 发表了文章 2018-10-10

    阿里云裸机部署rails运用

  • 发表了文章 2018-10-10

    阿里云RDS恢复数据到本地上

  • 发表了文章 2018-09-26

    Linux 禁止某个IP地址访问的几种方法

  • 发表了文章 2018-09-26

    解决阿里云服务器下 ip:8080 不能访问的问题

  • 发表了文章 2018-09-26

    阿里云服务器ECS屏蔽某个IP的访问

  • 发表了文章 2018-09-26

    怎么在阿里云屏蔽一些IP?

  • 发表了文章 2018-09-26

    如何做好网站安全防护,防止被黑客攻击

  • 发表了文章 2018-09-26

    阿里云全球交付中心正式成立,打造一流的全球服务能力

  • 发表了文章 2018-09-12

    阿里云如何检查TCP80端口是否正常工作?

  • 发表了文章 2018-09-12

    浅谈使用VPS建网站的几大好处

  • 发表了文章 2018-09-12

    阿里云加密服务产品优势及使用场景

  • 发表了文章 2018-09-12

    阿里云 E-MapReduce产品优势及使用场景

  • 发表了文章 2018-09-05

    数据库表迁移到阿里云的方法有哪些

  • 发表了文章 2018-09-05

    讲讲自己亲身经历网站备案的前前后后

  • 发表了文章 2018-09-05

    个人怎样选择阿里云服务器?

  • 发表了文章 2018-09-05

    SEO不只是发发链,那么SEO到底是什么呢?

正在加载, 请稍后...
滑动查看更多
  • 提交了问题 2020-06-16

    Android ORM框架ActiveAndroid使用

  • 回答了问题 2020-06-16

    在执行flink sql的时候报错,有见过的吗 Flink1.9的 #Flink

    Checkpoint介绍 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。 每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。 CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。 2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理 3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示 两个输入源 checkpoint 过程 假设算子C有A和B两个输入源 在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。 当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。 当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。 持久化存储 目前,Checkpoint持久化存储可以使用如下三种: MemStateBackend 该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及 快照的数据量非常小时使用,并不推荐用作大规模商业部署。 FsStateBackend 该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend('hdfs:///hacluster/checkpoint')), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend('file:///Data'))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。 RocksDBStateBackend RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend('hdfs:///hacluster/checkpoint')或new RocksDBStateBackend('file:///Data'))。 如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。 语法 ​ val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) ​ 例子 需求 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理 数据规划 使用自定义算子每秒钟产生大约10000条数据。 
 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 
 数据经统计后,统计结果打印到终端输出。 
 打印输出的结果为Long类型的数据。 
 开发思路 
 source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 window算子每隔1秒钟统计一次最近4秒钟内数据数量。 每隔1秒钟将统计结果打印到终端 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 //发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int) class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321' // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i /** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend('hdfs://hadoop01:9000/flink-checkpoint/checkpoint/')) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } } //该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count } //该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event // 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } } flink-SQL Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL: org.apache.flink flink-table_2.11 1.5.0 另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加: org.apache.flink flink-scala_2.11 1.5.0 Table API和SQL程序的结构 Flink的批处理和流处理的Table API和SQL程序遵循相同的模式; 所以我们只需要使用一种来演示即可 要想执行flink的SQL语句,首先需要获取SQL的执行环境: 两种方式(batch和streaming): // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责: 在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment 在内部目录中注册一个表 TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。 输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统 输入表可以从各种来源注册: 现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。 注册一个表 // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // register the Table projTable as table 'projectedX' tableEnv.registerTable('projectedTable', projTable) // Table is the result of a simple projection query val projTable: Table = tableEnv.scan('projectedTable ').select(...) 注册一个tableSource TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource('/path/to/file', ...) // register the TableSource as table 'CsvTable' tableEnv.registerTableSource('CsvTable', csvSource) 注册一个tableSink 注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSink val csvSink: TableSink = new CsvTableSink('/path/to/file', ...) // define the field names and types val fieldNames: Array[String] = Array('a', 'b', 'c') val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table 'CsvSinkTable' tableEnv.registerTableSink('CsvSinkTable', fieldNames, fieldTypes, csvSink) 例子 //创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path('data1.csv')//文件路径 .field('id' , Types.INT)//第一列数据 .field('name' , Types.STRING)//第二列数据 .field('age' , Types.INT)//第三列数据 .fieldDelimiter(',')//列分隔符,默认是',' .lineDelimiter('\n')//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource('tableA' , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan('tableA').select('id , name , age').filter('name == 'lisi'') //将数据写出去 table.writeToSink(new CsvTableSink('bbb' , ',' , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery('select id,name,age from tableA where id > 0 order by id limit 2') //// //将数据写出去 // sqlResult.writeToSink(new CsvTableSink('aaaaaa.csv', ',', 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成 1:将DataStream或DataSet转换为表格 在上面的例子讲解中,直接使用的是:registerTableSource注册表 对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。 然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据 语法: // get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // register the DataStream as Table 'myTable' with fields 'f0', 'f1' tableEnv.registerDataStream('myTable', stream) 例子 object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = { // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) //构造数据 val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, 'beer', 3), Order(1L, 'diaper', 4), Order(3L, 'rubber', 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, 'pen', 3), Order(2L, 'rubber', 3), Order(4L, 'beer', 1))) // 根据数据注册表 tEnv.registerDataStream('OrderA', orderA) tEnv.registerDataStream('OrderB', orderB) // union the two tables val result = tEnv.sqlQuery( 'SELECT * FROM OrderA WHERE amount > 2 UNION ALL ' + 'SELECT * FROM OrderB WHERE amount } } case class Order(user: Long, product: String, amount: Int) 将表转换为DataStream或DataSet A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序 1:将表转换为DataStream 有两种模式可以将 Table转换为DataStream: 1:Append Mode 将一个表附加到流上 2:Retract Mode 将表转换为流 语法格式: // get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子: object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, 'Hello'), Peoject(2L, 2, 'Hello'), Peoject(3L, 3, 'Hello'), Peoject(4L, 4, 'Hello'), Peoject(5L, 5, 'Hello'), Peoject(6L, 6, 'Hello'), Peoject(7L, 7, 'Hello World'), Peoject(8L, 8, 'Hello World'), Peoject(8L, 8, 'Hello World'), Peoject(20L, 20, 'Hello World')) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env.fromCollection(data) val table: Table = tEnv.fromDataStream(stream) //TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table) //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息 val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table) retractStream.print() env.execute() } } case class Peoject(user: Long, index: Int, content: String) 将表转换为DataSet 语法格式 // get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子: case class Peoject(user: Long, index: Int, content: String) object TableTODataSet{ def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, 'Hello'), Peoject(2L, 2, 'Hello'), Peoject(3L, 3, 'Hello'), Peoject(4L, 4, 'Hello'), Peoject(5L, 5, 'Hello'), Peoject(6L, 6, 'Hello'), Peoject(7L, 7, 'Hello World'), Peoject(8L, 8, 'Hello World'), Peoject(8L, 8, 'Hello World'), Peoject(20L, 20, 'Hello World')) //初始化环境,加载table数据 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnvironment = TableEnvironment.getTableEnvironment(env) val collection: DataSet[Peoject] = env.fromCollection(data) val table: Table = tableEnvironment.fromDataSet(collection) //TODO 将table转换为dataSet val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table) toDataSet.print() // env.execute() } }
    踩0 评论0
  • 回答了问题 2020-06-16

    为什么用pycharm总是报错呢??报错

    利用PyCharm用于一般IDE具备的功能,可不少人在Linux系统中安装PyCharm时会出现错误,提示Error: cannot start PyCharm,这是什么错误呢?我们又该如何来解决呢?下面小编就教大家Linux安装PyCharm提示错误的解决方法,有兴趣的网友们可以一起来学习下。。   Error: cannot start PyCharm   No JDK found to run PyCharm. Please validate either PYCHARM_JDK, JDK_HOME or JAVA_HOME光纤激光切割机 http://www.6618cnc.comenvironment variable points to valid JDK installation.   解决方法:   进入jdk目录, cd 。。/。。/jdk1.7.0_60/   执行 export JAVA_HOME= 。。/。。/jdk1.7.0_60/(按各自电脑的jdk目录设置)   然后重新安装pycharm即可。   上面就是小编总结的Linux安装PyCharm提示错误的解决方法,如果你在安装的过程中出现了错误提示,按照本文介绍的方法进行重新安装即可。
    踩0 评论0
  • 回答了问题 2020-06-16

    hibernate注解问题?报错

    异常: org.hibernate.MappingException: An AnnotationConfiguration instance is required to use 错误原因: Configuration cfg = new Configuration(); SessionFactory sf = cfg.configure().buildSessionFactory(); 解决方法: Hibernate配置文件中,若带有 ,则说明映射类时,采用了Annotation方式。在初始化数控等离子切割机http://www.158cnc.comConfiguation时,应使用AnnoationConfiguration,代码如下: Configuration cfg = new AnnotationConfiguration(); SessionFactory sf = cfg.configure().buildSessionFactory();
    踩0 评论0
  • 回答了问题 2020-06-16

    Mac上怎么装FastDFS?报错

    项目中用到了FastDFS来做文件存储。最近重构的时候,因为经常处于移动办公的状态,所以访问公司的服务器不是很方便,所以感觉有必要在本机上搭建一套FastDFS的测试环境。 FastDFS是什么? FastDFS是一个开源的轻量级分布式文件系统,它对文件进行管理,功能包括:文件存储、文件同步、文件访问(文件上传、文件下载)等,解决了大容量存储和负载均衡的问题。特别适合以文件为载体的在线服务,如相册网站、视频网站等等。 FastDFS为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务。 FastDFS里面有两种角色:Tracker、Storage。Tracker主要做调度工作,在访问上起负载均衡的作用。Storage存储节点存储文件,完成文件管理的所有功能:存储、同步和提供存取接口,FastDFS同时对文件的meta data进行管理。所谓文件的meta data就是文件的相关属性,以键值对(key value pair)方式表示,如:width=1024,其中的key为width,value为1024。文件meta data是文件属性列表,可以包含多个键值对。 Docker方式 因为之前用Docker用的比较多,所以首先想到的是用Docker来运行FastDFS。上hub.docker.io上找了找镜像,最后选择了luhuiguo/fastdfs这个镜像。很快写了一个docker-compose脚本: 1version: '3.3' 2services: 3 tracker: 4 image: luhuiguo/fastdfs 5 ports: 6 - '22122:22122' 7 command: 8 - tracker 9 volumes: 10 # let container use same timezone as host 11 - /etc/localtime:/etc/localtime 12 - /opt/docker/fastdfs/tracker:/var/fdfs 13 storage: 14 image: luhuiguo/fastdfs 15 ports: 16 - '23000:23000' 17 links: 18 - tracker 19 command: 20 - storage 21 environment: 22 TRACKER_SERVER: 192.168.255.199:22122 23 GROUP_NAME: group1 24 volumes: 25 - /etc/localtime:/etc/localtime 26 - /opt/docker/fastdfs/storage:/var/fdfs 执行起来,看起来也是正常的。但是使用程序访问的时候却失败了。抛出了异常:java.net.SocketTimeoutException: connect timed out。通过跟踪代码发现了原因:使用docker的时候,storage向tracker汇报的地址是docker内部的地址,而这个地址在Mac本机是无法直接访问的。通过代码访问Tracker的时候返回的地址就是docker内部的地址,自然会出现SocketTimeoutException了。 1.1 network host模式 因为知道docker有host这一网络模式,可以使docker容器使用与宿主机一样的网络。感觉上这样或许可以解决上面的问题,而且镜像的说明中也提到了这一点。于是加上了network_mode: host。但是启动之后发现还是一样的错误。经过搜索找到了原因: **Mac上的Docker实际上是通过虚拟化方式运行在Mac系统上的。通过xhyve技术模拟出来一台Linux主机,然后在其中跑Docker进程。当使用host模式跑的时候,docker使用的是这个虚拟出来的Linux主机的网络作为Host网络,无法直接使用Mac主机的网络。因此依然无法连接是正常的。**也就是说,在Mac主机上,host这种网络是无法使用的。 本机编译执行 既然使用Docker无法运行FastDFS,那只能通过编译源码来运行了。最新版本的FastDFS已经可以直接在Mac上编译了(不需要修改源码)。因此这种方法也不难。主要步骤如下: 2.0 关闭系统保护 从OSX 10.11开始,Mac对关键目录进行了保护(例如:/bin, /usr/bin等)。而要编译FastDFS却是要安装文件到/usr/bin等目录下,所以首先需要禁用系统保护。关闭的方法如下: 重启系统,重启的过程中按住Command+R进入Recovery模式; 从菜单中选择“终端”或“Terminal”进入命令行模式; 输入命令csrutil disable关闭保护模式,然后输入reboot重启系统即可。 2.1 libfastcommon FastDFS依赖于libfastcommon,因此需要首先编译、安装libfastcommon,步骤如下: 1unzip libfastcommon-1.0.35.zip 2cd libfastcommon-1.0.35 3./make.sh 4./make.sh install 2.2 fastdfs FastDFS源码通过Github下载。下面以最新的5.10为例。编译、安装步骤如下: 1unzip fastdfs-5.10.zip 2cd fastdfs-5.10 3./make.sh 4./make.sh install 安装成功后,FastDFS相关的可执行文件被安装到/usr/bin目录下。相关的配置文件被放在/etc/fdfs下,这和在Linux上安装的结果是一样的。 2.3 启动tracker 下面首先启动Tracker服务。首先准备存储数据的目录,Tracker将会在该目录中保存运行时信息以及日志文件: 1mkdir -p /opt/tools/fastdfs/tracker 2chown -R cap:cap /opt/tools/fastdfs/tracker 然后准备配置文件,从sample文件复制一份以便修改: 1cd /etc/fdfs 数控等离子切割机 数控等离子切割机 2cp tracker.conf.sample tracker.conf 然后修改配置文件/etc/fdfs/tracker.conf,在Mac上单机安装只需要修改base_path,修改成上面刚刚创建的目录: 1base_path=/opt/tools/fastdfs/tracker 然后启动fastdfs,启动完成之后应该占用了22122端口: 1/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf 2netstat -nl | grep 22122 启动服务使用如下命令: 1/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf start 停止服务可以使用如下命令: 1/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf stop 2.4 启动storage 首先还是准备目录供Storage节点使用: 1mkdir -p /opt/tools/fastdfs/storage 2chown -R cap:cap /opt/tools/fastdfs/storage 准备配置文件: 1cp storage.conf.sample storage.conf 修改配置文件: 1# 基本路径,日志等存储在该目录下 2base_path=/opt/tools/fastdfs/storage 3# 存储目录,可以存在多个。FastDFS会按照调度方法使用这些不同的目录。默认使用Round Robin调度方法。 4store_path0=/opt/tools/fastdfs/storage 5# 如果部署集群的话,只要多写几个tracker_server就可以了。 6tracker_server=192.168.255.199:22122 启动服务可以使用如下命令: 1/usr/bin/fdfs_storaged /etc/fdfs/storage.conf start 停止服务命令: 1/usr/bin/fdfs_storaged /etc/fdfs/storage.conf stop
    踩0 评论0
  • 回答了问题 2019-07-17

    云服务器ECS【问答合集】

    好好学习天天向上!
    踩0 评论0
正在加载, 请稍后...
滑动查看更多
正在加载, 请稍后...
暂无更多信息