开发者社区> 柏辰爸爸> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Hadoop Common源码分析之SerializationFactory、Serialization

简介:         SerializationFactory是Hadoop中的一个序列化器工厂,它除了用来存储序列化器种类、更新序列化器相关配置信息,还提供了根据指定待序列化类获取相匹配的序列化器和反序列化器的功能。
+关注继续查看

        SerializationFactory是Hadoop中的一个序列化器工厂,它除了用来存储序列化器种类、更新序列化器相关配置信息,还提供了根据指定待序列化类获取相匹配的序列化器和反序列化器的功能。

        SerializationFactory中的关键成员变量只有一个,如下:

private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
        serializations是一个存储实现Serialization接口对象的列表,而Serialization代表了序列化器或反序列化器的公共行为,它实际上是对序列化器/反序列化器对的压缩,列表中的对象其实是(序列化器/反序列化器对)的结合体,提供了以下三个方法:

/**
 * <p>
 * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
 * </p>
 * @param <T>
 */
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public interface Serialization<T> {
  
  /**
   * Allows clients to test whether this {@link Serialization}
   * supports the given class.
   * 允许客户端去检测这个Serialization是否支持给定Class
   */
  boolean accept(Class<?> c);
  
  /**
   * 为给定Class获取一个序列化器Serializer
   * @return a {@link Serializer} for the given class.
   */
  Serializer<T> getSerializer(Class<T> c);

  /**
   * 为给定Class获取一个反序列化器Deserializer
   * @return a {@link Deserializer} for the given class.
   */
  Deserializer<T> getDeserializer(Class<T> c);
}
        很明显,accept(Class<?> c)方法允许客户端去检测这个Serialization是否支持给定Class,getSerializer(Class<T> c)方法为给定Class获取一个序列化器Serializer,而getDeserializer(Class<T> c)方法为给定Class获取一个反序列化器Deserializer。

        我们再看SerializationFactory的构造函数,它通过读取配置信息conf中的io.serializations参数来确定Serializations,这个参数是一个逗号隔开的类名列表,代码如下:

  /**
   * <p>
   * Serializations are found by reading the <code>io.serializations</code>
   * property from <code>conf</code>, which is a comma-delimited list of
   * classnames.
   * 通过读取配置信息conf中的io.serializations参数来确定Serializations,这个参数是一个逗号隔开的类名列表
   * </p>
   */
  public SerializationFactory(Configuration conf) {
    
	// 调用父类构造函数  
	super(conf);
    
    // 遍历参数io.serializations配置的序列器名称serializerName,
    // 参数未配置默认为WritableSerialization、AvroSpecificSerialization、AvroReflectSerialization三个
    for (String serializerName : conf.getStrings(
      CommonConfigurationKeys.IO_SERIALIZATIONS_KEY,
      new String[]{WritableSerialization.class.getName(),
        AvroSpecificSerialization.class.getName(),
        AvroReflectSerialization.class.getName()})) {
    	
      // 调用add(conf, serializerName)方法添加序列化器
      add(conf, serializerName);
    }
  }
        构造函数先调用父类构造函数,然后遍历参数io.serializations配置的序列器名称serializerName,参数未配置默认为WritableSerialization、AvroSpecificSerialization、AvroReflectSerialization三个,然后调用add(conf, serializerName)方法添加每个序列化器名称。继续追踪add()方法,代码如下:

  @SuppressWarnings("unchecked")
  private void add(Configuration conf, String serializationName) {
    try {
      // 通过类名serializationName获取Class,即serializionClass
      Class<? extends Serialization> serializionClass =
        (Class<? extends Serialization>) conf.getClassByName(serializationName);
      
      // 通过反射生成类serializionClass的对象,并将其加入到serializations列表
      serializations.add((Serialization)
      ReflectionUtils.newInstance(serializionClass, getConf()));
    } catch (ClassNotFoundException e) {
      LOG.warn("Serialization class not found: ", e);
    }
  }
        add()方法很简单,就做了两件事:

        1、通过类名serializationName获取Class,即serializionClass;

        2、通过反射生成类serializionClass的对象,这个对象其实是(序列化器/反序列化器对)的结合体,并将其加入到serializations列表。

        SerializationFactory还提供了为指定类获取序列化器、反序列化器的getSerializer(Class<T> c)和getDeserializer(Class<T> c)方法,代码如下:

  // 为指定类c获取序列化器Serializer
  public <T> Serializer<T> getSerializer(Class<T> c) {
	// 根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      // 通过serializer的getSerializer()方法为指定类c获取序列化器的实例
      return serializer.getSerializer(c);
    }
    // 找不到则返回null
    return null;
  }

  // 为指定类c获取反序列化器Deserializer
  public <T> Deserializer<T> getDeserializer(Class<T> c) {
	// 根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer
    Serialization<T> serializer = getSerialization(c);
    if (serializer != null) {
      // 通过serializer的getSerializer()方法为指定类c获取反序列化器的实例
      return serializer.getDeserializer(c);
    }
    // 找不到则返回null
    return null;
  }

  // 为指定类c获取(序列化器/反序列化器对)对象
  @SuppressWarnings("unchecked")
  public <T> Serialization<T> getSerialization(Class<T> c) {
	// 遍历serializations列表,获取其中每个Serialization对象serialization
    for (Serialization serialization : serializations) {
    	
      // 调用serialization的accept()方法,如果其与c相匹配,则返回该serialization
      if (serialization.accept(c)) {
        return (Serialization<T>) serialization;
      }
    }
    
    // 找不到则返回null
    return null;
  }
        两个方法的处理逻辑一致,大体如下:

        1、根据指定类c,调用getSerialization()方法,获取对应序列化器serializer,实际上是确定(序列化器/反序列化器对)对象serializer;

        2、通过serializer的getSerializer()方法或getDeserializer()方法为指定类c获取序列化器或发序列化器的实例,并返回;

        3、找不到则返回null。

        而两者共用的getSerialization()方法则是为指定类c获取(序列化器/反序列化器对)对象,处理逻辑大致为:遍历serializations列表,获取其中每个Serialization对象serialization,调用serialization的accept()方法,如果其与c相匹配,则返回该serialization,找不到则返回null。




        



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Hadoop之MapReduce04【客户端源码分析】
客户端源码分析 启动的客户端代码 public static void main(String[] args) throws Exception { // 创建配置文件对象 Configuration conf = new Configuration(true); // 获取Job对象 Job job = Job.getInstance(conf); // 设置相关类 job.setJarByClass(WcTest.class);
0 0
Hadoop Yarn事件处理框架源码分析
由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。
826 0
Hadoop Common源码分析之服务Service
        Service是定义Hadoop中服务生命周期的一个接口。Service内部定义了服务的状态及生命周期,在服务被构造后,其一个生命周期内的状态为NOTINITED未初始化--INITED已初始化--已启动STARTED--已停止STOPPED,而这一生命周期内服务状态的变化,...
675 0
+关注
柏辰爸爸
专注于HDFS、HBase、Yarn、Spark、Kafka等领域研发
文章
问答
文章排行榜
最热
最新
相关电子书
更多
Why is my Hadoop* job slow
立即下载
Hadoop存储与计算分离实践
立即下载
\"Hadoop的过去现在和未来——从阿里云梯到E-MapReduce \"
立即下载