环境准备
安装lzop
sudo apt-get install lzop
shell 主脚本
#! /bin/bash
localpath=/home/impdatahd/testData/hdfs_test
outpath=/home/impdatahd/testData/merged
hdfspath=/user/hive/warehouse/test/origin/behavior_log_andr_test
dt='2022-01-04'
function checkFileExist(){
`hadoop fs -test -e $1`
#echo "in func"+$1
if [ $? -eq 0 ] ;then
return 0;
else
return 1;
fi
}
function init(){
#init
rm $localpath/*
rm $outpath/*.log
rm $outpath/*.log.lzo
}
function do_main(){
hdfs dfs -get $hdfspath/dt=$1/* $localpath
#merge
python $outpath/merge.py
#compression
lzop -Uv $outpath/*.log
#delete and mv hfds old file
hdfs dfs -rm -r -f $hdfspath/dt=$1/*
hdfs dfs -put $outpath/*.log.lzo $hdfspath/dt=$1
}
#index
function create_index(){
hadoop jar /home/impdatahd/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21.jar com.hadoop.compression.lzo.DistributedLzoIndexer $hdfspath/dt=$1
}
for ((i=0;i<1;i++));
do
cdt=`date -d "$dt $i days" +%Y-%m-%d`
echo $cdt
checkFileExist $hdfspath/dt=$cdt/*.lzo
if [ $? == 0 ];then
echo "$cdt 已完成压缩,跳过操作"
continue;
else
checkFileExist $hdfspath/dt=$cdt/test-*
if [ $? == 0 ];then
init
do_main $cdt
create_index $cdt
else
echo "$cdt 没有目标文件,跳过操作!"
continue;
fi
fi
done
python合并文件脚本
压缩文件不能直接合并,可以考虑,现将文件解压缩,然后再合并。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# File Name : merge.py
# Description :
# Author : zhengkw
# Email : zhengkaiwen@imprexion.com.cn
# Date : 2022/1/04 14:18
# -------------------------------------------------------------------------------
import os
input_path = "/home/impdatahd/testData/hdfs_test/" #此处填好自己的路径,注意最后的"/"
#判断文件类型,如果是一般log文件,直接合并,如果是lzo文件,只合并lzo文件,不能合并index文件。
#使用os.listdir函数获取路径下的所有的文件名,并存在一个list中
#使用os.path.join函数,将文件名和路径拼成绝对路径
whole_file = [os.path.join(input_path,file) for file in os.listdir(input_path)]
content = []
#对于每一个路径,将其打开之后,使用readlines获取全部内容
for w in whole_file:
with open(w,'rb') as f:
content = content+f.readlines()
#构造输出的路径,和输入路径在同一个文件夹下,如果该文件夹内没有这个文件会自动创建
output_path = os.path.join('/home/impdatahd/testData/merged/','merged_andr.log')
#将内容写入文件
with open(output_path,'wb') as f:
f.writelines(content)