最近上面想要通过flink-cdc来实现mysql数据实时同步至Elasticsearch,由于可以通过sql来实现数据同步,难度和投入都相对较小。于是自己研究了下flink,由于flink-cdc 现在最高支持flink1.13的版本,所有本文使用1.13.5的版本演示部署flink集群。
flink大数据框架,官方推荐了三种集群模式:
1.standalone
2.kubernetes
3.YARN
本文通过standlone+zookeeper构建flink HA Service
项目环境:
操作系统均为ubuntu20.1
master | 192.168.1.101 |
worker1 | 192.168.1.102 |
worker2 | 192.168.1.103 |
软件版本:
flink | flink-1.13.5 |
zookeeper | apache-zookeeper-3.5.9-bin.tar.gz |
官方推荐jdk是8或者11
jdk | openjdk-11.0.2_linux-x64_bin.tar.gz |
1.所有的主机安装好jdk
tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /usr/local/
cd /usr/local
mv jdk-11.0.2 jdk
echo "export JAVA_HOME=/usr/local/jdk">>/etc/profile
echo 'export PATH=$PATH:$JAVA_HOME/bin'>>/etc/profile
source /etc/profile
java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
2.做免密登录(重要的步骤)
由于大部分人都会做免密所以不在阐述详细的步骤。
3.安装zookeeper
3.1master主机上安装zookeeper
mkdir -p /data/zookeeper
echo 1 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg
3.2worker1上安装zookeeper
mkdir -p /data/zookeeper
echo 2 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg
3.3worker2上安装zookeeper
mkdir -p /data/zookeeper
echo 3 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg
zookeeper的配置文件如下:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888
3.4依次启动三个节点上的zookeeper
cd /data/apache-zookeeper-3.5.9-bin
./bin/zkServer.sh --config conf start
4.安装flink(切记三个主机上的flink的安装路劲需要一致,推荐全部安装在/opt这个目录,如果是云服务器推荐安装在data目录)
4.1三个节点上面依次执行下面命令。
tar xf flink-1.13.5-bin-scala_2.12.tgz -C /data/
echo "192.168.1.101:8081" > /data/flink-1.13.5/conf/master
echo -e "192.168.1.102\n192.168.1.103" > /data/flink-1.13.5/conf/worker
4.2 请保证三台服务器的配置文件如下。
jobmanager.rpc.address: 192.168.1.101
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 8600m
taskmanager.memory.process.size: 8728m
taskmanager.numberOfTaskSlots: 32
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir: file:///flink_store/recovery
high-availability.zookeeper.quorum: 192.168.1.101:2181,192.168.10.102:2181,192.168.1.103:2181
state.backend: filesystem
state.checkpoints.dir: file:///flink-checkpoints
state.savepoints.dir: file:///flink-savepoints
state.backend.incremental: true
execution.checkpointing.interval: 100
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
请注意/flink_store/recovery, /flink-checkpoints,/flink-savepoints都是nfs挂载目录。
在master上执行:
cd /data/flink-1.13.5
./bin/start-cluster.sh
为了简化操作,提供一个简易功能不完全的shell脚本
#!/bin/bash
cur_dir=`dirname $0`
root_dir=`cd $cur_dir|pwd`
echo $root_dir
source /etc/profile
flink_tar=flink-1.13.5-bin-scala_2.12.tgz
flink_dir=flink-1.13.5
zookeeper_tar=apache-zookeeper-3.5.9-bin.tar.gz
zookeeper_dir=apache-zookeeper-3.5.9-bin
nfs_host= #你的nfs服务器
master=$1
host2=$2
host3=$3
id=$4
function jdk_install(){
java -version > /dev/null
if [ $? -ne 0 ];then
cd $root_dir
tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /usr/local/
cd /usr/local
mv jdk-11.0.2 jdk
echo "export JAVA_HOME=/usr/local/jdk">>/etc/profile
echo 'export PATH=$PATH:$JAVA_HOME/bin'>>/etc/profile
fi
}
function flink_install(){
cd $root_dir
tar xf $flink_tar -C /data/
cp flink-sql-connector-elasticsearch6_2.11-1.13.5.jar /data/$flink_dir/lib/
cp flink-sql-connector-mysql-cdc-2.1.1.jar /data/$flink_dir/lib/
sed -i -e "s/{master}/${master}/g" -e "s/{host2}/${host2}/" -e "s/{host3}/${host3}/" flink-conf.yaml
cp -f flink-conf.yaml /data/$flink_dir/conf/
echo "${master}:8081" >> /data/$flink_dir/conf/master
echo "${host2}:8081" >> /data/$flink_dir/conf/master
echo "${host2}" >> /data/$flink_dir/conf/worker
echo "${host3}" >> /data/$flink_dir/conf/worker
}
function zookeeper_install(){
cd $root_dir
tar xf $zookeeper_tar -C /data/
cd /data/$zookeeper_dir/conf
mkdir /data/zookeeper
echo $id > /data/zookeeper/myid
cp zoo_sample.cfg zoo.cfg
echo "server.1=${master}:2888:3888" >> zoo.cfg
echo "server.2=${host2}:2888:3888" >> zoo.cfg
echo "server.3=${host3}:2888:3888" >> zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
}
function mount_nfs(){
apt install -y nfs-common
mkdir /flink_job
mkdir /flink_store
mkdir /flink-checkpoints
mkdir /flink-savepoints
echo "${nfs_host}:/flink/flink_job /flink_job nfs4 rw 0 0" >> /etc/fstab
echo "${nfs_host}:/flink/flink-checkpoints /flink-checkpoints nfs4 rw 0 0" >> /etc/fstab
echo "${nfs_host}:/flink/flink-savepoints /flink-savepoints nfs4 rw 0 0" >> /etc/fstab
echo "${nfs_host}:/flink/flink_store /flink_store/ nfs4 rw 0 0" >> /etc/fstab
mount -a
}
if [ -z $master ];then
exit
fi
if [ -z $host2 ];then
exit
fi
if [ -z $host3 ];then
exit
fi
if [ -z $4 ];then
exit
fi
jdk_install
flink_install
zookeeper_install
mount_nfs
了解更多关于flink和flink-cdc:
flink: Docs | Apache Flink
flink-cdc: Welcome to Flink CDC — Flink CDC documentation