<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--HBase -->
<!-- 分离资源文件 -->
<!--这个插件就是把依赖的jar包复制出来放到编译后的target/lib目录,并且在打包时候排除内部依赖 -->
package com.unionpay.css.bigdata.dataCompare.hbase;
import com.unionpay.css.bigdata.dataCompare.Constant;
import com.unionpay.css.bigdata.dataCompare.util.KerberosCheckUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class HBaseUtil {
public static Logger logger = LoggerFactory.getLogger(HBaseUtil.class);
public static Configuration getHbaseConfiguration(String cluster){
Configuration hbaseConf = HBaseConfiguration.create();
String hbaseIp = cluster;
hbaseConf.set("hbase.zookeeper.quorum", hbaseIp + ":2181");
hbaseConf.set("hbase.master", hbaseIp+":60000");
hbaseConf.set("hbase.rpc.timeout", "10000");//10s
hbaseConf.set("hbase.client.retries.number", "2");
hbaseConf.set("hbase.client.operation.timeout", "10000");
return hbaseConf;
public static void kerberosLogin(Configuration hbConf){
hbConf.set("hadoop.security.authentication", "Kerberos");
hbConf.set("hbase.security.authentication", "kerberos");
//hbaseConf.set("hbase.master", getHbaseIpByCluster(cluster) + ":16000");
hbConf.set("hbase.master.kerberos.principal", "hbase/_HOST@CVBG.COM");
hbConf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@CVBG.COM");
System.setProperty("javax.security.auth.useSubjectCredOnly", "false");
//System.setProperty("sun.security.krb5.debug", "true");
System.setProperty("java.security.krb5.conf", KerberosCheckUtil.getKrb5Conf());
if (UserGroupInformation.isLoginKeytabBased() && UserGroupInformation.getLoginUser().getUserName().equals(KerberosCheckUtil.principal)) {
logger.info("hbase:" + hbConf.get("hbase.master")+ ",user [{}] is login already!",KerberosCheckUtil.principal);
}else {
UserGroupInformation.loginUserFromKeytab(KerberosCheckUtil.principal, KerberosCheckUtil.getKeyTabFile());
logger.info("hbase:" + hbConf.get("hbase.master") + ",user [{}] login successed!",KerberosCheckUtil.principal);
}catch (IOException e){
logger.error("kerbose登录报错," + KerberosCheckUtil.getKeyTabFile());
public static User getAuthenticatedUser(){
User loginedUser = null;
try {
logger.info("=====put the logined userinfomation to user====");
loginedUser = User.create(UserGroupInformation.getLoginUser());
} catch (IOException e) {
logger.error("===fialed put the logined userinfomation to user===",e);
return loginedUser;
package com.unionpay.css.bigdata.dataCompare.util;
public class KerberosCheckUtil {
public static boolean isKerberosOn = true;
public static String principal="dw_hbkal@CVBG.COM";
public static String keyTabFileName="dw_hbkal";
public static String krb5Conf= "krb5.conf";
public static String getKeyTabFile() {
String runPath = KerberosCheckUtil.class.getResource("/").getPath();
return runPath + "dw_hbkal.keytab";
//return "file:///root/przhang/dw_hbkal.keytab";
public static String getKrb5Conf() {
String runPath = KerberosCheckUtil.class.getResource("/").getPath();
return runPath + "krb5.conf";
//return "fie:///root/przhang/krb5.conf";
package com.unionpay.css.bigdata.dataCompare.hbase;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.mapreduce.TableInputFormatBase;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
* Convert HBase tabular data into a format that is consumable by Map/Reduce.
public class KerberosTableInputFormat extends TableInputFormatBase
implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(KerberosTableInputFormat.class);
/** Job parameter that specifies the input table. */
public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
* If specified, use start keys of this table to split.
* This is useful when you are preparing data for bulkload.
private static final String SPLIT_TABLE = "hbase.mapreduce.splittable";
/** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
* See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
public static final String SCAN = "hbase.mapreduce.scan";
/** Scan start row */
public static final String SCAN_ROW_START = "hbase.mapreduce.scan.row.start";
/** Scan stop row */
public static final String SCAN_ROW_STOP = "hbase.mapreduce.scan.row.stop";
/** Column Family to Scan */
public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
/** Space delimited list of columns and column families to scan. */
public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
/** The timestamp used to filter columns with a specific timestamp. */
public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
/** The starting timestamp used to filter columns with a specific range of versions. */
public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
/** The ending timestamp used to filter columns with a specific range of versions. */
public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
/** The maximum number of version to return. */
public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
/** Set to false to disable server-side caching of blocks for this scan. */
public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
/** The number of rows for caching that will be passed to scanners. */
public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
/** Set the maximum number of values to return for each call to next(). */
public static final String SCAN_BATCHSIZE = "hbase.mapreduce.scan.batchsize";
/** Specify if we have to shuffle the map tasks. */
public static final String SHUFFLE_MAPS = "hbase.mapreduce.inputtable.shufflemaps";
/** The configuration. */
private Configuration conf = null;
/** The kerberos authenticated user*/
private User user;
* Returns the current configuration.
* @return The current configuration.
* @see org.apache.hadoop.conf.Configurable#getConf()
public Configuration getConf() {
return conf;
* Sets the configuration. This is used to set the details for the table to
* be scanned.
* @param configuration The configuration to set.
* @see org.apache.hadoop.conf.Configurable#setConf(
* org.apache.hadoop.conf.Configuration)
public void setConf(Configuration configuration) {
this.conf = configuration;
//=========get kerberos authentication before create hbase connection==========
user = HBaseUtil.getAuthenticatedUser();
Scan scan = null;
if (conf.get(SCAN) != null) {
try {
scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
} catch (IOException e) {
LOG.error("An error occurred.", e);
} else {
try {
scan = createScanFromConfiguration(conf);
} catch (Exception e) {
* Sets up a {@link Scan} instance, applying settings from the configuration property
* constants defined in {@code TableInputFormat}. This allows specifying things such as:
* <ul>
* <li>start and stop rows</li>
* <li>column qualifiers or families</li>
* <li>timestamps or timerange</li>
* <li>scanner caching and batch size</li>
* </ul>
public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
Scan scan = new Scan();
if (conf.get(SCAN_ROW_START) != null) {
if (conf.get(SCAN_ROW_STOP) != null) {
if (conf.get(SCAN_COLUMNS) != null) {
addColumns(scan, conf.get(SCAN_COLUMNS));
for (String columnFamily : conf.getTrimmedStrings(SCAN_COLUMN_FAMILY)) {
if (conf.get(SCAN_TIMESTAMP) != null) {
if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
if (conf.get(SCAN_MAXVERSIONS) != null) {
if (conf.get(SCAN_CACHEDROWS) != null) {
if (conf.get(SCAN_BATCHSIZE) != null) {
// false by default, full table scans generate too much BC churn
scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
return scan;
protected void initialize(JobContext context) throws IOException {
// Do we have to worry about mis-matches between the Configuration from setConf and the one
// in this context?
TableName tableName = TableName.valueOf(conf.get(INPUT_TABLE));
try {
//====================add authenticated user ===================
initializeTable(ConnectionFactory.createConnection(new Configuration(conf),user), tableName);
} catch (Exception e) {
* Parses a combined family and qualifier and adds either both or just the
* family in case there is no qualifier. This assumes the older colon
* divided notation, e.g. "family:qualifier".
* @param scan The Scan to update.
* @param familyAndQualifier family and qualifier
* @throws IllegalArgumentException When familyAndQualifier is invalid.
private static void addColumn(Scan scan, byte[] familyAndQualifier) {
byte [][] fq = CellUtil.parseColumn(familyAndQualifier);
if (fq.length == 1) {
} else if (fq.length == 2) {
scan.addColumn(fq[0], fq[1]);
} else {
throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
* Adds an array of columns specified using old format, family:qualifier.
* <p>
* Overrides previous calls to {@link Scan#addColumn(byte[], byte[])}for any families in the
* input.
* @param scan The Scan to update.
* @param columns array of columns, formatted as <code>family:qualifier</code>
* @see Scan#addColumn(byte[], byte[])
public static void addColumns(Scan scan, byte [][] columns) {
for (byte[] column : columns) {
addColumn(scan, column);
* Calculates the splits that will serve as input for the map tasks. The
* number of splits matches the number of regions in a table. Splits are shuffled if
* required.
* @param context The current job context.
* @return The list of input splits.
* @throws IOException When creating the list of splits fails.
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
* org.apache.hadoop.mapreduce.JobContext)
public List<InputSplit> getSplits(JobContext context) throws IOException {
List<InputSplit> splits = super.getSplits(context);
if ((conf.get(SHUFFLE_MAPS) != null) && "true".equals(conf.get(SHUFFLE_MAPS).toLowerCase(Locale.ROOT))) {
return splits;
* Convenience method to parse a string representation of an array of column specifiers.
* @param scan The Scan to update.
* @param columns The columns to parse.
private static void addColumns(Scan scan, String columns) {
String[] cols = columns.split(" ");
for (String col : cols) {
addColumn(scan, Bytes.toBytes(col));
protected Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
if (conf.get(SPLIT_TABLE) != null) {
TableName splitTableName = TableName.valueOf(conf.get(SPLIT_TABLE));
//====================add authenticated user ===================
try (Connection conn = ConnectionFactory.createConnection(getConf(),user)) {
try (RegionLocator rl = conn.getRegionLocator(splitTableName)) {
return rl.getStartEndKeys();
return super.getStartEndKeys();
* Sets split table in map-reduce job.
public static void configureSplitTable(Job job, TableName tableName) {
job.getConfiguration().set(SPLIT_TABLE, tableName.getNameAsString());
package com.unionpay.css.bigdata.dataCompare;
import com.unionpay.css.bigdata.dataCompare.hbase.HBaseUtil;
import com.unionpay.css.bigdata.dataCompare.hbase.KerberosTableInputFormat;
import com.unionpay.css.bigdata.dataCompare.util.CompareUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class HBaseCompare {
private static Configuration getKerberosLoginConf(String cluster){
Configuration conf = HBaseUtil.getHbaseConfiguration(cluster);
return conf;
private static JavaPairRDD<String, Map<String,String>> getTableDataRDD(Configuration hconf,String tableName, JavaSparkContext sc) throws IOException {
String scanToString = TableMapReduceUtil.convertScanToString(new Scan());
hconf.set(KerberosTableInputFormat.SCAN, scanToString);
JavaPairRDD<ImmutableBytesWritable, Result> dataRDD = sc.newAPIHadoopRDD(hconf,KerberosTableInputFormat.class,ImmutableBytesWritable.class,Result.class);
JavaPairRDD<String, Map<String,String>> dataRowsRDD = dataRDD.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Map<String,String>>() {
public Tuple2<String, Map<String,String>> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
Result result = immutableBytesWritableResultTuple2._2;
HashMap<String,String> resultMap = new HashMap<String, String>();
for(Cell cell : result.rawCells()) {
resultMap.put(new String(CellUtil.cloneQualifier(cell)).toLowerCase(), new String(CellUtil.cloneValue(cell)));
return new Tuple2<>(Bytes.toString(result.getRow()),resultMap);
return dataRowsRDD;
public static void main(String[] args) {
String ip = args[0];
String table = args[1];
//SparkSession session = SparkSession.builder().appName("hbase example").master("local").getOrCreate();
SparkSession session = SparkSession.builder().appName("hbase example").getOrCreate();
JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
Configuration srcConf = getKerberosLoginConf(ip);
JavaPairRDD<String, Map<String,String>> srcRowsRDD = getTableDataRDD(srcConf,table,sc);
} catch (Exception e){
spark-submit --keytab ../conf/kerberos/dw_hbkal.keytab --principal dw_hbkal@CVBG.COM --files ../conf/kerberos/dw_hbkal.keytab,../conf/kerberos/krb5.conf --master yarn --jars ../lib/hbase-client-2.1.0-cdh6.2.1.jar,../lib/hbase-server-2.1.0-cdh6.2.1.jar,../lib/hbase-mapreduce-2.1.0-cdh6.2.1.jar --class com.unionpay.css.bigdata.dataCompare.HBaseCompare data-compare-1.0-SNAPSHOT.jar 172.xxx.xxx.xxx testtable
### 记录坑