- TRex架构图
TRex-EMU can receive commands (RPC via JSON-RPC) from a Python client and send/receive packets via a ZMQ channel that connects it to the TRex server (RX core). Packets from the network (from a TRex server physical port) that match a filter (dynamic) are forwarded to the TRex-EMU process. Packets from the EMU process are packed into the ZMQ channel and sent directly to the TRex physical port.
TRex-EMU收到客户端发送的命令后(RPC via JSON-RPC),通过ZMQ通道发送、接受数据包到TRex 服务端。
Trex服务器端从物理接口上接收到了符合要求的数据报文后会转发给TRex-EMU进程.
EMU处理的数据报文会通过ZMQ通道直接走DPDK发包流程
是的
- TRex-EMU的ZMQ创建过程如下:
// Specifies the type of a socket, used by NewSocket()
type Type int
const (
// Constants for NewSocket()
// See: http://api.zeromq.org/4-1:zmq-socket#toc3
REQ = Type(C.ZMQ_REQ)
REP = Type(C.ZMQ_REP)
DEALER = Type(C.ZMQ_DEALER)
ROUTER = Type(C.ZMQ_ROUTER)
PUB = Type(C.ZMQ_PUB)
SUB = Type(C.ZMQ_SUB)
XPUB = Type(C.ZMQ_XPUB)
XSUB = Type(C.ZMQ_XSUB)
PUSH = Type(C.ZMQ_PUSH)
PULL = Type(C.ZMQ_PULL)
PAIR = Type(C.ZMQ_PAIR)
STREAM = Type(C.ZMQ_STREAM)
)
func (o *VethIFZmq) CreateSocket(socketStr string) (*zmq.Context, *zmq.Socket) {
context, err := zmq.NewContext()
if err != nil || context == nil {
panic(err)
}
socket, err := context.NewSocket(zmq.PAIR)
if err != nil || socket == nil {
panic(err)
}
if o.proxyMode {
err = socket.Bind(socketStr)
} else {
err = socket.Connect(socketStr)
}
if err != nil {
panic(err)
}
return context, socket
}
func (o *VethIFZmq) Create(ctx *CThreadCtx, port uint16, server string, tcp bool, proxyMode bool) {
var socketStrRx, socketStrTx string
if tcp {
socketStrRx = fmt.Sprintf("tcp://%s:%d", server, port)
socketStrTx = fmt.Sprintf("tcp://%s:%d", server, port+1)
} else {
socketStrRx = fmt.Sprintf("ipc://%s-%d.ipc", ZMQ_EMU_IPC_PATH, port)
socketStrTx = fmt.Sprintf("ipc://%s-%d.ipc", ZMQ_EMU_IPC_PATH, port+1)
}
o.proxyMode = proxyMode
if o.proxyMode {
o.rxCtx, o.rxSocket = o.CreateSocket(socketStrTx)
o.txCtx, o.txSocket = o.CreateSocket(socketStrRx)
o.rxPort = port + 1
o.txPort = port
} else {
o.rxCtx, o.rxSocket = o.CreateSocket(socketStrRx)
o.txCtx, o.txSocket = o.CreateSocket(socketStrTx)
o.rxPort = port
o.txPort = port + 1
}
...
}
- TRex ZMQ创建过程及收发包过程
CRxCore : public TrexRxCore,其中TrexRxCore是一个接口类
typedef std::vector<RXPortManager*> rx_port_mg_vec_t;
typedef std::map<uint8_t, RXPortManager*> rx_port_mg_map_t;
class CRxCore: public TrexRxCore {
protected:
....
rx_port_mg_map_t m_rx_port_mngr_map;
rx_port_mg_vec_t m_rx_port_mngr_vec;
...
void* m_zmq_ctx;
void* m_zmq_rx_socket; // in respect to TRex interface (rx->emu)
void* m_zmq_tx_socket; // in respect to TRex interface (emu->tx)
CZmqPacketWriter m_zmq_wr;
CZmqPacketReader m_zmq_rd;
}
void CRxCore::create(const CRxSlCfg &cfg) {
...
if (m_ex_zmq_enabled) {
create_zmq();
}
...
/* create per port manager */
for (auto &port : cfg.m_ports) {
RXPortManager *mngr = new RXPortManager();
mngr->create_async(port.first,
this,
port.second,
m_rfc2544,
&m_err_cntrs,
&m_cpu_dp_u);
m_rx_port_mngr_map[port.first] = mngr;
m_rx_port_mngr_vec[port.first] = mngr;
if (m_ex_zmq_enabled) {
mngr->set_zmq_cn(&m_zmq_wr,&m_zmq_rd);
mngr->set_feature(RXPortManager::EZMQ); /* enable the feature*/
}
}
....
}
void CRxCore::create_zmq() {
// create zmq ch
m_zmq_ctx = zmq_ctx_new();
CParserOption * po =&CGlobalInfo::m_options;
char buffer[100];
if (m_ezmq_use_tcp) {
sprintf(buffer,"tcp://*:%d",po->m_ezmq_ch_port);
} else {
sprintf(buffer,"ipc://%s-%d.ipc", po->m_emzq_ipc_file_path.c_str(), po->m_ezmq_ch_port);
}
create_zmq(m_zmq_rx_socket,buffer);
if (m_ezmq_use_tcp) {
sprintf(buffer,"tcp://*:%d",po->m_ezmq_ch_port+1);
} else {
sprintf(buffer,"ipc://%s-%d.ipc", po->m_emzq_ipc_file_path.c_str(), po->m_ezmq_ch_port + 1);
}
create_zmq(m_zmq_tx_socket,buffer);
m_zmq_wr.Create(m_zmq_rx_socket,&m_err_cntrs);
m_zmq_rd.Create(m_zmq_tx_socket,this,&m_err_cntrs);
}
bool CRxCore::create_zmq(void * &socket,std::string server) {
int rc;
socket = zmq_socket(m_zmq_ctx, ZMQ_PAIR);
if (!socket) {
printf("unable to create socket server at:%s \n", server.c_str());
return false;
}
int linger = 0;
rc = zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
if (rc != 0) {
printf("unable to set linger server at:%s \n", server.c_str());
return false;
}
rc = zmq_bind(socket, server.c_str());
if (rc != 0) {
printf("unable to bind ZMQ server at:%s \n", server.c_str());
return false;
}
return true;
}
其中CRxCore中的m_zmq_rd对象负责将数据发送给dpdk发包序列:
void CZmqPacketReader::tx_buffer(char *pkt,int pkt_size,uint8_t vport) {
uint8_t port_id = vport;
rte_mbuf_t *m;
uint8_t num_ports = CGlobalInfo::m_options.get_expected_ports();
if ( port_id >= num_ports ) {
m_cnt->m_ezmq_tx_fe_wrong_vport++;
return ;
}
socket_id_t socket = CGlobalInfo::m_socket.port_to_socket(vport);
if (pkt_size <= _2048_MBUF_SIZE) {
m = CGlobalInfo::pktmbuf_alloc_no_assert(socket, pkt_size);
if ( m ) {
/* allocate */
uint8_t *p = (uint8_t *)rte_pktmbuf_append(m, pkt_size);
assert(p);
/* copy */
memcpy(p, pkt, pkt_size);
}
}else{
/* creating chaning of mbuf in the size of pool */
rte_mempool *pool_2k = CGlobalInfo::pktmbuf_get_pool(socket, _2048_MBUF_SIZE);
m = utl_rte_pktmbuf_mem_to_pkt_no_assert(pkt, pkt_size, _2048_MBUF_SIZE, pool_2k);
}
if ( !m ) {
m_cnt->m_ezmq_tx_fe_dropped_no_mbuf++;
return;
}
if (!m_rx->tx_pkt(m, port_id)) {
m_cnt->m_ezmq_tx_fe_err_send++;
rte_pktmbuf_free(m);
}else{
m_cnt->m_ezmq_tx_fe_ok_send++;
}
}
m_rx是一个CRxCore对象,其tx_pkt函数为:
bool CRxCore::tx_pkt(rte_mbuf_t *m, uint8_t tx_port_id) {
return m_rx_port_mngr_vec[tx_port_id]->tx_pkt(m);
}
CRxCore 的实例对象通过ZMQ接收到TRex-EMU传送的数据
int CZmqPacketReader::pool_msg(void){
int cnt=1;
while ( cnt > 0 ){
int len = zmq_recv(m_socket, m_buf,ZMQ_BUFFER_SIZE,ZMQ_NOBLOCK);
if (len < 0 ){
return 0;
}else{
if (len > 0 ) {
if (len<=ZMQ_BUFFER_SIZE) {
parse_msg(m_buf,len);
}else{
...
}
}else{
...
}
}
cnt--;
}
return (0);
}
int CZmqPacketReader::parse_msg(char *buf,int size){
...
int i;
for (i=0; i<(int)pkts; i++){
...
tx_buffer(p,int(pkt_size),vport);
..
}
...
return 0;
}
start
-->_do_start
函数调用,(hot_state_loop|cold_state_loop) 调用work_tick, work_tick 调用process_all_pending_pkts
int CRxCore::process_all_pending_pkts(bool flush_rx) {
int total_pkts = 0;
for (auto &mngr_pair : m_rx_port_mngr_map) {
total_pkts += mngr_pair.second->process_all_pending_pkts(flush_rx);
}
if (m_ex_zmq_enabled){
if (m_zmq_wr.flush() ) {
restart_zmq();
}
m_zmq_rd.pool_msg();
}
return total_pkts;
}
- TRex如何启用ZMQ服务
/**
* abstract class (meant for derived implementation)
*
* dervied by STL, STF, ASTF and ASTF batch
*
* @author imarom (8/30/2017)
*/
class TrexSTX {
public:
/**
* create a STX object with configuration
*
*/
TrexSTX(const TrexSTXCfg &cfg);
/**
* pure virtual DTOR to enforce deriviation
*/
virtual ~TrexSTX();
...
protected:
...
/* RX */
TrexRxCore *m_rx;
}
class TrexStateless : public TrexSTX {
...
}
TrexStateless::TrexStateless(const TrexSTXCfg &cfg) : TrexSTX(cfg) {
/* API core version */
const int API_VER_MAJOR = 5;
const int API_VER_MINOR = 1;
/* init the RPC table */
TrexRpcCommandsTable::get_instance().init("STL", API_VER_MAJOR, API_VER_MINOR);
/* load the RPC components for stateless */
TrexRpcCommandsTable::get_instance().load_component(new TrexRpcCmdsCommon());
TrexRpcCommandsTable::get_instance().load_component(new TrexRpcCmdsSTL());
/* create stateless ports */
for (int i = 0; i < get_platform_api().get_port_count(); i++) {
if ( !CGlobalInfo::m_options.m_dummy_port_map[i] ) {
m_ports[i] = (TrexPort *)new TrexStatelessPort(i);
}
}
/* create RX core */
CRxCore *rx = new CRxCore();
rx->create(cfg.m_rx_cfg);
m_rx = rx;
m_stats = nullptr;
m_tpg_mgr = nullptr;
m_tpg_state = TPGState::DISABLED;
}
COLD_FUNC void CGlobalTRex::init_stl() {
for (int i = 0; i < get_cores_tx(); i++) {
m_cores_vif[i + 1] = &m_cores_vif_stl[i + 1];
}
if (get_dpdk_mode()->dp_rx_queues() ){
/* multi-queue mode */
for (int i = 0; i < get_cores_tx(); i++) {
int qid =(i/get_base_num_cores());
int rx_qid=get_dpdk_mode()->get_dp_rx_queues(qid); /* 0,1,2,3*/
m_cores_vif_stl[i+1].set_rx_queue_id(rx_qid,rx_qid);
}
}
init_vif_cores();
rx_interactive_conf();
m_stx = new TrexStateless(get_stx_cfg());
start_master_stateless();
init_stl_stats();
}
CGlobalTRex::Create() {
...
switch (get_op_mode()) {
case OP_MODE_STL:
init_stl();
break;
case OP_MODE_ASTF:
init_astf();
break;
case OP_MODE_STF:
init_stf();
break;
case OP_MODE_ASTF_BATCH:
init_astf_batch();
break;
default:
assert(0);
}
...
}