使用并发 ssh 连接来提升捞日志脚本执行效率

问题背景

公司有个简单粗暴的日志服务,它部署在多台机器实例上,收集的日志记录在每台机器本地硬盘,写一个小时自动切换日志文件,硬盘空间写满了自动回卷,大约可以保存两三天的历史数据。为什么说它粗暴呢?原来它不提供任何查询日志的接口,想要获取日志唯一的办法就是直接查日志文件:

  • ssh 执行 grep 得到结果
  • scp 将结果复制到本地

最后将这些文件拼接在一起作为最终结果。有个前辈写过一个脚本,不过比较简单,基本就是一个 while 循环里串行查询每台实例。获取一次日志需要将近 1 个小时,严重拖慢了开发人员的节奏。作为一个资深 coder,时间是最富贵的财富,婶可忍叔不可忍,于是决定对脚本作一番改造以提升查询效率。

ssh 远程脚本

在开始改造前,先看下原脚本的执行逻辑:

#!/bin/sh

date=$1
type=$2

# 1st, prepare the grep script
echo "#! /bin/sh" > bin/work_${type}.sh
echo "grep type=${type} /home/log/update.log.${date} >/tmp/work_${type}.log.${date}" >> bin/work_${type}.sh

# 2nd, send the script to each machine and run it
for i in $(get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq)
do
    echo $i
    scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" bin/work_${type}.sh $i:/tmp/
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "nohup sh /tmp/work_${type}.sh >/dev/null 2>err.log &"
done

# wait 'grep' a while
sleep 30
rm bin/work_${type}.sh

# 3rd, get log from each machine
for i in $(get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq)
do
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "cat /tmp/work_${type}.log.${date}" >> data/work_${type}.log.${date}
    # delete to free spaces
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $i "rm /tmp/work_${type}.log.${date} &"
done

针对上面的脚本,做个简单说明:

  • 目标日志位于远程机器 /home/log/ 目录,命名规范为 update.log.yyyyddmmhh,变量名 date 需要精确到小时,例如 2021010208;
  • 本地生成的 grep 脚本命名规范为 work_xxxx.sh,其中 xxxx 表示日志类型,所有日志记录都包含一个 type=xxx 的字段,因此一般通过 type 来过滤无关日志;
  • 机器实例需要使用特殊的 get_instance_by_service 命令获取,这是平台提供的命令,它接收一个 group 参数,返回与日志相关的 500 多台实例;
  • 上行时将 grep 脚本上传到远程机器的 /tmp 目录,生成的结果放置在实例的 /tmp 目录,命名规范为 work_xxxx.log.yyyyddmmhh,xxxx 表示日志类型,后缀为日期;
  • 下行时将结果下载到 data 目录,命名规范为 work_xxxx.log.yyyyddmmhh,因为是串行执行,所以结果数据是直接追加到这个文件的

另外说明一下 ssh 与 scp 的几个参数:

  • -o 用于覆盖 ssh 配置文件的设置,这里主要用到的是:
    • StrictHostKeyChecking:no - 不进行 known-host 的严格检查,很多实例是第一次访问,设置这个可以避免交互式提问从而卡死 shell 脚本;
    • PasswordAuthentication:no - 不检验 ssh 密码,执行查询脚本的机器已经同各个实例建立了证书信任,所以可以这样设置,同样是为了防止卡死 shell 脚本的;
    • ConnectTimeout:3 - ssh 连接超时秒数,防止因机器宕机或网络不通导致长时间等待;
  • -q:打开 quiet 模式,减少 ssh 输出
  • -x:关闭 X11 Forwarding,一般用于 GUI 程序,这里是脚本所以不需要
  • -T:禁用伪终端分配,伪终端一般用于交互式 console,这里不需要

总结一下远程 ssh 执行脚本和 scp 的语法:

  • ssh host script
  • scp local-path host:remote-path
  • scp host:remote-path local-path

其中 scp 也可以将远程文件复制到本地 ,不过这里需要将数据追加到已有文件, 所以使用了 ssh+cat 的实现方式。其实 ssh 的那些选项都可以省略,因为机器之间已经预先建立好了证书信任关系,这里只是 in case。重点说明一下 ssh 执行位于远程机器的脚本时需要注意的点:

  • >/dev/null 2>err.log:重定向 stdout/stderr 到错误文件或 /dev/null;
  • nohup:ssh tty 退出后继续执行;
  • &:后台执行;

其中比较重要的是第 1 条和第 3 条,nohup 亲测可省略。关于这方面的验证,可以查看文末链接。大家记住这个结论即可,后面会用到。

性能瓶颈

为了验证脚本执行慢确实是由 ssh 串行引起的,这里做了一个实验,执行以下简单的脚本,并记录整体耗时:

$ time ssh xxxx-xxxxxxxx-xxx-xx-xxx.xxxx 'pwd'
/home/rd

real	0m2.145s
user	0m0.026s
sys	0m0.008s

ssh 的第一个参数是实例机器名,由 get_instance_by_service 返回的实例列表中随便选取一个;ssh 第二个参数是要远程执行的命令,为了测量 ssh 时间这里使用了 pwd 命令,它的耗时基本可以忽略。

time 输出证明一次 ssh 交互大概在 2 秒左右,参考上一节中的脚本,可以得出以下脚本耗时公式:

total=(ssh_time + scp_time) * instance_count + 30 + ssh_time * 2 * instance_count
     = ssh_time * 4 * instance_count + 30

这里出于计算方便考虑,忽略 scp 过程中文件传输的时间 (单个文件都比较小),将它的耗时约等于 ssh 耗时,经过推导得到了第二个等式。令 ssh_time = 2,instance_count = 500,那么执行一次脚本就需要 4030 ≈ 1.12 小时,看来光消耗在连接上的时间就超过 1 小时了,怪不得这么慢呢!

预读取实例列表

读取实例列表其实比较快,统计了一下也就在几十毫秒之间:

$ time get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx
……
real	0m0.011s
user	0m0.003s
sys	0m0.006s

但当遇到连不通的实例时,一下就要耗掉 3 秒,统计了一下,500 多台实例中只有 300 多台可以连通 (amazing~),相当于每次有 200 * 4 * 3 = 2400 秒,大约 40 分钟的时间是浪费在无效的机器上了。当然可以将等待时间缩小到 1 秒,时间就可以降到 10 分钟。但我连一秒也不愿意浪费,何必傻等这 10 分钟呢?通过提前检查哪些机器是可以连通的,可以节约这 40 分钟,具体的做法就是运行 check_instance.sh 这个脚本:

#!/bin/sh

ret=0
start_time=$(date +%s)
if [ -f instance.txt ]; then 
    mv instance.txt instance_old.txt
fi

for host in `get_instance_by_service xxxxx.xxx-xxxx-xxxxxxx-xxxxxx-xxxxxxx-xxxxxx-xxxxxxx.xxx.xxx | sort | uniq`
do
    echo "check $host"
    ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "pwd"
    ret=$?
    if [ $ret -eq 0 ]; then 
        # only record success instance
        echo "$host" >> instance.txt
    else 
        echo "error $ret"
    fi
done

end_time=$(date +%s)
cost_time=$(($end_time-$start_time))
echo "check done, total time spend: $(($cost_time/60))m $(($cost_time%60))s "

简单说明一下:

  • 脚本运行完成后将实例列表存储在本地 instance.txt 文件中
  • 如果之前已经存在这个文件,将备份到 instance.bak.txt,方便对比前后输出结果
  • 对每个机器实例,通过 ssh 运行 pwd 命令来检查它是否可以免密登录,ssh 相关选项和之间保持一致
  • 如果成功,记录在 instance.txt 文件;否则不记录,打印一条错误信息告知用户该实例无法连通
  • 最后输出本次检查耗时

注意,这个脚本仍然是串行的,下面是一次实际的运行耗时:

check done, total time spend: 60m 43s 

印证了之前的计算结果,光 ssh 连接就得耗费一个多小时。好在这个脚本很长时间才运行一次,耗时久还能接受。

预复制过滤脚本

第二块耗时就是上传 grep 过滤脚本到实例机器,平均消耗 ssh_time * instance_count = 600 秒 (有效实例按 300 计算)。照搬前面的思路,如果将这个脚本做成通用的并提前上传到各台实例,那么这一步的耗时也可以省下来了。来看下通用的过滤脚本 fetch_log.sh:

#!/bin/sh

if [ $# -ne 1 -a $# -ne 2 ]; then
    echo "Usage: fetch_log.sh keyword [date]"
    echo "       keyword: anything you think a keyword, support regular expression that grep accepts"
    echo "       date: 2021020308, if no date provide using log file currently writing"
    exit 1
fi

keyword=$1
# using keyword md5 as filename part to avoid file confliction
# and prevent keyword contain invalid character for path names...
keymd5=$(echo $keyword | md5sum | awk '{print $1}')

if [ $# -eq 1 ]; then 
    # if no date provide, using current log file 
    # one can only filter logs that not currently writing ago, 
    # now we can filter them by not providing date parameter.
    grep -a "${keyword}" /home/log/update.log >/tmp/work.${keymd5}.log
else 
    date=$2

    # using keyword md5 as filename part to avoid file confliction
    grep -a "${keyword}" /home/log/update.log.$date >/tmp/work.${date}.${keymd5}.log
fi

 简单说明一下:

  • 脚本接受两个参数,一个是 keyword,传递给 grep 的,不限于 type=xxx 的形式、可以指定任意过滤字符串;一个是精确到小时的日期,用于定位日志文件,如果不传递第二个参数,默认使用日志服务当前正在写的日志文件,它位于 /home/log/update.log,一般做一些实时的测试时可以利用这个特性,避免等待漫长的日志切换时间
  • 当多个用户同时执行脚本且指定了同样的时间时,这里使用 keyword 的 md5 作为文件名的一部分 (eg: /tmp/work.yyyymmddhh.xxxxxxxxxxxx),避免服务器文件冲突 (同时指定一个 keyword 的场景最好共享结果,另外这个不能防止本地文件冲突,因此不同用户最好有自己单独的脚本副本)
  • 脚本接收的参数由真正执行日志捞取的脚本传递,关于如何给远程脚本传递参数,稍后给出

干活的脚本有了,下面就来看一下负责上传脚本的 upload_fetch_log.sh:

#!/bin/sh

ret=0
start_time=$(date +%s)

if [ ! -f instance.txt ]; then 
    echo "generate reliable instance list first.."
    sh check_instance.sh
fi

# upload the fetch_log.sh to each machine
for host in `cat instance.txt`
do
    echo "uploading $host"
    scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" fetch_log.sh $host:/tmp/
    ret=$?
    if [ $ret -ne 0 ]; then 
        echo "error $ret"
    fi
done

end_time=$(date +%s)
cost_time=$(($end_time-$start_time))
echo "upload done, total time spend: $(($cost_time/60))m $(($cost_time%60))s "

做个简单说明:

  • 如果没有检测到 instance.txt  的存在,会自动拉起 check_instance.sh 生成实例列表,防止后面遍历失败
  • 遍历实例列表,针对每个机器实例,运行 scp 拷贝 fetch_log.sh 到 /tmp/ 目录备用
  • 如果出错,打印错误实例信息
  • 最后输出本次上传耗时

有两点需要注意

  • 因为依赖 instance.txt,所以需要在执行 check_instance.sh 后执行此脚本,否则会自动执行 check_instance.sh
  • 这个脚本仍然是串行的,好在只有每次更新 instance.txt 文件时才执行,耗时久可以接受

下面是一次实际的运行耗时:

upload done, total time spend: 10m 14s

看起来比 check_instance.sh 快多了,主要是只需要对可连通的机器执行 ssh 连接,速度会快很多。

这样一套组合拳下来,新的脚本不光节省了执行时间,还得到了更强大的过滤表达式、更及时的日志捞取,比之前好用不止一点点。

ssh 并发

上面做了足够多的铺垫,可以开始本文的重头戏了 —— ssh 连接的并发执行。其实聪明的读者已经看出来了,上面一顿忙活,也只是解决了 1/4 的耗时问题,还有三大耗时在这儿摆着:

  • 执行过滤脚本
  • 回传过滤结果
  • 删除过滤结果

而且这已经是在远程机异步执行了,如果同步执行那就更慢了。所以问题的关键不是 grep 慢,而是启动远程 grep 慢 (之前有过统计,大概是 2 秒左右)。如何节约这个时间呢?第一个想到的方案是并行执行 ssh,将启动 ssh 的过程也后台化 (&),这样 2 秒内就可以并行启动多个连接了,如果一次能将 300 台实例全部启动,时间就可以直接缩短到 2 秒,是不是很厉害!

当然了,考虑到并发连接上限、对日志服务的冲击等因素,最好不要一次启动那么多连接,如果一次能启动 10 个并发连接,那么 300 台实例需要 60 (300 / 10 * 2 ) 秒,也相当快了。不过各个批次之间,需要有一个等待操作,以保证开启下个批次前上个批次的脚本都执行完毕了,这就增大了复杂性。

不过对于第一步 (过滤) 而言,还没有回传文件的问题,相对来说简单一点,来看一下 exec_fetch_log.sh 脚本:

#!/bin/sh

if [ $# -ne 1 -a $# -ne 2 ]; then
    echo "Usage: exec_fetch_log.sh keyword [date]"
    echo "       keyword: anything you think a keyword,support regular expression that grep accepts"
    echo "       date: 2021020308, if no date provide using log file currently writing"
    exit 1
fi

if [ ! -f instance.txt ]; then 
    echo "generate reliable instance list first.."
    sh check_instance.sh
fi

n=0
batch_size=10
batch_num=0
batch_no=0
keyword=$1
# using keyword md5 as filename part to avoid file confliction
# and prevent keyword contain invalid character for path names...
keymd5=$(echo "$keyword" | md5sum | awk '{print $1}')
date=""
if [ $# -eq 1 ]; then 
    date=`date "+%Y%m%d%H"`
else
    date=$2
fi

echo -e "=======================================================================\nkeyword=$keyword; date=$date; keymd5=$keymd5" | tee -a error.log
start_time=$(date +%s)
echo "start grep from each machine"
for host in `cat instance.txt`
do
    batch_no=$(($n % $batch_size))
    batch_num=$(($n / $batch_size))
    if [ $batch_no -eq 0 -a $n -gt 0 ]; then 
        echo "wait batch $batch_num"
        wait
    fi

    echo "$(($n+1)): $host"
    if [ $# -eq 1 ]; then 
        ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "nohup sh /tmp/fetch_log.sh \"$keyword\" >/dev/null 2>err.log &" >> error.log 2>&1 &
    else 
        ssh -o "StrictHostKeyChecking no" -q -xT -o "PasswordAuthentication=no" -o "ConnectTimeout=3" $host "nohup sh /tmp/fetch_log.sh \"$keyword\" \"$date\" >/dev/null 2>err.log &" >> error.log 2>&1  &
    fi
    n=$(($n+1))
done

echo "wait last batch $(($batch_num+1))"
wait

……

做个简单说明:

  • 接收参数与 fetch_log.sh 保持一致,包括 Usage 打印的内容
  • 如果没有检测到 instance.txt  的存在,会自动拉起 check_instance.sh 生成实例列表,这个对于新同学会比较友好,自动建立依赖
  • batch_size 默认为 10,如果当前实例数 (n) 达到整批次,说明要开启一个新的批次了,此时需要先 wait 上个批次的所有进程,确保它们都完成了 ssh 执行,这一步很重要,否则会一次建立 300 多个连接,就不能实现之前说的分批运行目标
  • 分批运行每个实例的连接和之前非常像,唯一的区别在于末尾的 '>> error.log 2>&1 &', 这一句将 ssh 本身也异步执行了,是整个脚本的灵魂。异步执行会非常快的返回,不存在之前那 2 秒的排队问题了
  • 之前已经将过滤脚本上传到了远程实例的 /tmp 目录,所以这里直接调用 /tmp/fetch_log.sh
  • 远程 ssh 脚本传参比较直观,就是直接在脚本后面加相应的参数字符串,为防止参数中的空格中断参数解析,这里加了双引号 (在双引号中间需要 \ 释义)
  • 按是否有 date 参数做下区分,有的话会将 date 参数给到远程实例的 fetch_log.sh 脚本,没有的话不传递这样就会使用当前日志文件了
  • 从 while 循环结束时,通过 wait 等待最后一个批次的 ssh 执行完成,来保证所有连接都关闭了

ssh 并发的关键是批次控制,每个异步执行的 ssh 都将成为一个单独的子进程,通过 wait 等待子进程就可以完成批次的等待,不过这有一个前题 —— 并发脚本没有其它独立运行的子进程,换句话说,就是不能同时有其它异步执行的任务。当然了,也可以通过在数组中记录子进程的 pid 并挨个 wait 它们来实现,不过那样开发成本就太高了,这里没有采取,感兴趣的可以看下 man wait:

       wait [n ...]
              Wait  for  each  specified  process  and return its termination status.  Each n may be a process ID or a job
              specification; if a job spec is given, all processes in that job’s pipeline are waited for.   If  n  is  not
              given, all currently active child processes are waited for, and the return status is zero.  If n specifies a
              non-existent process or job, the return status is 127.  Otherwise, the return status is the exit  status  of
              the last process or job waited for.

老脚本的 ssh 异步其实只进行到了第一步 —— 远程执行,而 ssh 连接本身还是同步的,新脚本最大的改进是连 ssh 本身也异步了,并提供了并发数量控制,可以实现更极致的并发能力。如果有些人不在乎并发量,可以直接一个循环异步启动所有 ssh 连接,那样代码更简单。

并发数量也会随机器数量增多而增长,不过这里没有将这个参数暴露在外面,主要是防止一些人为了快而不择手段,对现在运行的日志服务造成影响 (当然了,只能防一些小白误操作)。

文件合并

有了上面的基础,再处理剩下的两大耗时操作也就不难了,与执行过滤和删除结果不同,回传结果要求脚本执行完成后将数据保存在本地,之前顺序执行时一个追加操作就能搞定的事情现在变复杂了,批量并行后如何处理同时返回的多个文件块成为一个问题。首先不能再简单的追加了,因为多进程追加有可能导致数据混乱,保险的方式是每个子进程写一个临时文件,最后再将它们合并起来,继续看主脚本 exec_fetch_log:

n=0
batch_num=0
dir=""
olddir=""
if [ ! -d data ]; then 
    mkdir data
fi

echo "fetch result logs from each machine"
for host in `cat instance.txt`
do
    batch_no=$(($n % $batch_size))
    batch_num=$(($n / $batch_size))
    dir="data/$date.$batch_num"
    if [ ! -d "$dir" ]; then 
        mkdir "$dir"
        echo "create data dir: $dir"
    fi
    if [ $batch_no -eq 0 -a $n -gt 0 ]; then 
        # splice previous batch files
        if [ $batch_num -gt 1 ]; then 
            # has previous batch
            echo "batch end, try to splice previous batch files..."
            olddir="data/$date.$(($batch_num-2))"
            for file in $olddir/work.*.log
            do
                echo "handle $file: $(stat -c \"%s\" $file)"
                cat "$file" >> data/work.$date.log
            done 

            rm -rf "$olddir"
            echo "delete dir: $olddir"
        fi

        echo "wait batch $batch_num"
        wait
    fi

    echo "$(($n+1)): $host"
    if [ $# -eq 1 ]; then 
        scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" "$host:/tmp/work.$keymd5.log" "$dir/work.$host.log" >> error.log 2>&1 &
    else
        scp -o "StrictHostKeyChecking no"  -o "PasswordAuthentication=no" -o "ConnectTimeout=3" "$host:/tmp/work.$date.$keymd5.log" "$dir/work.$host.log" >> error.log 2>&1 &
    fi
    n=$(($n+1))
done

# splice previous batch files
echo "batch end, try to splice last batch files..."
olddir="data/$date.$(($batch_num-1))"
for file in $olddir/work.*.log
do
    echo "handle $file: $(stat -c \"%s\" $file)"
    cat "$file" >> data/work.$date.log
done 

rm -rf "$olddir"
echo "delete dir: $olddir"
echo "wait last batch $(($batch_num+1))"
wait

if [ $batch_no -ne 0 ]; then 
    olddir="data/$date.$batch_num"
    for file in $olddir/work.*.log
    do
        echo "handle $file: $(stat -c \"%s\" $file)"
        cat "$file" >> data/work.$date.log
    done 

    rm -rf "$olddir"
    echo "delete dir: $olddir"
fi

做个简单说明:

  • 所有数据存储于 data 目录,不存在时创建之
  • 大的 for 循环先看下面真正干活的 scp 语句,去远程机器上获取过滤后的日志:/tmp/work.yyyyddmmhh.xxxx.log,这是之前上传到各实例的过滤脚本默认的输出路径
  • 根据是否提供第二个参数 date,获取的文件名也有所区别,这一点和之前是一致的
  • scp 的异步执行也加了 '>> error.log 2>&1 &',这一点和之前 ssh 的异步执行异曲同工,可节约额外的 2 秒时间
  • 数据块是按批次保存的,在 data 下面建二级目录 0/1/2 等数字代码批次,例如第一批数据全部位于目录 0,第二批位于目录 1……以此类推
  • 批次目录下的数据通过实例名称区分,例如第一批的十块数据,命名为 work.xxxx-xxxxxx-xxxxxx-xxxxxxxx-xx-xxx.xxxx.log,中间长长的一串就是机器实例,不会冲突
  • 批次目录在该批次第一个实例进入循环后创建,开始新的批次前等待上一个批次全部子进程结束 (wait) 的逻辑和之前一致
  • 稍有不同的地方位于文件块合并处,wait 第一批次的时候,还没有文件块下载,所以是跳过的
  • 当 wait 后续批次时 (batch_num > 1),就可以伺机合并前一批次下载的数据块了,例如 wait 第二批次前合并第一批次的数据,wait 第三批次前合并第二批次的数据……以此类推,此时可以合并前一个批次是因为上个批次的进程已经全部 wait 到了,可以保证没有任何子进程在操作对应批次目录中的文件
  • 合并文件的逻辑相对简单,就是遍历批次目录,将其中的文件块追加到最终的文件 data/work.yyyymmddhh.log 中,合并成功后批次目录将被删除,释放临时文件占用的空间
  • 从大的 for 循环出来后的逻辑稍等复杂一些,首先是要有一个 wait 来保证最后一个批次结束,这个 wait 后有一个文件合并来保证最后一批数据块合入母文件
  • 比较难以理解的是 for 循环出来后、wait 前有一个文件合并操作,这个其实是合并的倒数第二批数据块。从前面的类推可以得知,for 循环中 wait 第 N-1 批次前合并了第 N-2 批次的数据,退出循环后第 N-1 批次还没合并呢,它只是进程都退出了但数据块还处于待合并状态,所以这里需要在 wait 前先补一个合并

其实在 wait 后合并是比较符合一般人思维习惯的,此时子进程都退出了,正好就把数据块合并了事,这样在 for 循环结束后就只需要一次 wait 和合并就可以了,代码看上去更清爽。为什么没有这样做呢?并发,还是并发!当一批子进程 wait 成功时,先去启动下一批的 ssh 连接,在 ssh 连接干活的空隙 (2 秒) 去合并数据块绰绰有余,等合并完了再回来一个 wait 可能还得等上个 1 秒多,这样是不是就省下了数据块合并的时间呢?哈哈,这才是真正的时间管理大师好伐

上一篇:IP Tunnel one-to-many用法


下一篇:自动做bond的脚本