
常年在 javaWeb 大数据领域中 突破自我 ,希望可以有更多志同道合之士一起交流
暂时未有相关通用技术能力~
阿里云技能认证
详细说明CheckPoint 1. checkpoint 保留策略 默认情况下,checkpoint 不会被保留,取消程序时即会删除他们,但是可以通过配置保留定期检查点,根据配置 当作业失败或者取消的时候 ,不会自动清除这些保留的检查点 。 java : CheckpointConfig config = env.getCheckpointConfig(); config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ExternalizedCheckpointCleanup 可选项如下: ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。 2. Checkpoint 配置 与SavePoint 类似 ,checkpoint 保留的是元数据文件和一些数据文件 默认情况下checkpoint 只保留 一份最新数据,如果需要进行checkpoint数据恢复 ,可以通过全局设置的方式设置该集群默认的checkpoint 保留数,以保证后期可以从checkpoint 点进行恢复 。 同时为了 及时保存checkpoint状态 还需要在服务级别设置 checkpoint 检查点的 备份速度 。全局配置: flink-conf.yaml // 设置 checkpoint全局设置保存点 state.checkpoints.dir: hdfs:///checkpoints/ // 设置checkpoint 默认保留 数量 state.checkpoints.num-retained: 20 注意 如果将 checkpoint保存在hdfs 系统中 , 需要设置 hdfs 元数据信息 : fs.default-scheme: 服务级别设置: java: // 设置 checkpoint 保存目录 env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"); // 设置checkpoint 检查点间隔时间 env.enableCheckpointing(5000); 提交任务之后 job 界面 和 hdfs 界面 通过页面可以看出 checkpoint 备份方式是每5秒执行一次 ,保存当前所有task 状态元信息 和状态信息 。 hdfs 信息 保存 jobId 为 0171897fa809692093b4a9b223cb35e4 最新的 20次 checkpoint 信息 3. Checkpoint 状态点恢复 因为 flink checkpoint 目录 分别对应的是 jobId , 每通过 flink run 方式 / 页面提交方式 都会重新生成 jobId ,那么如何通过checkpoint 恢复 失败任务或者重新执行保留时间点的 任务? flink 提供了 在启动 之时 通过设置 -s 参数指定checkpoint 目录 , 让新的jobId 读取该checkpoint 元文件信息和状态信息 ,从而达到指定时间节点启动 job 。启动方式如下 : ./bin/flink -s /flink/checkpoints/0171897fa809692093b4a9b223cb35e4/chk-50/_metadata -p @Parallelism -c @Mainclass @jar Savepoint Savepoint 介绍 Savepoint是通过Flink的检查点机制创建的流作业执行状态的一致图像。您可以使用Savepoints来停止和恢复,分叉或更新Flink作业。保存点由两部分组成:稳定存储(例如HDFS,S3,...)上的(通常是大的)二进制文件和(相对较小的)元数据文件的目录。稳定存储上的文件表示作业执行状态图像的净数据。Savepoint的元数据文件以(绝对路径)的形式包含(主要)指向作为Savepoint一部分的稳定存储上的所有文件的指针。 savepoint 和 checkpoint 区别 从概念上讲,Flink的Savepoints与Checkpoints的不同之处在于备份与传统数据库系统中的恢复日志不同。检查点的主要目的是在意外的作业失败时提供恢复机制。Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。作为一种恢复和定期触发的方法,Checkpoint实现的两个主要设计目标是:i)being as lightweight to create (轻量级),ii)fast restore (快速恢复) 。针对这些目标的优化可以利用某些属性,例如,JobCode在执行尝试之间不会改变。 与此相反,Savepoints由用户创建,拥有和删除。他们的用例是planned (计划) 的,manual backup( 手动备份 ) 和 resume(恢复) 。例如,这可能是您的Flink版本的更新,更改您的Job graph ,更改 parallelism ,分配第二个作业,如红色/蓝色部署,等等。当然,Savepoints必须在终止工作后继续存在。从概念上讲,保存点的生成和恢复成本可能更高,并且更多地关注可移植性和对前面提到的作业更改的支持。 Assigning Operator IDs ( 分配 operator ids) 为了能够在将来升级你的程序在本节中描述。主要的必要更改是通过该uid(String)方法手动指定操作员ID 。这些ID用于确定每个运算符的状态。 java: DataStream<String> stream = env. // Stateful source (e.g. Kafka) with ID .addSource(new StatefulSource()) .uid("source-id") // ID for the source operator .shuffle() // Stateful mapper with ID .map(new StatefulMapper()) .uid("mapper-id") // ID for the mapper // Stateless printing sink .print(); // Auto-generated ID 如果您未手动指定ID,则会自动生成这些ID。只要这些ID不变,您就可以从保存点自动恢复。生成的ID取决于程序的结构,并且对程序更改很敏感。因此,强烈建议手动分配这些ID。 Savepoint State 触发保存点时,会创建一个新的保存点目录,其中将存储数据和元数据。可以通过配置默认目标目录或使用触发器命令指定自定义目标目录来控制此目录的位置 保存Savepoint $ bin/flink savepoint :jobId [:targetDirectory] 这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。 在yarn 集群中保存Savepoint $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId 这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。 使用 Savepoint 取消job $ bin/flink cancel -s [:targetDirectory] :jobId 这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。 Resuming Savepoint $ bin/flink run -s :savepointPath [:runArgs] 这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。 允许未恢复状态启动 $ bin/flink run -s :savepointPath -n [:runArgs] 默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过--allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态: 全局配置 您可以通过state.savepoints.dir 配置文件设置默认savepoint 位置 。触发保存点时,此目录将用于存储保存点。您可以通过使用触发器命令指定自定义目标目录来覆盖默认值(请参阅:targetDirectory参数)。 flink-conf.yaml # Default savepoint target directory state.savepoints.dir: hdfs:///flink/savepoints 参考地址: https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/state_backends.htmlhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.htmlhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.htmlhttp://ju.outofmemory.cn/entry/370841
Flink 专题1 : 搭建Flink 及Flink 简介 图片来源于网络 Flink 简介 Apache Flink® - 基于数据流的有状态计算 Flink 的优势: 流场景使用案例 数据驱动的应用 批流数据分析 数据通道和ETL 正确性保证 Exactly-once状态一致性保证 事件时间处理 复杂的late date处理 更多 API分层体系统一SQL支持Stream和Batch数据处理 DataStream API & DataSet APIProcessFunction (Time & State) Operational Focus 部署灵活 高可用配置 Savepoints 适用于各种应用场景Scales to any use case# 架构可扩展 超大state支持 增量checkpointing 高性能 低延时 高吞吐 内存计算 Flink 安装 安装地址: flink : http://mirror.bit.edu.cn/apache/flink/flink-1.6.2/flink-1.6.2-bin-hadoop27-scala_2.11.tgzhadoop : https://archive.apache.org/dist/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz hadoop 安装略过 flink 安装步骤 flink 安装包含单点模式,集群模式,flink on yarn 模式 ,flink on k8s 等模式 ,flink 通过是基于jvm 进行操作, 通过代码可以在单机情况下模拟 集群模式数据 ,以此可以实现本地化的degug 操作。 下面介绍一下集群模式部署: flink 集群模式 结构 : Flink 集群模式 包含 JobManager /TaskManager 配置文件设置: flink-conf.yaml jobmanager.rpc.address: test-hadoop01 jobmanager.rpc.port: 6123 // 设置jobManager 的内存大小 jobmanager.heap.size: 2048m // 设置每个taskManager 的内存大小 taskmanager.heap.size: 3072m // 设置每个TaskManager 所占槽位 (最好和当前 机器的 可用核数相同(注意要排除预留给自己自身的核数)) taskmanager.numberOfTaskSlots: 8 parallelism.default: 3 // 默认并行度 // hdfs 地址 fs.default-scheme: hdfs://test-hadoop02:9000/ fs.hdfs.hadoopconf: hdfs:///flink/data/ state.checkpoints.dir: hdfs:///checkpoints/ //设置checkpoint 保留版本数量(选择) state.checkpoints.num-retained: 20 // 设置savepoint 地址 (选择 ) state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints //该参数控制了 Flink 是否该重新分配失败的 TaskManager 容器。默认值:true (选择 ) yarn.reallocate-failed:true //ApplicationMaster 能接受最多的失败 container 数,直到 YARN 会话失败。默认:初始请求的 TaskManager 数(-n) (选择 ) yarn.maximum-failed-containers:10 //ApplicationMaster(以及 TaskManager containers)重试次数。此参数默认值为1,如果 Application master 失败,那么整个 YARN session 会失败。如果想增大 ApplicationMaster 重启次数,可以把该参数的值调大一些。 (选择 ) yarn.application-attempts:5 slaves 将集群的所有节点均写入该文件中 test-hadoop01 test-hadoop02 test-hadoop03 添加jobManager/TaskManager 可以使用 bin/jobmanager.sh 和 bin/taskmanager.sh 两个脚本把 JobManager 和 TaskManager 实例添加到正在运行的集群中。 添加 JobManager ./bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all 添加 TaskManager ./bin/taskmanager.sh start|start-foreground|stop|stop-all 启动集群 1 集群模式启动 /bin/start-cluster.sh 2. yarn 模式启动 ./bin/yarn-session.sh Usage: Required -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers) Optional -D <arg> Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode Flink 基于 YARN 的恢复机制Flink 的 YARN 客户端通过下面的配置参数来控制容器的故障恢复。这些参数可以通过 conf/flink-conf.yaml 或者在启动 YARN session 的时候通过 -D 参数来指定。yarn.reallocate-failed:该参数控制了 Flink 是否该重新分配失败的 TaskManager 容器。默认值:trueyarn.maximum-failed-containers:ApplicationMaster 能接受最多的失败 container 数,直到 YARN 会话失败。默认:初始请求的 TaskManager 数(-n)yarn.application-attempts:ApplicationMaster(以及 TaskManager containers)重试次数。此参数默认值为1,如果 Application master 失败,那么整个 YARN session 会失败。如果想增大 ApplicationMaster 重启次数,可以把该参数的值调大一些。 参考地址: https://flink.apache.org/
[TOC] 引言 今天针对线上生产环境下单机 flume 拉取kafka数据并存储数据入Hdfs 出现大批量数据延迟. 在网上官网各种搜索数据,并结合官网数据,现进行以下总结 1. 线上单机存在问题简述 当前flume拉取kafa数据量并不大 ,根据flume客户端日志 ,每半分钟hdfs文件写入一次数据生成文件 发现问题: **拉取kafka数据过慢** 2. 解决思路 加大kafka拉取数据量 加大flume中channel,source,sink 各通道的单条数据量 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并 3 加大kafka拉取数据量 3.1 kafka-source简述 flume 输入单线程拉取数据并将数据发送内置channel并通过sink组件进行数据转发和处理,故对于kafka集群多副本方式拉取数据的时候,应适当考虑多个flume节点拉取kafka多副本数据,以避免flume节点在多个kafka集群副本中轮询。加大flume拉取kafka数据的速率。 flume-kafka-source 是flume内置的kafka source数据组件,是为了拉取kafka数据,配置如下: agent.sources = r1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest flume-kafka-source 的offset是交由zk集群去维护offset 3.2 kafka-source配置详解 Kafka Source是一个Apache Kafka消费者,它从Kafka主题中读取消息。 如果您正在运行多个Kafka源,则可以使用相同的使用者组配置它们,以便每个源都读取一组唯一的主题分区。 Property Name Default Description channels – 配置的channels 可配置多个channels 后续文章会说到 type – org.apache.flume.source.kafka.KafkaSource kafka.bootstrap.servers – 配置kafka集群地址 kafka.consumer.group.id flume 唯一确定的消费者群体。 在多个源或代理中设置相同的ID表示它们是同一个使用者组的一部分 kafka.topics – 你需要消费的topic kafka.topics.regex – 正则表达式,用于定义源订阅的主题集。 此属性的优先级高于kafka.topics,如果存在则覆盖kafka.topics。 batchSize 1000 一批中写入Channel的最大消息数 (优化项) batchDurationMillis 1000 将批次写入通道之前的最长时间(以毫秒为单位)只要达到第一个大小和时间,就会写入批次。(优化项) backoffSleepIncrement 1000 Kafka主题显示为空时触发的初始和增量等待时间。 等待时间将减少对空kafka主题的激进ping操作。 一秒钟是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 maxBackoffSleep 5000 Kafka主题显示为空时触发的最长等待时间。 5秒是摄取用例的理想选择,但使用拦截器的低延迟操作可能需要较低的值。 useFlumeEventFormat false 默认情况下,事件从Kafka主题直接作为字节直接进入事件主体。 设置为true以将事件读取为Flume Avro二进制格式。 与KafkaSink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。 setTopicHeader true 设置为true时,将检索到的消息的主题存储到标题中,该标题由topicHeader属性定义。 topicHeader topic 如果setTopicHeader属性设置为true,则定义用于存储接收消息主题名称的标题的名称。 如果与Kafka SinktopicHeader属性结合使用,应该小心,以避免在循环中将消息发送回同一主题。 migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量,请在Zookeeper中查找偏移量并将它们提交给Kafka。 这应该是支持从旧版本的Flume无缝Kafka客户端迁移。 迁移后,可以将其设置为false,但通常不需要这样做。 如果未找到Zookeeper偏移量,则Kafka配置kafka.consumer.auto.offset.reset定义如何处理偏移量。 查看[Kafka文档](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解详细信息 kafka.consumer.security.protocol PLAINTEXT 如果使用某种级别的安全性写入Kafka,则设置为SASL_PLAINTEXT,SASL_SSL或SSL。 Other Kafka Consumer Properties – 这些属性用于配置Kafka Consumer。 可以使用Kafka支持的任何消费者财产。 唯一的要求是在前缀为“kafka.consumer”的前缀中添加属性名称。 例如:kafka.consumer.auto.offset.reset 注意: Kafka Source会覆盖两个Kafka使用者参数:source.com将auto.commit.enable设置为“false”,并提交每个批处理。 Kafka源至少保证一次消息检索策略。 源启动时可以存在重复项。 Kafka Source还提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。 不建议修改这些参数。官方配置示例: tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.batchSize = 5000 tier1.sources.source1.batchDurationMillis = 2000 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics = test1, test2 tier1.sources.source1.kafka.consumer.group.id = custom.g.id Example for topic subscription by regex tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.channels = channel1 tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used 本案例kafka-source配置 agent.sources = r1 agent.sources.r1.channels=c1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest 官网配置文件地址 : kafka-source 3.3 配置优化 主要是在放入flume-channels 的批量数据加大 更改参数: agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 更改解释: **即每2秒钟拉取 kafka 一批数据 批数据大小为50000 放入到flume-channels 中 。即flume该节点 flume-channels 输入端数据已放大** 更改依据: 需要配置kafka单条数据 broker.conf 中配置 message.max.bytes 当前flume channel sink 组件最大消费能力如何? 4. 加大flume中channel,source,sink 各通道的单条数据量 4.1 source 发送至channels 数据量大小已配置 见 3.3 4.2 channel 配置 Property Name Default Description type – The component type name, needs to be memory capacity 100 通道中存储的最大事件数 (优化项) transactionCapacity 100 每个事务通道从源或提供给接收器的最大事件数 (优化项) keep-alive 3 添加或删除事件的超时(以秒为单位) byteCapacityBufferPercentage 20 定义byteCapacity与通道中所有事件的估计总大小之间的缓冲区百分比,以计算标头中的数据。 见下文。 byteCapacity see description 允许的最大总字节作为此通道中所有事件的总和。 实现只计算Eventbody,这也是提供byteCapacityBufferPercentage配置参数的原因。 默认为计算值,等于JVM可用的最大内存的80%(即命令行传递的-Xmx值的80%)。 请注意,如果在单个JVM上有多个内存通道,并且它们碰巧保持相同的物理事件(即,如果您使用来自单个源的复制通道选择器),那么这些事件大小可能会因为通道byteCapacity目的而被重复计算。 将此值设置为“0”将导致此值回退到大约200 GB的内部硬限制。 配置 capacity 和 transactionCapacity 值 。默认配置规则为: $$ channels.capacity >= channels.transactionCapacity >= source.batchSize $$ 官方channels配置示例 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 10000 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 本案例修改之后的channels 配置 agent.channels.c1.type = memory agent.channels.c1.capacity=550000 agent.channels.c1.transactionCapacity=520000 5. 将flume拉取数据单机版本改成多数据拉取,通过flume-avore-sink-> flume-avore-source 进行数据多数据采取并合并 5.1 存在问题 通过上续修改会发现单机版本的flume会在多副本kafka轮询造成效率浪费 单机版本flume处理数据时会存在单机瓶颈,单机channels可能最多只能处理最大数据无法扩充单机flume配置多个数据源不方便,不能适合后续多需求开发 5.2 修改架构 5.3采集节点配置文件 收集节点配置(3台): agent.sources = r1 agent.channels = c1 agent.sinks = k1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 50000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092 agent.sources.r1.kafka.topics = topicTest agent.sources.r1.kafka.consumer.group.id = groupTest agent.channels.c1.type = memory agent.channels.c1.capacity=550000 agent.channels.c1.transactionCapacity=520000 agent.sinks.k1.type = avro agent.sinks.k1.hostname = test-hadoop03 agent.sinks.k1.port=4545 agent.sources.r1.channels = c1 agent.sinks.k1.channel = c1 汇总节点配置(1台): agent.sources = r1 agent.channels = memoryChannel agent.sinks = hdfsSink agent.sources.r1.type = avro agent.sources.r1.bind = ip agent.sources.r1.port = 4545 agent.sources.r1.batchSize = 100000 agent.sources.r1.batchDurationMillis = 1000 agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.keep-alive=30 agent.channels.memoryChannel.capacity=120000 agent.channels.memoryChannel.transactionCapacity=100000 agent.sinks.hdfsSink.type=hdfs agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H agent.sinks.hdfsSink.hdfs.writeFormat=Text agent.sinks.hdfsSink.hdfs.rollCount = 0 agent.sinks.hdfsSink.hdfs.rollSize = 134217728 agent.sinks.hdfsSink.hdfs.rollInterval = 60 agent.sinks.hdfsSink.hdfs.fileType=DataStream agent.sinks.hdfsSink.hdfs.idleTimeout=65 agent.sinks.hdfsSink.hdfs.callTimeout=65000 agent.sinks.hdfsSink.hdfs.threadsPoolSize=300 agent.sinks.hdfsSink.channel = memoryChannel agent.sources.r1.channels = memoryChannel 5.4 架构注意点 当前架构需要保证聚合节点机器的性能 当前架构新的瓶颈可能会存在存储Hdfs数据时过慢 ,导致聚合节点Channels 占用率居高不下,导致堵塞 。 需要关注avro 自定义source sink 的发送效率 6.flume 监控工具(http) flume 监控工具总共有三种方式 ,我们这里为方便简单,使用内置http接口监控方式进行操作 6.1 配置 在启动脚本处设置 参数 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 即可 6.2 访问 地址 : http://flumeIp:34545 6.3 返回结果示例 和字段解释 : { "CHANNEL.memoryChannel": { "ChannelCapacity": "550000", "ChannelFillPercentage": "0.18181818181818182", "Type": "CHANNEL", "ChannelSize": "1000", "EventTakeSuccessCount": "33541400", "EventTakeAttemptCount": "33541527", "StartTime": "1536572886273", "EventPutAttemptCount": "33542500", "EventPutSuccessCount": "33542500", "StopTime": "0" }, "SINK.hdfsSink": { "ConnectionCreatedCount": "649", "ConnectionClosedCount": "648", "Type": "SINK", "BatchCompleteCount": "335414", "BatchEmptyCount": "27", "EventDrainAttemptCount": "33541500", "StartTime": "1536572886275", "EventDrainSuccessCount": "33541400", "BatchUnderflowCount": "0", "StopTime": "0", "ConnectionFailedCount": "0" }, "SOURCE.avroSource": { "EventReceivedCount": "33542500", "AppendBatchAcceptedCount": "335425", "Type": "SOURCE", "EventAcceptedCount": "33542500", "AppendReceivedCount": "0", "StartTime": "1536572886465", "AppendAcceptedCount": "0", "OpenConnectionCount": "3", "AppendBatchReceivedCount": "335425", "StopTime": "0" } } 参数定义: 字段名称 含义 备注 SOURCE.OpenConnectionCount 打开的连接数 SOURCE.TYPE 组件类型 SOURCE.AppendBatchAcceptedCount 追加到channel中的批数量 SOURCE.AppendBatchReceivedCount source端刚刚追加的批数量 SOURCE.EventAcceptedCount 成功放入channel的event数量 SOURCE.AppendReceivedCount source追加目前收到的数量 SOURCE.StartTime(StopTIme) 组件开始时间、结束时间 SOURCE.EventReceivedCount source端成功收到的event数量 SOURCE.AppendAcceptedCount source追加目前放入channel的数量 CHANNEL.EventPutSuccessCount 成功放入channel的event数量 CHANNEL.ChannelFillPercentage 通道使用比例 CHANNEL.EventPutAttemptCount 尝试放入将event放入channel的次数 CHANNEL.ChannelSize 目前在channel中的event数量 CHANNEL.EventTakeSuccessCount 从channel中成功取走的event数量 CHANNEL.ChannelCapacity 通道容量 CHANNEL.EventTakeAttemptCount 尝试从channel中取走event的次数 SINK.BatchCompleteCount 完成的批数量 SINK.ConnectionFailedCount 连接失败数 SINK.EventDrainAttemptCount 尝试提交的event数量 SINK.ConnectionCreatedCount 创建连接数 SINK.Type 组件类型 SINK.BatchEmptyCount 批量取空的数量 SINK.ConnectionClosedCount 关闭连接数量 SINK.EventDrainSuccessCount 成功发送event的数量 SINK.BatchUnderflowCount 正处于批量处理的batch数 参考地址flume-document : http://flume.apache.org/FlumeUserGuide.html
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/83473968 文章目录 利用java api 创建excel 内容并发送邮件 主要实现功能: 核心代码 (SendMailManager ) : 配置文件内容 : excel 邮件发送实例图: 简述: 最近使用 github 在归类自己平时使用的工具类 github 地址:https://github.com/auguszero/javaToolRepsitory 利用java api 创建excel 内容并发送邮件 主要实现功能: 1.通过配置文件设置发送邮件发送方,接收方,抄送方 2.目前实现了 自定义发送内容 3.实现了excel 邮件发送内容 核心代码 (SendMailManager ) : package com.javatool.email.proxy; import com.javatool.configer.DefaultConfigerContext; import com.javatool.email.model.MailAuthenticator; import com.javatool.email.model.SendMailConfigModel; import javax.mail.*; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeBodyPart; import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMultipart; import java.util.Date; import java.util.List; import java.util.Properties; /** * @author haisong * @create 2018/09/13 14:21 */ public class SendMailManager { /** * 发送邮件 * @param subject 邮件主题 * @param content 邮件内容 * @return success 发送成功 failure 发送失败 * @throws Exception */ public static String sendMail( String subject, String content) throws Exception { SendMailConfigModel sendMailConfigModel = DefaultConfigerContext.getInstance().getModelFromProperties(SendMailConfigModel.class); String to = sendMailConfigModel.getTo(); if (to != null){ Properties props = System.getProperties(); props.put("mail.smtp.host", sendMailConfigModel.getMail_smtp_host()); props.put("mail.smtp.auth", "true"); props.put("mail.transport.protocol", "smtp"); MailAuthenticator auth = new MailAuthenticator(); Session session = Session.getInstance(props, auth); session.setDebug(true); try { MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress(sendMailConfigModel.getFrom())); if (!to.trim().equals("")) { message.addRecipient(Message.RecipientType.TO, new InternetAddress(to.trim())); } List<String> copyToList = sendMailConfigModel.getCopyToList(); if(copyToList!=null&&copyToList.size()>0) { Address[] addresses = new Address[copyToList.size()]; int i = 0; for (String copy : copyToList) { addresses[i] = new InternetAddress(copy); i++; } message.addRecipients(Message.RecipientType.CC, addresses); } message.setSubject(subject); MimeBodyPart mbp1 = new MimeBodyPart(); // 正文 mbp1.setContent(content, "text/html;charset=utf-8"); Multipart mp = new MimeMultipart(); // 整个邮件:正文+附件 mp.addBodyPart(mbp1); message.setContent(mp); message.setSentDate(new Date()); message.saveChanges(); Transport trans = session.getTransport("smtp"); // trans.connect("smtp.163.com", sendMailConfigModel.getUser(), sendMailConfigModel.getPassword()); trans.send(message); System.out.println(message.toString()); } catch (Exception e){ e.printStackTrace(); return "failure"; } return "success"; }else{ return "failure"; } } public static String SendExcelMail(String subject,List<List<String>> conteList) throws Exception { String htmlStr =""; for(int i=0;i<conteList.size();i++){ htmlStr = createHTML(htmlStr,conteList.get(i),i==0?true:false,i==conteList.size()-1?true:false); } String result = sendMail(subject,htmlStr); return result; } private static String createHTML(String originHtml, List<String> data,boolean headFlage,boolean endFlage) { String html_msg=""; if(headFlage){ html_msg = "<table border=\"1\" width='80%' height='80'>"; html_msg = html_msg+"<tr bgcolor='#B6DDE6'>"; for(int column=0;column<data.size();column++){ html_msg = html_msg +"<td width='12%'>"+data.get(column)+"</td>"; } html_msg = html_msg+"</tr>"; }else{ html_msg = html_msg+"<tr>"; for(int column=0;column<data.size();column++){ html_msg = html_msg +"<td>"+data.get(column)+"</td>"; } html_msg = html_msg+"</tr>"; } if(endFlage){ html_msg = html_msg + "</table>"; } return originHtml+html_msg; } } 配置文件内容 : sendMail.mail_smtp_host=smtp.163.com sendMail.mail_smtp_auth=login sendMail.user=***@163.com sendMail.password=****** sendMail.from=*****@163.com sendMail.to=auguszero@163.com excel 邮件发送实例图: 如果你觉得可以 可以直接在github 上将该项目打包好 进行引入 作为工具类 。 同时欢迎大家一起来完善这个github项目 。 欢迎一起交流:
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/82752988 文章目录 kafka指令 1.常用指令 1.1 [ Adding and removing topics](http://kafka.apache.org/090/documentation.html#basic_ops_add_topic) 1.2 [Modifying topics](http://kafka.apache.org/090/documentation.html#basic_ops_modify_topic) 1.3 balancing leadership 1.4 checking consumer position 1.5 [Expanding your cluster(水平扩展集群)](http://kafka.apache.org/090/documentation.html#basic_ops_cluster_expansion) 1.由于该工具接受主题的输入列表作为json文件,因此首先需要确定要移动的主题并创建json文件,如下所示 - 2.一旦json文件准备就绪,使用分区重新分配工具生成候选分配 - 3.该工具生成一个候选分配,将所有分区从主题foo1,foo2移动到代理5,6。 但请注意,此时分区移动尚未开始,它只是告诉您当前的分配和建议的新分配。 应保存当前分配,以防您想要回滚它。 新的分配应保存在json文件中(例如expand-cluster-reassignment.json),以便使用--execute选项输入工具,如下所示 - 4.检查状态 1.6 [自定义分区分配和迁移](http://kafka.apache.org/090/documentation.html#basic_ops_partitionassignment) 1.创建分区文件 custom-reassignment.json 2.使用json文件执行分区操作 3.确认状态 1.7 [Increasing replication factor(新增副本数)](http://kafka.apache.org/090/documentation.html#basic_ops_increase_replication_factor) 1.8 启动 消费 等指令 1.启动zookeeper 2.启动kafka 3.停止kafka 4.停止zookeeper 5.创建topic 6.展示topic 7.描述topic 8.生产者: 9.消费者: kafka指令 1.常用指令 1.1 Adding and removing topics add: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y remove: 1.2 Modifying topics add partition: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40 add configes: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y To remove a config: > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x delete config (不使用): bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name 1.3 balancing leadership bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot 前提是需要加上配置 auto.leader.rebalance.enable=true 1.4 checking consumer position bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test Group Topic Pid Offset logSize Lag Owner my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0 my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0 1.5 Expanding your cluster(水平扩展集群) –generate: 在此模式下,给定主题列表和代理列表,该工具会生成候选重新分配,以将指定主题的所有分区移动到新代理。此选项仅提供了一种方便的方法,可在给定主题和目标代理列表的情况下生成分区重新分配计划。 –execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。这可以是由管理员手工制作的自定义重新分配计划,也可以使用–generate选项提供 –verify: 在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以是成功完成,失败或正在进行中 1.由于该工具接受主题的输入列表作为json文件,因此首先需要确定要移动的主题并创建json文件,如下所示 - > cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 } 2.一旦json文件准备就绪,使用分区重新分配工具生成候选分配 - > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } 3.该工具生成一个候选分配,将所有分区从主题foo1,foo2移动到代理5,6。 但请注意,此时分区移动尚未开始,它只是告诉您当前的分配和建议的新分配。 应保存当前分配,以防您想要回滚它。 新的分配应保存在json文件中(例如expand-cluster-reassignment.json),以便使用–execute选项输入工具,如下所示 - > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } 4.检查状态 > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully 1.6 自定义分区分配和迁移 1.创建分区文件 custom-reassignment.json cat custom-reassignment.json { "version": 1, "partitions": [{ "topic": "foo1", "partition": 0, "replicas": [5, 6] }, { "topic": "foo2", "partition": 1, "replicas": [2, 3] }] } 2.使用json文件执行分区操作 > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } 3.确认状态 > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } 1.7 Increasing replication factor(新增副本数) 逻辑和上两个一致 通过json文件 手动修改副本数据位置 提供json文件: > cat increase-replication-factor.json { "version": 1, "partitions": [{ "topic": "foo", "partition": 0, "replicas": [5, 6, 7] }] } 1.8 启动 消费 等指令 1.启动zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties & 2.启动kafka bin/kafka-server-start.sh config/server.properties & 3.停止kafka bin/kafka-server-stop.sh 4.停止zookeeper bin/zookeeper-server-stop.sh 5.创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 6.展示topic bin/kafka-topics.sh --list --zookeeper localhost:2181 7.描述topic bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic 8.生产者: bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic 9.消费者: bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin
摘自 : Spark踩坑记——Spark Streaming+Kafka SpringStreaming+Kafka 1.SpringStreaming+Kafka 接受数据和发送数据 (1)SparkStreaming 接受kafka方式 (2)Spark 发送数据至Kafka中 2.Spark streaming+Kafka调优 2.1 批处理时间设置 2.2 合理的Kafka拉取量 2.3 缓存反复使用的Dstream(RDD) 2.4 设置合理的GC 2.5 设置合理的CPU资源数 2.6设置合理的parallelism 2.7使用高性能的算子 SpringStreaming+Kafka 1.SpringStreaming+Kafka 接受数据和发送数据 (1)SparkStreaming 接受kafka方式 基于Received的方式 基于DirectKafkaStreaming DirectKafkaStreaming 相比较 ReceiverKafkaStreaming - 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。 - 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。 - 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。 (2)Spark 发送数据至Kafka中 一般处理方式 : 在RDD.forpartition进行操作 input.foreachRDD(rdd => // 不能在这里创建KafkaProducer rdd.foreachPartition(partition => partition.foreach{ case x:String=>{ val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String,String](props) val message=new ProducerRecord[String, String]("output",null,x) producer.send(message) } } ) ) 此方式的缺点在于每次foreach操作都需要重新创建一次kafkaProduce 主要花费时间都在 创建连接的时候. 基于此我们以以下方式进行操作 首先,我们需要将KafkaProducer利用lazy val的方式进行包装如下: import java.util.concurrent.Future import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata } class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object KafkaSink { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new KafkaSink(createProducerFunc) } def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap) } 之后我们利用广播变量的形式,将KafkaProducer广播到每一个executor,如下: // 广播KafkaSink val kafkaProducer: Broadcast[KafkaSink[String, String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", Conf.brokers) p.setProperty("key.serializer", classOf[StringSerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } log.warn("kafka producer init done!") ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig)) } 这样我们就能在每个executor中愉快的将数据输入到kafka当中: //输出到kafka segmentedStream.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreach(record => { kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2) // do something else }) } }) 2.Spark streaming+Kafka调优 2.1 批处理时间设置 参数设置: 2.2 合理的Kafka拉取量 参数设置: spark.streaming.kafka.maxRatePerPartition 2.3 缓存反复使用的Dstream(RDD) DStream.cache() 2.4 设置合理的GC 长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议: --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC" 2.5 设置合理的CPU资源数 CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。 2.6设置合理的parallelism partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。 2.7使用高性能的算子 使用reduceByKey/aggregateByKey替代groupByKey 使用mapPartitions替代普通map 使用foreachPartitions替代foreach 使用filter之后进行coalesce操作 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/80123085 solr 数据结构及目录结构分析和记录 solr是什么呢? 一、Solr它是一种开放源码的、基于 Lucene Java 的搜索服务器,易于加入到 Web 应用程序中。 二、Solr 提供了层面搜索(就是统计)、命中醒目显示并且支持多种输出格式(包括XML/XSLT 和JSON等格式)。它易于安装和配置,而且附带了一个基于 HTTP 的管理界面。Solr已经在众多大型的网站中使用,较为成熟和稳定。 三、Solr 包装并扩展了 Lucene,所以Solr的基本上沿用了Lucene的相关术语。更重要的是,Solr 创建的索引与 Lucene 搜索引擎库完全兼容。 四、通过对Solr 进行适当的配置,某些情况下可能需要进行编码,Solr 可以阅读和使用构建到其他 Lucene 应用程序中的索引。 五、此外,很多 Lucene 工具(如Nutch、 Luke)也可以使用Solr 创建的索引。可以使用 Solr 的表现优异的基本搜索功能,也可以对它进行扩展从而满足企业的需要。 solr 下载地址 http://mirrors.hust.edu.cn/apache/lucene/solr/7.2.1/solr-7.2.1-src.tgz solr 解压目录结构 |--- contrib:第三方包存放的目录 |--- dist:编译打包后存放目录,即构建后的输出产物存放的目录 |--- docs:solr文档的存放目录 |--- example:示范例子的存放目录,这里展示了DIH,即数据导入处理的例子 |--- licenses:权限相关的
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/78456345 问题背景 版本介绍 jdk1.7 、springboot:1.3.1.RELEASE、tomcat8 、maven3 解决过程 1. 搭建Springboot+jsp项目。搭建Springboot项目之时,标准配置: <modelVersion>4.0.0</modelVersion> <artifactId>manage</artifactId> <!--<version>1.0-SNAPSHOT</version>--> <!-- 打war包,不使用springboot内置tomcat--> <packaging>war</packaging> <dependencyManagement> <dependencies> <dependency> <!-- Import dependency management from Spring Boot --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>1.3.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!--移除内置tomcat,打war包--> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> <!--<exclusion>--> <!--<groupId>ch.qos.logback</groupId>--> <!--<artifactId>logback-classic</artifactId>--> <!--</exclusion>--> </exclusions> </dependency> <!--Springboot的监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- Spring Boot Test 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--spring-boot-configuration:spring boot 配置处理器; --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!--关于embed的容器 使用jsp时,必须的内容--> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> <properties> <!-- 这里指定项目编码 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.4.0.RELEASE</version> </plugin> <plugin> <!-- maven打包的时候告诉maven不需要web.xml,否刚会报找不到web.xml错误 --> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <version>2.4</version> <configuration> <failOnMissingWebXml>false</failOnMissingWebXml> </configuration> </plugin> <plugin> <!--指定maven的编译器版本--> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> 2. 在maven打包过程中出现了,有关ch.qos.logback版本的问题,而导致tomcat启动失败,故在pom文件中添加限制其版本 <!--logback版本问题导致tomcat启动失败--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.3</version> <scope>provided</scope> </dependency> 3.本地启动测试没问题,放至centos6.5进行测试,手动启动linux上同版本tomcat,tomcat报错: org.apache.jasper.servlet.TldScanner.scanJars At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time. 问题来源: 使用了CKEditor和CKFinder后,在lib里添加了很多jar包,打开相应页面出现以上问题。 org.apache.jasper.compiler.TldLocationsCache tldScanJar 信息: At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time. (1)有人说是çé®é¢ï¼å¯æ¯æç页é¢éæ ¹æ¬å°±æ²¡æ。 (2)还有说,修改${TOMCAT_HOME}/bin/catalina.sh或${TOMCAT_HOME}/bin/catalina.bat文件,可是tomcat目录下没有这两个文件。 (3)还有说,调整${tomcat}/conf/catalina.properties,将提示的jar添加到不扫描清单中。没有试,而是通过下面的方法解决了。 最终解决: 修改$CATALINA_BASE/conf/catalina.properties文件,添加org.apache.el.parser.SKIP_IDENTIFIER_CHECK=true选项。 4.上述问题解决之后发现tomcat的catalina.out虽然正常运行,并且未出现任何问题。但任然无法访问页面,通过localhost_access_log.txt日志文件返回的结果都是404,但tomcat主界面却可以正常访问。 问题原因: linux系统jdk版本与本地版本不一致,本地版本在调试过程中无意中设置成1.8.并且将maven编译器jdk编辑版本设置成1.8.导致两地版本不一致导致。 <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> 5.本调试过程中最大的问题: 本地启动正常,linux启动报错如下: 06-Nov-2017 10:48:12.336 SEVERE [localhost-startStop-1] org.apache.catalina.core.ContainerBase.addChildInternal ContainerBase.addChild: start: org.apache.catalina.LifecycleException: Failed to start component [StandardEngine[Catalina].StandardHost[localhost].StandardContext[/nrsmanage]] at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:162) at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:753) at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:729) at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:717) at org.apache.catalina.startup.HostConfig.deployWAR(HostConfig.java:976) at org.apache.catalina.startup.HostConfig$DeployWar.run(HostConfig.java:1853) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.slf4j.helpers.NOPLoggerFactory loaded from file:/opt/nrs/apache-tomcat-8.0.47/webapps/nrsmanage/WEB-INF/lib/slf4j-api-1.7.21.jar). If you are using WebLogic you will need to add 'org.slf4j' to prefer-application-packages in WEB-INF/weblogic.xml Object of class [org.slf4j.helpers.NOPLoggerFactory] must be an instance of class ch.qos.logback.classic.LoggerContext 考虑到之前有关springboot对于不同版本日志文件冲突导致启动失败的问题,故一致在本地进行关于日志工具版本的调试,都未有结果,后静下心再考虑该日志文件所说内容,随后把该web项目lib包中有关logback相关的包进行删除,tomcat正常启动,并可以正常访问。 在本地服务中,可以在pom文件中的相关包设置 <scope>provided</scope> 该标签表示该包的级别,表示仅在编译级别支持。打包和运行过程中不含有此包。 总结 1.必须时刻记住版本的重要性,以及对于tomcat原理、maven原理、以及springboot原理的深究。 2.不要盲目的相信各种解决办法,因为不管是谁的解决方法都是在特定情况下发生的,我可以借鉴他的方法,但必须要考虑清楚本地问题的原因。想清楚解决方案,在进行操作,并注意做好备份,以及对自己各种操作过程要有个记录,可以做好 Rollback操作。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/78360027 持续更新中………. 1.安装docker centos7 内核版本在3.10.0-693.el7.x86_64之上的直接执行: yum install docker -y 即可 本机版本3.10.0-693.el7.x86_64 直接执行yum install docker -y centos7 若centos版本在centos6,则需要更新内核版本才可以安装docker。 2.使用docker镜像 2.1 安装docker镜像 使用docker pull Name[:TAG] 安装指定镜像的名称和版本号 安装centos最近版本的镜像,从docker默认镜像库进行下载 命令: sudo docker pull centos Using default tag: latest Trying to pull repository docker.io/library/centos ... latest: Pulling from docker.io/library/centos d9aaf4d82f24: Pull complete Digest: sha256:eba772bac22c86d7d6e72421b4700c3f894ab6e35475a34014ff8de74c10872e 创建成功之后,我们就开始使用该镜像创建一个容器,并在该容器中进行操作。 docker run -it centos bash [root@localhost sysconfig]# docker run -it centos bash [root@fc99abc4213f /]# ls anaconda-post.log dev home lib64 media opt root sbin sys usr bin etc lib lost+found mnt proc run srv tmp var [root@fc99abc4213f /]# ping localhost PING localhost (127.0.0.1) 56(84) bytes of data. 64 bytes from localhost (127.0.0.1): icmp_seq=1 ttl=64 time=0.105 ms 64 bytes from localhost (127.0.0.1): icmp_seq=2 ttl=64 time=0.062 ms 64 bytes from localhost (127.0.0.1): icmp_seq=3 ttl=64 time=0.083 ms ^C --- localhost ping statistics --- 3 packets transmitted, 3 received, 0% packet loss, time 2000ms rtt min/avg/max/mdev = 0.062/0.083/0.105/0.019 ms [root@fc99abc4213f /]# exit exit 2.2 查看镜像信息 2.2.1 使用命令 docker images 查看当前镜像信息 [root@localhost sysconfig]# sudo docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/redis 3.2 4ae3b93617bd 2 weeks ago 99.67 MB docker.io/centos latest 196e0ce0c9fb 5 weeks ago 196.6 MB images 子命令主要支持如下的选项: (1) -a ,–all=true|false 列出所有的镜像文件 (2) –digests=true|false 列出镜像的数字摘要值,默认为否 (3) -f –filter=[] 过滤列出的镜像 … 2.2.2 使用tag命令添加镜像标签 [root@localhost sysconfig]# docker tag redis:3.2 myredis:0.1 [root@localhost sysconfig]# sudo docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/redis 3.2 4ae3b93617bd 2 weeks ago 99.67 MB myredis 0.1 4ae3b93617bd 2 weeks ago 99.67 MB docker.io/centos latest 196e0ce0c9fb 5 weeks ago 196.6 MB 2.2.3 使用inspect 命令查看详细信息 查看详细信息 [root@localhost sysconfig]# docker inspect myredis:0.1 [ { "Id": "sha256:4ae3b93617bdb7cc7559c021cd57fec2db465daf94e717b61282406b74493941", "RepoTags": [ "docker.io/redis:3.2", "myredis:0.1" ], "RepoDigests": [ "docker.io/redis@sha256:b15e3fabba806a6ee7f14774df0c2dc3036f752969bcdac022f0aa96d5cfc954" ], "Parent": "", "Comment": "", "Created": "2017-10-10T02:50:23.955973925Z", "Container": "13285f8661db6019c0916f3252c76c17bad6508905a319358c85f7acf7967365", "ContainerConfig": { "Hostname": "13285f8661db", "Domainname": "", "User": "", "AttachStdin": false, "AttachStdout": false, "AttachStderr": false, "ExposedPorts": { "6379/tcp": {} }, "Tty": false, "OpenStdin": false, "StdinOnce": false, "Env": [ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "GOSU_VERSION=1.10", "REDIS_VERSION=3.2.11", "REDIS_DOWNLOAD_URL=http://download.redis.io/releases/redis-3.2.11.tar.gz", "REDIS_DOWNLOAD_SHA=31ae927cab09f90c9ca5954aab7aeecc3bb4da6087d3d12ba0a929ceb54081b5" ], "Cmd": [ "/bin/sh", "-c", "#(nop) ", "CMD [\"redis-server\"]" ], "ArgsEscaped": true, "Image": "sha256:0d4214b1bf00a587daa1d4f36421bd99e4b2c316249280675bec34faaa6d6e0d", "Volumes": { "/data": {} }, "WorkingDir": "/data", "Entrypoint": [ "docker-entrypoint.sh" ], "OnBuild": [], "Labels": {} }, "DockerVersion": "17.06.2-ce", "Author": "", "Config": { "Hostname": "", "Domainname": "", "User": "", "AttachStdin": false, "AttachStdout": false, "AttachStderr": false, "ExposedPorts": { "6379/tcp": {} }, "Tty": false, "OpenStdin": false, "StdinOnce": false, "Env": [ "PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin", "GOSU_VERSION=1.10", "REDIS_VERSION=3.2.11", "REDIS_DOWNLOAD_URL=http://download.redis.io/releases/redis-3.2.11.tar.gz", "REDIS_DOWNLOAD_SHA=31ae927cab09f90c9ca5954aab7aeecc3bb4da6087d3d12ba0a929ceb54081b5" ], "Cmd": [ "redis-server" ], "ArgsEscaped": true, "Image": "sha256:0d4214b1bf00a587daa1d4f36421bd99e4b2c316249280675bec34faaa6d6e0d", "Volumes": { "/data": {} }, "WorkingDir": "/data", "Entrypoint": [ "docker-entrypoint.sh" ], "OnBuild": [], "Labels": null }, "Architecture": "amd64", "Os": "linux", "Size": 99668649, "VirtualSize": 99668649, "GraphDriver": { "Name": "devicemapper", "Data": { "DeviceId": "10", "DeviceName": "docker-253:0-33690427-dea7d2d0030dc44a3edf0fcc80077d715d5ccf4825a3db161da84436cf1dcdc7", "DeviceSize": "10737418240" } }, "RootFS": { "Type": "layers", "Layers": [ "sha256:29d71372a4920ec230739a9e2317e7e9b18644edb10f78cde85df85e6ab85fc2", "sha256:f5ccc3ab98cc45041bcf1f2cf49afb7e5046316af795c88ef6be50ed149cc3a4", "sha256:3fae9b7c819afb850f999670dc88cc3f646a146c379103c8947df99c03498ebe", "sha256:7044a5153c6481a7284e181703432930f392aa39fb12982c9a2d8cb2f2448cb0", "sha256:7768d1f84ecca49f4ca1005047f7d5d8a3a009dfb3e0213cdbbb3856f7e4c115", "sha256:327ce591d4be258dd33151003eebd5cc362fd6caed83f9a5512b7970a8f5facb" ] } } ] 查看某一个参数信息 “docker inspect -f {{“.Os”}} myredis:0.1 ” “` [root@localhost sysconfig]# docker inspect -f {{“.Os”}} myredis:0.1 linux [root@localhost sysconfig]# docker inspect -f {{“.Size”}} myredis:0.1 99668649 2.3搜索镜像 docker search redis -a 10 [root@localhost ~]# sudo docker search redis -s 10 Flag --stars has been deprecated, use --filter=stars=3 instead INDEX NAME DESCRIPTION STARS OFFICIAL AUTOMATED docker.io docker.io/redis Redis is an open source key-value store th... 4359 [OK] docker.io docker.io/bitnami/redis Bitnami Redis Docker Image 59 [OK] docker.io docker.io/sameersbn/redis 59 [OK] docker.io docker.io/tenstartups/redis-commander 29 [OK] docker.io docker.io/kubeguide/redis-master redis-master with "Hello World!" 19 docker.io docker.io/joshula/redis-sentinel A container for Redis Sentinel 18 docker.io docker.io/kubeguide/guestbook-redis-slave Guestbook redis slave 14 docker.io docker.io/tutum/redis Base docker image to run a Redis server 10 2.4删除镜像 删除镜像分为两种方式进行删除,分别为: a.使用标签进行删除 使用标签进行删除时,如果存在由该镜像创建的多个标签的镜像,删除其中的一个并不会删除该镜像,只有该镜像只有唯一的一个标签时,删除该标签将会导致该镜像彻底删除。 [root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/redis 3.2 4ae3b93617bd 2 weeks ago 99.67 MB myredis 0.2 4ae3b93617bd 2 weeks ago 99.67 MB myredis 0.3 4ae3b93617bd 2 weeks ago 99.67 MB [root@localhost ~]# docker rmi myredis:0.2 Untagged: myredis:0.2 [root@localhost ~]# docker rmi myredis:0.3 Untagged: myredis:0.3 Untagged: docker.io/redis@sha256:b15e3fabba806a6ee7f14774df0c2dc3036f752969bcdac022f0aa96d5cfc954 [root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/redis 3.2 4ae3b93617bd 2 weeks ago 99.67 MB [root@localhost ~]# docker rmi redis Error response from daemon: No such image: redis:latest [root@localhost ~]# docker rmi redis:3.2 Untagged: redis:3.2 Deleted: sha256:4ae3b93617bdb7cc7559c021cd57fec2db465daf94e717b61282406b74493941 Deleted: sha256:23434bfcd3a31cd975c6384253ba687fbcbd895b3e1a54af27824af9ed937591 Deleted: sha256:0d30a0d1a42e43f5dc11264673fd4ee56a03095dbcd3da72924870de4df577aa Deleted: sha256:3a6d079caad238a31ea4283d4fc3f443d6e75bb1d3ce199cd916dc49627c5931 Deleted: sha256:5b3c3d58e5f9d9460b356d01329f9016fbb1959bf1522f357fb81a2db362908b Deleted: sha256:a895b72388eb73dd9fb4406a318ee67fe48c7e70e01190c8f2ac4310dc529245 Deleted: sha256:29d71372a4920ec230739a9e2317e7e9b18644edb10f78cde85df85e6ab85fc2 [root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE b.使用id进行删除 在删除镜像时,若该镜像创建了一个容器。且该容器存在时,则无法对该镜像进行删除。如若希望强制删除,则使用命令docker rmi -f [image] [root@localhost ~]# docker rmi 196e0ce0c9fb Error response from daemon: conflict: unable to delete 196e0ce0c9fb (must be forced) - image is being used by stopped container fc99abc4213f [root@localhost ~]# docker rmi -f 196e0ce0c9fb Untagged: docker.io/centos:latest Deleted: sha256:196e0ce0c9fbb31da595b893dd39bc9fd4aa78a474bbdc21459a3ebe855b7768 [root@localhost ~]# 2.5创建镜像 创建镜像主要有三种方式 a.使用容器创建镜像。 [root@localhost ~]# docker run -it centos bash WARNING: IPv4 forwarding is disabled. Networking will not work. [root@1ed5befe7051 /]# useradd nrs [root@1ed5befe7051 /]# exit exit [root@localhost ~]# docker commit -m "add user nrs" -a "augus" 1ed5befe7051 mycentos:01 sha256:a154ae24d66d16201ba1bed9112e2cab11714644364b5bccef0d2ea2b1b3c443 [root@localhost ~]# b.使用本地模板创建镜像 c.使用dockerFile创建镜像 2.6镜像的载入和写出 镜像的写出 docker save -o centos.01.tar centos:01 镜像的载入 docker load –input centos.01.tar [augus@localhost ~]$ docker images REPOSITORY TAG IMAGE ID CREATED SIZE mycentos 01 a154ae24d66d 30 minutes ago 196.9 MB docker.io/centos latest 196e0ce0c9fb 6 weeks ago 196.6 MB [augus@localhost ~]$ docker save -o mycentos.01.tar mycentos:01 [augus@localhost ~]$ ls mycentos.01.tar precreated [augus@localhost ~]$ docker load --input mycentos.01.tar Loaded image: mycentos:01 [augus@localhost ~]$ docker images REPOSITORY TAG IMAGE ID CREATED SIZE mycentos 01 a154ae24d66d 32 minutes ago 196.9 MB docker.io/centos latest 196e0ce0c9fb 6 weeks ago 196.6 MB [augus@localhost ~]$ docker rmi mycentos:01 Untagged: mycentos:01 Deleted: sha256:a154ae24d66d16201ba1bed9112e2cab11714644364b5bccef0d2ea2b1b3c443 Deleted: sha256:5d58cde1df9c759667826244365052df54e7920391e72f8cdd1a3d1d35bc5bba [augus@localhost ~]$ docker images REPOSITORY TAG IMAGE ID CREATED SIZE docker.io/centos latest 196e0ce0c9fb 6 weeks ago 196.6 MB [augus@localhost ~]$ docker load --input mycentos.01.tar edfd6667dbbb: Loading layer [==================================================>] 312.8 kB/312.8 kB Loaded image: mycentos:01 [augus@localhost ~]$ docker images REPOSITORY TAG IMAGE ID CREATED SIZE mycentos 01 a154ae24d66d 33 minutes ago 196.9 MB docker.io/centos latest 196e0ce0c9fb 6 weeks ago 196.6 MB [augus@localhost ~]$
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/78013989 最近在尝试重新复习一段关于多线程的使用,同时尝试使用关于markdown编辑器的使用方法,会同步将自己整理的文档放上来。 线程创建方式 通过创建一个线程类的方式创建线程体,例如实现runnable接口创建一个实现类。或者是直接通过创建Thread方式,重写内部的runnable方法实现线程体的编写。 1. 接用Thread.start 的方式进行重写runnable的方式进行实现线程。 继承Thread创建线程 new Thread(new Runnable() { @Override public void run() { while(true){ System.out.println("just a test " + Thread.currentThread().getName()+ " "+new Date()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); 输出结果: just a test Thread-0 Sun Sep 17 09:48:48 CST 2017 just a test Thread-0 Sun Sep 17 09:48:49 CST 2017 just a test Thread-0 Sun Sep 17 09:48:50 CST 2017 just a test Thread-0 Sun Sep 17 09:48:51 CST 2017 just a test Thread-0 Sun Sep 17 09:48:52 CST 2017 just a test Thread-0 Sun Sep 17 09:48:53 CST 2017 2.创建一个实现Runnable接口并重写run方式的实现类,来进行线程体逻辑的可重用。 public class CreatThread02 implements Runnable{ @Override public void run() { System.out.println("currentThread : "+ Thread.currentThread().getName()+ " say hello for you ... time --> " + new Date()); } public static void main(String[] args) { Runnable thread01 = new CreatThread02(); Runnable thread02 = new CreatThread02(); while (true){ try { thread01.run(); Thread.sleep(500); thread02.run(); Thread.sleep(400); } catch (InterruptedException e) { e.printStackTrace(); } } } } 输出结果: currentThread : main say hello for you ... time --> Sun Sep 17 10:02:45 CST 2017 currentThread : main say hello for you ... time --> Sun Sep 17 10:02:45 CST 2017 currentThread : main say hello for you ... time --> Sun Sep 17 10:02:46 CST 2017 currentThread : main say hello for you ... time --> Sun Sep 17 10:02:46 CST 2017 currentThread : main say hello for you ... time --> Sun Sep 17 10:02:46 CST 2017 currentThread : main say hello for you ... time --> Sun Sep 17 10:02:47 CST 2017 3.通过创建线程池的方式创建线程 public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(2); for(int i = 0 ; i< 100 ;i++){ RunClass run = new RunClass(i); service.execute(run); } service.shutdown(); } static class RunClass implements Runnable{ private int index ; public RunClass(int index ){ this.index = index; } @Override public void run() { long sleeptime = (long)(Math.random()*1000); System.out.println("the RunClassNum--> "+index +" cunrrentThread --> "+ Thread.currentThread().getName()+ " come back over ..." + " Sleep Time -->" +sleeptime); try { Thread.sleep(sleeptime); } catch (InterruptedException e) { e.printStackTrace(); } } } 输出结果: the RunClassNum--> 87 cunrrentThread --> pool-1-thread-2 come back over ... Sleep Time -->152 the RunClassNum--> 88 cunrrentThread --> pool-1-thread-2 come back over ... Sleep Time -->207 the RunClassNum--> 89 cunrrentThread --> pool-1-thread-2 come back over ... Sleep Time -->958 the RunClassNum--> 90 cunrrentThread --> pool-1-thread-1 come back over ... Sleep Time -->484 the RunClassNum--> 91 cunrrentThread --> pool-1-thread-1 come back over ... Sleep Time -->767 the RunClassNum--> 92 cunrrentThread --> pool-1-thread-2 come back over ... Sleep Time -->202 the RunClassNum--> 93 cunrrentThread --> pool-1-thread-2 come back over ... Sleep Time -->666 the RunClassNum--> 94 cunrrentThread --> pool-1-thread-1 come back over ... Sleep Time -->454 Executors类中存在多个线程池类型,具体分为以下几种: public static ExecutorService newFixedThreadPool(int nThreads) : 可创建指定数量线程的线程池,当有新的线程需要执行,同时线程池内有空余线程,则会直接取用当前空闲线程,来达到线程的利用率。 public static ThreadFactory defaultThreadFactory(); 返回线程池的默认线程创建工厂 public static ExecutorService newCachedThreadPool(); 创建一个线程池,根据需要适当的创建线程的方式, 4.线程的控制 sleep、join、interrupt sleep–>使当前线程暂停一段时间 join–>使当前的线程加入另一个线程 public class Controller4Thread01 extends Thread { // 1.使用sleep使当前线程暂停一段时间 // 2.使用join是当前线程加入另一个线程 public static int result; public Controller4Thread01(String name ){ super(name); } @Override public void run() { result = (int)( Math.random()*1000); System.out.println("currentThread name-->"+ this.getName() + " get result -->" + result); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { Controller4Thread01 thread01 = new Controller4Thread01("测试线程1"); thread01.start(); long startTime = System.currentTimeMillis(); System.out.println("开始时间为--> " + startTime); System.out.println("thread 未加入之前 .. result-->"+result); try { thread01.join(); } catch (InterruptedException e) { e.printStackTrace(); } long endTime = System.currentTimeMillis(); System.out.println("结束时间为--> "+ endTime + " 时间间隔-->" + (endTime-startTime)); } } 输出结果: interrupt–>打断当前的线程 public class Controller4Thread01 extends Thread { // 1.使用sleep使当前线程暂停一段时间 // 2.使用join是当前线程加入另一个线程 // 3.使用interrupt打断线程 public static int result; public long time ; public Controller4Thread01(String name ){ super(name); } @Override public void run() { long starttime = System.currentTimeMillis(); try { result = (int)( Math.random()*1000); System.out.println("currentThread name-->"+ this.getName() + " get result -->" + result); Thread.sleep(4000); long endTime = System.currentTimeMillis(); time = endTime-starttime; System.out.println(this.getName() + "--> 线程正常运行,且当前线程已运行时间 --> "+ time + "ms"); } catch (InterruptedException e) { long endTime = System.currentTimeMillis(); time = endTime-starttime; System.out.println(this.getName() + "--> 线程被中断,且当前线程已运行时间 --> "+ time + "ms"); e.printStackTrace(); } } public static void main(String[] args) { Controller4Thread01 thread01 = new Controller4Thread01("测试线程1"); thread01.start(); long startTime = System.currentTimeMillis(); System.out.println("开始时间为--> " + startTime); System.out.println("thread 未加入之前 .. result-->"+result); try { thread01.join(2000); long endTime = System.currentTimeMillis(); thread01.interrupt(); System.out.println("结束时间为--> "+ endTime + " 时间间隔-->" + (endTime-startTime)); } catch (InterruptedException e) { e.printStackTrace(); } } } //输出结果: 开始时间为--> 1505642692997 thread 未加入之前 .. result-->0 currentThread name-->测试线程1 get result -->648 结束时间为--> 1505642694998 时间间隔-->2001 java.lang.InterruptedException: sleep interrupted 测试线程1--> 线程被中断,且当前线程已运行时间 --> 2000ms at java.lang.Thread.sleep(Native Method) at Thread.threadConcurrency01.Part02.Controller4Thread01.run(Controller4Thread01.java:22) 若有希望一起交流的朋友可以加我的有道云笔记的群,大家互相整理的技术栈。仅为交流 –> (群号:51920822) –> http://163.fm/QO0DihJw
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/70161808 1. 下载Maven * 官网下载 官网地址Maven官网 * CSDN资源库下载 地址:apache-maven-3.31下载 2.安装Maven 2.1 将解压好的Maven文件放置在没有汉字路径下,并复制路径 2.2 配置Maven环境变量 右击计算机选择属性–>选择高级系统设置–>环境变量–>系统变量 添加MAVEN_HOME,添加路径 添加PATH 2.3Eclipse中集成Maven 2.4 本地Maven配置文件与eclipse创建连接 2.5配置文件设置本地资源库地址 在本地设置建立一个本地资源库,并记住路径 Maven中配置文件设置 打开setting文件,设置当地资源库 将setting文件复制,并粘贴至C:\Users\[计算机名]\.m2下(与2.3中用户设置路径对应) 2.6更新Eclipse中Maven设置,按照2.4步骤点击,至界面User Settings ,点击Update Settings 按钮即可更新本地资源库内容和全局设置与用户设置. Maven配置完成,尽情体会java带来的乐趣吧,boys!!!
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_26654727/article/details/70161145 Spring+SpringMVC +MyBatis整合配置文件案例 针对Spring/SpringMVC/MyBatis三个框架的整合有很多的方式,经过最近的学习我来总结一下其配置文件的设置以及三大框架之间的一些关系.代码配置后面附上,仅作为建议. 三大框架之间的关系图如下: 配置文件配置的对应关系: 1.Spring配置文件 applicationContext.xml <context:component-scan base-package="service"/> <context:property-placeholder location="classpath:/c3p0.properties"/> <!-- 注册数据库的资源 --> <bean id="dataSource" class = "com.mchange.v2.c3p0.ComboPooledDataSource"> <property name="driverClass" value = "${c3p0.driver}"></property> <property name="jdbcUrl" value = "${c3p0.url}"></property> <property name="user" value = "${c3p0.user}"></property> <property name="password" value = "${c3p0.password}"></property> </bean> <!-- 声明式的事务处理 --> <bean id = "transactionManager" class = "org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="datasource"></property> </bean> <!-- 创建通知 配置切面和切入点--> <tx:advice id="advice"> <tx:attributes> <tx:method name="add*" propagation="REQUIRED" /> <tx:method name="del*" propagation="REQUIRED" /> <tx:method name="update*" propagation="REQUIRED" /> <tx:method name="find*" propagation="SUPPORTS" read-only="true" /> <tx:method name="*" read-only="true" /> </tx:attributes> </tx:advice> <aop:config > <aop:pointcut expression="execution(* servlet..*.*(..))" id="pc"/> <aop:advisor advice-ref="advice" pointcut-ref="pc"/> </aop:config> <!-- 整合MyBatis --> <bean id = "SqlSessionFactory" class = "org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref = "dataSource"></property> <!-- 导入核心配置文件 --> <property name="configLocation" value = "classpath:/sqlMapConfig.xml"></property> <!-- 导入映射文件 --> <property name="mapperLocations" value = "classpath:/pojo/*.xml"></property> </bean> <!-- spring为mapper接口创建代理对象 --> <bean class = "org.mybatis.spring.mapper.MapperScannerConfigurer" > <property name="basePackage" value = "mapper"></property> </bean> 2.SpringMVC配置文件 applicationContext-mvc.xml 1. <!-- 开启mvc注解 --> <mvc:annotation-driven /> <context:component-scan base-package="controller"></context:component-scan> 2. <!-- 内部资源视图管理器 --> <bean class = "org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/WEB-INF/"></property> <property name="suffix" value = ".jsp"></property> </bean> 3.MyBatis配置文件 sqlMapConfig.xml 1. MyBatis核心配置文件 <configuration> <!-- 可以设置其缓存和其他一些事务--> </configuration> UserMapper.xml <mapper namespace="mapper.UserMapper"> <!-- 映射配置文件指定开启二级缓存 --> <cache/> <!-- 复用sql语句 --> <sql id="selectUser"> select * from user </sql> <select id="findAll" resultType="pojo.User"> <include refid="selectUser"/> </select> 4.web.xml配置文件 web.xml 1. <!--配置过滤器--> <filter> <filter-name>filter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>Encoding</param-name> <param-value>UTF-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>filter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <servlet> <servlet-name>springmvc</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:/applicationContext*.xml</param-value> </init-param> </servlet> <servlet-mapping> <servlet-name>springmvc</servlet-name> <url-pattern>*.action</url-pattern> </servlet-mapping> 不太善于言辞,希望得到大家的支持,谢谢! 写于2017/04/13