概要
起因A3项目发展2年后,功能较为稳定后 ,准备合并进EagleEye主体项目,遇到了个问题,代码很难merge进EagleEye。暴露了一个问题,代码写得太差。模块化。重新认识一下,如何写代码
入门版Hello World
下面这段经典代码,开始学习的时候,觉得非常的优美。工作6年后,回头再看看这段代码,却发现漏洞很多。
1:逻辑通过静态对象和静态代码组织
2:难以分析依赖
3:代码散乱,扩展性差,无模块化,很难组织进大型项目
public class HelloWorld {
public static void main(String[] args){
System.out.println("Hello World");
}
}
学徒版Hello World
应该思考一下这段逻辑有哪些依赖?
1:将业务逻辑调用通过 handler 包装。
2:在handler执行后将返回值传递给 一个Result对象
3:将参数传递给一个Param对象,传递给业务逻辑
类似下面的伪代码
public class HelloWorldV2 {
Handler handler;
Param param;
Result result;
public static void main(String[] args){
param.setName("Hello World");
handler.run();
Object rtn = result.get();
}
}
实战HelloWorld
优化前的A3逻辑里的限流功能,通过几个静态方法组成限流模块,代码难以维护,主要是难以迁入EagleEye
通过学徒版的形式,重写这个静态方法的逻辑。
限流逻辑
限流模块,主要的逻辑,给各个api接口设置一个token身份,同一个身份最多同时只能保持N个请求上限,对重要的业务方,如aone发布平台不做身份限流上限,保证每次发布都能完整坚持,并且整个系统包含一个固定QPS水位的上限,防止系统崩溃。当系统达到上限后,对触发上限的token身份,触发熔断机制(在一定的时间内不接受这个token的请求)。
思考依赖
1:首先应该有个 Limiter对象,负责同一个身份的验证 Limiter
2:需要有一个可以管理身份的对象,可以判断身份,重要或者一般或者不合法 TokenChecker
3:需要有个固定QPS上限的对象,防止系统崩溃 RateLimiterGuave
4:需要有个熔断器,对高频调用身份做惩罚。RateLimiterPunish
抽出下面4个接口,然后由 Limiter依赖(TokenChecker,RateLimiterGuave,RateLimiterPunish)三个依赖
public interface Limiter {
boolean pass(String tokenId);
boolean pass(String tokenId, String start);
void unlock(String tokenId);
}
public interface TokenChecker {
boolean isBlackList(String tokenId);
boolean isWhiteList(String tokenId);
boolean exist(String tokenId);
}
public interface RateLimiterGuave {
public boolean pass();
}
public interface RateLimiterPunish {
boolean isPunish(String tokenId);
void punish(String tokenId);
}
配置依赖实现类
public class LimitModule extends AbstractModule {
@Override
protected void configure() {
bind(Limiter.class).to(LimiterImpl.class);
bind(RateLimiterGuave.class).to(RateLimiterGuaveImpl.class);
bind(RateLimiterPunish.class).to(RateLimiterPunishImpl.class);
bind(TokenChecker.class).to(TokenCheckerImpl.class);
}
核心限流逻辑
LimiterImpl 主要通过注入3个依赖。
@Singleton
public class LimiterImpl implements Limiter {
private final RateLimiterGuave rateLimiterGuave;
private final RateLimiterPunishImpl rateLimiterPunish;
private final TokenChecker tokenChecker;
@Inject
public LimiterImpl(
RateLimiterGuave rateLimiterGuave,
RateLimiterPunishImpl rateLimiterPunish,
TokenChecker tokenChecker) {
this.rateLimiterGuave = rateLimiterGuave;
this.rateLimiterPunish = rateLimiterPunish;
this.tokenChecker = tokenChecker;
boolean isProd = EnvUtils.isProd();
if (!isProd) MAX_FLOW_UPPER = 0l;
}
private static final Logger log = LoggerFactory.getLogger(LimiterImpl.class);
private final ConcurrentMap<String, AtomicInteger> flowControlTokens = new ConcurrentHashMap<String, AtomicInteger>();
private final String DEFAULT_TOKEN = "DEFAULT_TOKEN";
private static long MAX_FLOW_UPPER = 3l;//最多给 3 个查询
private final static long MAX_FLOW_WHITE_UPPER = 30l;//重要用户最多给 300 个查询
public int getCurrentConnectted(String tokenId) {
tokenId = getRealToken(tokenId);
AtomicInteger atomicInteger = flowControlTokens.get(tokenId);
int count = atomicInteger == null ? 0 : atomicInteger.intValue();
return count;
}
private StatLogger statErrorDetailLogger = EagleEye.statLoggerBuilder("a3_flow_token")
.intervalSeconds(10).buildSingleton();
private StatLogger tokenCountErrorDetailLogger = EagleEye.statLoggerBuilder("a3_flow_token_count")
.intervalSeconds(10).buildSingleton();
@Override
public boolean pass(String tokenId) {
return pass(tokenId, null);
}
/**
* @param tokenId
* @return
*/
@Override
public boolean pass(String tokenId, String start) {
tokenId = getRealToken(tokenId);
boolean passDiy = passInn(tokenId, start);
if (passDiy) {
return passGuava(tokenId);
}
log.warn("passInn fail . pass fail tokenId:" + tokenId);
return false;
}
private boolean passGuava(String tokenId) {
boolean isPunish = rateLimiterPunish.isPunish(tokenId);
if (isPunish) {
return limitByPunish(tokenId);
}
boolean queryPass = rateLimiterGuave.pass();
if (queryPass) {
log.info("pass success . pass guava tokenId:" + tokenId);
return true;
}
rateLimiterPunish.punish(tokenId);
return limit(tokenId);
}
private boolean limit(String tokenId) {
log.warn("guava limit . query fail pass guava tokenId:" + tokenId);
unlock(tokenId);
return false;
}
private boolean limitByPunish(String tokenId) {
log.warn("punish limit . query fail pass punish tokenId:" + tokenId);
unlock(tokenId);
return false;
}
/**
* @param tokenId
* @return
*/
public boolean passInn(String tokenId, String start) {
if (!EnvUtils.isProd()) {
return false;//日常不提供api服务了。
}
// tokenId = getRealToken(tokenId);
//2:
if (!flowControlTokens.containsKey(tokenId)) {
flowControlTokens.putIfAbsent(tokenId, new AtomicInteger(0));
}
tokenCountErrorDetailLogger.stat(tokenId).minMax(getCurrentConnectted(tokenId));
long max = MAX_FLOW_UPPER;
// max = 10;
if (tokenChecker.isWhiteList(tokenId)) {
if (EnvUtils.isProd()) {
//TODO:在这里加上通用令牌桶的限流
statErrorDetailLogger.stat(tokenId, "whiteInPro").count();
max = MAX_FLOW_WHITE_UPPER;
return true;//打开全流量
} else {//日常
statErrorDetailLogger.stat(tokenId, "white").count();
max = MAX_FLOW_WHITE_UPPER;
}
}
if (tokenChecker.isBlackList(tokenId) && limitByDate(tokenId, start)) return false;
AtomicInteger tokens = flowControlTokens.get(tokenId);
if (tokens.incrementAndGet() > max) {
tokens.decrementAndGet();
statErrorDetailLogger.stat(tokenId, "flowCont").count();
return false;//不让通过
} else {
return true;
}
}
@Override
public void unlock(String tokenId) {
tokenId = getRealToken(tokenId);
if (tokenChecker.isWhiteList(tokenId)) {
return;
}
AtomicInteger tokens = flowControlTokens.get(tokenId);
tokens.decrementAndGet();
statErrorDetailLogger.stat(tokenId, "success").count();
}
private boolean limitByDate(String tokenId, String date) {
if (date == null) {
return false;
}
statErrorDetailLogger.stat(tokenId, "limitByDate").count();
boolean limit = System.currentTimeMillis() - Long.parseLong(date)
> 3 * 24 * 60 * 60 * 1000;
return limit;
}
public Object getLock(String tokenId) {
tokenId = getRealToken(tokenId);
AtomicInteger tokens = flowControlTokens.get(tokenId);
return tokens;
}
private String getRealToken(String tokenId) {
if (tokenId == null || tokenId.trim().equals("")) {
tokenId = DEFAULT_TOKEN;
} else {//不存在的token乱写的话,也使用公用的流量
if (!tokenChecker.exist(tokenId)) tokenId = DEFAULT_TOKEN;
}
return tokenId;
}
}
@Singleton
public class RateLimiterGuaveImpl implements RateLimiterGuave {
RateLimiter rateLimiter = RateLimiter.create(100);//
/**
* @return
*/
@Override
public boolean pass() {
boolean acquire = rateLimiter.tryAcquire();
return acquire;
}
}
@Singleton
public class RateLimiterPunishImpl implements RateLimiterPunish {
static int PUNISH_MINUTES = 3;
private static final Logger log = LoggerFactory.getLogger(RateLimiterPunishImpl.class);
private static LoadingCache<String, Boolean> badGuys = CacheBuilder.newBuilder()
// .maximumSize(10000)
.expireAfterWrite(PUNISH_MINUTES, TimeUnit.MINUTES)
.concurrencyLevel(4)
.build(new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String token) throws Exception {
return false;
}
});
/**
* @return
*/
@Override
public boolean isPunish(String tokenId) {
Boolean isPunish = badGuys.getUnchecked(tokenId);
if (isPunish) log.error("badGuy true.tokenId:" + tokenId);
return isPunish;
}
@Override
public void punish(String tokenId) {
badGuys.put(tokenId, true);
log.warn("badGuys false -> true.tokenId:" + tokenId);
}
}
public class TokenCheckerImpl implements TokenChecker {
private final List<String> vips;
private final List<String> allowTokens;
private final List<String> illegalTokens;
@Inject
TokenCheckerImpl(@VipToken List<String> vips,
@AllowToken List<String> allowTokens,
@IllegalToken List<String> illegalTokens){
this.vips = vips;
this.allowTokens = allowTokens;
this.illegalTokens = illegalTokens;
}
@Override
public boolean isBlackList(String tokenId) {
if (illegalTokens.contains(tokenId)) {
return true;
}
return false;
}
@Override
public boolean isWhiteList(String tokenId) {
if (vips.contains(tokenId)) {
return true;
}
return false;
}
@Override
public boolean exist(String tokenId) {
if (allowTokens.contains(tokenId)) {
return true;
}
return false;
}
}
使用限流模块
通过清理依赖,模块化后,就很容易集成到各个系统里了。
1:通过 install 将上面整个模块加载进另一个系统
2:通过injector获取对应的实现。(和Spring也有成熟的交互方式)
public class BModule extends AbstractModule{
@Override
protected void configure() {
install(new LimitModule());
}
}
static Limiter limiter;
static {
Injector injector = Guice.createInjector(new LimitModule());
limiter = injector.getInstance(Limiter.class);
}
public static boolean pass(String tokenId) {
return limiter.pass(tokenId);
}