#include "trans_mobs_operator_impl.h" namespace baidu { namespace ymir { TransMobsOperatorImpl::TransMobsOperatorImpl() { _process_node_num = 0; _pass_node_num = 0; } TransMobsOperatorImpl::~TransMobsOperatorImpl() { } bool TransMobsOperatorImpl::init(const comcfg::ConfigUnit& conf) { return data_parse_impl.init(conf); } int TransMobsOperatorImpl::process(OperatorData<::mobs::common::MobsMessage>& data){ ::mobs::common::MobsMessage mobs_msg = data.get_input(); std::string line; bool find_flag = false; for(auto &rec : mobs_msg.records()) { if (rec.key() == "data") { line = rec.data().value(); find_flag = true; break; } } if (find_flag) { ::mobs::common::MobsMessage mobs_msg_new; data_parse_impl.parse(line, &mobs_msg_new); data.copy_output(mobs_msg_new); } else { data.set_skip(); } return 0; } bool TransMobsOperatorImpl::wait_finish(uint64_t node_num) { return true; } void TransMobsOperatorImpl::async_process(OperatorData<::mobs::common::BatchMobsMessage>& data, int* ret, ::bthread::CountdownEvent* event) { uint64_t process_node_num = __sync_add_and_fetch(&_process_node_num, (uint64_t)1); DEBUG("TransMobsOperatorImpl process_node_num=%llu", process_node_num); ::mobs::common::BatchMobsMessage batch_mobs_msg_out; mobs::common::BatchInfo* batch_info = batch_mobs_msg_out.mutable_batch_info(); batch_info->set_trace_id(0); ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_input(); for (auto &mobs_msg : batch_mobs_msg.messages()) { bool find_flag = false; std::string line; for(auto &rec : mobs_msg.records()) { if (rec.key() == "data") { line = rec.data().value(); find_flag = true; break; } } DEBUG("TransMobsOperatorImpl line.length=%d, line:%s", line.length(), line.c_str()); if (find_flag && line.length() > 1) { ::mobs::common::MobsMessage* mobs_msg_out = batch_mobs_msg_out.add_messages(); data_parse_impl.parse(line, mobs_msg_out); } } std::string temp; ProtoMessageToJson(batch_mobs_msg_out, &temp); DEBUG("TransMobsOperatorImpl output size: %s", temp.c_str()); uint64_t pass_node_num = 0; if (batch_mobs_msg_out.messages().size() > 0) { pass_node_num = __sync_add_and_fetch(&_pass_node_num, (uint64_t)1); data.copy_output(batch_mobs_msg_out); DEBUG("TransMobsOperatorImpl batch_mobs_msg_out size: %d", batch_mobs_msg_out.messages().size()); } else { data.set_skip(); NOTICE("TransMobsOperatorImpl set_skip"); } DEBUG("TransMobsOperatorImpl pass_node_num=%llu", pass_node_num); *ret = 0; if (event != NULL) { event->signal(); } return; } int TransMobsOperatorImpl::process(OperatorData<::mobs::common::BatchMobsMessage>& data) { ::bthread::CountdownEvent* event = NULL; int ret = 0; async_process(data, &ret, event); return ret; } } }