Redis笔记(七)Java实现Redis消息队列

简介:

封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  class  Message  implements  Serializable{
 
private  static  final  long  serialVersionUID = 1L;
 
private  String titile;
private  String info;
 
public  Message(String titile,String info){
this .titile=titile;
this .info=info;
}
 
public  String getTitile() {
return  titile;
}
public  void  setTitile(String titile) {
this .titile = titile;
}
public  String getInfo() {
return  info;
}
public  void  setInfo(String info) {
this .info = info;
}
}

  

为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public  class  MessageUtil {
 
//convert To String
public  static  String convertToString(Object obj,String charset)  throws  IOException{
 
ByteArrayOutputStream bo =  new  ByteArrayOutputStream();
ObjectOutputStream oo =  new  ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return  str;
}
 
//convert To Message
public  static  Object convertToMessage( byte [] bytes)  throws  Exception{
ByteArrayInputStream in =  new  ByteArrayInputStream(bytes);
ObjectInputStream sIn =  new  ObjectInputStream(in);
return  sIn.readObject();
 
}
}

  

从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public  class  RedisUtil {
 
/**
* Jedis connection pool
* @Title: config
*/
public  static  JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle( "redis" );
String host=bundle.getString( "host" );
int  port=Integer.valueOf(bundle.getString( "port" ));
int  timeout=Integer.valueOf(bundle.getString( "timeout" ));
//  String password=bundle.getString("password");
 
JedisPoolConfig config= new  JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString( "maxActive" )));
config.setMaxWait(Integer.valueOf(bundle.getString( "maxWait" )));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString( "testOnBorrow" )));
config.setTestOnReturn(Boolean.valueOf(bundle.getString( "testOnReturn" )));
 
JedisPool pool= new  JedisPool(config, host, port, timeout);
 
return  pool;
}
}

  

创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public  class  Producer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  provide(String channel,Message message)  throws  IOException{
String str1=MessageUtil.convertToString(channel, "UTF-8" );
String str2=MessageUtil.convertToString(message, "UTF-8" );
jedis.publish(str1, str2);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public  class  Consumer {
 
private  Jedis jedis;
private  JedisPool pool;
 
public  Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public  void  consum(String channel)  throws  IOException{
JedisPubSub jedisPubSub =  new  JedisPubSub() {
// 取得订阅的消息后的处理
public  void  onMessage(String channel, String message) {
System.out.println( "Channel:" +channel);
System.out.println( "Message:" +message.toString());
}
 
// 初始化订阅时候的处理
public  void  onSubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onSubscribe:" +channel);
}
 
// 取消订阅时候的处理
public  void  onUnsubscribe(String channel,  int  subscribedChannels) {
System.out.println( "onUnsubscribe:" +channel);
}
 
// 初始化按表达式的方式订阅时候的处理
public  void  onPSubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取消按表达式的方式订阅时候的处理
public  void  onPUnsubscribe(String pattern,  int  subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取得按表达式的方式订阅的消息后的处理
public  void  onPMessage(String pattern, String channel, String message) {
System.out.println(pattern +  "="  + channel +  "="  + message);
}
};
 
jedis.subscribe(jedisPubSub, channel);
}
 
//close the connection
public  void  close()  throws  IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public  static  void  main(String[] args){
 
Message msg= new  Message( "hello!" "this is the first message!" );
 
Producer producer= new  Producer();
Consumer consumer= new  Consumer();
try  {
producer.provide( "chn1" ,msg);
consumer.consum( "chn1" );
catch  (IOException e) {
e.printStackTrace();
}
}

  


本文转自邴越博客园博客,原文链接:http://www.cnblogs.com/binyue/p/4763352.html,如需转载请自行联系原作者

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2天前
|
canal 缓存 NoSQL
【Redis系列笔记】双写一致性
本文讨论了缓存不一致问题及其后果,如价格显示错误和订单计算错误。问题主要源于并发和双写操作的异常。解决方案包括使用分布式锁(但可能导致性能下降和复杂性增加)、延迟双删策略(通过延迟删除缓存来等待数据同步)以及异步同步方法,如通过Canal和MQ实现数据的最终一致性。面试中,可以提及这些策略来确保数据库和缓存数据的一致性。
26 1
【Redis系列笔记】双写一致性
|
8天前
|
人工智能 前端开发 Java
Java语言开发的AI智慧导诊系统源码springboot+redis 3D互联网智导诊系统源码
智慧导诊解决盲目就诊问题,减轻分诊工作压力。降低挂错号比例,优化就诊流程,有效提高线上线下医疗机构接诊效率。可通过人体画像选择症状部位,了解对应病症信息和推荐就医科室。
148 10
|
9天前
|
消息中间件 存储 安全
从零开始构建Java消息队列系统
【4月更文挑战第18天】构建一个简单的Java消息队列系统,包括`Message`类、遵循FIFO原则的`MessageQueue`(使用`LinkedList`实现)、`Producer`和`Consumer`类。在多线程环境下,`MessageQueue`的操作通过`synchronized`保证线程安全。测试代码中,生产者发送10条消息,消费者处理这些消息。实际应用中,可能需要考虑持久化、分布式队列和消息确认等高级特性,或者使用成熟的MQ系统如Kafka或RabbitMQ。
|
9天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
11天前
|
缓存 NoSQL Java
使用Redis进行Java缓存策略设计
【4月更文挑战第16天】在高并发Java应用中,Redis作为缓存中间件提升性能。本文探讨如何使用Redis设计缓存策略。Redis是开源内存数据结构存储系统,支持多种数据结构。Java中常用Redis客户端有Jedis和Lettuce。缓存设计遵循一致性、失效、雪崩、穿透和预热原则。常见缓存模式包括Cache-Aside、Read-Through、Write-Through和Write-Behind。示例展示了使用Jedis实现Cache-Aside模式。优化策略包括分布式锁、缓存预热、随机过期时间、限流和降级,以应对缓存挑战。
|
12天前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
16天前
|
运维 NoSQL 算法
Java开发-深入理解Redis Cluster的工作原理
综上所述,Redis Cluster通过数据分片、节点发现、主从复制、数据迁移、故障检测和客户端路由等机制,实现了一个分布式的、高可用的Redis解决方案。它允许数据分布在多个节点上,提供了自动故障转移和读写分离的功能,适用于需要大规模、高性能、高可用性的应用场景。
16 0
|
19天前
|
存储 缓存 NoSQL
Java手撸一个缓存类似Redis
`LocalExpiringCache`是Java实现的一个本地缓存类,使用ConcurrentHashMap存储键值对,并通过ScheduledExecutorService定时清理过期的缓存项。类中包含`put`、`get`、`remove`等方法操作缓存,并有`clearCache`方法来清除过期的缓存条目。初始化时,会注册一个定时任务,每500毫秒检查并清理一次过期缓存。单例模式确保了类的唯一实例。
16 0
|
30天前
|
缓存 NoSQL Java
Java项目:支持并发的秒杀项目(基于Redis)
Java项目:支持并发的秒杀项目(基于Redis)
26 0
时间轮-Java实现篇
在前面的文章《[时间轮-理论篇](https://developer.aliyun.com/article/910513)》讲了时间轮的一些理论知识,然后根据理论知识。我们自己来实现一个简单的时间轮。

热门文章

最新文章