[Avro]Avro入门例子

简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/52573164 1.
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/SunnyYoona/article/details/52573164
1. Maven配置

pom.xml

 
 
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5.    <parent>
  6.        <artifactId>open-diary</artifactId>
  7.        <groupId>com.sjf.open</groupId>
  8.        <version>1.0-SNAPSHOT</version>
  9.    </parent>
  10.    <modelVersion>4.0.0</modelVersion>
  11.    <artifactId>open.avro</artifactId>
  12.    <dependencies>
  13.        <dependency>
  14.            <groupId>org.apache.avro</groupId>
  15.            <artifactId>avro</artifactId>
  16.        </dependency>
  17.    </dependencies>
  18.    <build>
  19.        <plugins>
  20.            <plugin>
  21.                <groupId>org.apache.avro</groupId>
  22.                <artifactId>avro-maven-plugin</artifactId>
  23.                <version>1.8.1</version>
  24.                <executions>
  25.                    <execution>
  26.                        <phase>generate-sources</phase>
  27.                        <goals>
  28.                            <goal>schema</goal>
  29.                        </goals>
  30.                        <configuration>
  31.                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
  32.                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
  33.                        </configuration>
  34.                    </execution>
  35.                </executions>
  36.            </plugin>
  37.            <plugin>
  38.                <groupId>org.apache.maven.plugins</groupId>
  39.                <artifactId>maven-compiler-plugin</artifactId>
  40.                <configuration>
  41.                    <source>1.6</source>
  42.                    <target>1.6</target>
  43.                </configuration>
  44.            </plugin>
  45.        </plugins>
  46.    </build>
  47. </project>

2. 定义模式(schema)

Avro的数据格式(schema)采用JSON定义,shcema由原始类型(null,boolean,int,long,float,double,bytes,string)和复杂类型(record,enum,array,map,union,fixed)组成,schema文件通常以.avsc结尾,代表avro schema。例如我们定义一个代表User的schema,文件名为user.avsc,放在src/main/avro目录下,这个必须与pom.xml文件配置的一致:

 
 
  1. <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>

下面来定义模式:

 
 
  1. {
  2.  "namespace":"com.sjf.open",
  3.  "type":"record",
  4.  "name":"User",
  5.  "fields":[
  6.    {"name":"name","type":"string"},
  7.    {"name":"age","type":["int","null"]},
  8.    {"name":"favorite","type":["string","null"]}
  9.  ]
  10. }

上述定义中,namespace和name共同组成schema 的全称: com.sjf.open.User,表示将会在com.sjf.open包下创建User对象。type指定类型为复杂类型record,fileds数组定义record的字段。字段中的type指定为数组,代表该字段可以是int或者null类型。每一种数据类型可以定义哪一些参数,由Avro规范定义,也就是规范了定义Scheme中JSON需要遵循的JSON Schema。

一个模式文件只能包含一个模式(schema)。

3. 编译模式

Avro允许我们基于我们之前定义的模式来创建相应的类。一旦我们已经定义了相关的类,则程序中就不需要直接使用模式。可以使用avro-tools jar来生成代码如下:

 
 
  1. java -jar /path/to/avro-tools-1.8.1.jar compile schema <schema file> <destination>

这将会在所提供的目标文件夹(<destination>)下基于之前配置模式的namespace字段,生成一个包,并在包中生成相应的类文件。在我们例子中,

 
 
  1. java -jar /home/xiaosi/lib/avro-tools-1.8.1.jar compile schema user.avsc .

这样,就在当前根目录下生成com/sjf/open/User.java类,其中com/sjf/open就是我们在模式中的namespace。

如果你直接使用Avro Maven plugin,那么就不需要手动编译模式,因为Avro Maven plugin会自动给你编译好。

下面为Avro Maven plugin配置:

 
 
  1.            <plugin>
  2.                <groupId>org.apache.avro</groupId>
  3.                <artifactId>avro-maven-plugin</artifactId>
  4.                <version>1.8.1</version>
  5.                <executions>
  6.                    <execution>
  7.                        <phase>generate-sources</phase>
  8.                        <goals>
  9.                            <goal>schema</goal>
  10.                        </goals>
  11.                        <configuration>
  12.                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
  13.                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
  14.                        </configuration>
  15.                    </execution>
  16.                </executions>
  17.            </plugin>
  18.            <plugin>
  19.                <groupId>org.apache.maven.plugins</groupId>
  20.                <artifactId>maven-compiler-plugin</artifactId>
  21.                <configuration>
  22.                    <source>1.6</source>
  23.                    <target>1.6</target>
  24.                </configuration>
  25.            </plugin>

现在通过编译模式已经生成一个User.java类:

 
 
  1. /**
  2. * Autogenerated by Avro
  3. *
  4. * DO NOT EDIT DIRECTLY
  5. */
  6. package com.sjf.open;
  7. import org.apache.avro.specific.SpecificData;
  8. @SuppressWarnings("all")
  9. @org.apache.avro.specific.AvroGenerated
  10. public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  11.  private static final long serialVersionUID = 3024021648306482970L;
  12.  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.sjf.open\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite\",\"type\":[\"string\",\"null\"]}]}");
  13.  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  14.  @Deprecated public java.lang.CharSequence name;
  15.  @Deprecated public java.lang.Integer age;
  16.  @Deprecated public java.lang.CharSequence favorite;
  17.  /**
  18.   * Default constructor.  Note that this does not initialize fields
  19.   * to their default values from the schema.  If that is desired then
  20.   * one should use <code>newBuilder()</code>.
  21.   */
  22.  public User() {}
  23.  /**
  24.   * All-args constructor.
  25.   * @param name The new value for name
  26.   * @param age The new value for age
  27.   * @param favorite The new value for favorite
  28.   */
  29.  public User(java.lang.CharSequence name, java.lang.Integer age, java.lang.CharSequence favorite) {
  30.    this.name = name;
  31.    this.age = age;
  32.    this.favorite = favorite;
  33.  }
  34.  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
  35.  // Used by DatumWriter.  Applications should not call.
  36.  public java.lang.Object get(int field$) {
  37.    switch (field$) {
  38.    case 0: return name;
  39.    case 1: return age;
  40.    case 2: return favorite;
  41.    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
  42.    }
  43.  }
  44.  // Used by DatumReader.  Applications should not call.
  45.  @SuppressWarnings(value="unchecked")
  46.  public void put(int field$, java.lang.Object value$) {
  47.    switch (field$) {
  48.    case 0: name = (java.lang.CharSequence)value$; break;
  49.    case 1: age = (java.lang.Integer)value$; break;
  50.    case 2: favorite = (java.lang.CharSequence)value$; break;
  51.    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
  52.    }
  53.  }
  54.  /**
  55.   * Gets the value of the 'name' field.
  56.   * @return The value of the 'name' field.
  57.   */
  58.  public java.lang.CharSequence getName() {
  59.    return name;
  60.  }
  61.  /**
  62.   * Sets the value of the 'name' field.
  63.   * @param value the value to set.
  64.   */
  65.  public void setName(java.lang.CharSequence value) {
  66.    this.name = value;
  67.  }
  68.  /**
  69.   * Gets the value of the 'age' field.
  70.   * @return The value of the 'age' field.
  71.   */
  72.  public java.lang.Integer getAge() {
  73.    return age;
  74.  }
  75.  /**
  76.   * Sets the value of the 'age' field.
  77.   * @param value the value to set.
  78.   */
  79.  public void setAge(java.lang.Integer value) {
  80.    this.age = value;
  81.  }
  82.  /**
  83.   * Gets the value of the 'favorite' field.
  84.   * @return The value of the 'favorite' field.
  85.   */
  86.  public java.lang.CharSequence getFavorite() {
  87.    return favorite;
  88.  }
  89.  /**
  90.   * Sets the value of the 'favorite' field.
  91.   * @param value the value to set.
  92.   */
  93.  public void setFavorite(java.lang.CharSequence value) {
  94.    this.favorite = value;
  95.  }
  96.  /**
  97.   * Creates a new User RecordBuilder.
  98.   * @return A new User RecordBuilder
  99.   */
  100.  public static com.sjf.open.User.Builder newBuilder() {
  101.    return new com.sjf.open.User.Builder();
  102.  }
  103.  /**
  104.   * Creates a new User RecordBuilder by copying an existing Builder.
  105.   * @param other The existing builder to copy.
  106.   * @return A new User RecordBuilder
  107.   */
  108.  public static com.sjf.open.User.Builder newBuilder(com.sjf.open.User.Builder other) {
  109.    return new com.sjf.open.User.Builder(other);
  110.  }
  111.  /**
  112.   * Creates a new User RecordBuilder by copying an existing User instance.
  113.   * @param other The existing instance to copy.
  114.   * @return A new User RecordBuilder
  115.   */
  116.  public static com.sjf.open.User.Builder newBuilder(com.sjf.open.User other) {
  117.    return new com.sjf.open.User.Builder(other);
  118.  }
  119.  /**
  120.   * RecordBuilder for User instances.
  121.   */
  122.  public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<User>
  123.    implements org.apache.avro.data.RecordBuilder<User> {
  124.    private java.lang.CharSequence name;
  125.    private java.lang.Integer age;
  126.    private java.lang.CharSequence favorite;
  127.    /** Creates a new Builder */
  128.    private Builder() {
  129.      super(SCHEMA$);
  130.    }
  131.    /**
  132.     * Creates a Builder by copying an existing Builder.
  133.     * @param other The existing Builder to copy.
  134.     */
  135.    private Builder(com.sjf.open.User.Builder other) {
  136.      super(other);
  137.      if (isValidValue(fields()[0], other.name)) {
  138.        this.name = data().deepCopy(fields()[0].schema(), other.name);
  139.        fieldSetFlags()[0] = true;
  140.      }
  141.      if (isValidValue(fields()[1], other.age)) {
  142.        this.age = data().deepCopy(fields()[1].schema(), other.age);
  143.        fieldSetFlags()[1] = true;
  144.      }
  145.      if (isValidValue(fields()[2], other.favorite)) {
  146.        this.favorite = data().deepCopy(fields()[2].schema(), other.favorite);
  147.        fieldSetFlags()[2] = true;
  148.      }
  149.    }
  150.    /**
  151.     * Creates a Builder by copying an existing User instance
  152.     * @param other The existing instance to copy.
  153.     */
  154.    private Builder(com.sjf.open.User other) {
  155.            super(SCHEMA$);
  156.      if (isValidValue(fields()[0], other.name)) {
  157.        this.name = data().deepCopy(fields()[0].schema(), other.name);
  158.        fieldSetFlags()[0] = true;
  159.      }
  160.      if (isValidValue(fields()[1], other.age)) {
  161.        this.age = data().deepCopy(fields()[1].schema(), other.age);
  162.        fieldSetFlags()[1] = true;
  163.      }
  164.      if (isValidValue(fields()[2], other.favorite)) {
  165.        this.favorite = data().deepCopy(fields()[2].schema(), other.favorite);
  166.        fieldSetFlags()[2] = true;
  167.      }
  168.    }
  169.    /**
  170.      * Gets the value of the 'name' field.
  171.      * @return The value.
  172.      */
  173.    public java.lang.CharSequence getName() {
  174.      return name;
  175.    }
  176.    /**
  177.      * Sets the value of the 'name' field.
  178.      * @param value The value of 'name'.
  179.      * @return This builder.
  180.      */
  181.    public com.sjf.open.User.Builder setName(java.lang.CharSequence value) {
  182.      validate(fields()[0], value);
  183.      this.name = value;
  184.      fieldSetFlags()[0] = true;
  185.      return this;
  186.    }
  187.    /**
  188.      * Checks whether the 'name' field has been set.
  189.      * @return True if the 'name' field has been set, false otherwise.
  190.      */
  191.    public boolean hasName() {
  192.      return fieldSetFlags()[0];
  193.    }
  194.    /**
  195.      * Clears the value of the 'name' field.
  196.      * @return This builder.
  197.      */
  198.    public com.sjf.open.User.Builder clearName() {
  199.      name = null;
  200.      fieldSetFlags()[0] = false;
  201.      return this;
  202.    }
  203.    /**
  204.      * Gets the value of the 'age' field.
  205.      * @return The value.
  206.      */
  207.    public java.lang.Integer getAge() {
  208.      return age;
  209.    }
  210.    /**
  211.      * Sets the value of the 'age' field.
  212.      * @param value The value of 'age'.
  213.      * @return This builder.
  214.      */
  215.    public com.sjf.open.User.Builder setAge(java.lang.Integer value) {
  216.      validate(fields()[1], value);
  217.      this.age = value;
  218.      fieldSetFlags()[1] = true;
  219.      return this;
  220.    }
  221.    /**
  222.      * Checks whether the 'age' field has been set.
  223.      * @return True if the 'age' field has been set, false otherwise.
  224.      */
  225.    public boolean hasAge() {
  226.      return fieldSetFlags()[1];
  227.    }
  228.    /**
  229.      * Clears the value of the 'age' field.
  230.      * @return This builder.
  231.      */
  232.    public com.sjf.open.User.Builder clearAge() {
  233.      age = null;
  234.      fieldSetFlags()[1] = false;
  235.      return this;
  236.    }
  237.    /**
  238.      * Gets the value of the 'favorite' field.
  239.      * @return The value.
  240.      */
  241.    public java.lang.CharSequence getFavorite() {
  242.      return favorite;
  243.    }
  244.    /**
  245.      * Sets the value of the 'favorite' field.
  246.      * @param value The value of 'favorite'.
  247.      * @return This builder.
  248.      */
  249.    public com.sjf.open.User.Builder setFavorite(java.lang.CharSequence value) {
  250.      validate(fields()[2], value);
  251.      this.favorite = value;
  252.      fieldSetFlags()[2] = true;
  253.      return this;
  254.    }
  255.    /**
  256.      * Checks whether the 'favorite' field has been set.
  257.      * @return True if the 'favorite' field has been set, false otherwise.
  258.      */
  259.    public boolean hasFavorite() {
  260.      return fieldSetFlags()[2];
  261.    }
  262.    /**
  263.      * Clears the value of the 'favorite' field.
  264.      * @return This builder.
  265.      */
  266.    public com.sjf.open.User.Builder clearFavorite() {
  267.      favorite = null;
  268.      fieldSetFlags()[2] = false;
  269.      return this;
  270.    }
  271.    @Override
  272.    public User build() {
  273.      try {
  274.        User record = new User();
  275.        record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]);
  276.        record.age = fieldSetFlags()[1] ? this.age : (java.lang.Integer) defaultValue(fields()[1]);
  277.        record.favorite = fieldSetFlags()[2] ? this.favorite : (java.lang.CharSequence) defaultValue(fields()[2]);
  278.        return record;
  279.      } catch (Exception e) {
  280.        throw new org.apache.avro.AvroRuntimeException(e);
  281.      }
  282.    }
  283.  }
  284.  private static final org.apache.avro.io.DatumWriter
  285.    WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$);
  286.  @Override public void writeExternal(java.io.ObjectOutput out)
  287.    throws java.io.IOException {
  288.    WRITER$.write(this, SpecificData.getEncoder(out));
  289.  }
  290.  private static final org.apache.avro.io.DatumReader
  291.    READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);
  292.  @Override public void readExternal(java.io.ObjectInput in)
  293.    throws java.io.IOException {
  294.    READER$.read(this, SpecificData.getDecoder(in));
  295.  }
  296. }

4. 创建User对象

既然我们已经完成了代码自动生成,那我们创建一些用户,序列化到磁盘上的文件中,然后再读取文件,反序列化为用户对象。

 
 
  1. // 方式一
  2. User user1 = new User();
  3. user1.setName("yoona");
  4. user1.setAge(25);
  5. user1.setFavorite("football");
  6. // 方式二
  7. User user2 = new User("sjf", 24, "basketball");
  8. // 方式三
  9. User user3 = User.newBuilder().setName("Tom").setAge(26).setFavorite("badminton").build();

如这个例子所示,Avro对象可以通过调用构造函数直接创建用户或通过使用builder来创建用户。不同于构造函数,Builder的方式会把Schema设置的默认值赋给对象相应的属性(Unlike constructors, builders will automatically set any default values specified in the schema)。除此之外,Builder会校验数据判断是否符合schema,但是构造器不会做这些校验工作,直到对象被序列化的时候才会校验。但是通常使用构造函数会有更好的性能,因为Builder在写数据之前会创建数据的副本(However, using constructors directly generally offers better performance, as builders create a copy of the datastructure before it is written)。

5. 序列化操作

将上述几个User进行序列化操作,并将序列化的数据存放到user.avro文件中:

 
  
  1.    /**
  2.     * 序列化
  3.     */
  4.    public static void serialize(){
  5.        // 方式一
  6.        User user1 = new User();
  7.        user1.setName("yoona");
  8.        user1.setAge(25);
  9.        user1.setFavorite("football");
  10.        // 方式二
  11.        User user2 = new User("sjf", 24, "basketball");
  12.        // 方式三
  13.        User user3 = User.newBuilder().setName("Tom").setAge(26).setFavorite("football").build();
  14.        // 序列化
  15.        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
  16.        DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
  17.        try {
  18.            dataFileWriter.create(user1.getSchema(), new File("user.avro"));
  19.            dataFileWriter.append(user1);
  20.            dataFileWriter.append(user2);
  21.            dataFileWriter.append(user3);
  22.            dataFileWriter.close();
  23.        } catch (IOException e) {
  24.            e.printStackTrace();
  25.        }
  26.    }

运行上述代码之后,将会在磁盘产生user.avro文件,里面是avro序列化user的数据。

我们创建一个DatumWriter,将Java对象转换成内存中的序列化格式。

6. 反序列化操作

我们可以对上述代码序列化之后的数据user.avro进行反序列化为user对象:

 
 
  1.   /**
  2.     * 反序列化
  3.     */
  4.    public static void deserialize(){
  5.        DatumReader<User> reader = new SpecificDatumReader<User>(User.class);
  6.        DataFileReader<User> fileReader = null;
  7.        try {
  8.            fileReader = new DataFileReader<User>(new File(
  9.                    "user.avro"), reader);
  10.            User user = null;
  11.            while (fileReader.hasNext()) {
  12.                // 复用user对象,避免重复分配内存和GC
  13.                user = fileReader.next(user);
  14.                System.out.println(user);
  15.            }
  16.        } catch (IOException e) {
  17.            e.printStackTrace();
  18.        }
  19.    }

输出内容:

 
 
  1. {"name": "yoona", "age": 25, "favorite": "football"}
  2. {"name": "sjf", "age": 24, "favorite": "basketball"}
  3. {"name": "Tom", "age": 26, "favorite": "football"}


7. 完整例子


 
 
  1. import org.apache.avro.file.DataFileReader;
  2. import org.apache.avro.file.DataFileWriter;
  3. import org.apache.avro.io.DatumReader;
  4. import org.apache.avro.io.DatumWriter;
  5. import org.apache.avro.specific.SpecificDatumReader;
  6. import org.apache.avro.specific.SpecificDatumWriter;
  7. import java.io.File;
  8. import java.io.IOException;
  9. /**
  10. * Created by xiaosi on 16-9-14.
  11. */
  12. public class Test {
  13.    /**
  14.     * 序列化
  15.     */
  16.    public static void serialize(){
  17.        // 方式一
  18.        User user1 = new User();
  19.        user1.setName("yoona");
  20.        user1.setAge(25);
  21.        user1.setFavorite("football");
  22.        // 方式二
  23.        User user2 = new User("sjf", 24, "basketball");
  24.        // 方式三
  25.        User user3 = User.newBuilder().setName("Tom").setAge(26).build();
  26.        // 序列化
  27.        DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
  28.        DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
  29.        try {
  30.            dataFileWriter.create(user1.getSchema(), new File("user.avro"));
  31.            dataFileWriter.append(user1);
  32.            dataFileWriter.append(user2);
  33.            dataFileWriter.append(user3);
  34.            dataFileWriter.close();
  35.        } catch (IOException e) {
  36.            e.printStackTrace();
  37.        }
  38.    }
  39.    /**
  40.     * 反序列化
  41.     */
  42.    public static void deserialize(){
  43.        DatumReader<User> reader = new SpecificDatumReader<User>(User.class);
  44.        DataFileReader<User> fileReader = null;
  45.        try {
  46.            fileReader = new DataFileReader<User>(new File(
  47.                    "user.avro"), reader);
  48.            User user = null;
  49.            while (fileReader.hasNext()) {
  50.                // 复用user对象,避免重复分配内存和GC
  51.                user = fileReader.next(user);
  52.                System.out.println(user);
  53.            }
  54.        } catch (IOException e) {
  55.            e.printStackTrace();
  56.        }
  57.    }
  58.    public static void main(String[] args) {
  59.        serialize();
  60.        deserialize();
  61.    }
  62. }






目录
相关文章
|
消息中间件 Java 中间件
秒懂消息队列MQ,万字总结带你全面了解消息队列MQ
消息队列是大型分布式系统不可缺少的中间件,也是高并发系统的基石中间件,所以掌握好消息队列MQ就变得极其重要。接下来我就将从零开始介绍什么是消息队列?消息队列的应用场景?如何进行选型?如何在Spring Boot项目中整合集成消息队列。
23893 10
秒懂消息队列MQ,万字总结带你全面了解消息队列MQ
|
存储 JSON SpringCloudAlibaba
Sentinel使用及规则配置
Sentinel使用及规则配置
2317 0
Sentinel使用及规则配置
|
缓存 Linux 开发工具
CentOS 7- 配置阿里镜像源
阿里镜像官方地址http://mirrors.aliyun.com/ 1、点击官方提供的相应系统的帮助 :2、查看不同版本的系统操作: 下载源1、安装wget yum install -y wget2、下载CentOS 7的repo文件wget -O /etc/yum.
253845 0
|
消息中间件 JSON Java
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
23434 0
|
数据采集 物联网 大数据
NiFi【部署 01】NiFi最新版本1.18.0下载安装配置启动及问题处理(一篇学会部署NiFi)
NiFi【部署 01】NiFi最新版本1.18.0下载安装配置启动及问题处理(一篇学会部署NiFi)
2158 0
|
NoSQL 前端开发 Redis
Windows 下安装和配置 Redis (图文教程)
Windows 下安装和配置 Redis (图文教程)
|
数据处理 Apache 流计算
实时计算引擎 Flink:从入门到深入理解
本篇详细介绍了Apache Flink实时计算引擎的基本概念和核心功能。从入门到深入,逐步介绍了Flink的数据源与接收、数据转换与计算、窗口操作以及状态管理等方面的内容,并附带代码示例进行实际操作演示。通过阅读本文,读者可以建立起对Flink实时计算引擎的全面理解,为实际项目中的实时数据处理提供了有力的指导和实践基础。
4786 2
|
监控 Kubernetes 网络协议
上云业务的k8s容器排障与思考
本文主要讲述了在完成业务上云后,面临因业务请求量激增导致的系统复杂故障和挑战。作者通过排查分析,发现了一个长时间处于“进行中”状态的异常任务,客户端(APP2)进程卡死,而服务端(APP3)进程正常结束。进一步分析发现,问题出在kube-proxy代理的会话超时机制,由于请求处理延迟,kube-proxy清理了会话记录,导致服务端回包异常,客户端无法识别,从而形成进程假死。 最后,作者强调了在成本控制背景下,通过分析流量增长原因、优化技术架构和调整运营策略来改善系统性能和稳定性的必要性。
714 5
上云业务的k8s容器排障与思考
|
存储 SQL 关系型数据库
ClickHouse(11)ClickHouse合并树MergeTree家族表引擎之SummingMergeTree详细解析
`SummingMergeTree`是`MergeTree`引擎的变种,它合并相同主键的行并计算数值列的总和,从而节省存储空间和加速查询。通常与`MergeTree`配合使用,存储聚合数据以避免数据丢失。创建`SummingMergeTree`表时,可选参数`columns`指定要汇总的数值列。未指定时,默认汇总所有非主键数值列。注意,聚合可能不完整,查询时需用`SUM`和`GROUP BY`。文章还介绍了建表语法、数据处理规则以及对嵌套数据结构和`AggregateFunction`列的处理。查阅更多ClickHouse相关内容可访问相关链接。
505 5