20.1 Apache Arvo简介与实验介绍
- Apache Avro 是一个数据序列化系统,Avro提供Java、Python、C、C++、C#等语言API接口,下面通过java的一个实例来说明Avro序列化和反序列化数据。
- 支持丰富的数据结构
- 快速可压缩的二进制数据格式
- 存储持久数据的文件容器
- 远程过程调用(RPC)
- 动态语言的简单集成
- 实验介绍
- 如何使用java生成Avro格式数据以及如何通过spark将Avro数据文件转换成DataSet和DataFrame进行操作。
20.2 数据生成—Avro
- 定义Schema文件
- 下载avro-tools-1.8.1.jar
Avro官网:http://avro.apache.org/
Avro版本:1.8.1
下载Avro相关jar包:avro-tools-1.8.1.jar 该jar包主要用户将定义好的schema文件生成对应的java文件
- 定义一个schema文件,命名为CustomerAdress.avsc
{
"namespace":"com.peach.arvo",
"type": "record",
"name": "CustomerAddress",
"fields": [
{"name":"ca_address_sk","type":"long"},
{"name":"ca_address_id","type":"string"},
{"name":"ca_street_number","type":"string"},
{"name":"ca_street_name","type":"string"},
{"name":"ca_street_type","type":"string"},
{"name":"ca_suite_number","type":"string"},
{"name":"ca_city","type":"string"},
{"name":"ca_county","type":"string"},
{"name":"ca_state","type":"string"},
{"name":"ca_zip","type":"string"},
{"name":"ca_country","type":"string"},
{"name":"ca_gmt_offset","type":"double"},
{"name":"ca_location_type","type":"string"}
]
}
- Schema说明:
- namespace:在生成java文件时import包路径
- type:omplex types(record, enum,array, map, union, and fixed)
- name:生成java文件时的类名
- fileds:schema中定义的字段及类型
- 生成java代码文件
- 使用第1步下载的avro-tools-1.8.1.jar包,生成java code
java -jar avro-tools-1.8.1.jar compile schema CustomerAddress.avsc .
- 末尾的"."代表java code 生成在当前目录。
- 使用Java生成Avro文件
- 使用Maven创建java工程
- 在pom.xml文件中添加如下依赖
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
- 新建java类GenerateDataApp,代码如下
package com.peach;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.StringTokenizer;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import com.peach.arvo.CustomerAddress;
/**
* @author peach
* 2017-03-02
* 主要用于生成avro数据文件
*/
public class GenerateDataApp {
// private static String customerAddress_avsc_path;
//
// static {
// customerAddress_avsc_path = GenerateDataApp.class.getClass().getResource("/CustomerAddress.avsc").getPath();
// }
private static String source_data_path = "F:\\data\\customer_address.dat"; //源数据文件路 径
private static String dest_avro_data_path = "F:\\data\\customeraddress.avro"; //生成的avro数据文件路径
public static void main(String[] args) {
try {
// if(customerAddress_avsc_path != null) {
// File file = new File(customerAddress_avsc_path);
// Schema schema = new Schema.Parser().parse(file);
// }
DatumWriter<CustomerAddress> caDatumwriter = new SpecificDatumWriter<>(CustomerAddress.class);
DataFileWriter<CustomerAddress> dataFileWriter = new DataFileWriter<>(caDatumwriter);
dataFileWriter.create(new CustomerAddress().getSchema(), new File(dest_avro_data_path));
loadData(dataFileWriter);
dataFileWriter.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 加载源数据文件
* @param dataFileWriter
*/
private static void loadData(DataFileWriter<CustomerAddress> dataFileWriter) {
File file = new File(source_data_path);
if(!file.isFile()) {
return;
}
try {
InputStreamReader isr = new InputStreamReader(new FileInputStream(file));
BufferedReader reader = new BufferedReader(isr);
String line;
CustomerAddress address;
while ((line = reader.readLine()) != null) {
address = getCustomerAddress(line);
if (address != null) {
dataFileWriter.append(address);
}
}
isr.close();
reader.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 解析单条文本数据封装CustomerAddress对象
* @param line
* @return
*/
private static CustomerAddress getCustomerAddress(String line) {
CustomerAddress ca = null;
try {
if (line != null && line != "") {
StringTokenizer token = new StringTokenizer(line, "|"); //使用stringtokenizer拆分字符串时,会去自动除""类型
if(token.countTokens() >= 13) {
ca = new CustomerAddress();
ca.setCaAddressSk(Long.parseLong(token.nextToken()));
ca.setCaAddressId(token.nextToken());
ca.setCaStreetNumber(token.nextToken());
ca.setCaStreetName(token.nextToken());
ca.setCaStreetType(token.nextToken());
ca.setCaSuiteNumber(token.nextToken());
ca.setCaCity(token.nextToken());
ca.setCaCounty(token.nextToken());
ca.setCaState(token.nextToken());
ca.setCaZip(token.nextToken());
ca.setCaCountry(token.nextToken());
ca.setCaGmtOffset(Double.parseDouble(token.nextToken()));
ca.setCaLocationType(token.nextToken());
} else {
System.err.println(line);
}
}
} catch (NumberFormatException e) {
System.err.println(line);
}
return ca;
}
}
- 动态生成avro文件,通过将数据封装为GenericRecord对象,动态的写入avro文件,以下代码片段:
private static void loadData(DataFileWriter<GenericRecord> dataFileWriter, Schema schema) {
File file = new File(sourcePath);
if(file == null) {
logger.error("[peach], source data not found");
return ;
}
InputStreamReader inputStreamReader = null;
BufferedReader bufferedReader = null;
try {
inputStreamReader = new InputStreamReader(new FileInputStream(file));
bufferedReader = new BufferedReader(inputStreamReader);
String line;
GenericRecord genericRecord;
while((line = bufferedReader.readLine()) != null) {
if(line != "") {
String[] values = line.split("\\|");
genericRecord = SchemaUtil.convertRecord(values, schema);
if(genericRecord != null) {
dataFileWriter.append(genericRecord);
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if(bufferedReader != null) {
bufferedReader.close();
}
if(inputStreamReader != null) {
inputStreamReader.close();
}
} catch (IOException e) {
}
}
}
20.3 读Avro文件—Spark
- 使用Maven创建一个scala工程
- 在pom.xml文件中增加如下依赖
<dependency>
<groupId>com.peach</groupId>
<artifactId>generatedata</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>
- 实例代码片段——Scala
case class CustomerAddressData(ca_address_sk: Long,
ca_address_id: String,
ca_street_number: String,
ca_street_name: String,
ca_street_type: String,
ca_suite_number: String,
ca_city: String,
ca_county: String,
ca_state: String,
ca_zip: String,
ca_country: String,
ca_gmt_offset: Double,
ca_location_type: String
)
// org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
def main(args: Array[String]): Unit = {
val path = "/Users/zoulihan/Desktop/customeraddress.avro"
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ //为什么要加此段代码?
val _rdd = sc.hadoopFile[AvroWrapper[CustomerAddress], NullWritable, AvroInputFormat[CustomerAddress]](path)
val ddd = _rdd.map(line => new CustomerAddressData(
line._1.datum().getCaAddressSk,
line._1.datum().getCaAddressId.toString,
line._1.datum().getCaStreetNumber.toString,
line._1.datum().getCaStreetName.toString,
line._1.datum().getCaStreetType.toString,
line._1.datum().getCaSuiteNumber.toString,
line._1.datum().getCaCity.toString,
line._1.datum().getCaCounty.toString,
line._1.datum().getCaState.toString,
line._1.datum().getCaZip.toString,
line._1.datum().getCaCountry.toString,
line._1.datum().getCaGmtOffset,
line._1.datum().getCaLocationType.toString
))
val ds = sqlContext.createDataset(ddd)
ds.show()
val df = ds.toDF();
df.createTempView("customer_address");
// sqlContext.sql("select count(*) from customer_address").show()
sqlContext.sql("select * from customer_address limit 10").show()
}
大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通