mr_map

#include <signal.h>
#include <execinfo.h>
#include <hadoop_mapred.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <string>
#include <algorithm>
#include <stdio.h>     
#include <unistd.h> 
#include <gflags/gflags.h>
#include <common/error.h>
#include <common/help_lock.h>
#include <common/utils.h>
#include <common/data_collect.h>
#include <graph/topology.h>
#include <graph/mobsmsg_actuator.h>
#include <graph/batchmobsmsg_actuator.h>
#include <component/binary_operator_impl.h>
#include <component/dll_operator_impl.h>
#include <component/dubatch_operator_impl.h>
#include <component/du_operator_impl.h>
#include <component/script_operator_impl.h>
#include <component/filter_operator_impl.h>
#include <component/trans_mobs_operator_impl.h>
#include <parse/data_parse_impl.h>
#include <parse/export_parse_impl.h>
#include <pb_to_json.h>
#include <json_to_pb.h>
#include <base64.h>

DEFINE_string(conf_path, "./", "Conf Path");
DEFINE_string(conf_file, "mr.conf", "Conf File");
DEFINE_string(log_path, "./", "Conf Path");
DEFINE_string(log_file, "log.conf", "Log File");
DEFINE_bool(asyn_schema, true, "asyn_schema");
DEFINE_int32(max_concurrency, 1000, "max concurrency");
DEFINE_string(run_schema, "bistreaming", "Run Schema");
DEFINE_string(op_params, "", "Operator Params");

class mr_map_t : public hadoop_mapper_t {
public :
    mr_map_t() : _mobs_actuator(FLAGS_max_concurrency, &_topology, true), _has_error(false) {
    }

    virtual int on_task_begin()
    {
        pthread_mutex_init(&(this->_mutex), NULL);
        _data_collect.init();

        if (0 != com_loadlog(FLAGS_log_path.c_str(), FLAGS_log_file.c_str())) {
             FATAL("Config Load failed[%s/%s]",
                     FLAGS_log_path.c_str(), FLAGS_log_file.c_str());
            return -1;
        }
        
        if (this->_conf.load(FLAGS_conf_path.c_str(),
            FLAGS_conf_file.c_str()) != 0) {
            FATAL("Config Load failed[%s/%s]",
                FLAGS_conf_path.c_str(), FLAGS_conf_file.c_str());
            return -1;
        }

        this->_pcall_aiao = ::boost::bind(&mr_map_t::_process_back_batch_aiao, this, _1); 
        ::baidu::ymir::Topology::InsCreateFunc create_func = ::boost::bind(&mr_map_t::_create_ins, this, _1);
        if (this->_topology.init(this->_conf["topology"], create_func) == false) {
            FATAL("Topology init failed");
            return -1;
        }

        this->_pcall = ::boost::bind(&mr_map_t::_process_back_batch, this, _1, _2, _3);
        return 0;
    }

    virtual int map(const hadoop_map_input_t& input)
    {
        if (this->_has_error == true) {
            return -1;
        }

        int32_t key_len;
        char *key = (char *)input.key(key_len);

        int32_t val_len;
        char *val = (char *)input.value(val_len);

        std::string s_key(key, key_len);
        std::string s_val(val, val_len);

        ::mobs::common::BatchMobsMessage batch_mobs_msg;
        mobs::common::BatchInfo* batch_info = batch_mobs_msg.mutable_batch_info();
        batch_info->set_trace_id(0);

        ::mobs::common::MobsMessage* mobs_msg = batch_mobs_msg.add_messages();

        ::mobs::common::KVRecord* kv_record_key = mobs_msg->add_records();
        kv_record_key->set_key("key");
        mobs::common::Data* m_data_key = kv_record_key->mutable_data();
        m_data_key->set_value(s_key);

        ::mobs::common::KVRecord* kv_record_val = mobs_msg->add_records();
        kv_record_val->set_key("data");
        mobs::common::Data* m_data_val = kv_record_val->mutable_data();
        m_data_val->set_value(s_val);

        mobs_msg->set_namespace_("");
        mobs_msg->set_key("");
        mobs_msg->set_trace_id(0);

        ::baidu::ymir::BatchMobsMessageData data;
        data.copy_input(batch_mobs_msg);

        bool flag = true;
        if (FLAGS_asyn_schema) {
            flag = this->_mobs_actuator.push(data, this->_pcall, true, false);
        } else {
            flag = this->_mobs_actuator.push(data, this->_pcall, true, true);
        }
        if (flag == false) {
            return -1;
        }
        return 0;
    }

    virtual int on_task_end()
    {
        uint64_t node_num = _mobs_actuator.get_node_num();
        NOTICE("on_task_end node_num =%d", node_num);
        
        if (this->_has_error == true
            || this->_topology.wait_finish(node_num) == false) {
            FATAL("Process has error or Topology wait finish failed");
            return -1;
        }

        _mobs_actuator.destroy();
        pthread_mutex_destroy(&(this->_mutex));
        return 0;
    }

private:
    comcfg::Configure _conf;
    ::baidu::ymir::Topology _topology;
    //::baidu::ymir::MobsMessageActuator _mobs_actuator;
    //::baidu::ymir::MobsMessageActuator::ProcessCallBack _pcall;
    ::baidu::ymir::BatchMobsMessageActuator _mobs_actuator;
    ::baidu::ymir::BatchMobsMessageActuator::ProcessCallBack _pcall;
    ::baidu::ymir::OperatorImpl::BatchMobsMsgProcess _pcall_aiao;
    bool _has_error;
    pthread_mutex_t _mutex;
    ::baidu::ymir::DataCollect _data_collect;

    ::baidu::ymir::OperatorImpl* _create_ins(const std::string& name) {
        ::baidu::ymir::OperatorImpl* op_impl = NULL;
        char root_dir_temp[1024];
        getcwd(root_dir_temp, sizeof(root_dir_temp));
        std::string root_dir = root_dir_temp;
        if (strcasecmp(name.c_str(), "cmd") == 0) {
            op_impl = new ::baidu::ymir::BinaryOperatorImpl(FLAGS_op_params, root_dir);
            op_impl->set_bmobs_proc(this->_pcall_aiao);
        } else if (strcasecmp(name.c_str(), "so") == 0) {
            op_impl = new ::baidu::ymir::DllOperatorImpl(FLAGS_op_params);
        } else if (strcasecmp(name.c_str(), "du") == 0) {
            op_impl = new ::baidu::ymir::DuOperatorImpl();
        } else if (strcasecmp(name.c_str(), "dubatch") == 0) {
            op_impl = new ::baidu::ymir::DuBatchOperatorImpl();
        } else if (strcasecmp(name.c_str(), "script") == 0) {
            op_impl = new ::baidu::ymir::ScriptOperatorImpl();
        } else if (strcasecmp(name.c_str(), "bridge_trans") == 0) {
            op_impl = new ::baidu::ymir::TransMobsOperatorImpl();
        } else if (strcasecmp(name.c_str(), "bridge_filter") == 0) {
            op_impl = new ::baidu::ymir::FilterOperatorImpl();
        }
        return op_impl;
    }

    void _process_back(const int ret, const base::IOBuf& log,
        const ::baidu::ymir::OperatorData<::mobs::common::MobsMessage>& data) {
        ::baidu::ymir::HelpLock lock(&(this->_mutex));
        if (ret != ::baidu::ymir::ERR_OK) {
            this->_has_error = true;
            FATAL("%s", log.to_string().c_str());
        }
        NOTICE("%s", log.to_string().c_str());

        const ::mobs::common::MobsMessage msg =  data.get_output();
        emit_data(msg);
    }

    void _process_back_batch(const int ret, const base::IOBuf& log,
        const ::baidu::ymir::OperatorData<::mobs::common::BatchMobsMessage>& data) {
        ::baidu::ymir::HelpLock lock(&(this->_mutex));
        if (ret != ::baidu::ymir::ERR_OK) {
            this->_has_error = true;
            FATAL("%s", log.to_string().c_str());
        }
        NOTICE("%s", log.to_string().c_str());

        const ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_output();
        for (auto &msg : batch_mobs_msg.messages()) { 
            emit_data(msg);
        }        
    }

    void _process_back_batch_aiao(::baidu::ymir::OperatorData<::mobs::common::BatchMobsMessage>& data) {
        DEBUG("enter process_back_batch_aiao");
        ::baidu::ymir::HelpLock lock(&(this->_mutex));

        const ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_output();
        for (auto &msg : batch_mobs_msg.messages()) {
            emit_data(msg);
        }
    }

    void emit_data(const ::mobs::common::MobsMessage& msg) {
        bool key_flag = false;
        std::string key = "";
        std::string value = "";
        for (auto &rec : msg.records()) {
            if (rec.key() == "key") {
                key_flag = true;
                key = rec.data().value();
                key = ::baidu::ymir::string_trim(key, " ");
            } else if (rec.key() == "data") {
                value = rec.data().value();
                value = ::baidu::ymir::string_trim(value, " ");
            }
        }

        if (FLAGS_run_schema == "bistreaming") {
            hadoop_err_t err_t = hadoop_err_t::E_NOERROR;
            if (key_flag) {
                int32_t key_len = key.length();
                int32_t val_len = value.length();
                err_t = emit(key.c_str(), key_len, value.c_str(), val_len);
            } else {
                int32_t val_len = value.length();
                err_t = emit(value.c_str(), val_len);
            }
            if (err_t != E_NOERROR) {
                _has_error = true;
                FATAL("emit error=%d", err_t); 
            }
        } else if (FLAGS_run_schema == "streaming") {
            if (key_flag) {
                int32_t key_len = key.length();
                int32_t val_len = value.length();
                fwrite(key.c_str(), key_len, 1, stdout);
                fwrite("\t", 1, 1, stdout);
                fwrite(value.c_str(), val_len, 1, stdout);
                fwrite("\n", 1, 1, stdout);
            } else {
                int32_t val_len = value.length();
                fwrite(value.c_str(), val_len, 1, stdout);
                fwrite("\n", 1, 1, stdout);
            }
        }
    }
};

USING_MAPPER(mr_map_t);

int main(int argc, char *argv[])
{
    ::baidu::ymir::set_core_signal();
    ::google::ParseCommandLineFlags(&argc, &argv, true);
    hadoop_framework_t framework;    
    framework.init(argc, argv);
    return framework.run();
}

 

上一篇:MR之排序


下一篇:idea配置注释