我眼中的服务提供和服务消费

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介:

服务提供和消费脑图

服务提供和消费脑图

 

参见: 服务提供者服务消费者服务注册中心

v服务提供者

1.服务提供者启动,解析xml文件中配置的服务,这里使用Dom4j解析。

2.将服务的一些相关信息注册到 服务注册中心。

注:服务相关信息:服务中心接口url,接口名称,方法名称,参数信息。

3.提供一个接口,服务消费者通过调用这个接口url来调用相应的服务。

参见: 服务提供和消费脑图服务注册中心 (1.注册服务)服务消费者 (3.调用服务)

v服务消费者

1.服务消费者启动,使用dom4j解析xml获取要消费的服务相关接口。

2.根据接口信息去服务注册中心判断是否有对应的注册信息,如果有则通过jdk动态代理生成相应的代理类并注册到spring中(代理方法中会根据服务中心返回的信息(服务提供者的url)去调用服务提供者对应的服务)。

参见: 服务提供和消费脑图服务注册中心 (2.消费服务)服务提供者 (3.调用服务)

v服务注册中心

1.将来自服务提供者信息存储到redis。

2.将服务信息提供给服务消费者。

参见: 服务提供者 (1.注册服务)服务消费者 (2.消费服务)服务提供和消费脑图

v工程示例

  注:示例中为了简单,采用rest请求方式来代替socket连接

v  注册中心

复制代码
@RestController
@RequestMapping("index")
public class IndexController {

    @Autowired
    private RedisCacheTemplate redisCacheTemplate;
   //注册服务提供者信息,将信息放到redis中
    @RequestMapping(value = "register", method = RequestMethod.POST)
    public SimpleResponse register(@RequestBody RegisterMessage registerMessage) {
        try {
            Map<String, Object> map = new HashMap<>();
            for (InterfaceMessage interfaceMessage : registerMessage.getInterfaceMessageList()) {
                interfaceMessage.setProviderUrl(registerMessage.getProviderUrl());
                map.put(ToStringBuilder.reflectionToString(interfaceMessage, ToStringStyle.SHORT_PREFIX_STYLE), true);
            }
            redisCacheTemplate.batchPut(map);
            return SimpleResponse.success(map.size());
        } catch (Exception e) {
            e.printStackTrace();
            return SimpleResponse.error(e.getMessage());
        }
    }

   //消费者拿到配置的服务信息到注册中心来匹配,验证是否存在这个服务
    @RequestMapping(value = "contains", method = RequestMethod.POST)
    public SimpleResponse contains(@RequestBody InterfaceMessage interfaceMessage) {
        try {
            if(redisCacheTemplate.exist(ToStringBuilder.reflectionToString(interfaceMessage, ToStringStyle.SHORT_PREFIX_STYLE))) {
                return SimpleResponse.success(true);
            } else {
                return SimpleResponse.error(null);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return SimpleResponse.error(e.getMessage());
        }
    }

    @RequestMapping(value = "test", method = {RequestMethod.GET, RequestMethod.POST})
    public SimpleResponse test(@RequestParam String providerUrl){
        return SimpleResponse.success(providerUrl);
    }
}
复制代码

v  服务提供者

<?xml version="1.0" encoding="UTF-8"?>
<services-provider>
    <service id="testService" interface="com.hjzgg.simulation.api.ITestService"/>
</services-provider>

  自定义xml,配置将要注册的服务id及对应的接口类。

复制代码
# 内置tomcat服务器配置
server.port=8088
server.context-path=/provider-server

#打印彩色日志
spring.output.ansi.enabled=always

# 日志打印级别
logging.level.root=debug

# service
service.xml.path=classpath:service-provider.xml 自定义服务提供者配置文件 位置
service.provider.path=http://localhost:8088/provider-server/index/provider  服务提供者执行相应服务接口
service.register.path=http://localhost:8090/register-server/index/register  调用注册中心 接口
复制代码
复制代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hjzgg.simulation.common.node.InterfaceMessage;
import com.hjzgg.simulation.common.node.RegisterMessage;
import com.hjzgg.simulation.common.parsexml.BeanNode;
import com.hjzgg.simulation.common.parsexml.ParseServiceXML;
import com.hjzgg.simulation.common.response.ReturnCode;
import com.hjzgg.simulation.common.utils.RestTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.http.MediaType;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;

public class Registrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
    private static Logger logger = LoggerFactory.getLogger(Registrar.class);

    private String servicesXmlPath;
    private String serviceProviderPath;
    private String serviceRegisterPath;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        List<BeanNode> beanNodes = ParseServiceXML.getProviderServices(servicesXmlPath); 解析自定义服务提供配置文件
        List<InterfaceMessage> list = new ArrayList<>();
        for(BeanNode beanNode : beanNodes) { 根据服务对应id去 寻找实现的 bean
            if(!registry.containsBeanDefinition(beanNode.getBeanName())) {
                logger.error("接口" + beanNode.getBeanName() + " " + beanNode.getInterfaceCls().getTypeName() + " 没有对应的实现类");
            } else {
                InterfaceMessage interfaceMessage = new InterfaceMessage();
                interfaceMessage.setBeanName(beanNode.getBeanName());
                interfaceMessage.setInterfacType(beanNode.getInterfaceCls().getTypeName());
                list.add(interfaceMessage);
            }
        }
        if(!CollectionUtils.isEmpty(list)) { 将配置的服务信息发送的注册中心
            RegisterMessage registerMessage = new RegisterMessage();
            registerMessage.setProviderUrl(this.serviceProviderPath);
            registerMessage.setInterfaceMessageList(list);
            try {
                String result = RestTemplateUtils.post(this.serviceRegisterPath, (JSONObject) JSON.toJSON(registerMessage), MediaType.APPLICATION_JSON_UTF8);
                JSONObject retJson = JSONObject.parseObject(result);
                if(retJson.getInteger("code") == ReturnCode.SUCCESS.getValue()) {
                    logger.debug("服务注册成功...");
                } else {
                    logger.error("服务注册失败..." + retJson.getString("msg"));
                }
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("服务注册失败..." + e.getMessage());
            }
        }
    }

    @Override
    public void setEnvironment(Environment environment) { 获取环境变量
        this.servicesXmlPath = environment.getProperty("service.xml.path");
        this.serviceProviderPath = environment.getProperty("service.provider.path");
        this.serviceRegisterPath = environment.getProperty("service.register.path");

        assert(StringUtils.isNotEmpty(this.serviceProviderPath) && StringUtils.isNotEmpty(serviceRegisterPath) &&
        StringUtils.isNotEmpty(this.servicesXmlPath));
    }
}
复制代码
复制代码
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
 * Created by hujunzheng on 2017/7/7.
 */

@Configuration
@Import(Registrar.class)
public class Config { 注册服务配置启动
}
复制代码
复制代码
import com.hjzgg.simulation.common.node.ServiceMessage;
import com.hjzgg.simulation.common.response.SimpleResponse;
import com.hjzgg.simulation.common.utils.ContextUtils;
import com.hjzgg.simulation.common.utils.SerializeUtil;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by hujunzheng on 2017/7/8.
 */
@RestController
@RequestMapping("index")
public class IndexController {

服务提供者执行相应服务接口 @RequestMapping(value
= "invoke", method = RequestMethod.POST) public Object invoke(@RequestParam String serviceMessageBody) { try {
根据消费者传递的服务信息 找到对应的服务bean以及方法,并利用反射执行方法,最后返回结果 ServiceMessage serviceMessage
= (ServiceMessage) SerializeUtil.unserialize(Hex.decodeHex(serviceMessageBody.toCharArray())); Object bean = null; if((bean = ContextUtils.getBean(serviceMessage.getBeanName(), serviceMessage.getRequireType())) != null) { List<Class<?>> classList = new ArrayList<>(); if(serviceMessage.getArgs() != null) { for (Object obj : serviceMessage.getArgs()) { classList.add(obj.getClass()); } } Method method = ReflectionUtils.findMethod(bean.getClass(), serviceMessage.getMethodName(), classList.toArray(new Class<?>[0])); if(method != null) { return method.invoke(bean, serviceMessage.getArgs()); } else { return SimpleResponse.error("服务" + serviceMessage.getRequireType().getTypeName() + "中没有对应参数" + ToStringBuilder.reflectionToString(classList) + "的" + serviceMessage.getMethodName() + "方法"); } } else { return SimpleResponse.error("没有名称为" + serviceMessage.getBeanName() + "且类型为" + serviceMessage.getRequireType().getTypeName() + "对应的bean"); } } catch (Exception e) { e.printStackTrace(); return SimpleResponse.error(e.getMessage()); } } }
复制代码

v  服务消费者

<?xml version="1.0" encoding="UTF-8"?>
<services-consumer>
    <service ref="testService" interface="com.hjzgg.simulation.api.ITestService" url="http://localhost:8088/provider-server/index/provider"/>
</services-consumer>

  自定义服务消费者配置,服务引用名称,接口类型,调用服务提供者URL

复制代码
# 内置tomcat服务器配置
server.port=8089
server.context-path=/consumer-server

#打印彩色日志
spring.output.ansi.enabled=always

# 日志打印级别
logging.level.root=debug

# service xml
service.xml.path=classpath:service-consumer.xml 自定义服务消费配置文件位置
service.contains.url=http://localhost:8090/register-server/index/contains 注册中心服务查询接口
service.invoke.url=http://localhost:8088/provider-server/index/invoke 服务提供者执行相应服务接口
复制代码
复制代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.hjzgg.simulation.common.dynamic.JdkDynamicProxy;
import com.hjzgg.simulation.common.node.InterfaceMessage;
import com.hjzgg.simulation.common.parsexml.BeanNode;
import com.hjzgg.simulation.common.parsexml.ParseServiceXML;
import com.hjzgg.simulation.common.register.SpringBeanRegister;
import com.hjzgg.simulation.common.response.ReturnCode;
import com.hjzgg.simulation.common.utils.RestTemplateUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.http.MediaType;

import java.util.Iterator;
import java.util.List;

public class Registrar implements ImportBeanDefinitionRegistrar, EnvironmentAware{

    private Logger logger = LoggerFactory.getLogger(Registrar.class);

    private String servicesXmlPath;
    private String serviceContainsPath;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
解析自定义服务消费配置文件 List
<BeanNode> beanNodes = ParseServiceXML.getConsumerServices(servicesXmlPath); 判断注册中心 是否注册了这个服务了 for(Iterator<BeanNode> it = beanNodes.iterator(); it.hasNext(); ) { BeanNode beanNode = it.next(); InterfaceMessage interfaceMessage = new InterfaceMessage(); interfaceMessage.setProviderUrl(beanNode.getUrl()); interfaceMessage.setInterfacType(beanNode.getInterfaceCls().getTypeName()); interfaceMessage.setBeanName(beanNode.getBeanName()); try { String result = RestTemplateUtils.post(this.serviceContainsPath, (JSONObject) JSON.toJSON(interfaceMessage), MediaType.APPLICATION_JSON_UTF8); JSONObject retJson = JSON.parseObject(result); if (retJson.getInteger("code") == ReturnCode.FAILURE.getValue()) { it.remove(); logger.error(interfaceMessage.getBeanName() + "对应类型" + interfaceMessage.getInterfacType() + "的服务在" + interfaceMessage.getProviderUrl() + "上没有注册"); } } catch (Exception e) { e.printStackTrace(); logger.error("服务" + interfaceMessage.getBeanName() + "对应类型" + interfaceMessage.getInterfacType() + "查找失败..." + e.getMessage()); } }
将与注册中心一直的服务 以 动态代理的方式 注册到spring中 SpringBeanRegister.registerBean(importingClassMetadata, registry, beanNodes); } @Override
public void setEnvironment(Environment environment) { 设置环境变量 this.servicesXmlPath = environment.getProperty("service.xml.path"); this.serviceContainsPath = environment.getProperty("service.contains.url"); String serviceInvokePath = environment.getProperty("service.invoke.url"); assert(StringUtils.isNotEmpty(serviceContainsPath) && StringUtils.isNotEmpty(this.servicesXmlPath) && StringUtils.isNotEmpty(serviceInvokePath)); JdkDynamicProxy.setServerProviderInvokeUrl(serviceInvokePath); } }
复制代码
复制代码
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
 * Created by hujunzheng on 2017/7/7.
 */

@Configuration
@Import(Registrar.class)
public class Config {
}
复制代码

v   测试一下

  api接口

复制代码
import com.alibaba.fastjson.JSONObject;

/**
 * Created by hujunzheng on 2017/7/7.
 */
public interface ITestService {
    JSONObject testService();
}
复制代码

  服务提供者对应的实例

复制代码
import com.alibaba.fastjson.JSONObject;
import com.hjzgg.simulation.api.ITestService;
import org.springframework.stereotype.Service;

/**
 * Created by hujunzheng on 2017/7/8.
 */
@Service("testService")
public class TestServiceImpl implements ITestService {
    @Override
    public JSONObject testService() {
        JSONObject result = new JSONObject();
        result.put("name", "hujunzheng");
        result.put("age", 25);
        return result;
    }
}
复制代码

  消费者对应的测试

复制代码
import com.hjzgg.simulation.api.ITestService;
import com.hjzgg.simulation.common.response.SimpleResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * Created by hujunzheng on 2017/7/9.
 */
@RestController
@RequestMapping("index")
public class ConsumerController {

    @Autowired
    private ITestService testService;


    @RequestMapping("test")
    public SimpleResponse test() {
        try {
            return SimpleResponse.success(testService.testService());
        } catch (Exception e) {
            e.printStackTrace();
            return SimpleResponse.error(e.getMessage());
        }
    }
}
复制代码

  这就是我的实现方式,就说到这里了。最后我想说,思路很重要,掌握的知识很重要,多积累,多思考,任重而道远。最后附上我从中积累的知识和经验。

v知识和经验

v  执行顺序及ProxyFactoryBean实现

  
 
  Public class ProxyFactoryBean implements FactoryBean, InitializingBean;
  方法执行顺序 getObjectType->afterPropertiesSet->getObject
  bean 的属性设置的 先于 getObjectType
 

v  Springboot 工程自定义jar包中获取上下文工具类

  
  要加上 @Component注解
 

v  实体类型(例如下面)网络传输方法,避免字符串编码格式问题

  发送请求
ServiceMessage serviceMessage =  new ServiceMessage() ;
。。。。。

JSONObject params =  new JSONObject() ;
params.put( "serviceMessageBody" Hex.encodeHexString(SerializeUtil.serialize(serviceMessage))) ;

Class<?> returnType = method.getReturnType() ;
return RestTemplateUtils. post( SERVER_PROVIDER_INVOKE_URL params MediaType. APPLICATION_FORM_URLENCODED returnType) ;
  接收请求
@RequestMapping(value = "invoke", method = RequestMethod.POST)
public Object invoke( @RequestParam String serviceMessageBody) {
    try {
        ServiceMessage serviceMessage = (ServiceMessage)  SerializeUtil.unserialize(Hex.decodeHex(serviceMessageBody.toCharArray()));
      。。。。。 
}
  参考工具类
<dependency>
    <groupId>commons-codec </groupId>
    <artifactId>commons-codec </artifactId>
</dependency>
  Hex实现十六进制字符串和byte[]之间的转换,另附 RestTemplateUtils工具链接









本文转自 小眼儿 博客园博客,原文链接:http://www.cnblogs.com/hujunzheng/p/7131212.html,如需转载请自行联系原作者
目录
相关文章
|
5月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
145 62
|
8月前
|
消息中间件 存储 运维
消费者组大观:5种状态,1场分布式奇迹
消费者组大观:5种状态,1场分布式奇迹
245 0
|
消息中间件 canal 运维
听叔一句劝,消息队列的水太深,你把握不住!
听叔一句劝,消息队列的水太深,你把握不住!
109 0
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
268 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增 加消费者有用吗?
面试官:RocketMQ 消息积压了,增 加消费者有用吗? 我:这个要看具体的场景,不同的场景下情况是不一样的。 面试官:可以详细说一下吗? 我:如果消费者的数量小于 MessageQueue 的数量,增加消费者可以加快消 息消费速度,减少消 息积压。比如一个 Topic 有 4 个 MessageQueue,2 个消费者进行消费,如果增加一个消费者,明细可以加快拉取消息的频率。如下图:
|
安全 搜索推荐 生物认证
AXON天机:誓要赢得消费者的心
8月18日的深圳,正在举行一场粉丝见面会。二十多位粉丝当中,坐着一位身穿黑色T恤的中年人,他就是中兴通讯执行副总裁、终端CEO曾学忠。此时,他正在满怀激情地介绍中兴上个月发布的AXON天机、AXON WATCH和Spro2智能微型投影仪三大新品。
146 0
AXON天机:誓要赢得消费者的心
|
小程序 云栖大会 双11
消费“集体出圈”,企业创未来必备都在这
消费主张也在逐渐转变为去中心化、去物质化。企业把握理想的营销,又该如何展开攻势?
消费“集体出圈”,企业创未来必备都在这
|
Java 程序员
求求你们,别消费程序员了!
最近一段时间,微博、朋友圈都被程序员刷屏了。 先是微博上充斥着各种程序员格子衫的段子,紧接着又有各种程序员穿搭指南被刷屏了,虽然比较幽默,但是幽默中暗示程序员没品、秃头,然后再加上邋遢、情商低、没女朋友等等都跟程序员联系在一起,被各个行业津津乐道,不了解的还以为程序员是这个世界奇葩的物种一样,今天我得好好说道说道。
1019 0