在前面的一篇文章《java中关于”事件驱动型延时处理“业务需求的几种程序设计方案》中讲到了使用redis的功能实现延迟事件处理的定时任务功能,本文主要讲下使用redisson的延迟队列实现定时功能。redisson相关文档及章节参考:
https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88#715-%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97delayed-queue
官方文档中只是简单介绍了延迟队列的提交,但未涉及到使用的问题,今天我们就补充一下。
现在的使用场景是,电商系统会做促销,有时候每天会有多个促销场次,比如早上9点-11点一场,下午14点-16点一场,晚上20点-23点一场,每个场次的打折力度是不一样的,所以在不同的促销时间段内用户看到的价格也应该不一样,如果价格的计算过程比较复杂,那么产品的起价可能会缓存,所以缓存的更新就会依赖促销的场次变化。
促销规则都是预设的(活动都是有计划的),所以我们现在的解决思路是每天凌晨0点查询出当天需要进行的促销场次,然后把促销ID加入到延迟队列中,促销场次开始时触发起价计算逻辑,重新更起价缓存。
业务相关的就不提供了,此处给出添加队列和使用队列的相关代码吧。先来看添加队列:
/**
* 尝试添加到监控队列,满足条件是预订开始时间是当天
*
* @param promotionId
* 促销ID
* @param bookingStartTime
* 促销预订开始时间
* @return
*/
public boolean tryAddToQueue(Integer promotionId, Date bookingStartTime) {
Date today = DateUtil.getCurrenctDate();
Date tommorow = DateUtil.addDay(today, 1);
if (!DateUtil.between(bookingStartTime, today, tommorow)) {
return false;
}
// 过了当前时间的,不必再加入队列
Date now = new Date();
long delay = bookingStartTime.getTime() - now.getTime();
if (delay <= 0) {
return false;
}
RQueue<Integer> queue = redisson
.getQueue(PromotionConstants.KeyFormat.KEY_START_PRICE_PROMOTIONID_MONITOR_QUEUE);
// 使用延迟队列
RDelayedQueue<Integer> delayedQueue = redisson.getDelayedQueue(queue);
delayedQueue.offer(promotionId, delay, TimeUnit.MILLISECONDS);
// 使用后释放掉
delayedQueue.destroy();
return true;
}
提交到队列先使用redisson.getQueue获取到一个队列,再使用redisson.getDelayedQueue获取到一个延迟队列,最后使用delayedQueue.offer提交。再来看看使用队列的地方:
@PostConstruct
public void init() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
// 每秒检测一次
executor.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
RQueue<Integer> queue = redisson
.getQueue(PromotionConstants.KeyFormat.KEY_START_PRICE_PROMOTIONID_MONITOR_QUEUE);
Integer promotionId = queue.poll();
// 有促销ID,则进行起价更新
if (promotionId != null) {
updatePromotionProductStartPrice(promotionId);
}
}
}, 0, 1, TimeUnit.SECONDS);
}
使用redisson.getQueue获取到队列,使用queue.poll()从队列中获取到元素,如果获取成功,则直接更新促销产品起价,否则,一秒后继续检测。
使用redisson的延迟队列时,千万要注意的地方是放入队列是使用的RDelayedQueue,获取队列是使用RQueue而不是RDelayedQueue。有兴趣可参考redisson源码,尤其是下面这个方法内部的实现逻辑:
org.redisson.RedissonDelayedQueue.RedissonDelayedQueue(QueueTransferService, Codec, CommandAsyncExecutor, String)