先贴代码,利用了boost asio协程,所以代码基本是同步的编写,但实际是异步地执行。
#define BOOST_ASIO_HAS_CO_AWAIT
#include <boost/asio/awaitable.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/write.hpp>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <memory>
#include <string>
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;
using boost::asio::ip::tcp;
namespace this_coro = boost::asio::this_coro;
constexpr int32_t kHeadLen = 4;
constexpr int32_t kMaxBodyLen = 1024 * 1024;
const char *kPort = "55555";
const char *kAddress = "127.0.0.1";
class Message {
public:
Message() {}
~Message() {}
bool DecodeHeader() {
body_length_ = atoi(header_);
if (body_length_ > kMaxBodyLen) {
body_length_ = 0;
return false;
}
return true;
}
void EncodeHeader() { std::sprintf(header_, "%4d", body_length_); }
void InitBodyBuffer(const int32_t body_len) {
std::shared_ptr<char> ptr(new char[body_len],
std::default_delete<char[]>());
body_ = ptr;
body_length_ = body_len;
}
char *GetHeaderBuffer() { return header_; }
char *GetBodyBuffer() { return body_.get(); }
int32_t HeaderLen() { return kHeadLen; }
int32_t BodyLen() { return body_length_; }
private:
int32_t body_length_ = 0;
char header_[kHeadLen + 1] = {‘0‘};
std::shared_ptr<char> body_;
};
awaitable<void> client(tcp::socket s, const int32_t index) {
Message message;
std::string body = std::to_string(index);
message.InitBodyBuffer(body.length());
std::strncpy(message.GetBodyBuffer(), body.c_str(), body.length());
message.EncodeHeader();
co_await async_write(
s, boost::asio::buffer(message.GetHeaderBuffer(), message.HeaderLen()),
use_awaitable);
co_await async_write(
s, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()),
use_awaitable);
char data_recv[1024];
std::size_t n = co_await async_read(
s, boost::asio::buffer(data_recv, message.BodyLen()), use_awaitable);
std::cout << "Reply is: ";
std::cout.write(data_recv, n);
std::cout << "\n";
}
awaitable<void> server_handler(tcp::socket socket) {
Message message;
std::size_t n = co_await async_read(
socket,
boost::asio::buffer(message.GetHeaderBuffer(), message.HeaderLen()),
use_awaitable);
if (!message.DecodeHeader()) {
std::cout << "Decode header fail.\n";
co_return;
}
message.InitBodyBuffer(message.BodyLen());
n = co_await async_read(
socket, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()),
use_awaitable);
std::cout << "Recieve is: ";
std::cout.write(message.GetBodyBuffer(), n);
std::cout << "\n";
n = co_await async_write(
socket, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()),
use_awaitable);
}
awaitable<void> start_all_client() {
auto executor = co_await this_coro::executor;
for (int32_t i = 0; i < 10000; ++i) {
tcp::socket s(executor);
tcp::resolver resolver(executor);
co_await boost::asio::async_connect(s, resolver.resolve(kAddress, kPort),
use_awaitable);
co_spawn(executor, client(std::move(s), i), detached);
}
}
awaitable<void> listener() {
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), 55555});
for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, server_handler(std::move(socket)), detached);
}
}
int main(int argc, char **argv) {
if (argc != 2) {
std::cout << "[Role] 1:server 0:client\n";
std::cout << "[Usage] ./test1 Role\n";
return -1;
}
int32_t role = std::stoi(argv[1]);
boost::asio::io_context io_context(1);
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { io_context.stop(); });
if (role == 1) {
co_spawn(io_context, listener(), detached);
} else if (role == 0) {
co_spawn(io_context, start_all_client(), detached);
}
io_context.run();
return 0;
}
CMakeLists.txt如下
cmake_minimum_required(VERSION 3.6)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_CXX_COMPILER "clang++")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -stdlib=libc++ -lc++abi")
set(CMAKE_CXX_FLAGS "-fcoroutines-ts --stdlib=libc++ -Xclang -fconcepts-ts")
find_package(Boost COMPONENTS system)
include_directories(
${BOOST_INCLUDE_DIRS}
)
add_executable(echo echo.cpp)
target_link_libraries(echo
${Boost_LIBRARIES}
pthread
)