PortalRunSelect函数在 PORTAL_ONE_SELECT策略下执行portal查询,在 PORTAL_ONE_RETURNING、PORTAL_ONE_MOD_WITH 和 PORTAL_UTIL_SELECT 情况下从已完成的 HoldStore 中提取数据时也适用。
forward:控制simple N-rows-forward-or-backward情况
count <= 0 被解释为无操作:目标启动和关闭,但没有其他任何事情发生。 此外,count == FETCH_ALL 被解释为“所有行”。 (参见 FetchStmt.howMany)
返回处理的行数(适用于结果标签)
static uint64 PortalRunSelect(Portal portal, bool forward, long count, DestReceiver *dest) {
QueryDesc *queryDesc;
ScanDirection direction;
uint64 nprocessed;
/* NB: queryDesc will be NULL if we are fetching from a held cursor or a completed utility query; can't use it in that path. */
queryDesc = portal->queryDesc;
/* Caller messed up if we have neither a ready query nor held data. */
Assert(queryDesc || portal->holdStore);
/* Force the queryDesc destination to the right thing. This supports MOVE, for example, which will pass in dest = DestNone. This is okay to change as long as we do it on every fetch. (The Executor must not assume that dest never changes.) */
if (queryDesc) queryDesc->dest = dest;
if (forward){
if (portal->atEnd || count <= 0){
direction = NoMovementScanDirection;
count = 0; /* don't pass negative count to executor */
}
else
direction = ForwardScanDirection;
/* In the executor, zero count processes all rows */
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count, portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
if (!ScanDirectionIsNoMovement(direction)){
if (nprocessed > 0)
portal->atStart = false; /* OK to go backward now */
if (count == 0 || nprocessed < (uint64) count)
portal->atEnd = true; /* we retrieved 'em all */
portal->portalPos += nprocessed;
}
}else{
if (portal->cursorOptions & CURSOR_OPT_NO_SCROLL)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cursor can only scan forward"), errhint("Declare it with SCROLL option to enable backward scan.")));
if (portal->atStart || count <= 0){
direction = NoMovementScanDirection;
count = 0; /* don't pass negative count to executor */
}
else
direction = BackwardScanDirection;
/* In the executor, zero count processes all rows */
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count, portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
if (!ScanDirectionIsNoMovement(direction))
{
if (nprocessed > 0 && portal->atEnd){
portal->atEnd = false; /* OK to go forward now */
portal->portalPos++; /* adjust for endpoint case */
}
if (count == 0 || nprocessed < (uint64) count){
portal->atStart = true; /* we retrieved 'em all */
portal->portalPos = 0;
}
else
portal->portalPos -= nprocessed;
}
}
return nprocessed;
}
该函数主要由forward分为两个部分,用于确定要进入的方向,并检查我们是否已经在该方向上可用元组的末尾。 如果是这样,请将方向设置为 NoMovement 以避免尝试获取任何元组。 (此检查存在是因为如果它们已经返回 NULL 一次,则并非所有计划节点类型都可以再次调用。)然后调用执行程序(我们不能跳过此操作,因为目标需要查看设置和关闭,即使没有 元组可用)。 最后,根据检索到的元组数量更新portal位置状态。
如果forward为true时,方向为Forward,判定atEnd或count是否为负或0,主要是不能传输负值给执行器。如果是通过holdStore取值,则执行RunFromStore,否则执行ExecutorRun。
if (portal->atEnd || count <= 0){
direction = NoMovementScanDirection;
count = 0; /* don't pass negative count to executor */
}
else
direction = ForwardScanDirection;
/* In the executor, zero count processes all rows */
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else
{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count, portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
if (!ScanDirectionIsNoMovement(direction))
{
if (nprocessed > 0)
portal->atStart = false; /* OK to go backward now */
if (count == 0 || nprocessed < (uint64) count)
portal->atEnd = true; /* we retrieved 'em all */
portal->portalPos += nprocessed;
}
如果forward为fasle时,方向为Backward,判定atStart或count是否为负或0,主要是不能传输负值给执行器。如果是通过holdStore取值,则执行RunFromStore,否则执行ExecutorRun。
if (portal->atStart || count <= 0){
direction = NoMovementScanDirection;
count = 0; /* don't pass negative count to executor */
}else
direction = BackwardScanDirection;
/* In the executor, zero count processes all rows */
if (count == FETCH_ALL)
count = 0;
if (portal->holdStore)
nprocessed = RunFromStore(portal, direction, (uint64) count, dest);
else{
PushActiveSnapshot(queryDesc->snapshot);
ExecutorRun(queryDesc, direction, (uint64) count, portal->run_once);
nprocessed = queryDesc->estate->es_processed;
PopActiveSnapshot();
}
if (!ScanDirectionIsNoMovement(direction)){
if (nprocessed > 0 && portal->atEnd){
portal->atEnd = false; /* OK to go forward now */
portal->portalPos++; /* adjust for endpoint case */
}
if (count == 0 || nprocessed < (uint64) count){
portal->atStart = true; /* we retrieved 'em all */
portal->portalPos = 0;
}
else
portal->portalPos -= nprocessed;
}
ExecutorRun函数
void ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once) {
if (ExecutorRun_hook)
(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
else
standard_ExecutorRun(queryDesc, direction, count, execute_once);
}
void standard_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, uint64 count, bool execute_once){
EState *estate;
CmdType operation;
DestReceiver *dest;
bool sendTuples;
MemoryContext oldcontext;
/* sanity checks */
Assert(queryDesc != NULL);
estate = queryDesc->estate;
Assert(estate != NULL);
Assert(!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY));
/* Switch into per-query memory context */
oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
/* Allow instrumentation of Executor overall runtime */
if (queryDesc->totaltime)
InstrStartNode(queryDesc->totaltime);
/* extract information from the query descriptor and the query feature. */
operation = queryDesc->operation;
dest = queryDesc->dest;
/* startup tuple receiver, if we will be emitting tuples */
estate->es_processed = 0;
sendTuples = (operation == CMD_SELECT || queryDesc->plannedstmt->hasReturning);
if (sendTuples)
dest->rStartup(dest, operation, queryDesc->tupDesc);
/* run plan */
if (!ScanDirectionIsNoMovement(direction)){
if (execute_once && queryDesc->already_executed)
elog(ERROR, "can't re-execute query flagged for single execution");
queryDesc->already_executed = true;
ExecutePlan(estate, queryDesc->planstate, queryDesc->plannedstmt->parallelModeNeeded, operation, sendTuples, count, direction, dest, execute_once);
}
/* shutdown tuple receiver, if we started it */
if (sendTuples)
dest->rShutdown(dest);
if (queryDesc->totaltime)
InstrStopNode(queryDesc->totaltime, estate->es_processed);
MemoryContextSwitchTo(oldcontext);
}