使用场景
一个调用请求(Request)到达应用(Application)后,应用处理完成并返回结果(Response)的这一时间段内,如在处理这一请求(Request)过程中出行如下两种情形:
- 调用了较多的方法,且大部分方法都使用到了相同的数据。
- 调用了较少的方法,但某些有循环的方法使用到了其他方法也用到的数据。
并且这些能够共用的数据需要通过远程调用(RPC)或者数据库查询等,则可以考虑使用ThreadLocal做线程缓存。
实战举例
随着业务量的增加,网站流量不断的提升,有那么一段时间,客户经常抱怨,我们的网站经常不能正常访问。针对用户的反馈,我们分析了API网关的请求日志,其中有一个现象就是:前端打开一个页面会大量向后台发送数据请求。
就拿个人中心举例,我们的个人中心包含了个人信息、我的订单、我的常旅客、我的优惠券、我的奖金、我的收藏等信息,并且这些模块的元信息都需要一目了然的展示给用户。我们的用户体系比较特殊,有一个合并用户和关联用户的概念,比如用户使用微信登录则会在我们系统生成一个userIdWeixin,用户针对第三方账号绑定邮箱或者手机号,如果输入的邮箱或者手机号已经在我们的用户系统中存在userIdPhone或userIdEmail,则会把这几个userId关联,用户无论使用微信、手机还是邮箱登录都能同时看到三个账号的相关数据。
原来的做法是每个小模块一个请求,所以打开一次个人中心总共就会有6个请求发到服务端。即使服务器的配置再好,在业务请求量大的情况下也比较容易出现系统繁忙,线程堵塞,最后Web容器的HTTP线程池被慢慢耗尽而出现宕机问题,无法对外提供服务。所以我们针对这种现象进行了减少HTTP请求次数优化,就是说个人中心只需要前端一个http请求,服务端组装6个模块的数据一并返回。
从网络方面而言,减少数据的交互就意味着减少了网络带宽的压力,能够提供更大并发量的服务,减少网络传输的时间。由于减少了请求次数,服务器的资源开销得到有效减少,后台服务器不用频繁的创建线程来处理外部的请求,由于操作系统创建和销毁线程的操作是非常消耗系统CPU等资源,因此减少请求次数也就大大减少了系统开销。
针对我们系统中“个人中心”的这种特殊情况,还会发现,各个独立的模块里面都跟用户相关。就算我们减少了前端http的请求数,网关层还是会针对各个更细粒度的模块远程调用用户服务查询用户的关联用户ID,各自再根据“关联用户ID集合”查询订单、优惠券、收藏等信息。如此一来,个人信息、我的订单、我的常旅客、我的优惠券、我的奖金、我的收藏各查一次就是6次,对底层服务的压力并没有减少。
你可以理解为查询一次个人中心,网关层需要调用个人信息、我的订单、我的常旅客、我的优惠券、我的奖金、我的收藏等6个方法,并且每个方法都需要“关联用户ID集合”。一种做法是查询一次用户获取到“关联用户ID集合”,然后每个方法都把这个“关联用户ID集合”作为参数传递进去,但是这样已经改变了这个方法而且依赖这个方法的地方都要随之改动。所以我们再不改变方法以及保持每个方法实现逻辑不变的前提下,引入ThreadLocal和AOP,仅针对获取“关联用户ID集合”这个方法进行一些改造就能达到我们的目的,并且性能大幅度提升。
上干货了
为了实现线程的缓存,我们定义一个线程上下文类ThreadContext,用于存储需要缓存的数据。具体实现如下:
package cn.lovecto.thread;
import java.util.HashMap;
import java.util.Map;
/**
* 线程上下文
*
*/
public final class ThreadContext {
private ThreadContext() {
}
/**
* 线程容器
*/
private static final ThreadLocal<Map<Object, Object>> local = new ThreadLocal$Map<Map<Object, Object>>();
/**
* 设置线程中的对象
*
* @param key
* @param value
*/
public static void put(Object key, Object value) {
Map<Object, Object> m = getThreadMap();
m.put(key, value);
}
/**
* 取得线程中的对象
*
* @param key
* @return
*/
public static Object get(Object key) {
Map<Object, Object> m = getThreadMap();
return m.get(key);
}
/**
* 删除线程中的对象
*
* @param key
* @return
*/
public static void remove(Object key) {
Map<Object, Object> m = getThreadMap();
m.remove(key);
}
/**
* 清空线程Map
*
* @return
*/
public static void clear() {
Map<Object, Object> m = getThreadMap();
m.clear();
}
/**
* 是否包含Key
*
* @param key
* @return
*/
public static boolean containsKey(Object key) {
Map<Object, Object> m = getThreadMap();
return m.containsKey(key);
}
/**
* 批量添加数据
*
* @param p
* @return
*/
public static void putAll(Map<Object, Object> p) {
Map<Object, Object> m = getThreadMap();
m.putAll(p);
}
/**
* 取得线程中的Map
*
* @return
*/
private static Map<Object, Object> getThreadMap() {
return (Map<Object, Object>) local.get();
}
/**
* 销毁线程变量Map
*/
public static void destory() {
local.remove();
}
/**
* 内部类实现
*
* @param <T>
*/
private static class ThreadLocal$Map<T> extends
ThreadLocal<Map<Object, Object>> {
@Override
protected Map<Object, Object> initialValue() {
return new HashMap<Object, Object>();
}
}
}
我们的目标是使用AOP和ThreadLocal针对方法的返回结果进行缓存,所以我们添加一个注解ThreadCacheable,作用在需要缓存结果的方法上。
package cn.lovecto.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 线程缓存注解,作用在方法上,缓存map的key为:"类名.方法名(参数1,参数2,参数3,...)",参考
* {@link cn.lovecto.aop.ThreadCacheableAop}的实现,这个注解的目的是避免同一个线程,
* 多次调用底层同一个dubbo接口,提升性能,减少整个系统的dubbo调用次数.
*
* 此注解仅作用在调用频繁的方法上,使用者需注意
*
*/
@Retention(value = RetentionPolicy.RUNTIME)
@Target(value = ElementType.METHOD)
public @interface ThreadCacheable {
}
有了注解ThreadCacheable,我们还要针对这个注解实现相关的处理逻辑。下面是ThreadCacheableAop的基于Spring Aop的实现逻辑。
package cn.lovecto.aop;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import cn.lovecto.annotation.ThreadCacheable;
import cn.lovecto.thread.ThreadContext;
/**
* 线程缓存切面核心实现
*
*
*/
@Aspect
@Component
public class ThreadCacheableAop {
public static final Logger logger = LoggerFactory.getLogger(ThreadCacheableAop.class);
/**
* 方法缓存,缓存当前线程的变量
*
* @param joinPoint
* @param threadCacheable
* @return
* @throws Throwable
*/
@Around(value = "@annotation(threadCacheable)")
public Object threadCache(ProceedingJoinPoint joinPoint,
ThreadCacheable threadCacheable) throws Throwable {
return doCache(joinPoint, threadCacheable);
}
/**
* 缓存操作
*
* @param joinPoint
* @param threadCacheable
* @return
* @throws Throwable
*/
public Object doCache(ProceedingJoinPoint joinPoint,
ThreadCacheable threadCacheable) throws Throwable {
String key = getKey(joinPoint);
return getValue(key, joinPoint);
}
/**
* 获取值
*
* @param key
* @param joinPoint
* @param threadCacheable
* @return
* @throws Throwable
*/
private Object getValue(String key, ProceedingJoinPoint joinPoint) throws Throwable {
if (ThreadContext.containsKey(key)) {
return ThreadContext.get(key);
}
Object result = joinPoint.proceed();
if (result != null) {
ThreadContext.put(key, result);
}
return result;
}
/**
* 获取线程变量缓存key , key结构 : 类名+方法名+参数列表
*
* @param joinPoint
* @return
*/
private String getKey(ProceedingJoinPoint joinPoint) {
StringBuilder keyBuilder = new StringBuilder();
keyBuilder.append(joinPoint.getTarget().getClass().getTypeName())
.append(".").append(joinPoint.getSignature().getName())
.append("(");
if (joinPoint.getArgs().length > 0) {
for (int i = 0; i < joinPoint.getArgs().length; i++) {
keyBuilder.append(joinPoint.getArgs()[i]);
if (i != (joinPoint.getArgs().length - 1)) {
keyBuilder.append(",");
} else {
keyBuilder.append(")");
}
}
} else {
keyBuilder.append(")");
}
return keyBuilder.toString();
}
}
接下来一切工作就绪,是到我们使用的时候了,只需要像下面的queryUnionIds方法上加上@ThreadCacheable即可。
/**
* 根据用户ID查询关联的所有用户ID包括自己,使用@ThreadCacheable注解线程缓存
* @param userId
*/
@ThreadCacheable
public List<Integer> queryUnionIds(Integer userId) {
//查询逻辑
}
注意事项
使用ThreadLocal做线程内缓存是种有效手段,但需要考虑数据清除及有效性。我们的网关层是基于spring boot框架的,API网关提供http接口,所以我们在自定义的Filter中清除掉线程中缓存的数据,避免线程池中的线程后续被使用过程中出现不该出现的数据。
package cn.lovecto.Filter;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import cn.lovecto.thread.ThreadContext;
/**
* 线程上下文过滤器
*
*/
public class ThreadContextFilter implements Filter {
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
try {
chain.doFilter(request, response);
} finally {
// 清除线程上下文中的变量
ThreadContext.clear();
}
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void destroy() {
}
}
总结
这是一次ThreadLocal和AOP做线程缓存提高性能,缩短API网关响应时间的实践。文中的源代码是可以直接使用的,注解作用到任何方法上均可以达到线程缓存的效果,但需要注意ThreadLocal缓存的生命周期及合适的时间清除掉,避免对不同的请求造成干扰。