pregel+ 核心源码注释

#ifndef WORKER_H
#define WORKER_H

#include <vector>
#include "../utils/global.h"
#include "MessageBuffer.h"
#include <string>
#include "../utils/communication.h"
#include "../utils/ydhdfs.h"
#include "../utils/Combiner.h"
#include "../utils/Aggregator.h"
using namespace std;

template <class VertexT, class AggregatorT = DummyAgg> //user-defined VertexT
class Worker {
    typedef vector<VertexT*> VertexContainer;
    typedef typename VertexContainer::iterator VertexIter;

    typedef typename VertexT::KeyType KeyT;
    typedef typename VertexT::MessageType MessageT;
    typedef typename VertexT::HashType HashT;

    typedef MessageBuffer<VertexT> MessageBufT;
    typedef typename MessageBufT::MessageContainerT MessageContainerT;
    typedef typename MessageBufT::Map Map;
    typedef typename MessageBufT::MapIter MapIter;

    typedef typename AggregatorT::PartialType PartialT;
    typedef typename AggregatorT::FinalType FinalT;

public:
    Worker()
    {
        //init_workers();//put to run.cpp
        message_buffer = new MessageBuffer<VertexT>;
        global_message_buffer = message_buffer;
        active_count = 0;
        combiner = NULL;
        global_combiner = NULL;
        aggregator = NULL;
        global_aggregator = NULL;
        global_agg = NULL;
    }

    void setCombiner(Combiner<MessageT>* cb)
    {
        combiner = cb;
        global_combiner = cb;
    }

    void setAggregator(AggregatorT* ag)
    {
        aggregator = ag;
        global_aggregator = ag;
        global_agg = new FinalT;
    }

    virtual ~Worker()
    {
        for (int i = 0; i < vertexes.size(); i++)
            delete vertexes[i];
        delete message_buffer;
        if (getAgg() != NULL)
            delete (FinalT*)global_agg;
        //worker_finalize();//put to run.cpp
        worker_barrier(); //newly added for ease of multi-job programming in run.cpp
    }

    //==================================
    //sub-functions
    void sync_graph()
    {
        //ResetTimer(4);
        //set send buffer
        vector<VertexContainer> _loaded_parts(_num_workers);//储存各几点的数组 
        for (int i = 0; i < vertexes.size(); i++) {
            VertexT* v = vertexes[i];
            _loaded_parts[hash(v->id)].push_back(v);//hash算法放到对应的part 
        }
        //exchange vertices to add  把各部分节点进行交换补充 
        all_to_all(_loaded_parts);

        //delete sent vertices  删除非本节点的顶点 
        for (int i = 0; i < vertexes.size(); i++) {
            VertexT* v = vertexes[i];
            if (hash(v->id) != _my_rank)
                delete v;
        }
        vertexes.clear();
        //collect vertices to add  对收集到的消息加入到vertex 
        for (int i = 0; i < _num_workers; i++) {
            vertexes.insert(vertexes.end(), _loaded_parts[i].begin(), _loaded_parts[i].end());
        }
        _loaded_parts.clear();
        //StopTimer(4);
        //PrintTimer("Reduce Time",4);
    };

    //old implementation
    /*
		void active_compute()
		{
			active_count=0;
			MessageBufT* mbuf=(MessageBufT*)get_message_buffer();
			Map & msgs=mbuf->get_messages();
			MessageContainerT empty;
			for(VertexIter it=vertexes.begin(); it!=vertexes.end(); it++)
			{
				KeyT vid=(*it)->id;
				MapIter mit=msgs.find(vid);
				if(mit->second->size()==0)
				{
					if((*it)->is_active())
					{
						(*it)->compute(empty);
						AggregatorT* agg=(AggregatorT*)get_aggregator();
						if(agg!=NULL) agg->stepPartial(*it);
						if((*it)->is_active()) active_count++;
					}
				}
				else
				{
					(*it)->activate();
					(*it)->compute(*(mit->second));
					mit->second->clear();//clear used msgs
					AggregatorT* agg=(AggregatorT*)get_aggregator();
					if(agg!=NULL) agg->stepPartial(*it);
					if((*it)->is_active()) active_count++;
				}
			}
		}

		void all_compute()
		{
			active_count=0;
			MessageBufT* mbuf=(MessageBufT*)get_message_buffer();
			Map & msgs=mbuf->get_messages();
			MessageContainerT empty;
			for(VertexIter it=vertexes.begin(); it!=vertexes.end(); it++)
			{
				KeyT vid=(*it)->id;
				MapIter mit=msgs.find(vid);
				(*it)->activate();
				if(mit->second->size()==0) (*it)->compute(empty);
				else{
					(*it)->compute(*(mit->second));
					mit->second->clear();//clear used msgs
				}
				AggregatorT* agg=(AggregatorT*)get_aggregator();
				if(agg!=NULL) agg->stepPartial(*it);
				if((*it)->is_active()) active_count++;
			}
		}
		*/

    void active_compute()
    {
        active_count = 0;
        MessageBufT* mbuf = (MessageBufT*)get_message_buffer();
        vector<MessageContainerT>& v_msgbufs = mbuf->get_v_msg_bufs();
        for (int i = 0; i < vertexes.size(); i++) {
            if (v_msgbufs[i].size() == 0) {
                if (vertexes[i]->is_active()) {//没收到消息但是激活了 
                    vertexes[i]->compute(v_msgbufs[i]);
                    AggregatorT* agg = (AggregatorT*)get_aggregator();
                    if (agg != NULL)
                        agg->stepPartial(vertexes[i]);
                    if (vertexes[i]->is_active())
                        active_count++;
                }
            } else {//收到消息激活 
                vertexes[i]->activate();
                vertexes[i]->compute(v_msgbufs[i]);
                v_msgbufs[i].clear(); //clear used msgs
                AggregatorT* agg = (AggregatorT*)get_aggregator();
                if (agg != NULL)
                    agg->stepPartial(vertexes[i]);
                if (vertexes[i]->is_active())
                    active_count++;
            }
        }
    }

    void all_compute()
    {
        active_count = 0;
        MessageBufT* mbuf = (MessageBufT*)get_message_buffer();
        vector<MessageContainerT>& v_msgbufs = mbuf->get_v_msg_bufs();//得到消息buf 
        for (int i = 0; i < vertexes.size(); i++) {//所有点进行计算 
            vertexes[i]->activate();
            vertexes[i]->compute(v_msgbufs[i]);
            v_msgbufs[i].clear(); //clear used msgs
            AggregatorT* agg = (AggregatorT*)get_aggregator();
            if (agg != NULL)
                agg->stepPartial(vertexes[i]);//对每个节点执行step 
            if (vertexes[i]->is_active())
                active_count++;//记录活跃点数 
        }
    }

    inline void add_vertex(VertexT* vertex)
    {
        vertexes.push_back(vertex);
        if (vertex->is_active())
            active_count++;
    }

    void agg_sync()
    {
        AggregatorT* agg = (AggregatorT*)get_aggregator();
        if (agg != NULL) {
            if (_my_rank != MASTER_RANK) { //send partialT to aggregator slave部分 
                //gathering PartialT
                PartialT* part = agg->finishPartial();//调用finish进行最后结果处理 
                //------------------------ strategy choosing BEGIN ------------------------
                StartTimer(COMMUNICATION_TIMER);
                StartTimer(SERIALIZATION_TIMER);
                ibinstream m;
                m << part;
                int sendcount = m.size();
                StopTimer(SERIALIZATION_TIMER);
                int total = all_sum(sendcount);//得到总数量 
                StopTimer(COMMUNICATION_TIMER);
                //------------------------ strategy choosing END ------------------------
                if (total <= AGGSWITCH)//小于阈值 
                    slaveGather(*part);//汇聚slave信息 
                else {
                    send_ibinstream(m, MASTER_RANK);//发送给master 
                }
                //scattering FinalT
                slaveBcast(*((FinalT*)global_agg));//广播 
            } else {//master节点 
                //------------------------ strategy choosing BEGIN ------------------------
                int total = all_sum(0);
                //------------------------ strategy choosing END ------------------------
                //gathering PartialT
                if (total <= AGGSWITCH) {//小于阈值 
                    vector<PartialT*> parts(_num_workers);
                    masterGather(parts);//master获取结果 
                    for (int i = 0; i < _num_workers; i++) {
                        if (i != MASTER_RANK) {
                            PartialT* part = parts[i];
                            agg->stepFinal(part);//对每个部分调用step进行结果合并 
                            delete part;
                        }
                    }
                } else {//
                    for (int i = 0; i < _num_workers; i++) {
                        if (i != MASTER_RANK) {
                            obinstream um = recv_obinstream(i);
                            PartialT* part;
                            um >> part;//将结果合并到part 
                            agg->stepFinal(part);//对每个部分调用step进行结果合并 
                            delete part;
                        }
                    }
                }
                //scattering FinalT
                FinalT* final = agg->finishFinal();//调用finish进行最后结果处理 
                //cannot set "global_agg=final" since MASTER_RANK works as a slave, and agg->finishFinal() may change
                *((FinalT*)global_agg) = *final; //deep copy
                masterBcast(*((FinalT*)global_agg));//主节点 广播 
            }
        }
    }

    //user-defined graphLoader ==============================
    virtual VertexT* toVertex(char* line) = 0; //this is what user specifies!!!!!!

    void load_vertex(VertexT* v)
    { //called by load_graph
        add_vertex(v);
    }

    void load_graph(const char* inpath)
    {
        hdfsFS fs = getHdfsFS();
        hdfsFile in = getRHandle(inpath, fs);//读取文件 
        LineReader reader(fs, in);
        while (true) {
            reader.readLine();//读取行 
            if (!reader.eof())
                load_vertex(toVertex(reader.getLine()));//加入点 
            else
                break;
        }
        hdfsCloseFile(fs, in);
        hdfsDisconnect(fs);
        //cout<<"Worker "<<_my_rank<<": \""<<inpath<<"\" loaded"<<endl;//DEBUG !!!!!!!!!!
    }
    //=======================================================

    //user-defined graphDumper ==============================
    virtual void toline(VertexT* v, BufferedWriter& writer) = 0; //this is what user specifies!!!!!!  进行输出 

    void dump_partition(const char* outpath)//分区结果输出 
    {
        hdfsFS fs = getHdfsFS();
        BufferedWriter* writer = new BufferedWriter(outpath, fs, _my_rank);

        for (VertexIter it = vertexes.begin(); it != vertexes.end(); it++) {
            writer->check();
            toline(*it, *writer);
        }
        delete writer;
        hdfsDisconnect(fs);
    }
    //=======================================================

    // run the worker
    void run(const WorkerParams& params)
    {
        //check path + init 路径检查 
        if (_my_rank == MASTER_RANK) {
            if (dirCheck(params.input_path.c_str(), params.output_path.c_str(), _my_rank == MASTER_RANK, params.force_write) == -1)
                exit(-1);
        }
        init_timers();

        //dispatch splits
        ResetTimer(WORKER_TIMER);
        vector<vector<string> >* arrangement;
        if (_my_rank == MASTER_RANK) {
            arrangement = params.native_dispatcher ? dispatchLocality(params.input_path.c_str()) : dispatchRan(params.input_path.c_str());//是不是上一个任务的结果 
            //reportAssignment(arrangement);//DEBUG !!!!!!!!!!
            masterScatter(*arrangement);//将分文件的结果发送到各个节点并记录 
            vector<string>& assignedSplits = (*arrangement)[0];//第一个记录master应该读的文件 
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());//加载图 
            delete arrangement;
        } else {
            vector<string> assignedSplits;
            slaveScatter(assignedSplits);//接收master发来的文件 
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());//加载图 
        }

        //send vertices according to hash_id (reduce)  同步图到各个节点 
        sync_graph();

        message_buffer->init(vertexes);
        //barrier for data loading
        worker_barrier(); //@@@@@@@@@@@@@
        StopTimer(WORKER_TIMER);
        PrintTimer("Load Time", WORKER_TIMER);

        //=========================================================

        init_timers();
        ResetTimer(WORKER_TIMER);
        //supersteps
        global_step_num = 0;
        long long step_msg_num;
        long long step_vadd_num;
        long long global_msg_num = 0;
        long long global_vadd_num = 0;
        while (true) {
            global_step_num++;//步数 
            ResetTimer(4);
            //===================
            char bits_bor = all_bor(global_bor_bitmap);
            if (getBit(FORCE_TERMINATE_ORBIT, bits_bor) == 1)//强制停止 
                break;
            get_vnum() = all_sum(vertexes.size());//得到节点总数 
            int wakeAll = getBit(WAKE_ALL_ORBIT, bits_bor);//是否全部激活 
            if (wakeAll == 0) {
                active_vnum() = all_sum(active_count);//得到活跃节点总数和消息 
                if (active_vnum() == 0 && getBit(HAS_MSG_ORBIT, bits_bor) == 0)
                    break; //all_halt AND no_msg
            } else
                active_vnum() = get_vnum();
            //===================
            AggregatorT* agg = (AggregatorT*)get_aggregator();
            if (agg != NULL)
                agg->init();//agg初始化 
            //===================
            clearBits();
            if (wakeAll == 1)
                all_compute();//全部计算 
            else
                active_compute();//部分计算 
            message_buffer->combine();//调用比较函数 
            step_msg_num = master_sum_LL(message_buffer->get_total_msg());
            step_vadd_num = master_sum_LL(message_buffer->get_total_vadd());
            if (_my_rank == MASTER_RANK) {
                global_msg_num += step_msg_num;
                global_vadd_num += step_vadd_num;
            }
            vector<VertexT*>& to_add = message_buffer->sync_messages();//消息同步 
            agg_sync();
            for (int i = 0; i < to_add.size(); i++)
                add_vertex(to_add[i]);
            to_add.clear();
            //===================
            worker_barrier();
            StopTimer(4);
            if (_my_rank == MASTER_RANK) {
                cout << "Superstep " << global_step_num << " done. Time elapsed: " << get_timer(4) << " seconds" << endl;
                cout << "#msgs: " << step_msg_num << ", #vadd: " << step_vadd_num << endl;
            }
        }
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Communication Time", COMMUNICATION_TIMER);
        PrintTimer("- Serialization Time", SERIALIZATION_TIMER);
        PrintTimer("- Transfer Time", TRANSFER_TIMER);
        PrintTimer("Total Computational Time", WORKER_TIMER);
        if (_my_rank == MASTER_RANK)
            cout << "Total #msgs=" << global_msg_num << ", Total #vadd=" << global_vadd_num << endl;

        // dump graph
        ResetTimer(WORKER_TIMER);
        dump_partition(params.output_path.c_str());
        StopTimer(WORKER_TIMER);
        PrintTimer("Dump Time", WORKER_TIMER);
    }

    //run the worker
    void run(const WorkerParams& params, int num_phases)
    {
        //check path + init
        if (_my_rank == MASTER_RANK) {
            if (dirCheck(params.input_path.c_str(), params.output_path.c_str(), _my_rank == MASTER_RANK, params.force_write) == -1)
                exit(-1);
        }
        init_timers();

        //dispatch splits
        ResetTimer(WORKER_TIMER);
        vector<vector<string> >* arrangement;
        if (_my_rank == MASTER_RANK) {
            arrangement = params.native_dispatcher ? dispatchLocality(params.input_path.c_str()) : dispatchRan(params.input_path.c_str());
            //reportAssignment(arrangement);//DEBUG !!!!!!!!!!
            masterScatter(*arrangement);
            vector<string>& assignedSplits = (*arrangement)[0];
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
            delete arrangement;
        } else {
            vector<string> assignedSplits;
            slaveScatter(assignedSplits);
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
        }

        //send vertices according to hash_id (reduce)
        sync_graph();
        message_buffer->init(vertexes);
        //barrier for data loading
        worker_barrier(); //@@@@@@@@@@@@@
        StopTimer(WORKER_TIMER);
        PrintTimer("Load Time", WORKER_TIMER);

        //=========================================================

        init_timers();
        ResetTimer(WORKER_TIMER);

        for (global_phase_num = 1; global_phase_num <= num_phases; global_phase_num++) {
            if (_my_rank == MASTER_RANK)
                cout << "################ Phase " << global_phase_num << " ################" << endl;

            //supersteps
            global_step_num = 0;
            long long step_msg_num;
            long long step_vadd_num;
            long long global_msg_num = 0;
            long long global_vadd_num = 0;

            while (true) {
                global_step_num++;
                ResetTimer(4);
                //===================
                if (step_num() == 1) {
                    get_vnum() = all_sum(vertexes.size());
                    if (phase_num() > 1)
                        active_vnum() = get_vnum();
                    else
                        active_vnum() = all_sum(active_count);
                    //===================
                    AggregatorT* agg = (AggregatorT*)get_aggregator();
                    if (agg != NULL)
                        agg->init();
                    //===================
                    clearBits();
                    if (phase_num() > 1)
                        all_compute();
                    else
                        active_compute();
                    message_buffer->combine();
                    step_msg_num = master_sum_LL(message_buffer->get_total_msg());
                    step_vadd_num = master_sum_LL(message_buffer->get_total_vadd());
                    if (_my_rank == MASTER_RANK) {
                        global_msg_num += step_msg_num;
                        global_vadd_num += step_vadd_num;
                    }
                    vector<VertexT*>& to_add = message_buffer->sync_messages();
                    agg_sync();
                    for (int i = 0; i < to_add.size(); i++)
                        add_vertex(to_add[i]);
                    to_add.clear();
                } else {
                    char bits_bor = all_bor(global_bor_bitmap);
                    if (getBit(FORCE_TERMINATE_ORBIT, bits_bor) == 1)
                        break;
                    get_vnum() = all_sum(vertexes.size());
                    int wakeAll = getBit(WAKE_ALL_ORBIT, bits_bor);
                    if (wakeAll == 0) {
                        active_vnum() = all_sum(active_count);
                        if (active_vnum() == 0 && getBit(HAS_MSG_ORBIT, bits_bor) == 0)
                            break; //all_halt AND no_msg
                    } else
                        active_vnum() = get_vnum();
                    //===================
                    AggregatorT* agg = (AggregatorT*)get_aggregator();
                    if (agg != NULL)
                        agg->init();
                    //===================
                    clearBits();
                    if (wakeAll == 1)
                        all_compute();
                    else if (phase_num() > 1 && step_num() == 1)
                        all_compute();
                    else
                        active_compute();
                    message_buffer->combine();
                    step_msg_num = master_sum_LL(message_buffer->get_total_msg());
                    step_vadd_num = master_sum_LL(message_buffer->get_total_vadd());
                    if (_my_rank == MASTER_RANK) {
                        global_msg_num += step_msg_num;
                        global_vadd_num += step_vadd_num;
                    }
                    vector<VertexT*>& to_add = message_buffer->sync_messages();
                    agg_sync();
                    for (int i = 0; i < to_add.size(); i++)
                        add_vertex(to_add[i]);
                    to_add.clear();
                }
                //===================
                worker_barrier();
                StopTimer(4);
                if (_my_rank == MASTER_RANK) {
                    cout << "Superstep " << global_step_num << " done. Time elapsed: " << get_timer(4) << " seconds" << endl;
                    cout << "#msgs: " << step_msg_num << ", #vadd: " << step_vadd_num << endl;
                }
            }
            if (_my_rank == MASTER_RANK) {
                cout << "************ Phase " << global_phase_num << " done. ************" << endl;
                cout << "Total #msgs=" << global_msg_num << ", Total #vadd=" << global_vadd_num << endl;
            }
        }
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Communication Time", COMMUNICATION_TIMER);
        PrintTimer("- Serialization Time", SERIALIZATION_TIMER);
        PrintTimer("- Transfer Time", TRANSFER_TIMER);
        PrintTimer("Total Computational Time", WORKER_TIMER);

        // dump graph
        ResetTimer(WORKER_TIMER);
        dump_partition(params.output_path.c_str());
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Dump Time", WORKER_TIMER);
    }

    // run the worker
    void run(const MultiInputParams& params)
    {
        //check path + init
        if (_my_rank == MASTER_RANK) {
            if (dirCheck(params.input_paths, params.output_path.c_str(), _my_rank == MASTER_RANK, params.force_write) == -1)
                exit(-1);
        }
        init_timers();

        //dispatch splits
        ResetTimer(WORKER_TIMER);
        vector<vector<string> >* arrangement;
        if (_my_rank == MASTER_RANK) {
            arrangement = params.native_dispatcher ? dispatchLocality(params.input_paths) : dispatchRan(params.input_paths);
            //reportAssignment(arrangement);//DEBUG !!!!!!!!!!
            masterScatter(*arrangement);
            vector<string>& assignedSplits = (*arrangement)[0];
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
            delete arrangement;
        } else {
            vector<string> assignedSplits;
            slaveScatter(assignedSplits);
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
        }

        //send vertices according to hash_id (reduce)
        sync_graph();
        message_buffer->init(vertexes);
        //barrier for data loading
        worker_barrier(); //@@@@@@@@@@@@@
        StopTimer(WORKER_TIMER);
        PrintTimer("Load Time", WORKER_TIMER);

        //=========================================================

        init_timers();
        ResetTimer(WORKER_TIMER);
        //supersteps
        global_step_num = 0;
        long long step_msg_num;
        long long step_vadd_num;
        long long global_msg_num = 0;
        long long global_vadd_num = 0;
        while (true) {
            global_step_num++;
            ResetTimer(4);
            //===================
            char bits_bor = all_bor(global_bor_bitmap);
            if (getBit(FORCE_TERMINATE_ORBIT, bits_bor) == 1)
                break;
            get_vnum() = all_sum(vertexes.size());
            int wakeAll = getBit(WAKE_ALL_ORBIT, bits_bor);
            if (wakeAll == 0) {
                active_vnum() = all_sum(active_count);
                if (active_vnum() == 0 && getBit(HAS_MSG_ORBIT, bits_bor) == 0)
                    break; //all_halt AND no_msg
            } else
                active_vnum() = get_vnum();
            //===================
            AggregatorT* agg = (AggregatorT*)get_aggregator();
            if (agg != NULL)
                agg->init();
            //===================
            clearBits();
            if (wakeAll == 1)
                all_compute();
            else
                active_compute();
            message_buffer->combine();
            step_msg_num = master_sum_LL(message_buffer->get_total_msg());
            step_vadd_num = master_sum_LL(message_buffer->get_total_vadd());
            if (_my_rank == MASTER_RANK) {
                global_msg_num += step_msg_num;
                global_vadd_num += step_vadd_num;
            }
            vector<VertexT*>& to_add = message_buffer->sync_messages();
            agg_sync();
            for (int i = 0; i < to_add.size(); i++)
                add_vertex(to_add[i]);
            to_add.clear();
            //===================
            worker_barrier();
            StopTimer(4);
            if (_my_rank == MASTER_RANK) {
                cout << "Superstep " << global_step_num << " done. Time elapsed: " << get_timer(4) << " seconds" << endl;
                cout << "#msgs: " << step_msg_num << ", #vadd: " << step_vadd_num << endl;
            }
        }
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Communication Time", COMMUNICATION_TIMER);
        PrintTimer("- Serialization Time", SERIALIZATION_TIMER);
        PrintTimer("- Transfer Time", TRANSFER_TIMER);
        PrintTimer("Total Computational Time", WORKER_TIMER);
        if (_my_rank == MASTER_RANK)
            cout << "Total #msgs=" << global_msg_num << ", Total #vadd=" << global_vadd_num << endl;

        // dump graph
        ResetTimer(WORKER_TIMER);
        dump_partition(params.output_path.c_str());
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Dump Time", WORKER_TIMER);
    }

    //========================== reports machine-level msg# ===============================
    void run_report(const WorkerParams& params, const string reportPath)
    {
        //check path + init
        if (_my_rank == MASTER_RANK) {
            if (dirCheck(params.input_path.c_str(), params.output_path.c_str(), _my_rank == MASTER_RANK, params.force_write) == -1)
                exit(-1);
        }
        init_timers();

        //dispatch splits
        ResetTimer(WORKER_TIMER);
        vector<vector<string> >* arrangement;
        if (_my_rank == MASTER_RANK) {
            arrangement = params.native_dispatcher ? dispatchLocality(params.input_path.c_str()) : dispatchRan(params.input_path.c_str());
            //reportAssignment(arrangement);//DEBUG !!!!!!!!!!
            masterScatter(*arrangement);
            vector<string>& assignedSplits = (*arrangement)[0];
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
            delete arrangement;
        } else {
            vector<string> assignedSplits;
            slaveScatter(assignedSplits);
            //reading assigned splits (map)
            for (vector<string>::iterator it = assignedSplits.begin();
                 it != assignedSplits.end(); it++)
                load_graph(it->c_str());
        }

        //send vertices according to hash_id (reduce)
        sync_graph();
        message_buffer->init(vertexes);
        //barrier for data loading
        worker_barrier(); //@@@@@@@@@@@@@
        StopTimer(WORKER_TIMER);
        PrintTimer("Load Time", WORKER_TIMER);

        //=========================================================
        vector<int> msgNumVec; //$$$$$$$$$$$$$$$$$$$$ added for per-worker msg counting

        init_timers();
        ResetTimer(WORKER_TIMER);
        //supersteps
        global_step_num = 0;
        long long step_msg_num;
        long long step_vadd_num;
        long long global_msg_num = 0;
        long long global_vadd_num = 0;
        while (true) {
            global_step_num++;
            ResetTimer(4);
            //===================
            char bits_bor = all_bor(global_bor_bitmap);
            if (getBit(FORCE_TERMINATE_ORBIT, bits_bor) == 1)
                break;
            get_vnum() = all_sum(vertexes.size());
            int wakeAll = getBit(WAKE_ALL_ORBIT, bits_bor);
            if (wakeAll == 0) {
                active_vnum() = all_sum(active_count);
                if (active_vnum() == 0 && getBit(HAS_MSG_ORBIT, bits_bor) == 0)
                    break; //all_halt AND no_msg
            } else
                active_vnum() = get_vnum();
            //===================
            AggregatorT* agg = (AggregatorT*)get_aggregator();
            if (agg != NULL)
                agg->init();
            //===================
            clearBits();
            if (wakeAll == 1)
                all_compute();
            else
                active_compute();
            message_buffer->combine();
            int my_msg_num = message_buffer->get_total_msg(); //$$$$$$$$$$$$$$$$$$$$ added for per-worker msg counting
            msgNumVec.push_back(my_msg_num); //$$$$$$$$$$$$$$$$$$$$ added for per-worker msg counting
            step_msg_num = master_sum_LL(my_msg_num); //$$$$$$$$$$$$$$$$$$$$ added for per-worker msg counting
            step_vadd_num = master_sum_LL(message_buffer->get_total_vadd());
            if (_my_rank == MASTER_RANK) {
                global_msg_num += step_msg_num;
                global_vadd_num += step_vadd_num;
            }
            vector<VertexT*>& to_add = message_buffer->sync_messages();
            agg_sync();
            for (int i = 0; i < to_add.size(); i++)
                add_vertex(to_add[i]);
            to_add.clear();
            //===================
            worker_barrier();
            StopTimer(4);
            if (_my_rank == MASTER_RANK) {
                cout << "Superstep " << global_step_num << " done. Time elapsed: " << get_timer(4) << " seconds" << endl;
                cout << "#msgs: " << step_msg_num << ", #vadd: " << step_vadd_num << endl;
            }
        }
        worker_barrier();
        StopTimer(WORKER_TIMER);
        PrintTimer("Communication Time", COMMUNICATION_TIMER);
        PrintTimer("- Serialization Time", SERIALIZATION_TIMER);
        PrintTimer("- Transfer Time", TRANSFER_TIMER);
        PrintTimer("Total Computational Time", WORKER_TIMER);
        if (_my_rank == MASTER_RANK)
            cout << "Total #msgs=" << global_msg_num << ", Total #vadd=" << global_vadd_num << endl;

        // dump graph
        ResetTimer(WORKER_TIMER);
        dump_partition(params.output_path.c_str());

        StopTimer(WORKER_TIMER);
        PrintTimer("Dump Time", WORKER_TIMER);

        //dump report
        if (_my_rank != MASTER_RANK) {
            slaveGather(msgNumVec);
        } else {
            vector<vector<int> > report(_num_workers);
            masterGather(report);
            report[MASTER_RANK].swap(msgNumVec);
            //
            //per line per worker: #msg for step1, #msg for step2, ...
            hdfsFS fs = getHdfsFS();
            hdfsFile out = getWHandle(reportPath.c_str(), fs);
            char buffer[100];
            for (int i = 0; i < _num_workers; i++) {
                for (int j = 0; j < report[i].size(); j++) {
                    sprintf(buffer, "%d ", report[i][j]);
                    hdfsWrite(fs, out, (void*)buffer, strlen(buffer));
                }
                sprintf(buffer, "\n");
                hdfsWrite(fs, out, (void*)buffer, strlen(buffer));
            }
            if (hdfsFlush(fs, out)) {
                fprintf(stderr, "Failed to 'flush' %s\n", reportPath.c_str());
                exit(-1);
            }
            hdfsCloseFile(fs, out);
            hdfsDisconnect(fs);
        }
    }

private:
    HashT hash;
    VertexContainer vertexes;
    int active_count;

    MessageBuffer<VertexT>* message_buffer;
    Combiner<MessageT>* combiner;
    AggregatorT* aggregator;
};

#endif
上一篇:【笔记】Pandas分组聚合


下一篇:python astype+groupby+sign+agg