最近在做一个利用微信公众号引流的活动,其中有一个需求点是当系统收到用户回复的任意关键词两分钟后,系统会给用户发送一张带有二维码的图片,用户长按识别二维码会跳转到活动的落地页。这是一个典型的“事件驱动型延时处理”需求,触发条件时收到用户消息,延时2分钟处理。对于这么一个常见的业务场景,对于java开发者来说,你会怎样设计你的功能呢?
java功底薄弱的程序员的致命错误
在谈具体方案之前,先来一个事故小插曲,我们先看看一位经验不是很丰富的java程序员设计这个功能的部分代码:
<!-- spring线程池配置 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="TASK"/>
<property name="queueCapacity" value="255" />
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="keepAliveSeconds" value="3600" />
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
</bean>
//注入spring线程池任务执行器
@Autowired
@Qualifier("taskExecutor")
private ThreadPoolTaskExecutor executor;
/**
* 异步延迟发送图片
*/
public void asyncDelaySendImage(final WxGZHReceiveMsg receiveMsg) {
executor.submit(new Runnable() {
@Override
public void run() {
// 延时两分钟执行
try {
Thread.sleep(2 * 60 * 1000);
} catch (InterruptedException e) {
LOG.error("发送图片延时执行失败");
}
//根据收到的消息内容发送图片
sendImage(receiveMsg);
}
});
}
这个代码上线后,运营人员开启了一波活动,立即有几千个粉丝回复了消息,这个时候我的系统大量的报类似下面的错误:
org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6d5b9673[Running, pool size = 100, active threads = 100, queued tasks = 255, completed tasks = 402]] did not accept task
从这个报错我们看出,当并发量上来后,executor.submit提交失败了,原因是线程池队列已经满了。我们在分析上面的代码,这个代码有两个致命的错误:
- 接受到一个用户的回复executor.submit一次,如果并发量超过线程池队列大小,后面的提交都会失败。
- submit任务后,run方法里居然有个sleep,这直接让线程池里的线程休眠了,线程不能空闲出来被其他任务使用,干等2分钟。
这样的代码会很快耗尽系统资源,大大降低系统的吞吐率,对于java程序员来说,犯这样的错误其实是很低级的,也是致命的,接下来我们来看有什么方案可以解决。
DelayQueue实现事件驱动延时处理
如果上面的这位程序员有了解过jdk的源码,有了解过DelayQueue的原理,我想他不会犯那么低级的错误。java.util.concurrent.DelayQueue
我们现在使用DelayQueue简单实现以下这个功能。我们针对上面出现的WxGZHReceiveMsg类,实现Delayed接口,并实现getDelay和compareTo方法,具体看注释:
package cn.lovecto.test.delay;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 微信公众收到的消息类
*/
public class WxGZHReceiveMsg implements Delayed{
/**用户ID*/
private Integer userId;
/**用户回复的关键字*/
private String keyWord;
/**执行时间,单位ms*/
public Long executeTime;
public WxGZHReceiveMsg(Integer userId, String keyWord, Long executeTime) {
super();
this.userId = userId;
this.keyWord = keyWord;
this.executeTime = executeTime;
}
/**
* compareTo方法的作用即是判断队列中元素的顺序谁前谁后。当前元素比队列元素后执行时,返回一个正数,比它先执行时返回一个负数,否则返回0.
*/
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
}else if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
}
return 0;
}
/**
* getDelay的作用是计算当前时间到执行时间之间还有多少毫秒
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getKeyWord() {
return keyWord;
}
public void setKeyWord(String keyWord) {
this.keyWord = keyWord;
}
public Long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(Long executeTime) {
this.executeTime = executeTime;
}
}
接下来我们模拟测试一下,新建一个ReplayWxGZHReceiveMsgExecutor任务执行类,使用java.util.concurrent.ScheduledExecutorService开启一个线程每秒扫描一次队列中是否有需要回复的消息,只要队列中有需要回复的消息则进行回复:
package cn.lovecto.test.delay;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ReplayWxGZHReceiveMsgExecutor {
/** 接收到的微信公众号消息队列 */
private static DelayQueue<WxGZHReceiveMsg> queue = new DelayQueue<>();
/** 类初始化后就每秒检测队列进行任务处理 */
static {
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
while (!queue.isEmpty()) {
try {
WxGZHReceiveMsg msg = queue.take();
System.out.println(String.format(
"当前时间:%d,计划执行时间%d,准备给用户%d回复的\"%s\"回复图片",
System.currentTimeMillis(),
msg.getExecuteTime(), msg.getUserId(),
msg.getKeyWord()));
// TODO 发送逻辑,此处略
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, 0, 1000, TimeUnit.MILLISECONDS);
}
/**
* 添加到队列
*
* @param msg
*/
public static void submit(WxGZHReceiveMsg msg) {
ReplayWxGZHReceiveMsgExecutor.queue.add(msg);
}
public static void main(String[] args) throws InterruptedException {
long delay = 5000;// 延迟5秒执行
for (int i = 1; i <= 10; i++) {
WxGZHReceiveMsg msg = new WxGZHReceiveMsg(i, "关键词" + i,
System.currentTimeMillis() + delay);
ReplayWxGZHReceiveMsgExecutor.submit(msg);
//这里sleep是为了模拟活动的后半段,用户参与度有所下降
if(i > 5){
Thread.sleep(2000);
}
}
}
}
运行main方法测试一下,对比下计划执行时间和真正的执行时间,相差在1毫秒左右,基本能满足功能性需求。测试结果如下:
当前时间:1534595607657,计划执行时间1534595607656,准备给用户1回复的"关键词1"回复图片
当前时间:1534595607673,计划执行时间1534595607656,准备给用户2回复的"关键词2"回复图片
当前时间:1534595607674,计划执行时间1534595607656,准备给用户3回复的"关键词3"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户4回复的"关键词4"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户5回复的"关键词5"回复图片
当前时间:1534595607674,计划执行时间1534595607657,准备给用户6回复的"关键词6"回复图片
当前时间:1534595609658,计划执行时间1534595609657,准备给用户7回复的"关键词7"回复图片
当前时间:1534595611658,计划执行时间1534595611657,准备给用户8回复的"关键词8"回复图片
当前时间:1534595613658,计划执行时间1534595613657,准备给用户9回复的"关键词9"回复图片
当前时间:1534595615658,计划执行时间1534595615657,准备给用户10回复的"关键词10"回复图片
使用DelayQueue能够“满足事件驱动延时处理”类需求,但要考虑单jvm内存大小的问题,如果并发量太高,消息队列将会较庞大,遍历一次耗时也会较长。此时可以考虑增加机器部署多个实例,实现集群,每个jvm还可以使用多个队列和线程池对任务进行处理从而提高回复效率,提升整个系统的性能。
使用mq实现事件驱动延时处理
使用DelayQueue的优点是JDK自身实现,使用方便,量小特别适用,但是整个队列处于jvm内存中,内容不能持久化,如果没有负载均衡机制,就不能支持分布式运行。如果你要考虑消息的持久化,那么mq是一个不错的选择,比如Rocketmq的延时队列,有兴趣可以参考官方实例:
http://rocketmq.apache.org/docs/schedule-example/
除了rocketmq,Rabbitmq延时队列(TTL+DLX实现)也是一个不错的选择。
使用redis缓存的zset实现事件驱动延时处理
redis缓存的zset是一个有序集合,根据score值进行排序。对于本文的例子,我们可以把score值设置为上面WxGZHReceiveMsg中的executeTime,java代码中使用定时任务扫描有序集合中在一定时间范围内(score值)的元素进行回复操作(使用ZRANGEBYSCORE命令),对已经处理的消息进行rem操作(ZREMRANGEBYSCORE )。如果并发量高,可以使用多个有序集合,采用线程池对集合进行扫描。如果应用是集群部署,要考虑好队列竞争的问题,可以使用分布式锁(此处可以对已经扫描的最小的时间值minScore加锁,扫描一次迭代一次,每次扫描都是当前时间到上一次的minScore值之间的一个范围)。基于redis的分布式锁可参考《springboot使用redisson作为分布式锁的一种实现方式》。
另外redis常用命令可参考:
http://doc.redisfans.com/index.html
总结
总结一下,java中关于”事件驱动型延时处理“业务需程序设计方案,本文列举了三种,第一种设计基于jdk的DelayQueue(java开发人员必须要会);第二种是基于mq的持久化消息队列,如使用Rocketmq、Rabbitmq等,适用于分布式系统;第三种是基于缓存的持久化队列,适用于分布式,但要考虑好集群中多个实例对同一个队列读取时的多次消费问题。总之提高系统吞吐率和性能无非就是多线程、集群、分布式、异步消息队列、缓存,根据实际使用情况选型即可。