Java与Avro序列化
#java #bigdata #serialization #avro

在以前的帖子中,我分析了Protocol BuffersFlatBuffers,使用JSON作为基线。在这篇文章中,我将分析Apache Avro并将其与先前研究的格式进行比较。

Apache Avro是作为Apache Hadoop项目的组成部分开发的,并于2009年根据Apache 2.0许可发行。

不应将Avro与Parquet混淆。通常,在搜索Parquet上的文档时,您最终会阅读有关Avro的信息,使它们感到困惑。 尽管AVRO库可用于生成镶木quet文件,并且两者都在大数据的世界中广泛使用,但格式与彼此无关。

作为协议缓冲区和扁平缓冲区,您可以预先定义一个模式(在JSON中),并生成不可读取的二进制内容。它还支持multiple languages

它不仅用于持续大量数据。例如,AVRO在Kafka中广泛用于序列化信息中的信息。

由于格式或架构可以包含在序列化数据中,因此代码生成是可选的,这使得易于构建一般处理其文件的系统。它提供了JSON的灵活性,但具有更紧凑,更有效的格式。

我将探索这两种方法:从模式中生成代码并以编程方式生成。


IDL和代码生成

上面文章中具有等效架构的文件是this one

{
  "name": "Organization",
  "type": "record",
  "namespace": "com.jerolba.xbuffers.avro",
  "fields": [
    { "name": "name", "type": "string" }, 
    { "name": "category", "type": "string"}, 
    { "name": "organizationType",
      "type": {
        "type": "enum",
        "name": "OrganizationType",
        "symbols": ["FOO", "BAR", "BAZ"]
      }
    }, 
    { "name": "country", "type": "string" }, 
    { "name": "attributes",
      "type": {
        "type": "array",
        "items": {
          "type": "record",
          "name": "Attribute",
          "fields": [
            { "name": "id", "type": "string" }, 
            { "name": "quantity", "type": "int"}, 
            { "name": "amount", "type": "int"}, 
            { "name": "size", "type": "int"}, 
            { "name": "percent", "type": "double"}, 
            { "name": "active", "type": "boolean"}
          ]
        }
      }
    }
  ]
}

要生成所有Java类,您需要下载avro-tools
并使用参数来运行它,该参数引用IDL文件所在的位置以及生成文件的目标路径:

 java -jar avro-tools-1.11.1.jar compile schema ./src/main/resources/organizations.avsc ./src/main/java/

或直接使用Docker与图像准备执行命令:

docker run --rm -v $(pwd)/src:/avro/src kpnnv/avro-tools:1.11.1 compile schema /avro/src/main/resources/organizations.avsc /avro/src/main/java/

使用生成的类

序列化

喜欢协议缓冲区,AVRO不会直接序列化您的pojos,您需要将信息复制到架构编译器生成的对象。

但是,在这种情况下,如果您坚持一个对象集合,则不需要将所有对象都放在内存中,因为您可以在实例化时一个一个一个序列化对象。

从Pojos开始序列化信息所需的代码看起来像this

DatumWriter<Organization> datumWriter = new SpecificDatumWriter<>(Organization.class);
var dataFileWriter = new DataFileWriter<>(datumWriter);
try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  dataFileWriter.create(new Organization().getSchema(), os);
  for (var org : organizations) {
    List<Attribute> attrs = org.attributes().stream()
      .map(a -> Attribute.newBuilder()
        .setId(a.id())
        .setQuantity(a.quantity())
        .setAmount(a.amount())
        .setSize(a.size())
        .setPercent(a.percent())
        .setActive(a.active())
      .build())
    .toList();
    Organization organization = Organization.newBuilder()
      .setName(org.name())
      .setCategory(org.category())
      .setCountry(org.country())
      .setOrganizationType( OrganizationType.valueOf(org.type().name()))
      .setAttributes(attrs)
      .build();
    dataFileWriter.append(organization);
  }
  dataFileWriter.close();
}

我们可以转换并坚持一个Organization

  • 序列化时间:5,409 ms
  • File size: 846 MB
  • 压缩文件大小:530 MB
  • 需要内存:因为我们直接序列到OutputStream,所以它除了必要的内部IO缓冲区(和原始对象)
  • 之外,它不会消耗其他任何内容
  • 库大小(Avro-1.11.1.1.jar +依赖项):3,552,326字节
  • 生成类的大小:37,283字节

避免

由于数据的内部表示,AVRO需要围绕文件解析数据,您需要提供可寻求的输入流或直接的File。例如,您无法直接从HTTP请求中使用InputStream

使用few lines您可以阅读和处理整个对象图:

File file = new File("/tmp/organizations.avro");
DatumReader<Organization> datumReader = new SpecificDatumReader<>(Organization.class);
List<Organization> organizations = new ArrayList<>();
try (var dataFileReader = new DataFileReader<>(file, datumReader)) {
  while (dataFileReader.hasNext()) {
    organizations.add(dataFileReader.next());
  }
}

对象是从模式生成的类的实例,而不是原始记录。因为我们正在迭代读者,所以我们可以在必要时将每个实例转换为我们的表示形式,而无需在内存中重复两种表示。

  • 避免时间:8,197 MS
  • 需要内存:重建架构定义的所有对象结构,占2,520 MB

使用通用记录

在AVRO中,您可以使用架构embedded in the binary file,它允许您阅读序列化记录,而无需事先了解或同意模式。这使我们能够对任何文件的内容进行挑选并了解任何文件的内容,而无需代码生成

您可以在运行时定义模式,并根据您的需求,序列化信息的字段和结构来决定。

序列化

您可以通过AVRO GenericRecord将数据复制到生成的类中,该类似于地图。首先,您需要使用代码定义AVRO模式(或从静态JSON文件加载):

Schema attrSchema = SchemaBuilder.record("Attribute")
    .fields()
    .requiredString("id")
    .requiredInt("quantity")
    .requiredInt("amount")
    .requiredInt("size")
    .requiredDouble("percent")
    .requiredBoolean("active")
    .endRecord();
var enumSymbols = Stream.of(Type.values()).map(Type::name)
    .toArray(String[]::new);
Schema orgsSchema = SchemaBuilder.record("Organizations")
    .fields()
    .requiredString("name")
    .requiredString("category")
    .requiredString("country")
    .name("organizationType").type().enumeration("organizationType")
                             .symbols(enumSymbols).noDefault()
    .name("attributes").type().array().items(attrSchema).noDefault()
    .endRecord();
//Auxiliar Map to encode Enums
var typeField = orgsSchema.getField("organizationType").schema();
EnumMap<Type, EnumSymbol> enums = new EnumMap<>(Type.class);
enums.put(Type.BAR, new EnumSymbol(typeField, Type.BAR));
enums.put(Type.BAZ, new EnumSymbol(typeField, Type.BAZ));
enums.put(Type.FOO, new EnumSymbol(typeField, Type.FOO));

和序列化集合所需的代码看起来像this

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(orgsSchema);
var dataFileWriter = new DataFileWriter<>(datumWriter);
try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  dataFileWriter.create(orgsSchema, os);
  for (var org : organizations) {
    List<GenericRecord> attrs = new ArrayList<>();
    for (var attr : org.attributes()) {
      GenericRecord attrRecord = new GenericData.Record(attrSchema);
      attrRecord.put("id", attr.id());
      attrRecord.put("quantity", attr.quantity());
      attrRecord.put("amount", attr.amount());
      attrRecord.put("size", attr.size());
      attrRecord.put("percent", attr.percent());
      attrRecord.put("active", attr.active());
      attrs.add(attrRecord);
    }
    GenericRecord orgRecord = new GenericData.Record(orgsSchema);
    orgRecord.put("name", org.name());
    orgRecord.put("category", org.category());
    orgRecord.put("country", org.country());
    orgRecord.put("organizationType", enums.get(org.type()));
    orgRecord.put("attributes", attrs);
    dataFileWriter.append(orgRecord);
  }
  dataFileWriter.close();
}

因为我们使用的是相同的模式,而我们只是在更改序列化数据的方式,所使用的文件大小和内存相同。

序列化时间增长到 5,903 ms ,比使用生成的代码高10%。 GenericRecord的实现引入了一个轻微的开销。

避免

使用不同的Reader,避免数据的结果再次是一个未型的GenericRecord对象。在这种情况下,我们需要将每个实例转换为原始数据结构,mapping each type

List<Org> organizations = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (var dataFileReader = new DataFileReader<>(file, datumReader)) {
  while (dataFileReader.hasNext()) {
    GenericRecord record = dataFileReader.next();
    List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
    var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get("id").toString(),
      ((Integer) attr.get("quantity")).byteValue(),
      ((Integer) attr.get("amount")).byteValue(),
      (boolean) attr.get("active"),
      (double) attr.get("percent"),
      ((Integer) attr.get("size")).shortValue())).toList();
    Utf8 name = (Utf8) record.get("name");
    Utf8 category = (Utf8) record.get("category");
    Utf8 country = (Utf8) record.get("country");
    Type type = Type.valueOf(record.get("organizationType").toString());
    organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
  }
}

代码是冗长的,是很多铸件。 AVRO不支持byteshort类型,并且它们持续为int,因此我们需要淡化其价值。作为优化,创建字符串,使用称为Utf8的内部表示。

避免时间增长到 8,471 ms ,与静态代码版本相比约为间接费用的5%。


使用优化的通用记录

如果您检查了GenericRecord类的实现,则可以看到get(String key)put(String key, Object value) access by that key to a Map to get the index in an array。由于每个属性在阅读文件时始终处于相同的位置,因此我们只能访问一次并使用变量重复使用其值,从而改善执行时间。

序列化

由于您需要在模式中存储每个字段的索引,因此代码更加详细。创建模式后,code是:

int idPos = attrSchema.getField("id").pos();
int quantityPos = attrSchema.getField("quantity").pos();
int amountPos = attrSchema.getField("amount").pos();
int activePos = attrSchema.getField("active").pos();
int percentPos = attrSchema.getField("percent").pos();
int sizePos = attrSchema.getField("size").pos();
int namePos = orgsSchema.getField("name").pos();
int categoryPos = orgsSchema.getField("category").pos();
int countryPos = orgsSchema.getField("country").pos();
int organizationTypePos = orgsSchema.getField("organizationType").pos();
int attributesPos = orgsSchema.getField("attributes").pos();

try (var os = new FileOutputStream("/tmp/organizations.avro")) {
  DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(orgsSchema);
  DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
  dataFileWriter.create(orgsSchema, os);
  for (var org : organizations) {
    List<GenericRecord> attrs = new ArrayList<>();
    for (var attr : org.attributes()) {
      GenericRecord attrRecord = new GenericData.Record(attrSchema);
      attrRecord.put(idPos, attr.id());
      attrRecord.put(quantityPos, attr.quantity());
      attrRecord.put(amountPos, attr.amount());
      attrRecord.put(sizePos, attr.size());
      attrRecord.put(percentPos, attr.percent());
      attrRecord.put(activePos, attr.active());
      attrs.add(attrRecord);
    }
    GenericRecord orgRecord = new GenericData.Record(orgsSchema);
    orgRecord.put(namePos, org.name());
    orgRecord.put(categoryPos, org.category());
    orgRecord.put(countryPos, org.country());
    orgRecord.put(organizationTypePos, enums.get(org.type()));
    orgRecord.put(attributesPos, attrs);
    dataFileWriter.append(orgRecord);
  }
  dataFileWriter.close();
}

序列化时间为 5,381 ms ,非常接近带有生成代码的版本中使用的时间。

避免

code非常相似,我们只添加变量才能知道每个字段的位置:

List<Org> organizations = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
try (DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(file, datumReader)) {
  Schema attributes = dataFileReader.getSchema().getField("attributes").schema().getElementType();
  int idPos = attributes.getField("id").pos();
  int quantityPos = attributes.getField("quantity").pos();
  int amountPos = attributes.getField("amount").pos();
  int activePos = attributes.getField("active").pos();
  int percentPos = attributes.getField("percent").pos();
  int sizePos = attributes.getField("size").pos();
  Schema orgs = dataFileReader.getSchema();
  int namePos = orgs.getField("name").pos();
  int categoryPos = orgs.getField("category").pos();
  int countryPos = orgs.getField("country").pos();
  int organizationTypePos = orgs.getField("organizationType").pos();
  while (dataFileReader.hasNext()) {
    GenericRecord record = dataFileReader.next();
    List<GenericRecord> attrsRecords = (List<GenericRecord>) record.get("attributes");
    var attrs = attrsRecords.stream().map(attr -> new Attr(attr.get(idPos).toString(),
      ((Integer) attr.get(quantityPos)).byteValue(),
      ((Integer) attr.get(amountPos)).byteValue(),
      (boolean) attr.get(activePos),
      (double) attr.get(percentPos),
      ((Integer) attr.get(sizePos)).shortValue())).toList();
    Utf8 name = (Utf8) record.get(namePos);
    Utf8 category = (Utf8) record.get(categoryPos);
    Utf8 country = (Utf8) record.get(countryPos);
    Type type = Type.valueOf(record.get(organizationTypePos).toString());
    organizations.add(new Org(name.toString(), category.toString(), country.toString(), type, attrs));
  }
}

避免时间下降到 7,353 ms ,比生成的代码版本快10%。为什么?我对其内部的知识没有足够的知识来冒险一个答案,但是结果令我感到惊讶。

AVRO摘要

生成的代码 通用记录 优化
通用记录
序列化时间 5,409 MS 5,903 MS 5,381 MS
避免时间 8,197 MS 8,471 MS 7,353 MS

使用GenericRecord允许我们在过程中获得一些灵活性而不会丢失性能,但是由于字段的手动映射,代码更加详细,容易出现错误。

在所有情况下,随附的依赖项都是相同的,我们只能保存源代码的生成。

分析和印象

json 协议缓冲区 flatbuffer avro
序列化时间 11,718 ms 5,823 MS 3,803 MS 5,409 MS
文件大小 2,457 MB 1,044 MB 600 MB 846 MB
GZ文件大小 525 MB 448 MB 414 MB 530 MB
内存序列化 n/a 1.29 GB 0.6 GB -1 GB n/a
避免时间 20,410 MS 4,535 MS 202-1,876 MS 8,197 MS
内存挑战 2,193 MB 2,710 MB 0-600 MB 2,520 MB
jar库大小 1,910 kb 1,636 kb 64 kb 3,469 kb
生成类的大小 n/a 40 kb 9 kb 36 kb
  • 如果我们不考虑在flatbuffers中应用的优化,则AVRO文件占用更少的空间。
  • 在示例中,尽管需要所有字段,但您可以轻松使它们无效(消耗更多的空间)。
  • 对我来说,它的主要优势是,它不需要将所有数据记忆中的所有数据序列化。例如,您可以从数据库或文件中读取它,根据您的逻辑进行转换或丰富,同时将其持续使用某种类型的outputStream(由文件或HTTP Connection创建)。
  • 通过编程定义模式的可能性使我们可以选择根据业务逻辑并创建自己的序列化工具来修改输出格式。
  • Avro位于JSON和二进制格式之间的一半,例如协议缓冲区和FlatBuffer:
    • AVRO支持灵活的模式
    • 不必事先了解或同意模式以阅读AVRO文件
    • 在JSON中,您无法从文件本身中获得架构(不先完全解析)
    • Avro比JSON(但不可读)更紧凑
    • 序列化和避免时间比JSON更快
    • 您可以在循环/流中轻松地序列化/对所有对象进行序列化,而无需将其全部放在内存中