关于spark进行实时日志解析,保存hbase与mysql

进行地域分析  rowkey=中国_上海_201901016  value=访问次数 
 areaStartAmt.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// /**
// * *&**********************************************************************
// *注意事项1:在各个分区内进行hbase设置,开启连接 每个分区连接一次 避免每条每条数据进行连接
// * 注意事项2:在外部创建hbase与connect 是在diver端的代码 需要注意在foreachRDD算子进行的操作是在executor的操作 会报序列化错误
// * 注意事项3:从中可以看出,直接把 int 型的参数传入 Bytes.toBytes() 函数中,编译不会报错,但数据的格式发生错误,再显示时就会出现乱码,
// * 因此,在调用 Bytes.toBytes() 函数时,需要先将 int, double 型数据转换成 String 类型,此时即可正常显示。
// * 查询会出现乱码 int double等 需要 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("accountNum"), Bytes.toBytes(String.valueOf(record._2)))
// * 注意事项3:使用500条一个批次提交的sql代码执行 局部更新操作 ,数据更新不知是太慢 还是未达到500条 数据库数据不正确
// * 直接使用了 val sql1 = s"insert into area_user_amt (date,country,provence,amt)
// * values('${datekey}','${countrykey}','${provencekey}','${amt}') ON DUPLICATE KEY UPDATE `amt`= '${amt}'"
// * 未使用预编译 与批次提交 实时更新 在集群模式下所以的分区与机器都访问数据库的次数过多 造成结果??
// *********************************************************************
// */ val hbaseConf = HBaseConfiguration.create()
// hbaseConf.set("hbase.rootdir", "hdfs://hadoop01:9000/hbase")
// hbaseConf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181")
hbaseConf.addResource("hbase-site.xml")
val connection = ConnectionFactory.createConnection(hbaseConf)
// val admin=connection.getAdmin;
val table = connection.getTable(TableName.valueOf("test1"));
if (partitionOfRecords.isEmpty) {
println("This RDD is not null but partition is null")
} else {
partitionOfRecords.foreach(record => {
val put = new Put(Bytes.toBytes(record._1))
/*
从中可以看出,直接把 int 型的参数传入 Bytes.toBytes() 函数中,编译不会报错,但数据的格式发生错误,再显示时就会出现乱码,
因此,在调用 Bytes.toBytes() 函数时,需要先将 int, double 型数据转换成 String 类型,此时即可正常显示。
***********************************************************************
*/ put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("accountNum"), Bytes.toBytes(String.valueOf(record._2)))
table.put(put)
})
}
})
      //   HbaseUtil.scanDataFromHabse(table)
上一篇:HQueue:基于HBase的消息队列


下一篇:CUDA Memories--CUDA记忆体(翻译+整理+测试)