文章目录
ZooKeeper
一、ZooKeeper 简介
分布式协调服务
主要⽤用来解决分布式集群中应⽤用系统的一致性问题
-
ZooKeeper 本质上是⼀个分布式的⼩文件存储系统, 基于类似于⽂文件系统的⽬目录树⽅方式的数 据存储,并且可以对树中的节点进⾏行行有效管理理。
-
ZooKeeper 提供给客户端监控存储在zk内部数据的功能, 达到基于数据的集群管理理
统一命名服务(dubbo)、分布式配置管理理(solr的配置集中管理理)、分布式消息队列列 (sub/pub)、分布式锁、分布式协调
1.1 ZooKeeper 架构
Leader
Leader 不是手动指定的,而是 Follower 选举出来的
核心组件,事务请求(写操作) 的唯一处理者
Follower
处理客户端非事务(读操作) 请求
转发事务请求给 Leader
Observer
针对访问量大的 ZooKeeper 集群,可以增加 Observer
观察 ZooKeeper 集群的最新状态变化,并将这些状态同步
对于 非事务请求,可以独立处理
对于 事务请求,会转发给 Leader 服务器
不会参与任何形式的投票,只提供非事务服务
不影响群事务处理能力的前提下,提升集群的非事务处理能力
1.2 ZooKeeper 特点
-
Leader + 多个 Follower 的集群
-
Leader 负责发起投票和决议,更新系统状态
-
Follower 用于接受客户请求,并向 客户端返回结果,参与投票
-
集群中有 半数节点存活,集群就能正常服务
-
因为 全局数据一致,每个 server 保存一份相同的数据副本
-
更新请求顺序执行
-
数据更新原子性,一次数据更新,要么成功,要么失败
二、环境搭建
- 解压
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
- 修改配置文件, 创建 data 和 log 目录
-
添加 myid 配置
-
启动 zk
-
查看启动情况
-
集群启动停止脚本
三、数据结构 和 监听机制
ZooKeeper 数据模型 Znode
数据信息保存在若干数据节点 Znode 上,是 ZooKeeper 中最小的数据单位
Znode 组成 Znode 树的命名空间
3.1 Znode 类型
- persistent 持久性节点
- ephemeral 临时性节点
- sequential 顺序性节点
持久性节点,直到删除操作才会被清除
临时节点,不能有子节点,其生命周期和客户端会话绑定在一起,会话结束后被清除
持久顺序节点,持久的节点,其节点名会有一个表示顺序的数字后缀
临时顺序节点,临时的节点,其节点名会有一个表示顺序的数字后缀
- 事务 ID
事务 - 对物理和抽象的应用状态上的操作集合
通常指的是数据库事务,一般包含了⼀系列对数据库有序的读写操作
但是,事务是指能够改变 ZooKeeper 服务器器状态的操作
或更新操作,⼀般包括数据节点创建与删除、数据节点内容更新等操作
zk 中的事务指的是对zk服务器状态改变的操作(create, update data,更新字节点); zk 对这些事务操作都会编号,这个编号是⾃增长的被称为ZXID
3.2 Znode 状态信息
#使⽤用bin/zkCli.sh 连接到zk集群
[zk: localhost:2181(CONNECTED) 2] get /zookeeper
cZxid = 0x0
ctime = Wed Dec 31 19:00:00 EST 1969 mZxid = 0x0
mtime = Wed Dec 31 19:00:00 EST 1969 pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
ZNode 节点内容包括两部分: 节点数据内容 和 节点状态信息。此处数据内容是空
cZxid 就是 Create ZXID,表示节点被创建时的事务 ID。
ctime 就是 Create Time,表示节点创建时间。
mZxid 就是 Modified ZXID,表示节点最后⼀次被修改时的事务ID。
mtime 就是 Modified Time,表示节点最后⼀次被修改的时间。
pZxid 表示该节点的子节点列表最后⼀次被修改时的事务 ID。只有子节点列表变更才会更新 pZxid,⼦节点内容变更不会更新。
cversion 表示子节点的版本号。
dataVersion 表示内容版本号。
aclVersion 标识 acl 版本
ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0
dataLength 表示数据⻓长度。
numChildren 表示直系⼦子节点数。
3.3 Watcher 机制
实现 分布式数据的发布/订阅 功能
一对多的订阅关系,让多个订阅者同时监听某个主题对象,当主题对象状态变化时,会通知所有订阅者,并作出相应的处理
- 客户端在向 Zookeeper 服务器注册的同时,会将 Watcher 对象存储在客户端的 WatcherManager 当中
- 当 Zookeeper 服务器触发 Watcher 事件后,会向客户端发送通知
- 客户端线程从 WatcherManager 中取出对应的 Watcher 对象来执行回调逻辑
四、基础应用
4.1 命令行操作
客户端命令行
bin/zkCli.sh 连接本地的 zk 服务器
bin/zkCli.sh -server ip:port(2181) 连接指定的服务器
查看可用命令
[zk: localhost:2181(CONNECTED) 3] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
创建节点
create [-s] [-e] path data acl
-s -e 分别指定节点特性(顺序 或 临时), 若不指定,则为 持久节点
- 创建顺序节点
create -s /zk-test 123
-s 顺序节点
/zk-test 节点名
123 节点内容
- 创建临时节点
create -e /zk-temp 123
- 创建持久节点
create /zk-permanent 123
读取节点
ls 查看节点
ls path
get 获取节点的数据内容和属性信息
get path
更新节点
set path data
如:
set /zk-permanent 456
删除节点
delete path
delete /zk-permanent
若删除节点存在子节点,则无法删除,需要先删除子节点
4.2 客户端操作
依赖:
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<type>pom</type>
</dependency>
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
创建会话
package com.lagou.zk.demo;
import org.I0Itec.zkclient.ZkClient;
public class ZkDemo {
public static void main(String[] args) {
// 1. 获取 zkClient 对象
final ZkClient zkClient = new ZkClient("linux121:2181");
System.out.println("zkclient is ready");
}
}
创建节点
// 2. 创建节点
// 持久节点, 注意递归创建节点,需要 设置 true
zkClient.createPersistent("/lagou-client/lagou-c1", true);
System.out.println("node is created");
删除节点
// 3. 删除节点
// 递归删除
zkClient.deleteRecursive("/lagou-client");
System.out.println("node is deleted");
监听节点变化
注意 监听器 可用对 不存在的节点 进行监听
监听⽬录下子节点发生改变,可以接收到通知,携带数据有⼦节点列表
3 监听⽬录创建和删除本身也会被监听到
package com.lagou.zk.demo;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
public class Get_ChildNode_Change {
public static void main(String[] args) throws InterruptedException {
// 1. 获取 zkClient
final ZkClient zkClient = new ZkClient("linux121:2181");
// 2. 对指定节点进行监听,指定收到通知后的处理逻辑
// 处理逻辑 通过重写 handleChildChange 方法实现
zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> childs) throws Exception {
// 打印节点信息
System.out.println(path + " child node changes - current children " + childs);
}
});
// 3. 删除节点,验证监听是否有效
zkClient.createPersistent("/lg-client");
Thread.sleep(1000);
zkClient.createPersistent("/lg-client/c1");
Thread.sleep(1000);
zkClient.delete("/lg-client/c1");
Thread.sleep(1000);
zkClient.delete("/lg-client");
Thread.sleep(Integer.MAX_VALUE);
}
}
监听、获取数据
监听数据变化的监听器,需要监听节点是否被删除,以及监听数据是否改变
zkClient.subscribeDataChanges("/lg-client", new IZkDataListener() {
@Override
public void handleDataChange(String path, Object data) throws Exception {
// 定义接收通知之后的处理逻辑
System.out.println(path + " data is changed, new data: " + data);
}
// 数据删除时
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println(path + " path is deleted");
}
});
五、原理
5.1 Leader 选举
- 半数机制,集群中半数以上机器存活,集群可用
- Zookeeper 工作时,有一个节点为 Leader, 其他为 Follower
选举机制:
假设 5 个节点
-
服务器 1 启动,发出去的保文没有任何响应,所以其选举状态一直是 Looking 状态
-
服务器 2 启动,与 1 通信,互相交换自己的选举结果,但是由于两者都没有历史数据,所以 id 值较大的 2 胜出,但是由于没有达到超过半数以上的服务器的投票,所以其不能当选。此时 1 2 均保持 looking 状态
-
服务器 3 启动,根据 id 值较大的胜出,且超过半数投票,此时 3 为 Leader, 其余节点为 Follower
-
服务器 4 启动,id 值大小胜出,但是不超过半数投票,此时 4 为 Folloer
-
服务器 5 同 4, Folloer
5.2 ZAB 一致性协议
分布式数据会有一致性问题
- 数据复制到分布式部署的多台机器中
- 负载均衡技术,让分布在不同机器中的数据副本全都可以对外提供服务
数据的多副本,以及多副本同时读写,会导致一致性问题
ZAB(ZooKeeper Atomic Broadcast, 原子消息广播协议)
主备模式,保证一致性
所有的数据都由主进程处理,然后复制给副本进程
ZAB 会将服务器数据的状态变更以事务 Proposal 的形式广播到所有的副本进程上,ZAB协议能够保证了事务操作的一个全局的变更更序号(ZXID)
广播消息
ZAB 协议广播消息过程,类似于 二阶段提交过程
-
对于 client 发送的 写请求,全部由 Leader 接收, Leader 将请求封装为 事务 Proposal,分发给所有 Follower
Leader 会给这个事务分配一个全局递增的唯一 ID, 即 事务 ID (ZXID)
ZAB 协议要求保证事务有序 -
Follower 反馈 ACK
-
如果超过半数 Follower 反馈 ACK,则执行 Commit 操作
先提交自己,再发送 Commit 给所有 Follower
不能正常反馈 Follower 恢复正常后会进入数据同步阶段最终与 Leader 保持一致
Leader 宕机奔溃
Leader 宕机后,选举新 Leader
ZAB 协议的选举算法,能够确保 Leader 提交的事务被集体接收,并且丢弃还没有提交的事务
因此,选举机制保证选举出的新 Leader 拥有集群中所有节点最大事务编号(ZXID) 的事务
六、应用实践
6.1 服务器状态监听
应用:服务器动态上下线监听
分布式系统中,主节点会有多台
主节点因为任何原因出现宕机或者下线,客户端都要能实时感知到主节点服务器的状态
案例:
-
client 向服务器发送请求,获取时间
-
服务器为 client 请求创建 zk 节点
-
在节点内启动 main 函数,接收 client 请求后,返回时间
6.2 分布式锁
锁
对变量或者堆代码块做同步,即为 锁
目的是实现多个线程在一个时刻内,同一个代码块只能有一个线程可执行
单机程序中,多个线程可以同时改变某个变量时,为了保证线程安全,需要对变量 或 代码做同步,使其在改变变量时能够串行执行消除并发修改变量
分布式锁
对于分布式程序,多台机器上的多个线程,有了线程锁,但是由于操作的是同一个数据库, 分布式程序运行在不同机器的 JVM 里,线程锁无法作用与分布式系统
此时,需要分布式锁
在整个系统提供一个全局、唯一的锁
在分布式系统中的每个机器需要获取到该锁,才能执行后续的逻辑操作
zk 实现分布式锁
- 利用 zk 创建临时顺序节点,实现分布式锁
思路:
- 分布式锁,就是指定目录下序号最小的临时顺序节点
- 多个系统的多个线程都要在此目录下创建临时顺序节点
- 每个线程都先创建临时顺序节点,然后获取当前目录下最小序号的节点,判断最小节点是不是当前节点。如果是,则获取到锁,不是,则获取锁失败
- 获取锁失败的线程,获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除的时候,此线程会收到通知,代表获取到锁
七、Hadoop HA
7.1 High Available
高可用,消除单点故障
- HDFS 的 HA
- YARN 的 HA
搭建 两个 NN 的集群,消除单点故障 (Active NN / Standby NN)
7.2 HDFS - HA
-
元数据
两个 NN 各自保存一份元数据
Edits 日志只有 Active NN 可以写操作
两个 NN 都可以读取 Edits
-
需要一个 状态管理功能模块
实现一个 zkfailover 常驻在 两个 NN 节点上, 负责监控各自 NN 节点
利用 zk 进行状态标识,由 zkfailover 负责切换状态,切换过程需要防止 brain split 现象发生
集群中同时出现连个 Active NN
-
两个 NN 之间 ssh 免密登录
-
两个 NN 隔离,同一时刻只有一个 NN 对外提供服务
工作机制
HDFS - HA 的自动故障转移的实现是通过为 HDFS 部署两个新组件
- zookeeper, 维护少量协调数据,通知客户端数据的改变,监控客户端故障
- ZkFailoverController
自动故障转移
ZK 部分:
-
故障检测
-
现役 NN 选择
ZKC 部分:
是 ZooKeeper 的客户端
-
健康监测
-
ZK 会话管理
-
基于 ZK 选择
7.3 HDFS - HA 集群配置
集群规划
linux121 | linux122 | linux123 |
---|---|---|
NN | NN | |
JournalNode | JournalNode | JournalNode |
DN | DN | DN |
ZK | ZK | ZK |
ResourceManager | ||
NM | NM | NM |
启动 zk 集群
停止原先 HDFS 集群
stop-dfs.sh
创建 HA 目录,拷贝愿 hadoop 目录到 HA 目录,删除里面的 data 目录
mkdir /opt/lagou/servers/ha
cp -r /opt/lagou/servers/hadoop-2.9.2 /opt/lagou/servers/ha
rm -rf /opt/lagou/servers/ha/hadoop-2.9.2/data
配置 hdfs-site.xml
<property>
<name>dfs.nameservices</name>
<value>lagoucluster</value>
</property>
<property>
<name>dfs.ha.namenodes.lagoucluster</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.lagoucluster.nn1</name> <value>linux121:9000</value>
</property>
<property>
<name>dfs.namenode.rpc-address.lagoucluster.nn2</name>
<value>linux122:9000</value>
</property>
<property>
<name>dfs.namenode.http-address.lagoucluster.nn1</name> <value>linux121:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.lagoucluster.nn2</name>
<value>linux122:50070</value>
</property>
<property>
<!-- edits 贡献目录 -->
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://linux121:8485;linux122:8485;linux123:8485/lagou</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.lagoucluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyPr ovider</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/journalnode</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
配置 core-site.xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://lagoucluster</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/lagou/servers/ha/hadoop-2.9.2/data/tmp</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>linux121:2181,linux122:2181,linux123:2181</value>
</property>
拷贝配置好的 hadoop ha 环境 到其他节点
启动 HDFS - HA 集群
各个 JournalNode 节点上,启动 JN 服务
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start journalnode
格式化 NN 1, 并启动
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -format
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/hadoop-daemon.sh start namenode
NN 2 同步 NN 1 的元数据信息
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs namenode -bootstrapStandby
NN 1 上初始化 ZKFC
/opt/lagou/servers/ha/hadoop-2.9.2/bin/hdfs zkfc -formatZK
NN 1 上启动集群
/opt/lagou/servers/ha/hadoop-2.9.2/sbin/start-dfs.sh
验证
kill Active NN 进程
7.4 Yarn - HA 配置
Yarn - HA 不需要像 HDFS 那样同步元数据
集群规划
linux121 | linux122 | linux123 |
---|---|---|
NN | NN | |
JournalNode | JournalNode | JournalNode |
DN | DN | DN |
ZK | ZK | ZK |
ResourceManager | ResourceManager | |
NM | NM | NM |
配置 yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--启⽤用resourcemanager ha-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!--声明两台resourcemanager的地址-->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster-yarn</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>linux122</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>linux123</value>
</property>
<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>linux121:2181,linux122:2181,linux123:2181</value>
</property>
<!--启⽤用⾃自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</ value>
</property>
</configuration>
同步到其他节点
启动 HDFS
sbin/start-yarn.sh
HBase
一个分布式海量列式非关系型
数据库系统
提供超大规模数据集的实时随机读写
列式存储的优点:减少空值,减少存储空间占用
特点:
- 海量存储,底层是 HDFS
- 列式存储, 基于列族存储
- 易扩展
- 高并发
- 数据可以有多个版本值,根据版本号区分(版本号是插入数据的时间戳)
- 数据类型单一
应用:
交通、金融、电商、电信
适合海量明细数据的存储,并且需要很好的查询性能
一、HBase shell 操作
模糊查询用 FILTER 参数 => “函数名(参数)”, 多列族用 COLUMNS 参数
- 进入 shell
hbase shell
- 帮助命令
help
- 查看数据库中国呢有哪些表
list
- 创建 lagou 表,包含 base_info, extra-info 两个列族
create 'lagou', 'base_info', 'extra_info'
-- 或者指定 列族信息
create 'lagou', {NAME => 'base_info', VERSIONS => '3'}, {NAME => 'extra_info', VERSIONS => '3'}
VERSIONS 表示此单元格内的数据可以保留最近的 3 个版本
- 添加数据
-- row key 为 rk1, 列族 base_info 中添加 name 列标识符,值为 wang
put 'lagou', 'rk1', 'base_info:name', 'wang'
-- row key 为 rk1, 列族 base_info 中添加 age 列标识符,值为 30
put 'lagou', 'rk1', 'base_info:age', 30
-- row key 为 rk1, 列族 extra_info 中添加 address 列标识符,值为 shanghai
put 'lagou', 'rk1', 'extra_info:address', 'shanghai'
- 查询数据
-- 查询表中 row key 为 rk1 的所有信息
get 'lagou', 'rk1'
-- 查看 row key 下面的某个列族的信息
get 'lagou', 'rk1', 'base_info'
-- 查看 row key 指定列族中字段的值
get 'lagou', 'rk1', 'base_info:name', 'extra_info:address'
-- 或者
get 'lagou', 'rk1', {COLUMN => ['base_info:name', 'extra_info:address']}
-- 指定 row key 与 列族中列值,进行查询 binary 二分查找
get 'lagou', 'rk1', {FILTER => "ValueFilter(=, 'binary:wang')"}
-- 对 列名 模糊查询 substring
get 'lagou', 'rk1', {FILTER => "QualifierFilter(=, 'substring:a')"}
-- 查询表中所有信息
scan 'lagou'
-- 列族查询
scan 'lagou', {COLUMNS => 'base_info'}
-- scan 也可以指定 VERSIONS 参数, RAW 参数等
scan 'lagou', {COLUMNS => 'base_info', RAW => true, VERSIONS => 3}
-- 查询多个列族里,对列名模糊查询
scan 'lagou', {COLUMNS => ['base_info', 'extra_info'], FILTER => "QualifierFilter(=, 'substring:a')"}
-- 查询 rowkey
-- 顺序查询
scan 'lagou', {COLUMNS => 'base_info', STARTROW => 'rk1', ENDROW => 'rk3'}
-- 模糊查询
scan 'lagou', {FILTER => "PrefixFilter('rk')"}
-- 更新数据
-- 同插入数据
-- 更新数据值
put 'lagou', 'rk1', 'base_info:name', 'liang'
-- 删除数据和表
-- 指定 rowkey 以及 列名,删除
delete 'lagou', 'rk1', 'base_info:name'
-- 指定 rowkey 以及 列名 和 时间戳信息 删除
delete 'lagou', 'rk1', 'base_info:name', 1634301287482
-- 删除列族
alter 'lagou', 'delete' => 'base_info'
-- 清空表数据
truncate 'lagou'
-- 删除表
-- 先 disable, 再 drop
disable 'lagou'
drop 'lagou'
二、HBase 原理
2.1 HBase 读数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9FWrsTxy-1634397879870)(HBase-readdata.png)]
HBase 读操作
-
首先从 zk 找到 meta 表的region位置,然后读取 meta 表中的数据,meta 表中存储了用户表的 region 信息
-
根据要查询的 namespace、表名和 rowkey 信息。找到写入数据对应的 region 信息
-
找到这个 region 对应的 regionServer,然后发送请求
-
查找对应的 region
-
先从 memstore 查找数据,如果没有,再从 BlockCache 上读取
HBase 上 Regionserver 的内存分为两个部分
-
一部分作为 Memstore 主要用来写
-
另一部分作为 BlockCache 主要用于读数据
-
-
如果 BlockCache 中也没有找到,再到 StoreFile 上进行读取
从 storeFile 中读取到数据之后,不是直接把结果数据返回给客户端,而是把数据先写入到 BlockCache 中,目的是为了加快后续的查询; 然后在返回结果给客户端。
2.2 HBase 写数据
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OwzSbC0w-1634397879873)(HBase-writedata.png)]
- 首先从 zk 找到 meta 表的 region 位置,然后读取 meta 表中的数据,meta 表中存储了用户表的 region 信息
- 根据 namespace、表名 和 rowkey 信息。找到写入数据对应的 region 信息
- 找到这个 region 对应的 regionServer,然后发送请求
- 把数据分别写到 HLog(write ahead log) 和 memstore 各一份
- memstore 达到阈值后把数据刷到磁盘,生成 storeFile 文件
- 删除 HLog 中的历史数据
三、HBase API
package com.lagou.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class HbaseClientDemo {
Configuration conf = null;
Connection conn = null;
@Before
public void init() throws IOException {
// 获取一个配置文件对象
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "linux121,linux122,linux123");
conf.set("hbase.zookeeper.property.clientPort", "2181");
// 通过 conf 获取到 hbase 的集群链接
conn = ConnectionFactory.createConnection(conf);
}
// 创建一张 hbase 表
@Test
public void creatTable() throws IOException {
// 获取 HbaseAdmin 对象
HBaseAdmin admin = (HBaseAdmin) conn.getAdmin();
// 表描述器
HTableDescriptor teacher = new HTableDescriptor(TableName.valueOf("teacher"));
// 设置列族描述器
teacher.addFamily(new HColumnDescriptor("info"));
// 执行创建操作
admin.createTable(teacher);
System.out.println("table teacher is created ...");
}
// 插入数据
@Test
public void putData() throws IOException {
// 获取 table 对象
Table t1 = conn.getTable(TableName.valueOf("teacher"));
// 创建 put 对象,设定 rowkey
Put put = new Put(Bytes.toBytes("110"));
// 列族,列,值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("addr"), Bytes.toBytes("Beijing"));
// 插入数据到 表 t1
t1.put(put);
// 插入多条时,用 list<puts> 遍历批量插入
// 关闭对象
t1.close();
System.out.println("data is inserted to table ...");
}
// 删除数据
@Test
public void deleteData() throws IOException {
// 获取 table 对象
Table t1 = conn.getTable(TableName.valueOf("teacher"));
// 创建 delete 对象, 需要指定 rowkey
Delete delete = new Delete(Bytes.toBytes("110"));
t1.delete(delete);
// 关闭 table 对象
t1.close();
System.out.println("data is deleted ...");
}
// 查询某个列族数据
@Test
public void getDataByColumnFamily() throws IOException {
// 获取 table 对象
Table t1 = conn.getTable(TableName.valueOf("teacher"));
// 创建 get 对象,指定 rowkey
Get get = new Get(Bytes.toBytes("110"));
// 指定 列族信息
get.addFamily(Bytes.toBytes("info"));
// 查询
Result result = t1.get(get);
// 获取 result 中的所有 cell 对象
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value);
}
t1.close();
}
@Test
public void scanAllData() throws IOException {
// 获取表
Table t1 = conn.getTable(TableName.valueOf("teacher"));
// 创建 scan 对象
Scan scan = new Scan();
//
ResultScanner resultScanner = t1.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value);
}
}
t1.close();
}
// 通过 startRowKey 和 endRowKey 进行扫描
@Test
public void scanRowKey() throws IOException {
// 获取表
Table t1 = conn.getTable(TableName.valueOf("teacher"));
// scan 对象
Scan scan = new Scan();
scan.setStartRow("001".getBytes());
scan.setStopRow("2".getBytes());
ResultScanner resultScanner = t1.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
String cf = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
System.out.println(rowkey + "\t" + cf + "\t" + column + "\t" + value);
}
}
t1.close();
}
@After
public void release() {
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Azkaban
1. 工作流调度系统
⼀个完整的数据分析系统通常都是由⼤量任务单元组成
- shell 脚本程序
- java 程序
- mapreduce 程序
- hive 脚本等
各任务单元有前后依赖关系,组成工作流
如:某个业务系统每天产生 20 G 原始数据,每天都要进行处理,逻辑如下:
- 通过 hadoop 将原始数据同步到 HDFS
- MR 计算框架,对原始数据进行转换,生成的数据以分区表的形式存储到多张 hive 表中
- 对 hive 中多张表进行 join,得到一个明细数据大表
- 对明细数据进行各种统计分析,得到结果报表信息
- 将统计分析结果同步到业务系统中,供业务调用
对于这类复杂的工作流,crontab 定时任务无法满足需求,就需要一个调度系统来支持
2. Azkaban 使用
- 创建 job 描述文件
vi command.job
# 内容
type=command
command=echo 'hello'
- 打包 job 资源文件
zip command.job
-
在 azkaban 的 web 界面创建 project 并伤处 job 压缩包
-
执行 job
3. job 的类型
- 前后工作流依赖的 job
# 第二个 job 依赖 第一个 job
# 第一个 job, foo.job
type=command
command=echo 'foo'
# 第二个 job, bar.job
type=command
dependencies=foo
command=echo 'bar'
打包时,将这两个 job 文件打包到一起
- HDFS 任务调度
# 将 hdfs 命令写入 job 文件, fs.job
type=command
command=/opt/lagou/servers/hadoop-2.9.2/bin/hdfs dfs -mkdir /azkaban
- MR 任务调度
# 需要 mr 程序的 jar 包
# mrwc.job
type=command
command=/opt/lagou/servers/hadoop-2.9.2/bin/hadoop jar hadoop-mapreduce-examples-2.9.2.jar wordcount /wordcount/input /wordcount/azout
- Hive 脚本任务调度
# hive 脚本 test.hql
use default;
drop table aztest;
create table aztest(id int, name string) row format delimited fields terminated by ',';
# hivef.job
type=command
command=/opt/lagou/servers/hive-2.3.7/bin/hive -f 'test.hql'
- 定时任务调度
在 Azkaban 的 web 界面配置 project 的定时调度信息