完成Pipeline。-- 代码源自虎哥博客。感谢

  当时想实践一下Pipeline的构建。

  未能实现的原因主要是在于 

  1. ClickHouse架构认识不足。

  2.CMakeLists.txt 功力不足。

  各占一半一半。

  参见虎哥的博客: https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/

 

1. Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MySource : public ISource
{
public:
String getName() const override { return "MySource"; }

MySource(UInt64 end_)
: ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_)
{
}

private:
UInt64 end;
bool done = false;

Chunk generate() override
{
if (done)
{
return Chunk();
}
MutableColumns columns;
columns.emplace_back(ColumnUInt64::create());
for (auto i = 0U; i < end; i++)
columns[0]->insert(i);

done = true;
return Chunk(std::move(columns), end);
}
};

2. MyAddTransform

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class MyAddTransformer : public IProcessor
{
public:
String getName() const override { return "MyAddTransformer"; }

MyAddTransformer()
: IProcessor(
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})},
{Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})})
, input(inputs.front())
, output(outputs.front())
{
}

Status prepare() override
{
if (output.isFinished())
{
input.close();
return Status::Finished;
}

if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}

if (has_process_data)
{
output.push(std::move(current_chunk));
has_process_data = false;
}

if (input.isFinished())
{
output.finish();
return Status::Finished;
}

if (!input.hasData())
{
input.setNeeded();
return Status::NeedData;
}
current_chunk = input.pull(false);
return Status::Ready;
}

void work() override
{
auto num_rows = current_chunk.getNumRows();
auto result_columns = current_chunk.cloneEmptyColumns();
auto columns = current_chunk.detachColumns();
for (auto i = 0U; i < num_rows; i++)
{
auto val = columns[0]->getUInt(i);
result_columns[0]->insert(val+1);
}
current_chunk.setColumns(std::move(result_columns), num_rows);
has_process_data = true;
}

InputPort & getInputPort() { return input; }
OutputPort & getOutputPort() { return output; }

protected:
bool has_input = false;
bool has_process_data = false;
Chunk current_chunk;
InputPort & input;
OutputPort & output;
};

3. MySink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class MySink : public ISink
{
public:
String getName() const override { return "MySinker"; }

MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }

private:
WriteBufferFromFileDescriptor out{STDOUT_FILENO};
FormatSettings settings;

void consume(Chunk chunk) override
{
size_t rows = chunk.getNumRows();
size_t columns = chunk.getNumColumns();

for (size_t row_num = 0; row_num < rows; ++row_num)
{
writeString("prefix-", out);
for (size_t column_num = 0; column_num < columns; ++column_num)
{
if (column_num != 0)
writeChar('\t', out);
getPort()
.getHeader()
.getByPosition(column_num)
.type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings);
}
writeChar('\n', out);
}

out.next();
}
};

4. DAG Scheduler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(int, char **)
{
auto source0 = std::make_shared<MySource>(5);
auto add0 = std::make_shared<MyAddTransformer>();
auto sinker0 = std::make_shared<MySink>();

/// Connect.
connect(source0->getPort(), add0->getInputPort());
connect(add0->getOutputPort(), sinker0->getPort());

std::vector<ProcessorPtr> processors = {source0, add0, sinker0};
PipelineExecutor executor(processors);
executor.execute(1);
}

 

上一篇:pwny


下一篇:Memcached缓存,深入分析解读MySQL锁,解决幻读问题