hdfs普通文本文件合并lzo压缩

环境准备

安装lzop

sudo apt-get install lzop

shell 主脚本

#! /bin/bash
localpath=/home/impdatahd/testData/hdfs_test
outpath=/home/impdatahd/testData/merged
hdfspath=/user/hive/warehouse/test/origin/behavior_log_andr_test
dt='2022-01-04'

function checkFileExist(){
`hadoop fs -test -e $1`
#echo "in func"+$1
if [ $? -eq 0 ] ;then
  return 0;
else
  return 1;
fi
}
function init(){
#init
rm  $localpath/*
rm  $outpath/*.log
rm  $outpath/*.log.lzo
}


function do_main(){
hdfs dfs -get $hdfspath/dt=$1/* $localpath
#merge
python $outpath/merge.py
#compression
lzop -Uv $outpath/*.log 

#delete and mv hfds old file
hdfs dfs -rm -r -f $hdfspath/dt=$1/*
hdfs dfs -put  $outpath/*.log.lzo $hdfspath/dt=$1

}
#index
function create_index(){
hadoop jar /home/impdatahd/hadoop-3.2.0/share/hadoop/common/hadoop-lzo-0.4.21.jar  com.hadoop.compression.lzo.DistributedLzoIndexer $hdfspath/dt=$1
}

for ((i=0;i<1;i++));
do
   cdt=`date -d "$dt $i days" +%Y-%m-%d`
   echo $cdt
   checkFileExist $hdfspath/dt=$cdt/*.lzo
   if [ $? == 0 ];then
   echo "$cdt 已完成压缩,跳过操作"
   continue;
   else
      checkFileExist $hdfspath/dt=$cdt/test-*
      if [ $? == 0 ];then
      init
      do_main $cdt
      create_index $cdt
      else
        echo "$cdt 没有目标文件,跳过操作!"
	continue;
      fi
   fi
done

python合并文件脚本

压缩文件不能直接合并,可以考虑,现将文件解压缩,然后再合并。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# -------------------------------------------------------------------------------
# File Name   : merge.py
# Description :
# Author      : zhengkw
# Email       : zhengkaiwen@imprexion.com.cn
# Date        : 2022/1/04 14:18
# -------------------------------------------------------------------------------
import os
input_path = "/home/impdatahd/testData/hdfs_test/" #此处填好自己的路径,注意最后的"/"

#判断文件类型,如果是一般log文件,直接合并,如果是lzo文件,只合并lzo文件,不能合并index文件。



#使用os.listdir函数获取路径下的所有的文件名,并存在一个list中
#使用os.path.join函数,将文件名和路径拼成绝对路径
whole_file = [os.path.join(input_path,file) for file in os.listdir(input_path)]
content = []
#对于每一个路径,将其打开之后,使用readlines获取全部内容
for w in whole_file:
    with open(w,'rb') as f:
         content = content+f.readlines()
#构造输出的路径,和输入路径在同一个文件夹下,如果该文件夹内没有这个文件会自动创建
output_path = os.path.join('/home/impdatahd/testData/merged/','merged_andr.log')
 #将内容写入文件
with open(output_path,'wb') as f:
    f.writelines(content)

上一篇:初识Hadoop,Hadoop知识小结


下一篇:HDFS编程实践-HDFS常用命令实践操作