基于Redis实现分布式消息队列(4)
1、访问Redis的工具类
public class RedisManager {
private static Pool<Jedis> pool;
protected final static Logger logger = Logger.getLogger(RedisManager.class);
static{
try {
init();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void init() throws Exception {
Properties props = ConfigManager.getProperties("redis");
logger.debug("初始化Redis连接池。");
if(props==null){
throw new RuntimeException("没有找到redis配置文件");
}
// 创建jedis池配置实例
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
// 设置池配置项值
int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
jedisPoolConfig.setMaxTotal(poolMaxTotal);
int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
jedisPoolConfig.setMaxIdle(poolMaxIdle);
long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);
logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));
// 根据配置实例化jedis池
String connectMode = props.getProperty("redis.connectMode");
String hostPortStr = props.getProperty("redis.hostPort");
logger.debug(String.format("host : %s ",hostPortStr));
logger.debug(String.format("mode : %s ",connectMode));
if(StringUtils.isEmpty(hostPortStr)){
throw new OptimusException("redis配置文件未配置主机-端口集");
}
String[] hostPortSet = hostPortStr.split(",");
if("single".equals(connectMode)){
String[] hostPort = hostPortSet[0].split(":");
pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
}else if("sentinel".equals(connectMode)){
Set<String> sentinels = new HashSet<String>();
for(String hostPort : hostPortSet){
sentinels.add(hostPort);
}
pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
}
}
/**
* 使用完成后,必须调用 returnResource 还回。
* @return 获取Jedis对象
*/
public static Jedis getResource(){
Jedis jedis = pool.getResource();
if(logger.isDebugEnabled()){
logger.debug("获得链接:" + jedis);
}
return jedis;
}
/**
* 获取Jedis对象。
*
* 用完后,需要调用returnResource放回连接池。
*
* @param db 数据库序号
* @return
*/
public static Jedis getResource(int db){
Jedis jedis = pool.getResource();
jedis.select(db);
if(logger.isDebugEnabled()){
logger.debug("获得链接:" + jedis);
}
return jedis;
}
/**
* @param jedis
*/
public static void returnResource(Jedis jedis){
if(jedis!=null){
pool.returnResource(jedis);
if(logger.isDebugEnabled()){
logger.debug("放回链接:" + jedis);
}
}
}
/**
* 需要通过Spring确认这个方法被调用。
* @throws Exception
*/
public static void destroy() throws Exception {
pool.destroy();
}
}
这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。
2、队列接口
public interface TaskQueue {
/**
* 获取队列名
* @return
*/
String getName();
/**
* 往队列中添加任务
* @param task
*/
void pushTask(String task);
/**
* 从队列中取出一个任务
* @return
*/
String popTask();
}
用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。
3、队列的Redis实现类
/**
* 任务队列Redis实现。
*
* 采用每次获取Jedis并放回pool的方式。
* 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。
* 暂时先忍受这种低性能,不明确Jedis是否线程安全。
*
*/
public class TaskQueueRedisImpl implements TaskQueue {
private final static int REDIS_DB_IDX = 9;
private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);
private final String name;
/**
* 构造函数。
*
* @param name
*/
public TaskQueueRedisImpl(String name) {
this.name = name;
}
/* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#getName()
*/
public String getName() {
return this.name;
}
/* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#pushTask(String)
*/
public void pushTask(String task) {
Jedis jedis = null;
try{
jedis = RedisManager.getResource(REDIS_DB_IDX);
jedis.lpush(this.name, task);
}catch(Throwable e){
logger.error(e.getMessage(),e);
}finally{
if(jedis!=null){
RedisManager.returnResource(jedis);
}
}
}
/* (non-Javadoc)
* @see com.gwssi.common.mq.TaskQueue#popTask()
*/
public String popTask() {
Jedis jedis = null;
String task = null;
try{
jedis = RedisManager.getResource(REDIS_DB_IDX);
task = jedis.rpop(this.name);
}catch(Throwable e){
logger.error(e.getMessage(),e);
}finally{
if(jedis!=null){
RedisManager.returnResource(jedis);
}
}
return task;
}
}
4、获取队列实例的工具类
/**
* <pre>
* // 获得队列
* TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
*
* // 添加任务到队列
* String task = "task id";
* tq.pushTask(task);
*
* // 从队列中取出任务执行
* String taskToDo = tq.popTask();
* </pre>
* @author liuhailong
*/
public class TaskQueueManager {
protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);
private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();
/**
* 短信队列名。
*/
public static final String SMS_QUEUE = "SMS_QUEUE";
/**
* 规则队列名。
*/
public static final String RULE_QUEUE = "RULE_QUEUE";
private static void initQueneMap() {
logger.debug("初始化任务队列...");
queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
logger.debug("建立队列:"+RULE_QUEUE);
queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
logger.debug("建立队列:"+SMS_QUEUE);
}
static {
initQueneMap();
}
public static TaskQueue get(String name){
return getRedisTaskQueue(name);
}
public static TaskQueue getRedisTaskQueue(String name){
return queneMap.get(name);
}
}
和具体的队列过于紧耦合,但简单好用。
先跑起来再说。
5、向队列中添加任务的代码
TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);
6、从队列中取出任务执行的代码
public class SmsSendTask{
protected final static Logger logger = Logger.getLogger(SmsSendTask.class);
protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
/**
* 入口方法。
*/
public void execute() {
TaskQueue taskQueue = null;
String task = null;
try {
taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
// 非线程安全
Set<Serializable> executedTaskSet = new HashSet<Serializable>();
task = taskQueue.popTask();
while(task!=null){
// 判断是否把所有任务都执行一遍了,避免死循环
if(executedTaskSet.contains(task)){
taskQueue.pushTask(task);
break;
}
executeSingleTask(taskQueue,task);
task = taskQueue.popTask();
}
}catch(Throwable e){
logger.error(e.getMessage(),e);
e.printStackTrace();
}
}
/**
* 发送单条短信。
*
* 取出任务并执行,如果失败,放回任务列表。
*
* @param taskQueue
* @param task
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
private void executeSingleTask(TaskQueue taskQueue, String task) {
try {
// do the job
String smsId = task;
Map<String,String> sms = smsSendService.getSmsList(smsId);
smsSendService.send(sms);
smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);
String opType = "2";
TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
} catch (Throwable e) {
if(task!=null){
taskQueue.pushTask(task);
smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
if(logger.isDebugEnabled()){
logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage()));
}
}else {
e.printStackTrace();
}
}
}
}
这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。
有空再改。