利用gdb对greenplum的segment源码进行跟踪的方法

版本号

Linux version 3.10.0-862.3.2.el7.x86_64 (gcc version 4.8.5 20150623 (Red Hat 4.8.5-28) (GCC) )
PostgreSQL 8.2.15 (Greenplum Database 4.3.99.00 build dev)


方法

原理

1. psql启动

2. master与segment通信


方法
  • psql进入master
psql -d database -p 2345
  • 获取master上的后端进程号
ps -ef --forest | grep postgres

利用gdb对greenplum的segment源码进行跟踪的方法

  • gdb联系master上的后端进程号
gdb
attach 34046
b cdbdisp_dispatchPlan
c
  • psql查询
SELECT * from test order by id;
  • 获取segment上的后端进程号
ps -ef --forest | grep postgres

利用gdb对greenplum的segment源码进行跟踪的方法

  • gdb联系segment上的后端进程号
gdb
attach 35905
b exec_mpp_query
c
  • 在master的gdb上
c

接下来就可以对segment进行代码跟踪调式了


原理(代码并非完全复制,请自行参照源码)

1. psql启动

函数PostmasterMain
/*
 * Postmaster main entry point
 */
int
PostmasterMain(int argc, char *argv[])
{
    // ListenAddresses是postgresql.conf配置的参数listen_addresses
    if (ListenAddresses)
    {
        char       *rawstring;
        List       *elemlist;
        ListCell   *l;
        int            success = 0;

        /* Need a modifiable copy of ListenAddresses */
        rawstring = pstrdup(ListenAddresses);

        /* Parse string into list of identifiers */
        if (!SplitIdentifierString(rawstring, ',', &elemlist))
        {
            /* syntax error in list */
            ereport(FATAL,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("invalid list syntax for \"listen_addresses\"")));
        }

        foreach(l, elemlist)
        {
            char       *curhost = (char *) lfirst(l);

            // StreamServerPort创建和绑定socket,并进行监听
            if (strcmp(curhost, "*") == 0)
                status = StreamServerPort(AF_UNSPEC, NULL,
                                          (unsigned short) PostPortNumber,
                                          UnixSocketDir,
                                          ListenSocket, MAXLISTEN);
            else
                status = StreamServerPort(AF_UNSPEC, curhost,
                                          (unsigned short) PostPortNumber,
                                          UnixSocketDir,
                                          ListenSocket, MAXLISTEN);
            if (status == STATUS_OK)
                success++;
            else
                ereport(WARNING,
                        (errmsg("could not create listen socket for \"%s\"",
                                curhost)));
        }
    }
    
    // 利用函数ServerLoop和全局静态变量ListenSocket进行循环接受连接请求
    status = ServerLoop();
}

函数StreamServerPort
/*
 * StreamServerPort -- open a "listening" port to accept connections.
 *
 * Successfully opened sockets are added to the ListenSocket[] array,
 * at the first position that isn't PGINVALID_SOCKET.
 *
 * RETURNS: STATUS_OK or STATUS_ERROR
 */

int
StreamServerPort(int family, char *hostName, unsigned short portNumber,
                 char *unixSocketName,
                 pgsocket ListenSocket[], int MaxListen)
{
    for (addr = addrs; addr; addr = addr->ai_next)
    {
        // 创建socket
        if ((fd = socket(addr->ai_family, SOCK_STREAM, 0)) < 0)
        {
            ereport(LOG,
                    (errcode_for_socket_access(),
            /* translator: %s is IPv4, IPv6, or Unix */
                     errmsg("could not create %s socket: %m",
                            familyDesc)));
            continue;
        }
        
        // 绑定socket
        err = bind(fd, addr->ai_addr, addr->ai_addrlen);
        
        // 监听
        err = listen(fd, maxconn);
    }
}

函数ServerLoop
/*
 * Main idle loop of postmaster
 */
static int
ServerLoop(void)
{
    for (;;)
    {
        int            selres;

        if (acceptNewConnections)
        {
            // select进行I/O监视,有新的连接请求返回大于0,
            selres = select(nSockets, &rmask, NULL, NULL, &timeout);
        }
        
        /*
         * New connection pending on any of our sockets? If so, fork a child
         * process to deal with it.
         */
        if (selres > 0)
        {
            int            i;

            for (i = 0; i < MAXLISTEN; i++)
            {
                if (ListenSocket[i] == -1)
                    break;
                if (FD_ISSET(ListenSocket[i], &rmask))
                {
                    Port       *port;

                    // ConnCreate->StreamConnection->accept,接受连接
                    port = ConnCreate(ListenSocket[i]);
                    if (port)
                    {
                        // 启动后端
                        BackendStartup(port);

                        /*
                         * We no longer need the open socket or port structure
                         * in this process
                         */
                        StreamClose(port->sock);
                        ConnFree(port);
                    }
                }
            }
        }
    }
}

函数BackendStartup
/*
 * BackendStartup -- start backend process
 *
 * returns: STATUS_ERROR if the fork failed, STATUS_OK otherwise.
 *
 * Note: if you change this code, also consider StartAutovacuumWorker.
 */
static int
BackendStartup(Port *port)
{
    pid = fork_process();

    if (pid == 0)                /* child */
    { // fork新的进程与psql进行交互
        /* Close the postmaster's sockets */
        ClosePostmasterPorts(false);
        
        /* And run the backend */
        proc_exit(BackendRun(port));
    }
}

函数BackendRun
/*
 * BackendRun -- set up the backend's argument list and invoke PostgresMain()
 *
 * returns:
 *        Shouldn't return at all.
 *        If PostgresMain() fails, return status.
 */
static int
BackendRun(Port *port)
{
    return PostgresMain(ac, av, port->database_name, port->user_name);
}

函数PostgresMain
/* ----------------------------------------------------------------
 * PostgresMain
 *       postgres main loop -- all backends, interactive or otherwise start here
 *
 * argc/argv are the command line arguments to be used.  (When being forked
 * by the postmaster, these are not the original argv array of the process.)
 * username is the (possibly authenticated) PostgreSQL user name to be used
 * for the session.
 * ----------------------------------------------------------------
 */
int
PostgresMain(int argc, char *argv[],
             const char *dbname, const char *username)
{
    /*
     * Non-error queries loop here.
     */

    for (;;)
    {
        if (send_ready_for_query)
        {
            /* 
             * 1. ReadyForQuery->pq_endmessage->pq_putmessage->internal_putbytes,
             * internal_putbytes把准备完毕的信息存放在全局静态变量PqSendBufferSize中
             * 2. ReadyForQuery->pq_flush->internal_flush->secure_write->send,
             * send通知psql后端准备完毕,至此psql启动完毕
             */
            if (Gp_role == GP_ROLE_EXECUTE && Gp_is_writer)
            {
                ReadyForQuery_QEWriter(whereToSendOutput);
            }
            else
                ReadyForQuery(whereToSendOutput);
            send_ready_for_query = false;
        }
        
        /* 
         * 此处接受psql查询
         * ReadCommand->SocketBackend->pq_getbyte->pq_recvbuf->secure_read->recv,
         * recv获取psql发送的信息(包括信息类型,长度,sql),
         * 并且SocketBackend调用pq_getmessage将sql存放到input_message中。
         * ReadCommand返回消息类型
         */
        firstchar = ReadCommand(&input_message);
    }
}

2. master与segment通信

函数ExecutorStart

调用关系: exec_simple_query->PortalStart->ExecutorStart

void
ExecutorStart(QueryDesc *queryDesc, int eflags)
{
    START_MEMORY_ACCOUNT(plannedStmt->memoryAccount);
    {
        /* If the interconnect has been set up; we need to catch any
         * errors to shut it down -- so we have to wrap InitPlan in a PG_TRY() block. */
        PG_TRY();
        {
            // InitPlan->ExecInitNode->ExecInitMotion
            InitPlan(queryDesc, eflags);
        
            if (Gp_role == GP_ROLE_DISPATCH &&
                queryDesc->plannedstmt->planTree->dispatch == DISPATCH_PARALLEL)
            {
                /* Assign gang descriptions to the root slices of the slice forest. */
                // 非常简单的select语句,这里并未做任何改变
                InitRootSlices(queryDesc);
                
                if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
                {
                    // AssignGangs->allocateWriterGang(primaryWriterGang==NULL)->createGang
                    AssignGangs(queryDesc, gp_singleton_segindex);
                }
            }
            
            if (shouldDispatch)
            {
                // cdbdisp_dispatchPlan->cdbdisp_dispatchX->cdbdisp_dispatchToGang
                cdbdisp_dispatchPlan(queryDesc, needDtxTwoPhase, true, estate->dispatcherState);
            }
        }
    }
}

函数ExecInitMotion
MotionState *
ExecInitMotion(Motion * node, EState *estate, int eflags)
{
    MotionState *motionstate = NULL;
    TupleDesc    tupDesc;
    Slice       *sendSlice = NULL;
    Slice      *recvSlice = NULL;
    SliceTable *sliceTable = estate->es_sliceTable;

    int            parentSliceIndex = estate->currentSliceIdInPlan;
    
    /* Look up the sending gang's slice table entry. */
    sendSlice = (Slice *)list_nth(sliceTable->slices, node->motionID);
    
    /* QD must fill in the global slice table. */
    if (Gp_role == GP_ROLE_DISPATCH)
    {   // 主要作用是将sendSlice置为recvSlice的child,并且设置sendSlice
        /* Look up the receiving (parent) gang's slice table entry. */
        recvSlice = (Slice *)list_nth(sliceTable->slices, parentSliceIndex);
        
        /* Sending slice become a children of recv slice */
        recvSlice->children = lappend_int(recvSlice->children, sendSlice->sliceIndex);
        sendSlice->parentIndex = parentSliceIndex;
        sendSlice->rootIndex = recvSlice->rootIndex;

        /* The gang beneath a Motion will be a reader. */
        sendSlice->gangType = GANGTYPE_PRIMARY_READER;
    }
}

函数createGang
/*
 * creates a new gang by logging on a session to each segDB involved
 *
 */
static Gang *
createGang(GangType type, int gang_id, int size, int content, char *portal_name)
{
    for (i = 0; i < threadCount; i++)
    {
        /* 
         * thread_DoConnect->cdbconn_doConnect->PQconnectdbParams连接segment数据库,
         * 类似于psql连接postgres数据库
         */
        pthread_err = gp_pthread_create(&pParms->thread, thread_DoConnect, pParms, "createGang");
    }
    
    /*
     * wait for all of the DoConnect threads to complete.
     */
    for (i = 0; i < threadCount; i++)
    {
        DoConnectParms *pParms = &doConnectParmsAr[i];

        if (0 != pthread_join(pParms->thread, NULL))
        {
            elog(FATAL, "could not create segworker group");
        }
    }
}

函数cdbdisp_dispatchToGang

void
cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,
                       GpDispatchCommandType        mppDispatchCommandType,
                       void                           *commandTypeParms,
                       struct Gang                 *gp,
                       int                          sliceIndex,
                       unsigned int                 maxSlices,
                       CdbDispatchDirectDesc        *disp_direct)
{
    /*
     * Create the threads. (which also starts the dispatching).
     */
    for (i = 0; i < newThreads; i++)
    {
        DispatchCommandParms *pParms = &(ds->dispatchThreads->dispatchCommandParmsAr + ds->dispatchThreads->threadCount)[i];
        
        if (gp_connections_per_thread==0)
        {
            Assert(newThreads <= 1);
            thread_DispatchOut(pParms);
        }
        else
        {
            int        pthread_err = 0;

            pParms->thread_valid = true;
            /*
             * thread_DispatchCommand->thread_DispatchOut->dispatchCommandQuery->PQsendGpQuery_shared,
             * 类似于psql发送查询信息
             */
            pthread_err = gp_pthread_create(&pParms->thread, thread_DispatchCommand, pParms, "dispatchToGang");
        }
    }
}
上一篇:网关设备是一台安卓来管理子设备


下一篇:UVaLive5059 Playing With Stones