你真的了解并发编程吗?
1、进程和线程a)进程:是系统进行资源分配和调度的基本单位
i.每一个php脚本被运行都会开启一个进程
1. 在nginx:php-fpm
2. 在apache:CGI
b)进程包含线程,线程是我们程序的一个执行单元,负责执行
复制代码2、PHP多线程开发a)概念:是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能
b)原理:在一个php进程下,开启多个线程来执任务,提高执行效率
i.同一件事情,一个人做和十个人做效率是不一样的,多线程就是这个原理
1.例子:十件衣服,每件衣服洗干净平均需要5-10分钟,那么一个人去洗就需要一百分钟才能把所有的衣服洗完,那么如果十个人一起洗,每人一件,那么洗完这十件衣服的事件就是这十个人中洗的最慢的那个人所耗费的事件。
c)实现:php使用pthreads扩展,pthreads是基于c编写的php扩展,用于操作计算机的线程
i.创建一个类 并且继承 Thread 类,并实现以下方法
1.Run():编写多线程执行的业务代码
ii.Start():开启多线程,并执行run中的任务
iii.IsRunning():判断线程是否执行结束
iv.Join:用于获取线程执行的结果
d)缺点:
i.内存占用
ii.线程的开启执行关闭需要消耗系统资源
iii.线程默认共享内存中的数据,所以多线程是不安全的,这也是为什么php的pthreads扩展需要线程安全的版本,线程安全的版本会屏蔽线程之间的内存向
iv.内存共享就代表着内存的抢占,需要引入互斥锁的机制,实现内存的管理,当又一个线程在访问内存时,其他的线程不允许访问
e)应用场景:
i.多线程并发处理:
1.同时执行多个任务,提高执行效率,获取返回结果
2.场景:网络请求、本地文件I/O、大数据处理
ii.具体实现业务:爬虫采集、文件读写日志
复制代码3、PHP生成器a)概念:实现对象的迭代,减少内存消耗
b)原理:通过实现php中的Iterator接口并实现接口内的方法:
i.Rewind:重置对象指针
ii.Next:移动对象指针到下一位
iii.Current:获取当前指针的value
iv.Key:获取当前指针的key
v.Vaild:验证迭代的条件是否满足
vi.实现对象的迭代,程序获取的是这个生成器的对象,在内存中只占用一个对象的空间,通过迭代来更新对象中的值,从而减少内存消耗
c)应用:在函数中使用yield关键字,再调用这个函数时返回的不再函数的执行结果,而是生成器对象,我们可以通过foreach遍历这个对象,每一次遍历会获取到生成器中yield后面的value,并且生成器中在yield位置暂停,下一次循环从暂停位置继续执行
复制代码d)场景:
i.大文件读写,减少内存消耗
复制代码4、协程:a)原理:协程小于线程,不同的是协程不依靠系统的调度,而是通过代码实现多任务快速切换,这个切换的过程无法感知,感官上是同时进行的,其实是串行执行的,因为不需要系统的调蓄,所以切换速度特别快消耗资源特别少,又可以提高执行的效率
i.并行:一个单核的CPU同一之间只能执行一个任务,如果是多核可以同时执行多个任务,从而提高执行效率
ii.并发:单核通过快速切换任务,释放与接收CPU控制权,实现多任务切换并发执行
b)实现:通过php生成器+任务管理器+调度器实现协程
c)应用场景:
i.网络请求、I/O请求、提高系统吞吐量与执行效率
ii.因为协程占用系统资源小,可以处理更多请求,提高系统的吞吐量
复制代码5、Go的协程开发(gorountine)并发编程a)概念:gorountine原理就是协程,只不过这些协程的调度go已经在内部实现了,包括垃圾回收,任务管理,调度,所以我们开发不需要管协程的内部实现,只需要关注业务开发就可以,go从语言层面就实现了并发
b)原理:多任务快速切换,不需要操作系统调度,可以通过管道(channel)实现通讯
c)实现:
i.使用go的协程
1.Go func() {}()
2.Fun := func () {}
go fun()
ii.通讯:
1.Chan := make(chan string[, 管道长度(int)], )
a)默认不设置管道长度为无缓冲的管道,管道中只存储一个值
b)设置之后为有缓冲的管道
2.管道数据的存取:
a)Chan<- “value”
b)Value := <-chan
iii.sync实现等待协程执行结束
1.Var wg sync.WaitGrou
2.Wg.Add(1):添加一个任务,在内部自增1
3.wg.Done():结束任务,在wg内部减一
4.Wg.Wait():检测wg中的计数器值是否为0,为0则继续执行,否则阻塞等待
d)应用场景:
i.网络请求
ii.本地文件I/O, 日志处理
iii.通常用协程来实现异步的操作
简单且高效的6种防止数据重复提交的方法,学到了真的太香了
有位朋友,某天突然问磊哥:在 Java 中,防止重复提交最简单的方案是什么?这句话中包含了两个关键信息,第一:防止重复提交;第二:最简单。于是磊哥问他,是单机环境还是分布式环境?得到的反馈是单机环境,那就简单了,于是磊哥就开始装*了。话不多说,我们先来复现这个问题。模拟用户场景简化的模拟代码如下(基于 Spring Boot):importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
@RequestMapping("/user")
@RestController
publicclassUserController{
/**
* 被重复请求的方法
*/
@RequestMapping("/add")
publicString addUser(String id) {
// 业务代码...
System.out.println("添加用户ID:"+ id);return"执行成功!";
}
}
于是磊哥就想到:通过前、后端分别拦截的方式来解决数据重复提交的问题。前端拦截前端拦截是指通过 HTML 页面来拦截重复请求,比如在用户点击完“提交”按钮后,我们可以把按钮设置为不可用或者隐藏状态。执行效果如下图所示:前端拦截的实现代码:functionsubCli(){
// 按钮设置为不可用
document.getElementById("btn_sub").disabled="disabled";
document.getElementById("dv1").innerText ="按钮被点击了~";
}
但前端拦截有一个致命的问题,如果是懂行的程序员或非法用户可以直接绕过前端页面,通过模拟请求来重复提交请求,比如充值了 100 元,重复提交了 10 次变成了 1000 元(瞬间发现了一个致富的好办法)。所以除了前端拦截一部分正常的误操作之外,后端的拦截也是必不可少。后端拦截后端拦截的实现思路是在方法执行之前,先判断此业务是否已经执行过,如果执行过则不再执行,否则就正常执行。我们将请求的业务 ID 存储在内存中,并且通过添加互斥锁来保证多线程下的程序执行安全,大体实现思路如下图所示:然而,将数据存储在内存中,最简单的方法就是使用 HashMap 存储,或者是使用 Guava Cache 也是同样的效果,但很显然 HashMap 可以更快的实现功能,所以我们先来实现一个HashMap 的防重(防止重复)版本。1.基础版——HashMapimportorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
importjava.util.HashMap;
importjava.util.Map;
/**
* 普通 Map 版本
*/
@RequestMapping("/user")
@RestControlle
publicclassUserController3{
// 缓存 ID 集合
privateMap reqCache = new HashMap<>();
@RequestMapping("/add")
publicString addUser(String id) {
// 非空判断(忽略)...
synchronized (this.getClass()) {
// 重复请求判断
if(reqCache.containsKey(id)) {
// 重复请求
System.out.println("请勿重复提交!!!"+ id);
return"执行失败";
}
// 存储请求 ID
reqCache.put(id,1);
}
// 业务代码...
System.out.println("添加用户ID:"+ id);
return"执行成功!";
}
}
实现效果如下图所示:存在的问题:此实现方式有一个致命的问题,因为 HashMap 是无限增长的,因此它会占用越来越多的内存,并且随着 HashMap 数量的增加查找的速度也会降低,所以我们需要实现一个可以自动“清除”过期数据的实现方案。2.优化版——固定大小的数组此版本解决了 HashMap 无限增长的问题,它使用数组加下标计数器(reqCacheCounter)的方式,实现了固定数组的循环存储。当数组存储到最后一位时,将数组的存储下标设置 0,再从头开始存储数据,实现代码如下:importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
importjava.util.Arrays;
@RequestMapping("/user")
@RestController
publicclassUserController{
privatestatic String[] reqCache = new String[100]; // 请求 ID 存储集合
privatestatic Integer reqCacheCounter =0; // 请求计数器(指示 ID 存储的位置)
@RequestMapping("/add")
publicString addUser(String id) {
// 非空判断(忽略)...
synchronized (this.getClass()) {
// 重复请求判断
if(Arrays.asList(reqCache).contains(id)) {
// 重复请求
System.out.println("请勿重复提交!!!"+ id);
return"执行失败";
}
// 记录请求 ID
if(reqCacheCounter >= reqCache.length) reqCacheCounter =0;
// 重置计数器
reqCache[reqCacheCounter] = id;
// 将 ID 保存到缓存
reqCacheCounter++;
// 下标往后移一位
}
// 业务代码...
System.out.println("添加用户ID:"+ id);
return"执行成功!";
}
}
3.扩展版——双重检测锁(DCL)上一种实现方法将判断和添加业务,都放入 synchronized 中进行加锁操作,这样显然性能不是很高,于是我们可以使用单例中著名的 DCL(Double Checked Locking,双重检测锁)来优化代码的执行效率,实现代码如下:import org .spring framework .web .bind .annotation .Request Mapping;
import org .spring framework .web .bind.annotation .Rest Controller;
import java.util.Arrays;
@Request Mapping("/user")
@Rest Controller
public class User Controller {
private static String[] reqCache = new String[100];
// 请求 ID 存储集合
private static Integer reqCacheCounter =0;
// 请求计数器(指示 ID 存储的位置)
@Request Mapping("/add")
public String addUser(String id) {
// 非空判断(忽略)...
// 重复请求判断
if(Arrays.asList(reqCache).contains(id)) {
// 重复请求
System.out.println("请勿重复提交!!!"+ id);
return"执行失败";
}
synchronized (this.getClass()) {
// 双重检查锁(DCL,double checked locking)提高程序的执行效率
if(Arrays.asList(reqCache).contains(id)) {
// 重复请求System.out.println("请勿重复提交!!!"+ id);
return"执行失败";
}
// 记录请求 ID
if(reqCacheCounter >= reqCache.length) reqCacheCounter =0;
// 重置计数器reqCache[reqCacheCounter] = id;
// 将 ID 保存到缓存
reqCacheCounter++;
// 下标往后移一位
}
// 业务代码...
System.out.println("添加用户ID:"+ id);
return"执行成功!";
}
}
注意:DCL 适用于重复提交频繁比较高的业务场景,对于相反的业务场景下 DCL 并不适用。4.完善版——LRUMap上面的代码基本已经实现了重复数据的拦截,但显然不够简洁和优雅,比如下标计数器的声明和业务处理等,但值得庆幸的是 Apache 为我们提供了一个 commons-collections 的框架,里面有一个非常好用的数据结构 LRUMap 可以保存指定数量的固定的数据,并且它会按照 LRU 算法,帮你清除最不常用的数据。小贴士:LRU 是 Least Recently Used 的缩写,即最近最少使用,是一种常用的数据淘汰算法,选择最近最久未使用的数据予以淘汰。首先,我们先来添加 Apache commons collections 的引用:<!-- 集合工具类 apache commons collections -->
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
org.apache.commonscommons-collections44.4
实现代码如下:importorg.apache.commons.collections4.map.LRUMap;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
@RequestMapping("/user")
@RestController
publicclassUserController{
// 最大容量 100 个,根据 LRU 算法淘汰数据的 Map 集合
privateLRUMap reqCache = new LRUMap<>(100);
@RequestMapping("/add")
publicString addUser(String id) {
// 非空判断(忽略)...
synchronized (this.getClass()) {
// 重复请求判断
if(reqCache.containsKey(id)) {
// 重复请求
System.out.println("请勿重复提交!!!"+ id);
return"执行失败";
}
// 存储请求 ID
reqCache.put(id,1);
}
// 业务代码...
System.out.println("添加用户ID:"+ id);return"执行成功!";
}
}
使用了 LRUMap 之后,代码显然简洁了很多。5.最终版——封装以上都是方法级别的实现方案,然而在实际的业务中,我们可能有很多的方法都需要防重,那么接下来我们就来封装一个公共的方法,以供所有类使用:importorg.apache.commons.collections4.map.LRUMap;
/**
* 幂等性判断
*/
public class Idem potent Utils {
// 根据 LRU(Least Recently Used,最近最少使用)算法淘汰数据的 Map 集合,最大容量 100 个
privatestaticLRUMap reqCache =newLRUMap<>(100);
/**
* 幂等性判断
* @return
*/
publicstaticbooleanjudge(Stringid,ObjectlockClass) {
synchronized (lockClass) {
// 重复请求判断
if(reqCache.containsKey(id)) {
// 重复请求
System.out.println("请勿重复提交!!!"+ id);
returnfalse;
}
// 非重复请求,存储请求 ID
reqCache.put(id,1);
}
returntrue;
}
}
调用代码如下:importcom.example.idempote.util.IdempotentUtils;
importorg.springframework.web.bind.annotation.RequestMapping;
importorg.springframework.web.bind.annotation.RestController;
@RequestMapping("/user")
@RestController
publicclassUserController4{
@RequestMapping("/add")
publicString addUser(String id) {
// 非空判断(忽略)...
// -------------- 幂等性调用(开始) --------------
if(!IdempotentUtils.judge(id,this.getClass())) {
return"执行失败";
}
// -------------- 幂等性调用(结束) --------------
// 业务代码...
System.out.println("添加用户ID:"+ id);
return"执行成功!";
}
}
小贴士:一般情况下代码写到这里就结束了,但想要更简洁也是可以实现的,你可以通过自定义注解,将业务代码写到注解中,需要调用的方法只需要写一行注解就可以防止数据重复提交了,老铁们可以自行尝试一下(需要磊哥撸一篇的,评论区留言 666)。扩展知识——LRUMap 实现原理分析既然 LRUMap 如此强大,我们就来看看它是如何实现的。LRUMap 的本质是持有头结点的环回双链表结构,它的存储结构如下:AbstractLinkedMap.LinkEntryentry;当调用查询方法时,会将使用的元素放在双链表 header 的前一个位置,源码如下:publicVget(Object key, boolean updateToMRU) {
LinkEntry entry =this.getEntry(key);
if(entry ==null) {
returnnull;
}else{
if(updateToMRU) {
this.moveToMRU(entry);
}
returnentry.getValue();
}
}
protectedvoid moveToMRU(LinkEntry entry) {
if(entry.after !=this.header) {
++this.modCount;
if(entry.before ==null) {
thrownew IllegalStateException("Entry.before is null. This should not occur if your keys are immutable, and you have used synchronization properly.");
}
entry.before.after = entry.after;
entry.after.before = entry.before;
entry.after =this.header;
entry.before =this.header.before;
this.header.before.after = entry;
this.header.before = entry;
}else if(entry ==this.header) {
thrownew IllegalStateException("Can't move header to MRU This should not occur if your keys are immutable, and you have used synchronization properly.");
}
}
如果新增元素时,容量满了就会移除 header 的后一个元素,添加源码如下:protectedvoid addMapping(int hashIndex, int hashCode, K key, V value) {
// 判断容器是否已满
if(this.isFull()) {
LinkEntry reuse =this.header.after;
boolean removeLRUEntry =false;
if(!this.scanUntilRemovable) {
removeLRUEntry =this.removeLRU(reuse);
}else{
while(reuse !=this.header && reuse !=null) {
if(this.removeLRU(reuse)) {
removeLRUEntry =true;
break;
}
reuse = reuse.after;
}
if(reuse ==null) {
thrownew IllegalStateException("Entry.after=null, header.after="+this.header.after +" header.before="+this.header.before +" key="+ key +" value="+ value +" size="+this.size +" maxSize="+this.maxSize +" This should not occur if your keys are immutable, and you have used synchronization properly.");
}
}
if(removeLRUEntry) {
if(reuse ==null) {
thrownew IllegalStateException("reuse=null, header.after="+this.header.after +" header.before="+this.header.before +" key="+ key +" value="+ value +" size="+this.size +" maxSize="+this.maxSize +" This should not occur if your keys are immutable, and you have used synchronization properly.");
}
this.reuseMapping(reuse, hashIndex, hashCode, key, value);
}else{
super.addMapping(hashIndex, hashCode, key, value);
}
}else{
super.addMapping(hashIndex, hashCode, key, value);
}
}
判断容量的源码:publicbooleanisFull(){
returnsize >= maxSize;
}容量未满就直接添加数据:super.addMapping(hashIndex, hashCode, key, value);如果容量满了,就调用 reuseMapping 方法使用 LRU 算法对数据进行清除。综合来说:LRUMap 的本质是持有头结点的环回双链表结构,当使用元素时,就将该元素放在双链表 header 的前一个位置,在新增元素时,如果容量满了就会移除 header 的后一个元素。总结本文讲了防止数据重复提交的 6 种方法,首先是前端的拦截,通过隐藏和设置按钮的不可用来屏蔽正常操作下的重复提交。但为了避免非正常渠道的重复提交,我们又实现了 5 个版本的后端拦截:HashMap 版、固定数组版、双重检测锁的数组版、LRUMap 版和 LRUMap 的封装版。特殊说明:本文所有的内容仅适用于单机环境下的重复数据拦截,如果是分布式环境需要配合数据库或 Redis 来实现。
浅谈ActiveMQ
1.ActiveMQ是什么ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。在ActiveMQ官网,版本已经更新到5.15.0版本。2.ActiveMQ的用途ActiveMQ用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话,我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。3.ActiveMQ消息传递的分类消息的传递有两种类型:一种是点对点模式的,即一个生产者和一个消费者一一对应;一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。点对点模式:需要一个生产者发送消息到队列版块(Queue)中,只能有一个消费者从该队列(Queue)中接受该消息。生产者发送消息时,消费者不需要处于运行状态。发布\订阅模式:需要一个生产者发送消息到主题版块(Topic)中,可以有多个消费者订阅该版块来接受消息。消费者接受消息时,必须处于运行状态,而且只能接受运行之后的消息。4.应用实例举例:(1)更新商品信息在电商[后台商品管理系统]中,维护商品信息,执行变更商品信息操作时,除了要将变更后的数据更新到数据库,同时也需要将变更后的数据,更新到索引库。功能分析:需要保证更新索引操作时,不能影响原有业务,所以使用activemq消息中间件进行异步通信。消息发送,选用发布\订阅模式,这种消息可以有多个消费者接收消息。因为在商品信息变更时,还需要进行缓存同步等其他操作。具体操作:在后台商品系统管理工程中的添加商品方法里面添加发送消息逻辑。a)消息目标:Topic。b)消息内容:商品ID,使用TextMessage(2)实时同步交易数据到结算系统电商交易系统产生的交易数据时,不仅需要将交易数据保存到电商交易系统对应的数据库中,还需要实时将产生的交易数据,同步到结算系统数据库中。功能分析:同步交易数据到结算系统的过程,不能影响电商交易系统原有业务,所以同样使用ActivityMQ消息中间件来进行异步通信.消息发送,选用点对点模式。结算系统采用分布式部署,只要有一个消费者接收消息即可。具体操作:消息目标:Queue;消息内容:json格式,交易数据,使用TextMessage。5.activeMQ消息消费不成功怎么办?Activemq有两种通信方式,点到点形式和发布订阅模式。点到点模式,如果消息发送不成功此消息默认会保存到activemq服务端直到有消费者将其消费,所以此时消息是不会丢失的。发布订阅模式,默认情况下只通知一次,如果接收不到此消息就没有了。这种场景只适用于对消息送达率要求不高的情况。如果要求消息必须送达不可以丢失的话,需要配置持久订阅。每个订阅端定义一个id,在订阅是向activemq注册。发布消息和接收消息时需要配置发送模式为持久化。此时如果客户端接收不到消息,消息会持久化到服务端,直到客户端正常接收后为止。
Spring Cloud Alibaba,Spring Cloud Stream 事件驱动(五)
1. 简介事件驱动架构(Event-driven 架构,简称 EDA)是软件设计领域内的一套程序设计模型。这套模型的意义是所有的操作通过事件的发送/接收来完成。举个例子,比如一个订单的创建在传统软件设计中服务端通过接口暴露创建订单的动作,然后客户端访问创建订单。在事件驱动设计里,订单的创建通过接收订单事件来完成,这个过程中有事件发送者和事件接受者这两个模块,事件发送者的作用是发送订单事件,事件接受者的作用的接收订单事件。Spring Cloud Stream 是一套基于消息的事件驱动开发框架,它提供了一套全新的消息编程模型,此模型屏蔽了底层具体消息中间件的使用方式。开发者们使用这套模型可以完成基于消息的事件驱动应用开发。2. 学习目标掌握 Spring 对消息的编程模型封装掌握 RocketMQ 整合 Spring Cloud Stream 完成消息的发送和接收掌握 RocketMQ 整合 Spring Cloud Bus 完成远程事件的发送和接收3. 详细内容概念理解:指导读者理解 Spring 的消息编程模型消息发送/接收:实战 Spring Cloud Steam RocketMQ Binder事件发送/接收: 实战 Spring Cloud Bus RocketMQ4. 理解 Spring 消息编程模型首先我们来看这个场景,不同的消息中间件发送消息的代码:每个消息中间件都有自己的消息模型编程,他们的代码编写方式都不一致。同样地,在消息的订阅方面,也是不同的代码。这个时候如果某天想把 Kafka 切换到 RocketMQ,必须得修改大量代码。Spring 生态里有两个消息相关的模块和项目,分别是 spring-messaging 模块和 Spring Integration 项目,它们对消息的编程模型进行了统一,不论是 Apache RocketMQ 的 Message,或者是 Apache Kafka 的 ProducerRecord,都被统一称为 org.springframework.messaging.Message 接口。Message 接口有两个方法,分别是 getPayload 以及 getHeaders 用于获取消息体以及消息头。如图所示,这也意味着一个消息 Message 由 Header 和 Payload 组成:Payload 是一个泛型,意味是消息体可以放任意数据类型。Header 是一个 MessageHeaders 类型的消息头。有了消息之后,这个消息被发送到哪里呢?Spring 提供了消息通道 MessageChannel 的概念。消息可以被发送到消息通道里,然后再通过消息处理器 MessageHandler 去处理消息通道里的消息:消息处理这里又会遇到一个问题。如果消息通道里只有 1 个消息,但是消息处理器有 N 个,这个时候要被哪个消息处理器处理呢?这里又涉及一个消息分发器的问题。UnicastingDispatcher 表示单播的处理方式,消息会通过负载均衡被分发到某一个消息处理器上,BroadcastingDispatcher 表示广播的方式,消息会被所有的消息处理器处理。5. Spring Cloud StreamSpring Cloud Stream 是一套基于消息的事件驱动开发框架。Spring Cloud Stream 在 Spring Integration 项目的基础上再进行了一些封装,提出一些新的概念,让开发者能够更简单地使用这套消息编程模型。如图所示,这是三者之间的关系:如下图所示,这是 Spring Cloud Stream 的编程模型。通过 RabbitMQ Binder 构建 input Binding 用于读取 RabbitMQ 上的消息,将 payload 内容转成大写再通过 Kafka Binder 构建的 output Binding 写入到 Kafka 中。图上中间的 4行非常简单的代码就可以完成从 RabbitMQ 读取消息再写入到 Kafka 的动作。以下代码是使用 Spring Cloud Stream 以最简单的方式完成消息的发送和接收:@SpringBootApplication@EnableBinding({Source.class, Sink.class}) // ①
public class SCSApplication {
public static void main(String[] args) {
new SpringApplicationBuilder().sources(SCSApplication.class)
.web(WebApplicationType.NONE).run(args);
}
@Autowired
Source source; // ②
@Bean
public CommandLineRunner runner() {
return (args) -> {
source.output().send(MessageBuilder.withPayload("custom payload").setHeader("k1", "v1").build()); // ③
};
}
@StreamListener(Sink.INPUT) // ④
@SendTo(Source.OUTPUT) // ⑤
public String receive(String msg) {
return msg.toUpperCase();
}
}使用 @EnableBinding 注解,注解里面有两个参数 Source 和 Sink,它们都是接口。Source 接口内部有个 MessageChannel 类型返回值的 output 方法,被 @Output 注解修饰表示这是一个 Output Binding;Sink 接口内部有个 SubscribableChannel 类型返回值的 intput 方法,被 @Input 注解修饰表示这是一个 Input Binding。@EnableBinding 注解会针对这两个接口生成动态代理。注入 @EnableBinding 注解对于 Source 接口生成的动态代理。使用 @EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel 发送一条消息。最终消息会被发送到消息中间件对应的 topic 里。@StreamListener 注解订阅 @EnableBinding 注解对于 Sink 接口生成的动态代理内部的 SubscribableChannel 中的消息,这里会订阅到消息中间件对应的topic 和 group。消息处理结果发送到@EnableBinding 注解对于 Source 接口生成的动态代理内部的 MessageChannel。最终消息会被发送到消息中间件对应的topic 里。上述代码需要配置信息:spring.cloud.stream.bindings.input.destination=test-input
spring.cloud.stream.bindings.input.group=test-input-binder
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.destination=test-output
spring.cloud.stream.bindings.output.binder=rocketmq这里的 Input Binding 对应的 topic 是 test-input,group 是 test-input-binder,对应的 MQ 是 Kafka,Output Binding 对应的 topic 是 test-output,对应的 MQ 是 RocketMQ。所以这段代码的意思是以 test-input-binder 这个 group 去 Kafka 上读取 test-input 这个 topic 下的消息,把消息的内容转换成大写再发送给 RocketMQ 的 test-output topic 上。
Spring Cloud Alibaba,分布式服务调用(四)(上)
1. 简介在《Spring Cloud Alibaba 服务注册与发现》篇中曾提到,Spring Cloud Alibaba Nacos Discovery 能无缝整合 Spring Cloud OpenFeign。换言之,Spring Cloud Alibaba 延续了 Spring Cloud 分布式服务调用的特性。除此之外,Spring Cloud Alibaba 引入了 Dubbo Spring Cloud,扩展了分布式服务调用能力,不仅能使 Apache Dubbo 和 OpenFeign 共存,还允许 Spring Cloud 标准调用底层通过 Dubbo 支持的通讯协议传输。无论开发人员是 Dubbo 用户还是 Spring Cloud 用户,都能轻松地驾驭,并以接近“零”成本的代价使应用向上迁移。Dubbo Spring Cloud 致力于简化 Cloud Native 开发成本,提高研发效能以及提升应用性能等目的。2. 学习目标使用 Dubbo Spring Cloud 实现 Spring Cloud 分布式服务调用使用 Dubbo Spring Cloud 替换 Spring Cloud 分布式服务调用底层协议理解 Dubbo Spring Cloud 高级特性:服务订阅、元数据、Actuator3. 详细内容快速上手:使用 Apache Dubbo适配整合:使用注解 @DubboTransported 适配 Spring Cloud OpenFeign 和 @LoadBalanced RestTemplate运维特性:演示服务订阅、元信息(服务、REST)以及 Actuator Endpoints4. 功能特性由于 Dubbo Spring Cloud 构建在原生的 Spring Cloud 之上,其服务治理方面的能力可认为是 Spring Cloud Plus,不仅完全覆盖 Spring Cloud 原生特性,而且提供更为稳定和成熟的实现,特性比对如下表所示:功能组件Spring CloudDubbo Spring Cloud分布式配置(Distributed configuration)Git、Zookeeper、Consul、JDBCSpring Cloud 分布式配置 + Dubbo 配置中心服务注册与发现(Service registration and discovery)Eureka、Zookeeper、ConsulSpring Cloud 原生注册中心?+ Dubbo 原生注册中心负载均衡(Load balancing)Ribbon(随机、轮询等算法)Dubbo 内建实现(随机、轮询等算法 + 权重等特性)服务熔断(Circuit Breakers)Spring Cloud HystrixSpring Cloud Hystrix + Alibaba Sentinel17 等服务调用(Service-to-service calls)Open Feign、RestTemplateSpring Cloud 服务调用 + Dubbo@Reference链路跟踪(Tracing)Spring Cloud Sleuth?+ ZipkinZipkin、opentracing 等4.1 高亮特性4.1.1 Dubbo 使用 Spring Cloud 服务注册与发现Dubbo Spring Cloud 基于 Spring Cloud Commons 抽象实现 Dubbo 服务注册与发现,无需添加任何外部化配置,就能轻松地桥接到所有原生 Spring Cloud 注册中心,包括:NacosEurekaZookeeperConsul注:Dubbo Spring Cloud 将在下个版本支持 Spring Cloud 注册中心与 Dubbo 注册中心并存,提供双注册机制,实现无缝迁移。4.1.2 Dubbo 作为 Spring Cloud 服务调用默认情况,Spring Cloud Open Feign 以及@LoadBalanced``RestTemplate作为 Spring Cloud 的两种服务调用方式。Dubbo Spring Cloud 为其提供了第三种选择,即 Dubbo 服务将作为 Spring Cloud 服务调用的同等公民出现,应用可通过 Apache Dubbo 注解@Service和@Reference暴露和引用 Dubbo 服务,实现服务间多种协议的通讯。同时,也可以利用 Dubbo 泛化接口轻松实现服务网关。4.1.3 Dubbo 服务自省Dubbo Spring Cloud 引入了全新的服务治理特性 - 服务自省(Service Introspection),其设计目的在于最大化减轻注册中心负载,去 Dubbo 注册元信息中心化。假设一个 Spring Cloud 应用引入 Dubbo Spring Boot Starter,并暴露 N 个 Dubbo 服务,以?Dubbo Nacos 注册中心?为例,当前应用将注册 N+1 个 Nacos 应用,除 Spring Cloud 应用本身之前,其余 N 个应用均来自于 Dubbo 服务,当 N 越大时,注册中心负载越重。因此,Dubbo Spring Cloud 应用对注册中心的负载相当于传统 Dubbo 的 N 分之一,在不增加基础设施投入的前提下,理论上,使其集群规模扩大 N 倍。当然,未来的 Dubbo 也将提供服务自省的能力。4.1.4 Dubbo 迁移 Spring Cloud 服务调用尽管 Dubbo Spring Cloud 完全地保留了原生 Spring Cloud 服务调用特性,不过 Dubbo 服务治理的能力是 Spring Cloud Open Feign 所不及的,如高性能、高可用以及负载均衡稳定性等方面。因此,建议开发人员将 Spring Cloud Open Feign 或者@LoadBalanced``RestTemplate迁移为 Dubbo 服务。考虑到迁移过程并非一蹴而就,因此,Dubbo Spring Cloud 提供了方案,即@DubboTransported注解。该注解能够帮助服务消费端的 Spring Cloud Open Feign 接口以及@LoadBalanced``RestTemplateBean 底层走 Dubbo 调用(可切换 Dubbo 支持的协议),而服务提供方则只需在原有@RestController类上追加 Dubbo@Servce注解(需要抽取接口)即可,换言之,在不调整 Feign 接口以及RestTemplateURL 的前提下,实现无缝迁移。如果迁移时间充分的话,建议使用 Dubbo 服务重构系统中的原生 Spring Cloud 服务的定义。5. 快速上手5.1 如何引入 Dubbo Spring CloudDubbo Spring Cloud 引入的方式通常有两种,由易到难分别为:Aliyun Java Initializr 引入和 Maven pom.xml 依赖。官方推荐使用 Aliyun Java Initializr 方式引入 Dubbo Spring Cloud,以便简化组件之间的依赖关系。5.1.1 [偷懒] 直接在沙箱里查看应用代码点击 链接,直接访问沙箱环境,这里会有为你准备好的案例代码^_^。5.1.2 [简单] 通过 Aliyun Java Initializr 创建工程并引入 Dubbo Spring Cloud(推荐)Dubbo Spring Cloud 组件的在整个 Spring Cloud Alibaba 版本和依赖最为复杂,推荐读者使用 Aliyun Java Initializr 构建应用工程。读者选择偏好的 Web 浏览器访问 Aliyun Java Initializr,其资源网址为:https://start.aliyun.com/bootstrap.html下文以 Google Chrome 浏览器为例,当网页加载后,首先,在 “项目基本信息” 部分输入 Group :“com.alibaba.cloud” 以及 Artifact:“dubbo-provider-sample”(见下图绿框部分)然而,“组件依赖” 输入框搜索:“Dubbo”(见下图红箭头部分),最后,选择 “Spring Cloud Alibaba Dubbo”(见下图红框部分),如下所示:继续依赖其他组件:Nacos Service Discovery - 服务注册与发现组件Spring Web - Spring Web MVC 组件Spring Boot Actuator - Spring Boot Actator 组件完整组件汇总如下图所示:点击 “生成” 高亮按钮,平台将生成一个名为 “dubbo-provider-sample.zip” 的压缩文件,将其保存到本地目录,并解压该文件,工程目录将随之生成。打开目录下的 pom.xml 文件,不难发现 Dubbo Spring Cloud Starter 声明其中(以下 XML 内容均来自于项目根路径中的 pom.xml 文件): <dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>不过该 starter 并未指定版本,具体的版本声明在 com.alibaba.cloud:spring-cloud-alibaba-dependencies 部分: <dependencyManagement>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>其中,${spring-cloud-alibaba.version} 和 ${spring-boot.version} 分别为 Spring Cloud Alibaba 和 Spring Boot 组件依赖的版本,它们的版本定义在 <properties> 元素中,即 2.2.1.RELEASE 和 2.3.0.RELEASE: <properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.0.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
</properties>如果读者非常熟悉 Maven 依赖管理的配置方式,可以考虑 Maven pom.xml 依赖 Dubbo Spring Cloud。5.1.3 [高级] 通过 Maven pom.xml 依赖 Dubbo Spring Cloud如果要在您 Dubbo Spring Cloud 的项目中使用 Nacos 来实现服务注册/发现,可将两者 Stater 同时依赖:<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-dubbo</artifactId>
</dependency>该声明方式同样需要声明 com.alibaba.cloud:spring-cloud-alibaba-dependencies,内容与上小节相同,在此不再赘述。下一节将讨论如何使用 Dubbo Spring Cloud 构建服务提供者。5.2 使用 Dubbo Spring Cloud 构建服务提供者按照传统的 Dubbo 开发模式,在构建服务提供者之前,第一个步骤是为服务提供者和服务消费者定义 Dubbo 服务接口。为了确保契约的一致性,推荐的做法是将 Dubbo 服务接口打包在第二方或者第三方的 artifact(jar)中,该 artifact 甚至无需添加任何依赖。对于服务提供方而言,不仅通过依赖 artifact 的形式引入 Dubbo 服务接口,而且需要将其实现。对应的服务消费端,同样地需要依赖该 artifact,并以接口调用的方式执行远程方法。接下来的步骤则是创建 artifact。5.2.1 创建 artifact - dubbo-sample-api选择合适的文件目录,通过 Maven 命令行工具构建 artifact dubbo-sample-api,如下所示:mvn archetype:generate -DgroupId=com.alibaba.cloud -DartifactId=dubbo-sample-api -Dversion=0.0.1-SNAPSHOT -DinteractiveMode=false命令执行后,名为“dubbo-sample-api” 的项目目录生成,切换至该目录,并使用 tree 命令(macOS 命令工具)预览器内部接口:% tree
.
├── pom.xml
└── src
├── main
│?? └── java
│?? └── com
│?? └── alibaba
│?? └── cloud
│?? └── App.java
└── test
└── java
└── com
└── alibaba
└── cloud
└── AppTest.java
11 directories, 3 files其中,App.java 和 AppTest.java 文件并非需要文件,可将其删除。除此之外,当然最重要的是 pom.xml<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.alibaba.cloud</groupId>
<artifactId>dubbo-sample-api</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>dubbo-sample-api</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>不难看出,Maven GAV 信息均按照之前的命令来设定,并没有依赖其他组件。接下来,为当前工程定义 Dubbo 服务接口。
试用体验
第一次真正的试用云服务器,在学校因为项目需要需要做一份大三需要试用的原型,而老师要求的效果要尽量贴合最终成品,因此自学了点php和ajax的内容,最后在电脑上搭建了一个简单的服务器,但是因为每次使用都需要手动开启apache,而且项目组的其他同学也必须配置一下才能使用,因此想到使用云服务器来当做后台,因为是第一次使用,linux的指令也不熟悉,跟着网上的教程走了一晚上最后成功的把后台的php和MySQL的数据库都移动到了云上,总体感觉阿里云服务器的上手难度不高,而且网上有着完整的教程即使不是计算机相关专业的自己网上查找也能很轻松的创建自己的个人服务器
【PHP】window下搭建php环境
php和apache的下载包:https://www.yuque.com/u30882/rx39g7/un9d6b# 注意:需要使用管理员身份运行命令行!!!httpd ‐k start# 重新启动 Apache 服务 httpd ‐k restart# 停止 Apache 服务httpd ‐k stop在c盘创建一个wamp,并把apache的安装包放在里边进入apache 这里会报一个错原因是默认的配置文件有问题,需要先调整一下配置文件 conf/httpd.conf ,才能启动找到 Apache 解压目录中的 conf 目录下的 httpd.conf 文件,定位到 37 行,将 c:/Apache24 改为解压目录,我这里解压到路径是 C:/Develop/Apache24 ,(http.conf文件中路径都为反斜杠/,而不是\)所以我这里修改 然后检测一下配置是否成功 通过错误信息得知,这里是因为另外一个地方配置的目录不存在导致的,所以接着调整 251 行的 DocumentRoot默认 Apache 的网站根目录是安装目录中的 htdocs 文件夹,为了方便对网站文件的管理,一般我们会将其设置在一个自定义目录中,这里我们设置在D:/Develop/www(如果你不介意其实不修改也无所谓)。同时,下面<Directory>为设置可访问的路径这里我们当然把www文件权限放开了
玩转Apache JMeter—测试HTTP接口篇
应一位粉丝的要求发此文章~1 什么是JMeter官网:https://jmeter.apache.orgApache JMeter是由Apache开源的、100% 纯 Java 的应用程序,旨在对程序性能进行测试 。它最初是为测试 Web 应用程序而设计的,但后来扩展到其他测试功能。使用JMeter能够测试哪些功能:Web接口(包括HTTP、HTTPS)SOAP / REST Web 服务FTP通过 JDBC 的数据库LDAP通过 JMS 的面向消息的中间件 (MOM)邮件 - SMTP(S)、POP3(S) 和 IMAP(S)本机命令或 shell 脚本TCPJava 对象2 为什么需要JMeter每个测试工程师,必须掌握的测试工具,熟练使用Jmeter能大大提高工作效率。熟练使用Jmeter后, 能用Jmeter搞定的事情,你就不会使用LoadRunner了。Jmeter 是一款使用Java开发的,主要用来做功能测试和性能测试(压力测试/负载测试). 而且用Jmeter 来测试 Restful API, 非常好用。3 下载JMeter下载地址:https://jmeter.apache.org/download_jmeter.cgi运行时需要环境:JDK8+下载后的目录:主要使用到的目录及其作用:bin:启动文件目录docs:相关文档4 初步使用总体使用流程:(1)新建测试计划(2)启动被测试服务(3)启动JMeter进行测试(4)获取测试结果4.1 使用JMeter的Hello World4.1.1 编写被测试程序package main
import (
"fmt"
"net/http"
)
// i 请求次数
var i int64
// RunHttp1 最简单的HTTP服务
func RunHttp1() {
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
i += 1
fmt.Println("请求次数:",i)
w.Write([]byte("Hello Http!"))
})
http.ListenAndServe(":8080", nil)
}
func main() {
RunHttp1()
}4.1.2 新建测试计划(1)Windows下启动JMeterbin>jmeter.bat(2)新建测试计划(3)在测试计划下新建测试线程组(4)在线程组下新建HTTP请求请求HTTP接口时的配置设置线程数和循环次数4.1.3 启动JMeter进行测试4.1.4 观察控制台输出启动JMeter测试后我们观察控制台输出就能够清晰的发现JMeter的请求啦4.2 使用测试报告当然总看程序的控制台是很不专业的,因为我们这样只能用过控制台来观察,在实际的生产环境中是很难实现的,因此我们最好是有一个类似于测试报告的东西,当然JMeter也为我们准备了,他就是Listener,用于测试的监听和结果的显示,我们下面来实际演示下:(1)新建Listener(2)设置Lisenter名称(3)启动测试,观察变化4.3 性能和错误测试(1)程序package main
import (
"fmt"
"net/http"
"time"
)
var i int64
// RunHttp1 最简单的HTTP服务
func RunHttp1() {
http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) {
i += 1
fmt.Println("请求次数:", i)
w.Write([]byte("Hello Http!"))
})
http.HandleFunc("/hi", func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Second * 1)
i += 1
fmt.Println("请求次数:", i)
w.Write([]byte("Hi Http!"))
})
http.HandleFunc("/err", func(w http.ResponseWriter, r *http.Request) {
i += 1
if i%10 == 0 {
http.Error(w, "err", 500)
} else {
w.Write([]byte("Err Http!"))
}
fmt.Println("请求次数:", i)
})
http.ListenAndServe(":8080", nil)
}
func main() {
RunHttp1()
}(2)测试结果:4.4 切换中文如果英文版的用起来不是那么的顺手,可以切换成中文哈5 小总结本文仅讲述了最常用也是最简单的JMeter测试HTTP接口的功能,我们只需要根据需求填写我们需要测试的HTTP接口信息即可,并且通过不同种类的Listener可以实时的获取测试报告,能够帮助我们更好的分析出我们的接口标准。最后给大家一个小提示哈:无论是测试自己的开发机器还是服务器,尽量谨慎测试,不要一上来就1W个线程直接跑,可能会给机器带来不太好的影响,请优雅一点。今天的分享就到这里咯~
探索Pulsar之使用Go和Java操作Pulsar
1 前言Pulsar官方支持的客户端库:C++PythonWebSocketGo clientNode.jsC#JavaGitHub中三方的客户端库:GoHaskellScalaRust.NETNode.js具体可参看:https://pulsar.apache.org/docs/zh-CN/next/client-libraries/本次仅演示Go和Java的客户端操作。2 单机模式运行Pulsar[root@iZ2ze4m2 bin]# pwd
/root/apache-pulsar-2.10.0/bin
[root@iZ2ze4m2 bin]# ./pulsar standalonePS:针对单机启动报错问题,如下面的:可以尝试使用该命令进行启动:./pulsar standalone -nss3 Go客户端操作Pulsar(1)添加依赖 go get -u "github.com/apache/pulsar-client-go/pulsar"(2)生产者package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://192.168.71.143:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
OperationTimeout: 60 * time.Second,
ConnectionTimeout: 60 * time.Second,
})
defer client.Close()
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
}(3)消费者package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://192.168.71.143:6650", //支持:"pulsar://localhost:6650,localhost:6651,localhost:6652"
OperationTimeout: 60 * time.Second,
ConnectionTimeout: 60 * time.Second,
})
defer client.Close()
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}4 Java&Spring客户端操作Pulsar4.1 Java客户端(1)pom依赖<properties>
<pulsar.version>2.9.1</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>(2)生产者和消费者class SbPursarApplicationTests {
private PulsarClient client;
private void init() throws PulsarClientException {
client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.71.147:6650")
.build();
}
@Test
void producer() throws Exception {
init();
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.create();
// 然后你就可以发送消息到指定的broker 和topic上:
producer.send("My message".getBytes());
client.close();
}
@Test
void consumer() throws PulsarClientException {
init();
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message msg = consumer.receive();
try {
System.out.println("Message received: " + new String(msg.getData()));
//消息确认
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
}
}
}4.2 Spring客户端(1)依赖<properties>
<java.version>1.8</java.version>
<!-- in your <properties> block -->
<pulsar.version>2.9.1</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>(2)项目结构(3)配置类主要用于将自定义Bean放入Spring中@Configuration
public class PulsarConfig {
@Bean
public Producer pulsarProducer() throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.71.147:6650")
.build();
Producer<byte[]> producer = client
.newProducer()
.topic("my-topic")
.create();
return producer;
}
@Bean
public Consumer pulsarConsumer() throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://192.168.71.147:6650")
.build();
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
return consumer;
}
}(4)控制器类:生产者@RestController
public class HelloPulsarController {
@Autowired
private Producer pulsarProducer;
@RequestMapping("/hello/{msg}")
public String hello(@PathVariable("msg") String msg) {
try {
pulsarProducer.send(msg.getBytes());
} catch (PulsarClientException e) {
return "发送失败";
}
return "发送成功";
}
}(3)消费者直接使用自定义Bean,并在Spring Boot启动后自动调用该方法@Service
public class PulsarConsumerService implements ApplicationRunner {
@Autowired
private Consumer pulsarConsumer;
public void consumer() throws PulsarClientException {
while (true) {
Message msg = pulsarConsumer.receive();
try {
System.out.println("Message received: " + new String(msg.getData()));
pulsarConsumer.acknowledge(msg);
} catch (Exception e) {
pulsarConsumer.negativeAcknowledge(msg);
}
}
}
@Override
public void run(ApplicationArguments args) throws Exception {
consumer();
}
}5 通过pulsar-manager搭建可视化管理界面5.1 下载链接https://github.com/apache/pulsar-manager#access-pulsar-manager或https://pulsar.apache.org/en/download/5.2 启动并配置启动$ wget https://dist.apache.org/repos/dist/release/pulsar/pulsar-manager/pulsar-manager-0.2.0/apache-pulsar-manager-0.2.0-bin.tar.gz
$ tar -zxvf apache-pulsar-manager-0.2.0-bin.tar.gz
$ cd pulsar-manager
$ tar -xvf pulsar-manager.tar
$ cd pulsar-manager
$ cp -r ../dist ui
$ ./bin/pulsar-manager配置账号密码$ CSRF_TOKEN=$(curl http://127.0.0.1:7750/pulsar-manager/csrf-token)
$ curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://127.0.0.1:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'5.3 使用访问http://localhost:9527就可以打开pulsar-manager界面:参考:https://pulsar.apache.org/docs/zh-CN/client-libraries-go/https://pulsar.apache.org/docs/zh-CN/client-libraries-java/https://blog.csdn.net/ycf921244819/article/details/120907372https://github.com/apache/pulsar-manager#access-pulsar-manager
php配置mongodb扩展、安装mongodb服务教程
安装mongodb服务 1、下载mongodb: mongodb 提供了可用于 32 位和 64 位系统的,你可以从mongodb官网下载安装. mongodb下载地址:https://www.mongodb.com/download-center#atlas 2、运行安装mongodb: 双击下载下来的文件,按操作提示安装即可。 ( 推荐选择自定义安装,自定义安装在自建的mongodb目录里。如:我安装在F盘我手动创建的mongodb目录下) 3、完成装成后:创建数据保存目录(dbpath) 1):在F盘的mongodb目录下新建data目录。 2):在cmd命令模式下运行命令:mongod --dbpath F:\mongodb\data ( 注意要切换到F:\mongodb\bin\下执行哦) 4、安装完成。 以后要使用mongodb时记得要启动mongodb服务(运行bin目录下的mongo.exe) 当然这样每次用都要去启动一下有点麻烦,推荐做成系统服务以后就都不用去启动了。 5、把mongodb安装成服务。以后就不需要每次手动去启动了 1):首先我们在F:\mongodb\下新建一个目录logs, 然后在里面新建一个空的文件(logs.txt) 2):cmd运行命令: mongod --logpath "F:\mongodb\logs\logs.txt" --logappend --dbpath "F:\mongodb\data" --directoryperdb --serviceName "MongoDB" --serviceDisplayName "MongoDB" --install 3):完成,去服务中启动一下,以后就不用再手机启动了。php配置mongodb扩展 1、通过查看phpinfo(),确认需要下载那个版本的php_mongodb.dll文件 1):查看PHP当然版本号是多少。 2):查看Architecture项,是x86 还是x64。 3):查看PHP Extension Build项,里面是支持TS还是不支持(也就是看是TS还是NTS)。 2、下载对应mongodb扩展文件: 下载地址: https://pecl.php.net/package/mongodb 选择一个版本下载,只要点进去支持我们当前的php版本。 点进去后的页面如下: 选择好后直接点击就会下载了。 3、解压下载的文件包,把里面的php_mongodb.dll放到对应的php版本目录中的ext中去! (如:我的是E:\phpStudy\php\php-5.5.38,那就放到E:\phpStudy\php\php-5.5.38\ext\下) 4、最后一步使用php的mongodb扩展生效能用(有两种处理方法): 这一步很多人会忽略,从而使mongodb扩展不生效。 1)不需要重启电脑: 复制E:\phpStudy\php\php-5.5.38\libsasl.dll这个文件放到Apache的bin目录中去。 (如:我的是E:\phpStudy\Apache\bin,然后重启Apache) 2)需要重启电脑: 把php版本目录添加到系统环境变量中去。 (如:我的是E:\phpStudy\php\php-5.5.38)方法: 计算机(右键) -》 属性(点击)-》 高级系统设置(点击)-》环境变量(点击) -》系统变量下的path变量名(点击)弹出添加目录到最后去,然后保存 -》 重启电脑Linux系统安装/usr/local/php/bin/pecl install mongodbmongodb可视化管理工具:Robo 3T Robo 3T 下载地址:https://robomongo.org/download 安装和使用都很简单,这里就不过多讲解了! 连接后的效果图: