#include <signal.h> #include <execinfo.h> #include <hadoop_mapred.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <string> #include <algorithm> #include <stdio.h> #include <unistd.h> #include <gflags/gflags.h> #include <common/error.h> #include <common/help_lock.h> #include <common/utils.h> #include <common/data_collect.h> #include <graph/topology.h> #include <graph/mobsmsg_actuator.h> #include <graph/batchmobsmsg_actuator.h> #include <component/binary_operator_impl.h> #include <component/dll_operator_impl.h> #include <component/dubatch_operator_impl.h> #include <component/du_operator_impl.h> #include <component/script_operator_impl.h> #include <component/filter_operator_impl.h> #include <component/trans_mobs_operator_impl.h> #include <parse/data_parse_impl.h> #include <parse/export_parse_impl.h> #include <pb_to_json.h> #include <json_to_pb.h> #include <base64.h> DEFINE_string(conf_path, "./", "Conf Path"); DEFINE_string(conf_file, "mr.conf", "Conf File"); DEFINE_string(log_path, "./", "Conf Path"); DEFINE_string(log_file, "log.conf", "Log File"); DEFINE_bool(asyn_schema, true, "asyn_schema"); DEFINE_int32(max_concurrency, 1000, "max concurrency"); DEFINE_string(run_schema, "bistreaming", "Run Schema"); DEFINE_string(op_params, "", "Operator Params"); class mr_map_t : public hadoop_mapper_t { public : mr_map_t() : _mobs_actuator(FLAGS_max_concurrency, &_topology, true), _has_error(false) { } virtual int on_task_begin() { pthread_mutex_init(&(this->_mutex), NULL); _data_collect.init(); if (0 != com_loadlog(FLAGS_log_path.c_str(), FLAGS_log_file.c_str())) { FATAL("Config Load failed[%s/%s]", FLAGS_log_path.c_str(), FLAGS_log_file.c_str()); return -1; } if (this->_conf.load(FLAGS_conf_path.c_str(), FLAGS_conf_file.c_str()) != 0) { FATAL("Config Load failed[%s/%s]", FLAGS_conf_path.c_str(), FLAGS_conf_file.c_str()); return -1; } this->_pcall_aiao = ::boost::bind(&mr_map_t::_process_back_batch_aiao, this, _1); ::baidu::ymir::Topology::InsCreateFunc create_func = ::boost::bind(&mr_map_t::_create_ins, this, _1); if (this->_topology.init(this->_conf["topology"], create_func) == false) { FATAL("Topology init failed"); return -1; } this->_pcall = ::boost::bind(&mr_map_t::_process_back_batch, this, _1, _2, _3); return 0; } virtual int map(const hadoop_map_input_t& input) { if (this->_has_error == true) { return -1; } int32_t key_len; char *key = (char *)input.key(key_len); int32_t val_len; char *val = (char *)input.value(val_len); std::string s_key(key, key_len); std::string s_val(val, val_len); ::mobs::common::BatchMobsMessage batch_mobs_msg; mobs::common::BatchInfo* batch_info = batch_mobs_msg.mutable_batch_info(); batch_info->set_trace_id(0); ::mobs::common::MobsMessage* mobs_msg = batch_mobs_msg.add_messages(); ::mobs::common::KVRecord* kv_record_key = mobs_msg->add_records(); kv_record_key->set_key("key"); mobs::common::Data* m_data_key = kv_record_key->mutable_data(); m_data_key->set_value(s_key); ::mobs::common::KVRecord* kv_record_val = mobs_msg->add_records(); kv_record_val->set_key("data"); mobs::common::Data* m_data_val = kv_record_val->mutable_data(); m_data_val->set_value(s_val); mobs_msg->set_namespace_(""); mobs_msg->set_key(""); mobs_msg->set_trace_id(0); ::baidu::ymir::BatchMobsMessageData data; data.copy_input(batch_mobs_msg); bool flag = true; if (FLAGS_asyn_schema) { flag = this->_mobs_actuator.push(data, this->_pcall, true, false); } else { flag = this->_mobs_actuator.push(data, this->_pcall, true, true); } if (flag == false) { return -1; } return 0; } virtual int on_task_end() { uint64_t node_num = _mobs_actuator.get_node_num(); NOTICE("on_task_end node_num =%d", node_num); if (this->_has_error == true || this->_topology.wait_finish(node_num) == false) { FATAL("Process has error or Topology wait finish failed"); return -1; } _mobs_actuator.destroy(); pthread_mutex_destroy(&(this->_mutex)); return 0; } private: comcfg::Configure _conf; ::baidu::ymir::Topology _topology; //::baidu::ymir::MobsMessageActuator _mobs_actuator; //::baidu::ymir::MobsMessageActuator::ProcessCallBack _pcall; ::baidu::ymir::BatchMobsMessageActuator _mobs_actuator; ::baidu::ymir::BatchMobsMessageActuator::ProcessCallBack _pcall; ::baidu::ymir::OperatorImpl::BatchMobsMsgProcess _pcall_aiao; bool _has_error; pthread_mutex_t _mutex; ::baidu::ymir::DataCollect _data_collect; ::baidu::ymir::OperatorImpl* _create_ins(const std::string& name) { ::baidu::ymir::OperatorImpl* op_impl = NULL; char root_dir_temp[1024]; getcwd(root_dir_temp, sizeof(root_dir_temp)); std::string root_dir = root_dir_temp; if (strcasecmp(name.c_str(), "cmd") == 0) { op_impl = new ::baidu::ymir::BinaryOperatorImpl(FLAGS_op_params, root_dir); op_impl->set_bmobs_proc(this->_pcall_aiao); } else if (strcasecmp(name.c_str(), "so") == 0) { op_impl = new ::baidu::ymir::DllOperatorImpl(FLAGS_op_params); } else if (strcasecmp(name.c_str(), "du") == 0) { op_impl = new ::baidu::ymir::DuOperatorImpl(); } else if (strcasecmp(name.c_str(), "dubatch") == 0) { op_impl = new ::baidu::ymir::DuBatchOperatorImpl(); } else if (strcasecmp(name.c_str(), "script") == 0) { op_impl = new ::baidu::ymir::ScriptOperatorImpl(); } else if (strcasecmp(name.c_str(), "bridge_trans") == 0) { op_impl = new ::baidu::ymir::TransMobsOperatorImpl(); } else if (strcasecmp(name.c_str(), "bridge_filter") == 0) { op_impl = new ::baidu::ymir::FilterOperatorImpl(); } return op_impl; } void _process_back(const int ret, const base::IOBuf& log, const ::baidu::ymir::OperatorData<::mobs::common::MobsMessage>& data) { ::baidu::ymir::HelpLock lock(&(this->_mutex)); if (ret != ::baidu::ymir::ERR_OK) { this->_has_error = true; FATAL("%s", log.to_string().c_str()); } NOTICE("%s", log.to_string().c_str()); const ::mobs::common::MobsMessage msg = data.get_output(); emit_data(msg); } void _process_back_batch(const int ret, const base::IOBuf& log, const ::baidu::ymir::OperatorData<::mobs::common::BatchMobsMessage>& data) { ::baidu::ymir::HelpLock lock(&(this->_mutex)); if (ret != ::baidu::ymir::ERR_OK) { this->_has_error = true; FATAL("%s", log.to_string().c_str()); } NOTICE("%s", log.to_string().c_str()); const ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_output(); for (auto &msg : batch_mobs_msg.messages()) { emit_data(msg); } } void _process_back_batch_aiao(::baidu::ymir::OperatorData<::mobs::common::BatchMobsMessage>& data) { DEBUG("enter process_back_batch_aiao"); ::baidu::ymir::HelpLock lock(&(this->_mutex)); const ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_output(); for (auto &msg : batch_mobs_msg.messages()) { emit_data(msg); } } void emit_data(const ::mobs::common::MobsMessage& msg) { bool key_flag = false; std::string key = ""; std::string value = ""; for (auto &rec : msg.records()) { if (rec.key() == "key") { key_flag = true; key = rec.data().value(); key = ::baidu::ymir::string_trim(key, " "); } else if (rec.key() == "data") { value = rec.data().value(); value = ::baidu::ymir::string_trim(value, " "); } } if (FLAGS_run_schema == "bistreaming") { hadoop_err_t err_t = hadoop_err_t::E_NOERROR; if (key_flag) { int32_t key_len = key.length(); int32_t val_len = value.length(); err_t = emit(key.c_str(), key_len, value.c_str(), val_len); } else { int32_t val_len = value.length(); err_t = emit(value.c_str(), val_len); } if (err_t != E_NOERROR) { _has_error = true; FATAL("emit error=%d", err_t); } } else if (FLAGS_run_schema == "streaming") { if (key_flag) { int32_t key_len = key.length(); int32_t val_len = value.length(); fwrite(key.c_str(), key_len, 1, stdout); fwrite("\t", 1, 1, stdout); fwrite(value.c_str(), val_len, 1, stdout); fwrite("\n", 1, 1, stdout); } else { int32_t val_len = value.length(); fwrite(value.c_str(), val_len, 1, stdout); fwrite("\n", 1, 1, stdout); } } } }; USING_MAPPER(mr_map_t); int main(int argc, char *argv[]) { ::baidu::ymir::set_core_signal(); ::google::ParseCommandLineFlags(&argc, &argv, true); hadoop_framework_t framework; framework.init(argc, argv); return framework.run(); }