trans_mobs_operator_impl.cpp

#include "trans_mobs_operator_impl.h"

namespace baidu {
namespace ymir {

TransMobsOperatorImpl::TransMobsOperatorImpl() {
    _process_node_num = 0;
    _pass_node_num = 0;
}

TransMobsOperatorImpl::~TransMobsOperatorImpl() {

}

bool TransMobsOperatorImpl::init(const comcfg::ConfigUnit& conf) {
    return data_parse_impl.init(conf);
}

int TransMobsOperatorImpl::process(OperatorData<::mobs::common::MobsMessage>& data){
    ::mobs::common::MobsMessage mobs_msg = data.get_input();
    std::string line;
    bool find_flag = false;
    for(auto &rec : mobs_msg.records()) {
        if (rec.key() == "data") {
            line = rec.data().value();
            find_flag = true;
            break;
        }
    }
    
    if (find_flag) {
        ::mobs::common::MobsMessage mobs_msg_new;
        data_parse_impl.parse(line, &mobs_msg_new);
        data.copy_output(mobs_msg_new);
    } else {
        data.set_skip();
    }
    return 0;
}

bool TransMobsOperatorImpl::wait_finish(uint64_t node_num) {
    return true;
}

void TransMobsOperatorImpl::async_process(OperatorData<::mobs::common::BatchMobsMessage>& data,
                                          int* ret, ::bthread::CountdownEvent* event) {
    uint64_t process_node_num = __sync_add_and_fetch(&_process_node_num, (uint64_t)1);
    DEBUG("TransMobsOperatorImpl process_node_num=%llu", process_node_num);

    ::mobs::common::BatchMobsMessage batch_mobs_msg_out;
    mobs::common::BatchInfo* batch_info = batch_mobs_msg_out.mutable_batch_info(); 
    batch_info->set_trace_id(0); 

    ::mobs::common::BatchMobsMessage batch_mobs_msg = data.get_input();
    for (auto &mobs_msg : batch_mobs_msg.messages()) {
        bool find_flag = false; 
        std::string line;
        for(auto &rec : mobs_msg.records()) {
            if (rec.key() == "data") {
                line = rec.data().value();
                find_flag = true;
                break;
            }
        }

        DEBUG("TransMobsOperatorImpl line.length=%d, line:%s", line.length(), line.c_str());
        if (find_flag && line.length() > 1) {
            ::mobs::common::MobsMessage* mobs_msg_out = batch_mobs_msg_out.add_messages();
            data_parse_impl.parse(line, mobs_msg_out);
        }
    }

    std::string temp;
    ProtoMessageToJson(batch_mobs_msg_out, &temp);
    DEBUG("TransMobsOperatorImpl output size: %s", temp.c_str());

    uint64_t pass_node_num = 0;
    if (batch_mobs_msg_out.messages().size() > 0) {
        pass_node_num = __sync_add_and_fetch(&_pass_node_num, (uint64_t)1);
        data.copy_output(batch_mobs_msg_out);
        DEBUG("TransMobsOperatorImpl batch_mobs_msg_out size: %d",
                batch_mobs_msg_out.messages().size());
    } else {
        data.set_skip();
        NOTICE("TransMobsOperatorImpl set_skip");
    }

    DEBUG("TransMobsOperatorImpl pass_node_num=%llu", pass_node_num);
    *ret = 0;
    if (event != NULL) {
        event->signal();
    }
    return;
}

int TransMobsOperatorImpl::process(OperatorData<::mobs::common::BatchMobsMessage>& data) {
    ::bthread::CountdownEvent* event = NULL;
    int ret = 0;
    async_process(data, &ret, event);
    return ret;
}

}
}

 

上一篇:PyTorch深度学习(3)Transforms CIFAR10


下一篇:第1章 开始