Hadoop积累---Hadoop判断job和map的开始和结束(带源码)

在Hadoop中,判断Job和Map任务的开始和结束可以通过多种方式实现。以下是几种常见的方法:

1. 使用Hadoop命令行工具

Hadoop提供了一些命令行工具来监控和查询Job的状态。

1.1 查看Job状态
hadoop job -status <job_id>

这条命令会显示Job的详细信息,包括Job的状态(如RUNNING, SUCCEEDED, FAILED等)。

1.2 查看Map任务状态
hadoop job -list

这条命令会列出所有正在运行的Job及其任务的状态。你可以通过Job ID进一步查询Map任务的状态。

2. 使用Web界面

Hadoop提供了Web UI来监控Job的状态。

2.1 访问JobTracker/ResourceManager Web UI
http://<jobtracker_or_resourcemanager_host>:50030/jobtracker.jsp

在Web UI中,你可以查看所有Job的列表,点击具体的Job可以查看其详细信息和Map任务的进度。

3. 使用API

如果你正在开发Hadoop应用程序,可以通过Hadoop API来监控Job和Map任务的状态。

3.1 使用JobClient
JobClient jobClient = new JobClient(new JobConf());
JobStatus[] jobStatuses = jobClient.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
    System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}
3.2 使用JobTracker
JobTracker jobTracker = JobTracker.getJobTracker();
JobStatus[] jobStatuses = jobTracker.getAllJobs();
for (JobStatus jobStatus : jobStatuses) {
    System.out.println("Job ID: " + jobStatus.getJobID() + ", Status: " + jobStatus.getRunState());
}

4. 使用Streaming或Pipes

如果你使用的是Hadoop Streaming或Pipes,可以通过标准输入输出流来获取Map任务的状态。

4.1 使用Streaming

Hadoop Streaming允许你使用脚本语言(如Python)编写Map和Reduce任务。你可以通过检查标准输入输出来判断任务的状态。

import sys

def mapper(line):
    # Map任务的逻辑
    pass

def main():
    for line in sys.stdin:
        mapper(line)

if __name__ == "__main__":
    main()

5. 日志文件

Hadoop会在各个节点上生成日志文件,你可以通过查看这些日志文件来判断Job和Map任务的状态。

5.1 访问HDFS上的日志

你可以通过HDFS命令行或HDFS Web UI访问日志文件。

hadoop fs -cat /path/to/logs/*

6. 使用监控工具

除了以上方法,你还可以使用一些第三方的监控工具来监控Hadoop集群的状态,如Ganglia、Nagios、Prometheus等。这些工具能够提供更详细的监控信息,包括Job和Map任务的状态。

7. 总结

  • 命令行工具:使用hadoop job -statushadoop job -list等命令查询Job和Map任务的状态。
  • Web UI:访问JobTracker/ResourceManager的Web界面查看Job和Map任务的进度。
  • API:在应用程序中使用Hadoop API监控Job和Map任务的状态。
  • 日志文件:查看Hadoop生成的日志文件以获取Job和Map任务的状态。
  • 监控工具:使用第三方监控工具提供更详细的监控信息。

通过这些方法,你可以有效地监控Hadoop中的Job和Map任务的开始和结束。

8. 源码

package com.mofang.data.hadoop.report;

import java.text.DateFormat;
import java.util.Date;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;

import com.mofang.data.hadoop.ReportHadoopMain;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.domain.entity.report.ReportResultChangeLogTrace;

public class ReportJob extends Configured implements Tool {

	private AbstractApplicationContext springContenxt = null;
	private ReportResultChangeLogTrace reportResultChangeLogTrace = new ReportResultChangeLogTrace();

	public int run(String[] arg0) throws Exception {
		
		
		System.out.println("job运行开始"+DateFormat.getDateTimeInstance().format(new Date()));
		springContenxt = SpringContextFactory.initSpringContext();
		reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
		reportResultChangeLogTrace.setCreateTime(new Date());
		reportResultChangeLogTrace.setChangeStart(new Date());

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, arg0).getRemainingArgs();

		conf.addResource("hadoop.xml"); // 不需要加classpath
		
		conf.set("reportResultChangeLogTraceId",reportResultChangeLogTrace.getId());

		Job job = Job.getInstance(conf);
		job.setJarByClass(ReportHadoopMain.class);

		job.setMapperClass(ReportMapper.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// job.setReducerClass(TraitRetioReudcer.class);
		// job.setNumReduceTasks(8);

		job.setInputFormatClass(NLineInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[1])); // 设置输入路径

		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(otherArgs[2]), true);
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2])); // 设置输出路径

		NLineInputFormat.setNumLinesPerSplit(job, 5000); // 一个分片5000个barCode
		job.waitForCompletion(true);
		System.out.println("job运行结束"+DateFormat.getDateTimeInstance().format(new Date()));
    	reportResultChangeLogTrace.setChangeEnd(new Date());
    	MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
    	logMongoTemplate.save(reportResultChangeLogTrace);
        springContenxt.close();
		System.exit(0);
		return 0;
	}

}

package com.mofang.data.hadoop.report;


import java.io.IOException;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.data.mongodb.core.MongoTemplate;

import com.alibaba.fastjson.JSONObject;
import com.mofang.data.domain.service.LogMonitorService;
import com.mofang.data.domain.service.ReportDataService;
import com.mofang.data.hadoop.domain.SpringContextFactory;
import com.mofang.data.hadoop.util.Heartbeat;
import com.mofang.domain.entity.report.ReportResultChangeLogTraceBarcode;

public class ReportMapper extends Mapper<LongWritable, Text, Text,Text>{
    
    private AbstractApplicationContext  springContenxt=null;
    
    private  Heartbeat heartbeat=null;
    
//    private ReportResultChangeLogTrace reportResultChangeLogTrace=new ReportResultChangeLogTrace();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        springContenxt=SpringContextFactory.initSpringContext();
        heartbeat=Heartbeat.createHeartbeat(context);
//        reportResultChangeLogTrace.setId(UUID.randomUUID().toString());
//        reportResultChangeLogTrace.setCreateTime(new Date());
//        reportResultChangeLogTrace.setChangeStart(new Date());
    }
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
        throws IOException, InterruptedException { 
        
        String barcode=value.toString().trim();
        JSONObject json = new JSONObject();
        
        json.put("barCode", barcode);
        ReportDataService reportService = springContenxt.getBean(ReportDataService.class);
        LogMonitorService logMonitorService = springContenxt.getBean(LogMonitorService.class);
        MongoTemplate logMongoTemplate = (MongoTemplate) springContenxt.getBean("logMongoTemplate");
        ReportResultChangeLogTraceBarcode reportResultChangeLogTraceBarcode=new ReportResultChangeLogTraceBarcode();
        reportResultChangeLogTraceBarcode.setBarcode(barcode);
        reportResultChangeLogTraceBarcode.setCreateTime(new Date());
        reportResultChangeLogTraceBarcode.setReportResultChangeLogTraceId(context.getConfiguration().get("reportResultChangeLogTraceId"));
        logMongoTemplate.save(reportResultChangeLogTraceBarcode);
        JSONObject jsonResult = reportService.parseResultTxt(json, barcode, "17");
        logMonitorService.printParseSnpError(jsonResult, barcode); //barCode报告生成状态
        context.write(new Text(barcode), new Text(((String)jsonResult.get("okflag"))));
        
    }
    
    //清空spring缓存,停止心跳服务
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        springContenxt.close();
        if(heartbeat!=null)
            heartbeat.stopBeating();
    }
    
    
}
上一篇:解决部署RKE2或K3S-“docker.io/rancher/mirrored-pause:3.6“: -无法拉取镜像办法


下一篇:基于STM32的智能家居安防系统设计