当时想实践一下Pipeline的构建。
未能实现的原因主要是在于
1. ClickHouse架构认识不足。
2.CMakeLists.txt 功力不足。
各占一半一半。
参见虎哥的博客: https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/
1. Source
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
class MySource : public ISource { public: String getName() const override { return "MySource"; }
MySource(UInt64 end_) : ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_) { }
private: UInt64 end; bool done = false;
Chunk generate() override { if (done) { return Chunk(); } MutableColumns columns; columns.emplace_back(ColumnUInt64::create()); for (auto i = 0U; i < end; i++) columns[0]->insert(i);
done = true; return Chunk(std::move(columns), end); } };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
class MyAddTransformer : public IProcessor { public: String getName() const override { return "MyAddTransformer"; }
MyAddTransformer() : IProcessor( {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}, {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}) , input(inputs.front()) , output(outputs.front()) { }
Status prepare() override { if (output.isFinished()) { input.close(); return Status::Finished; }
if (!output.canPush()) { input.setNotNeeded(); return Status::PortFull; }
if (has_process_data) { output.push(std::move(current_chunk)); has_process_data = false; }
if (input.isFinished()) { output.finish(); return Status::Finished; }
if (!input.hasData()) { input.setNeeded(); return Status::NeedData; } current_chunk = input.pull(false); return Status::Ready; }
void work() override { auto num_rows = current_chunk.getNumRows(); auto result_columns = current_chunk.cloneEmptyColumns(); auto columns = current_chunk.detachColumns(); for (auto i = 0U; i < num_rows; i++) { auto val = columns[0]->getUInt(i); result_columns[0]->insert(val+1); } current_chunk.setColumns(std::move(result_columns), num_rows); has_process_data = true; }
InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; }
protected: bool has_input = false; bool has_process_data = false; Chunk current_chunk; InputPort & input; OutputPort & output; };
|
3. MySink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
class MySink : public ISink { public: String getName() const override { return "MySinker"; }
MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { }
private: WriteBufferFromFileDescriptor out{STDOUT_FILENO}; FormatSettings settings;
void consume(Chunk chunk) override { size_t rows = chunk.getNumRows(); size_t columns = chunk.getNumColumns();
for (size_t row_num = 0; row_num < rows; ++row_num) { writeString("prefix-", out); for (size_t column_num = 0; column_num < columns; ++column_num) { if (column_num != 0) writeChar('\t', out); getPort() .getHeader() .getByPosition(column_num) .type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings); } writeChar('\n', out); }
out.next(); } };
|
4. DAG Scheduler
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
int main(int, char **) { auto source0 = std::make_shared<MySource>(5); auto add0 = std::make_shared<MyAddTransformer>(); auto sinker0 = std::make_shared<MySink>();
/// Connect. connect(source0->getPort(), add0->getInputPort()); connect(add0->getOutputPort(), sinker0->getPort());
std::vector<ProcessorPtr> processors = {source0, add0, sinker0}; PipelineExecutor executor(processors); executor.execute(1); }
|