背景介绍
我所在的项目组,使用的技术一直是接近原始社会的:jdk1.6 + SpringMVC + hessian + Mybatis,当前最火的中间件技术Redis、MQ是完全没有用到,更别说什么分布式这种高大上的东西了。开始一直以为可能接下来的日子我都会在原始社会中渡过,直到我跳槽到另一家公司。
事情总是在最不抱希望的时候出现转机,最近老大指派我牵头做分布式改造。作为技术痴的我,刚接到这个消息的时候别提有多激动了。虽然平时空闲时间会去学习最新的一些技术,但总觉得没怎么学进去。一是年纪大了,对于一个东西不用就忘;二是都只是敲一些demo代码,相信各位大神都知道,真正项目中遇到的技术难题永远比demo中的那些小问题复杂的多。
废话不多说,先来说说这次分布式改造的预期:
- 应用的分布式:这一点很容易理解,就是希望以前的单节点应用能够部署成多节点,一个请求可以转发到多个节点中的任意一个
- 缓存的分布式:好在我们项目对缓存的依赖性不是特别高,项目中使用的缓存也大部分仅仅是为了提升效率。对于内存缓存(Ehcache),希望在不同节点间能够同步,对数据的实时性和一致性要求不是特别高
- 锁的分布式:业务互斥其实一直是我们项目的一个复杂所在。因为是金融行业,一旦业务互斥没有做好,就会出现严重的资金风险。对锁的可靠性要求特别高,对于互斥的业务锁,只要一个节点能够拿到,其他节点一定不能拿到。项目以前的实现是直接通过内存中的一个ConcurrentHashMap去实现的。如果多节点部署的话,很显然每个节点都会存在一个内存锁,原来的锁将完全不起作用
当然除了预期之外,考虑到部署环境的复杂性:一共几十套环境,后面可能上百。有的部署在腾讯云上,有的部署在客户自己内部系统中,一个微小的部署变动可能会被放大几十倍。所以领导除了给出预期之外,还给了以下两点要求:
- 尽量不要升级JDK
- 尽量不要引入新的中间件或者新的外部应用部署
好了,背景暂且交代到这里。我们基本对这次分布式改造剧集有了了解,下面开始进入正片…...
第一章:纯DIY分布式
第一集: 应用分布式改造
探索之路
对于当今的互联网企业码农,只要公司不处于原始社会。分析了上面的需求之后,对于这种请求多节点转发负载的,很自然地就会想到nginx。没错,我开始也想到了nginx,并且本地测试了,对于当前项目的转发只需下面简单的配置即可:
upstream apps {
server 127.0.0.1:8380;
server 127.0.0.1:8480;
server 127.0.0.1:8580;
}
server {
listen 9999;
server_name 127.0.0.1:9999;
#本机的hessian服务最终都发布成 */service/*.hs的形式
location /service/ {
proxy_pass http://apps/myapp-war/;
}
}
但是各位别忘了要求的第二点,好吧?忘了的话请再次回头看上面 。显然,引入nginx这种方式不行…...
既然不能引入这种转发的应用,那么只有DIY一个调用分发的实现了。那么,究竟如何实现呢?永远不要忘记没有什么问题是看源码解决不了的,如果有,那么请debug源码。
hessian与Spring的集成,客户端最终注入到Spring容器中的bean类为org.springframework.remoting.caucho.HessianProxyFactoryBean
,我们跟进这个类的源码,发现该类的上下级关系为:
HessianProxyFactoryBean extends HessianClientInterceptor implements FactoryBean<Object>
最终实际客户端的调用时通过HessianClientInterceptor
类的invoke方法来实现的,该类的主要代码如下:
public class HessianClientInterceptor extends UrlBasedRemoteAccessor implements MethodInterceptor {
private HessianProxyFactory proxyFactory = new HessianProxyFactory();
private Object hessianProxy;
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
// 类初始化完成之后会调用prepare方法,对hessianProxy进行初始化
prepare();
}
/**
* Initialize the Hessian proxy for this interceptor.
* @throws RemoteLookupFailureException if the service URL is invalid
*/
public void prepare() throws RemoteLookupFailureException {
try {
this.hessianProxy = createHessianProxy(this.proxyFactory);
}
catch (MalformedURLException ex) {
throw new RemoteLookupFailureException("Service URL [" + getServiceUrl() + "] is invalid", ex);
}
}
/**
* Create the Hessian proxy that is wrapped by this interceptor.
* @param proxyFactory the proxy factory to use
* @return the Hessian proxy
* @throws MalformedURLException if thrown by the proxy factory
* @see com.caucho.hessian.client.HessianProxyFactory#create
*/
protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
Assert.notNull(getServiceInterface(), "'serviceInterface' is required");
// 根据配置文件中的配置创建代理类
return proxyFactory.create(getServiceInterface(), getServiceUrl(), getBeanClassLoader());
}
// 最终hessian调用时调用的方法
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (this.hessianProxy == null) {
throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
"invoke 'prepare' before attempting any operations");
}
ClassLoader originalClassLoader = overrideThreadContextClassLoader();
try {
// 这一句特别关键,最终是使用前面初始化过的hessianProxy的对应方法,最终的hessian地址也存在该对象中
return invocation.getMethod().invoke(this.hessianProxy, invocation.getArguments());
}
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
// Hessian 4.0 check: another layer of InvocationTargetException.
if (targetEx instanceof InvocationTargetException) {
targetEx = ((InvocationTargetException) targetEx).getTargetException();
}
if (targetEx instanceof HessianConnectionException) {
throw convertHessianAccessException(targetEx);
}
else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
Throwable cause = targetEx.getCause();
throw convertHessianAccessException(cause != null ? cause : targetEx);
}
else if (targetEx instanceof UndeclaredThrowableException) {
UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
throw convertHessianAccessException(utex.getUndeclaredThrowable());
}
else {
throw targetEx;
}
}
catch (Throwable ex) {
throw new RemoteProxyFailureException(
"Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
}
finally {
resetThreadContextClassLoader(originalClassLoader);
}
}
仔细分析上面源码和我加的中文注释,不难发现解决问题的关键就在与在实际调用之前替换hessianProxy或者针对同一个hessianProxy替换其指向的url。即我们需要对原来注入到Spring容器中的org.springframework.remoting.caucho.HessianProxyFactoryBean
类做定制,替换成我们的类,然后在调用之前动态替换hessianProxy。一种方式是对于需要路由的服务接口xml声明做替换:
<bean id="channelAccTaskServiceFacade" class="com.rampage.distribute.factory.DistributeHessainProxyFactoryBean">
<!--hessian.rampage.server为分布式之前配置文件配置的固定服务器地址 -->
<property name="serviceUrl" value="${hessian.rampage.server}/services/helloService.hs" />
<property name="serviceInterface" value="com.rampage.demo.facade.cle.service.HelloService" />
</bean>
这种可能对代码的改动较大,而且如果不想实现路由的话又得替换回来。另外一种实现就像前面我写的服务定制一样,在Springbean定义加载完成初始化之前做拦截,对bean进行增强替换,这里我们采用第二种方式。因为这样更灵活,可以自定义替换规则。
实现Demo
我这里给出我的一个实现demo,大致步骤分为如下几步:
- 定义一个注解,用来添加到hessian服务接口上,表示该服务需要在客户端调用的时候进行分布式增强,分发到不同节点:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface Distribute {
}
- 定义一个BFP, 在bean初始化之前根据是否增强为分布式bean策略,来决定是否对hessian服务进行增强:
// 定义一个BFP,在bean初始化之前进行增强
@Component
public class DistributeBeanFactoryPostProcessor implements BeanFactoryPostProcessor, Ordered {
private static final Logger LOGGER = LoggerFactory.getLogger(DistributeBeanFactoryPostProcessor.class);
private DistributeStrategy distributeStrategy;
private boolean openDistribute;
{
// 从配置文件加载是否开启分布式以及分布式分发策略
Properties properties = new Properties();
try {
properties.load(DistributeBeanFactoryPostProcessor.class.getClassLoader().getResource("app-config.properties").openStream());
} catch (IOException e) {
LOGGER.error("加载classPath下配置文件【app-config.properties】失败!", e);
}
openDistribute = Libs.toBoolean(properties.getProperty("open.hessian.client.distribute", "false"), false);
if (openDistribute) {
Class<?> clazz = null;
String strategyClassName = properties.getProperty("hessian.client.distribute.strategy", "com.ramapge.distribute.AnnotationDistributeStrategy");
try {
clazz = Class.forName(strategyClassName);
distributeStrategy = (DistributeStrategy) clazz.newInstance();
} catch (Exception e) {
openDistribute = false;
LOGGER.error("初始化分布式策略类失败!", e);
}
}
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
// 未开启分布式策略,则直接返回
if (!openDistribute) {
LOGGER.error("未开启分布式分发策略, 跳过分布式BeanFactory后置处理......");
return;
}
LOGGER.info("进入分布式策略BeanFactory后置处理, 分布式策略类为【{}】......", distributeStrategy.getClass().getName());
String[] beanDefNames = beanFactory.getBeanDefinitionNames();
if (ArrayUtils.isEmpty(beanDefNames)) {
return;
}
BeanDefinition beanDef = null;
// 替换hessian客户端的实现类为分布式hessian支持类
for (String beanName : beanDefNames) {
beanDef = beanFactory.getBeanDefinition(beanName);
// 如果满足分布式分发策略,则替换hessian客户端工厂类 FIXME: 测试渠道日结分布式,后续删掉该判断条件
if (distributeStrategy.doDistribute(beanFactory, beanDef)) {
beanDef.setBeanClassName("com.rampage.distribute.DistributeHessainProxyFactoryBean");
}
}
}
}
// 是否转变成分布式策略
public interface DistributeStrategy {
boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef);
}
// 注解转发策略,这里是如果有对应的Distribute注解,则将其变成分布式调用
public class AnnotationDistributeStrategy implements DistributeStrategy {
@Override
public boolean doDistribute(ConfigurableListableBeanFactory beanFactory, BeanDefinition beanDef) {
if (!"org.springframework.remoting.caucho.HessianProxyFactoryBean".equals(beanDef.getBeanClassName())) {
return false;
}
// 只分发有@Distribute注解的bean
MutablePropertyValues pv = beanDef.getPropertyValues();
if (!pv.contains("serviceInterface")) {
return false;
}
TypedStringValue interfaceName = (TypedStringValue) pv.getPropertyValue("serviceInterface").getValue();
try {
Class<?> hessianInterface = Thread.currentThread().getContextClassLoader().loadClass(interfaceName.getValue());
Distribute distribute = hessianInterface.getAnnotation(Distribute.class);
if (distribute == null) {
return false;
}
} catch (ClassNotFoundException e) {
return false;
}
return true;
}
}
- 定制的、hessianProxyFactoryBean实现:
/**
* 分布式Hessian Bean工厂
* @author ziyuqi
*
*/
public class DistributeHessainProxyFactoryBean extends HessianProxyFactoryBean {
/**
* 从hessian代理类列表
*/
private List<Object> slaveHessianProxies = new ArrayList<Object>();
/**
* 主hessian代理类
*/
private Object masterHessianProxy;
private HessianProxyFactory proxyFactory;
@Override
protected Object createHessianProxy(HessianProxyFactory proxyFactory) throws MalformedURLException {
// 将配置中的proxy设置为主代理类,并且返回null
this.masterHessianProxy = super.createHessianProxy(proxyFactory);
this.proxyFactory = proxyFactory;
return null;
}
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
// TODO: 实现从节点可配置,动态读取当前配置信息进行转发
// 初始化从Hessian代理列表
String masterServiceUrl = getServiceUrl();
int suffixIndex = masterServiceUrl.lastIndexOf("/services/");
// 配置文件中配置的http://127.0.0.1:8580/myapps-war作为主节点,这里demo写死两个从节点
String[] slavePrefixes = new String[] {"http://127.0.0.1:8480/myapps-war", "http://127.0.0.1:8580/myapps-war"};
for (String slavePrefix : slavePrefixes) {
try {
Object slaveHessianProxy = this.proxyFactory.create(getServiceInterface(), slavePrefix + masterServiceUrl.substring(suffixIndex), getBeanClassLoader());
slaveHessianProxies.add(slaveHessianProxy);
} catch (MalformedURLException e) {
throw new RemoteLookupFailureException("Service URL [" + slavePrefix + getServiceUrl() + "] is invalid", e);
}
}
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (this.masterHessianProxy == null && this.slaveHessianProxies.isEmpty()) {
throw new IllegalStateException("HessianClientInterceptor is not properly initialized - " +
"invoke 'prepare' before attempting any operations");
}
ClassLoader originalClassLoader = overrideThreadContextClassLoader();
try {
return invocation.getMethod().invoke(routeHessianProxy(invocation), invocation.getArguments());
}
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
// Hessian 4.0 check: another layer of InvocationTargetException.
if (targetEx instanceof InvocationTargetException) {
targetEx = ((InvocationTargetException) targetEx).getTargetException();
}
if (targetEx instanceof HessianConnectionException) {
throw convertHessianAccessException(targetEx);
}
else if (targetEx instanceof HessianException || targetEx instanceof HessianRuntimeException) {
Throwable cause = targetEx.getCause();
throw convertHessianAccessException(cause != null ? cause : targetEx);
}
else if (targetEx instanceof UndeclaredThrowableException) {
UndeclaredThrowableException utex = (UndeclaredThrowableException) targetEx;
throw convertHessianAccessException(utex.getUndeclaredThrowable());
}
else {
throw targetEx;
}
}
catch (Throwable ex) {
throw new RemoteProxyFailureException(
"Failed to invoke Hessian proxy for remote service [" + getServiceUrl() + "]", ex);
}
finally {
resetThreadContextClassLoader(originalClassLoader);
}
}
/**
* 路由hessian调用
* @param invocation 方法调用对象
* @return 路由hessian代理对象
*/
private Object routeHessianProxy(MethodInvocation invocation) {
if (this.slaveHessianProxies.isEmpty()) {
return this.masterHessianProxy;
}
// TODO: 修改随机选取算法
int totalCount = this.slaveHessianProxies.size() + 1;
int nextIndex = (new Random()).nextInt(totalCount);
if (nextIndex == 0) {
return this.masterHessianProxy;
}
return this.slaveHessianProxies.get(nextIndex - 1);
}
}
- 对于想增强为分布式服务调用的接口,在其上加上@Distribute注解:
@Distribute
public interface HelloService {
void sayHello(String name);
}
展望
前面的简单实现Demo,虽然简单,但其实也预留了一些可以扩展的点可供后续展望:
- DistributeStrategy可以定制,如果只针对特定路径下的服务接口做增强,可以只替换策略,不需要对原来代码做改动
- 如果使用Distribute注解可以考虑增加一个服务组的概念,增对不同的服务组进行不同的地址转发处理
- 可以考虑增加一个界面对于从节点进行增删该,可以增加权重的概念,根据不同的权重定制不同的路由规则
后续
这只是DIY分布式改造第一篇只应用分布式改造,后续还有缓存分布式及锁的分布式改造,出于篇幅和时间限制,今天暂时写到这里。后续再做更新。
黎明前最黑暗,成功前最绝望!