flink-standalone部署。

最近上面想要通过flink-cdc来实现mysql数据实时同步至Elasticsearch,由于可以通过sql来实现数据同步,难度和投入都相对较小。于是自己研究了下flink,由于flink-cdc 现在最高支持flink1.13的版本,所有本文使用1.13.5的版本演示部署flink集群。

flink大数据框架,官方推荐了三种集群模式:

1.standalone

2.kubernetes

3.YARN

本文通过standlone+zookeeper构建flink HA Service

项目环境:

 操作系统均为ubuntu20.1

master 192.168.1.101
worker1 192.168.1.102
worker2 192.168.1.103

软件版本:

flink flink-1.13.5
zookeeper apache-zookeeper-3.5.9-bin.tar.gz

官方推荐jdk是8或者11

jdk openjdk-11.0.2_linux-x64_bin.tar.gz

1.所有的主机安装好jdk

tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /usr/local/
cd /usr/local
mv jdk-11.0.2 jdk
echo "export JAVA_HOME=/usr/local/jdk">>/etc/profile
echo 'export PATH=$PATH:$JAVA_HOME/bin'>>/etc/profile
source /etc/profile	
java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

2.做免密登录(重要的步骤)

由于大部分人都会做免密所以不在阐述详细的步骤。

3.安装zookeeper

3.1master主机上安装zookeeper

mkdir -p /data/zookeeper
echo 1 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg

3.2worker1上安装zookeeper

mkdir -p /data/zookeeper
echo 2 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg

3.3worker2上安装zookeeper

mkdir -p /data/zookeeper
echo 3 > /data/zookeeper/myid
tar xf apache-zookeeper-3.5.9-bin.tar.gz -C /data/
cd /data/apache-zookeeper-3.5.9
cp zoo_sample.cfg zoo.cfg
sed -i "s/tmp/data/" zoo.cfg
echo "server.1=192.168.1.101:2888:3888" >> zoo.cfg
echo "server.2=192.168.1.102:2888:3888" >> zoo.cfg
echo "server.3=192.168.1.103:2888:3888" >> zoo.cfg

zookeeper的配置文件如下:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

 

3.4依次启动三个节点上的zookeeper

cd /data/apache-zookeeper-3.5.9-bin
./bin/zkServer.sh --config conf start

4.安装flink(切记三个主机上的flink的安装路劲需要一致,推荐全部安装在/opt这个目录,如果是云服务器推荐安装在data目录)

4.1三个节点上面依次执行下面命令。

tar xf flink-1.13.5-bin-scala_2.12.tgz -C /data/
echo "192.168.1.101:8081" > /data/flink-1.13.5/conf/master
echo -e "192.168.1.102\n192.168.1.103" > /data/flink-1.13.5/conf/worker
 

 4.2 请保证三台服务器的配置文件如下。

jobmanager.rpc.address: 192.168.1.101
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 8600m
taskmanager.memory.process.size: 8728m
taskmanager.numberOfTaskSlots: 32
parallelism.default: 1
high-availability: zookeeper
high-availability.storageDir: file:///flink_store/recovery
high-availability.zookeeper.quorum: 192.168.1.101:2181,192.168.10.102:2181,192.168.1.103:2181
state.backend: filesystem
state.checkpoints.dir: file:///flink-checkpoints
state.savepoints.dir: file:///flink-savepoints
state.backend.incremental: true
execution.checkpointing.interval: 100
jobmanager.execution.failover-strategy: region
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb

请注意/flink_store/recovery, /flink-checkpoints,/flink-savepoints都是nfs挂载目录。

在master上执行:

cd /data/flink-1.13.5
./bin/start-cluster.sh

为了简化操作,提供一个简易功能不完全的shell脚本

#!/bin/bash

cur_dir=`dirname $0`
root_dir=`cd $cur_dir|pwd`
echo $root_dir
source /etc/profile
flink_tar=flink-1.13.5-bin-scala_2.12.tgz
flink_dir=flink-1.13.5
zookeeper_tar=apache-zookeeper-3.5.9-bin.tar.gz
zookeeper_dir=apache-zookeeper-3.5.9-bin
nfs_host= #你的nfs服务器
master=$1
host2=$2
host3=$3
id=$4


function jdk_install(){
   java -version > /dev/null
   if [ $? -ne 0 ];then
	cd $root_dir
        tar xf openjdk-11.0.2_linux-x64_bin.tar.gz -C /usr/local/
        cd /usr/local
        mv jdk-11.0.2 jdk
        echo "export JAVA_HOME=/usr/local/jdk">>/etc/profile
        echo 'export PATH=$PATH:$JAVA_HOME/bin'>>/etc/profile	
   fi
}

function flink_install(){
   cd $root_dir
   tar xf $flink_tar -C /data/
   cp flink-sql-connector-elasticsearch6_2.11-1.13.5.jar /data/$flink_dir/lib/
   cp flink-sql-connector-mysql-cdc-2.1.1.jar /data/$flink_dir/lib/
   sed -i -e "s/{master}/${master}/g" -e "s/{host2}/${host2}/" -e "s/{host3}/${host3}/" flink-conf.yaml
   cp -f flink-conf.yaml /data/$flink_dir/conf/
   echo "${master}:8081" >> /data/$flink_dir/conf/master
   echo "${host2}:8081" >> /data/$flink_dir/conf/master
   echo "${host2}" >> /data/$flink_dir/conf/worker
   echo "${host3}" >> /data/$flink_dir/conf/worker
}

function zookeeper_install(){
   cd $root_dir
   tar xf $zookeeper_tar -C /data/
   cd /data/$zookeeper_dir/conf
   mkdir /data/zookeeper
   echo $id > /data/zookeeper/myid
   cp zoo_sample.cfg zoo.cfg
   echo "server.1=${master}:2888:3888" >> zoo.cfg
   echo "server.2=${host2}:2888:3888" >> zoo.cfg
   echo "server.3=${host3}:2888:3888" >> zoo.cfg
   sed -i "s/tmp/data/" zoo.cfg
}

function mount_nfs(){
	apt install -y nfs-common
	mkdir /flink_job
	mkdir /flink_store
	mkdir /flink-checkpoints
	mkdir /flink-savepoints
	echo "${nfs_host}:/flink/flink_job         /flink_job nfs4 rw  0 0" >> /etc/fstab
	echo "${nfs_host}:/flink/flink-checkpoints /flink-checkpoints nfs4 rw 0 0" >> /etc/fstab
	echo "${nfs_host}:/flink/flink-savepoints /flink-savepoints nfs4 rw 0 0" >> /etc/fstab
	echo "${nfs_host}:/flink/flink_store /flink_store/  nfs4 rw 0 0" >> /etc/fstab
	mount -a
}

if [ -z $master ];then
   exit
fi
if [ -z $host2 ];then
   exit
fi
if [ -z $host3 ];then
   exit
fi
if [ -z $4 ];then
   exit
fi
jdk_install
flink_install
zookeeper_install
mount_nfs

了解更多关于flink和flink-cdc:

flink: Docs | Apache Flink

flink-cdc: Welcome to Flink CDC — Flink CDC documentation

上一篇:【阿里云服务器ECS】阿里云服务器ECS部署记录


下一篇:flink命令行运行报错