注解方式
@DefaultSchema(JavaFieldSchema.class) public class Address { public final String city; public final String street; public final int pincode; @SchemaCreate public Address(String city, String street, int pincode) { super(); this.city = city; this.street = street; this.pincode = pincode; } }
Avro IO读取文件时,自动推断出Schema信息
通过Row.withSchema设置Schema
Schema appSchema = Schema.builder().addStringField("string1").addInt32Field("int1").build(); Row row1 = Row.withSchema(appSchema).addValues("aaa,bbb", 1).build(); Row row2 = Row.withSchema(appSchema).addValues("ccc,ddd", 2).build(); Row row3 = Row.withSchema(appSchema).addValues("ddd,eee", 3).build(); PCollection<Row> inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3).withRowSchema(appSchema));
通过PCollection.setSchema/PCollection.setRowSchema设置Schema
Pipeline p = Pipeline.create(options); PCollection<Object> objs = p.apply(new CustomSource()); Schema type = Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build(); PCollectionTuple.of(new TupleTag<>("somedata"), objs).apply(SqlTransform.query("SELECT c1 FROM somedata")).setSchema(type, SerializableFunctions.identity(), SerializableFunctions.identity()); p.run().waitUntilFinish();
|
通过AvroUtils.schemaCoder设置Schema
private static PCollection<GenericRecord> inferSchema( PCollection<GenericRecord> input, org.apache.avro.Schema schema) { org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema); if (!input.hasSchema()) { input.setCoder(AvroUtils.schemaCoder(schema)); } return input; }