在shardingsphere中会发现不少netty的代码,到底做了什么,来看下
ShardingProxy代理服务,上代码,典型的netty启动
public void start(final int port) { try { ServerBootstrap bootstrap = new ServerBootstrap(); bossGroup = createEventLoopGroup(); if (bossGroup instanceof EpollEventLoopGroup) { groupsEpoll(bootstrap); } else { groupsNio(bootstrap); } ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); BackendExecutorContext.getInstance().getExecutorEngine().close(); } }
具体配置了什么
bootstrap.group(bossGroup, workerGroup) .channel(EpollServerSocketChannel.class) .option(EpollChannelOption.SO_BACKLOG, 128) .option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024)) .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(EpollChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerHandlerInitializer());
handler做了啥呢
@Override protected void initChannel(final SocketChannel socketChannel) { DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(LogicSchemas.getInstance().getDatabaseType()); ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine())); pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine)); }
见名知意,其实就是启动了一个netty服务器,解析了数据库的交互协议做通信支持
-----------------------------------------------------
ShardingScaling中Bootstrap典型的Netty启动
public static void main(final String[] args) { log.info("Init server config"); initServerConfig(); log.info("ShardingScaling Startup"); EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new HttpServerInitializer()); int port = ScalingContext.getInstance().getServerConfiguration().getPort(); Channel channel = bootstrap.bind(port).sync().channel(); log.info("ShardingScaling is server on http://127.0.0.1:" + port + '/'); channel.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
到底做了啥? 作为典型的http服务端接收处理请求
@Override protected void initChannel(final SocketChannel socketChannel) { ChannelPipeline channelPipeline = socketChannel.pipeline(); channelPipeline.addLast(new HttpServerCodec()); channelPipeline.addLast(new HttpObjectAggregator(65536)); channelPipeline.addLast(new HttpServerHandler()); }
大文章在HttpServerHandler,处理了啥? 作为扩展服务器,响应/shardingscaling/job/*请求
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final FullHttpRequest request) { String requestPath = request.uri(); String requestBody = request.content().toString(CharsetUtil.UTF_8); HttpMethod method = request.method(); if (!URL_PATTERN.matcher(requestPath).matches()) { response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")), channelHandlerContext, HttpResponseStatus.BAD_REQUEST); return; } if ("/shardingscaling/job/start".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) { startJob(channelHandlerContext, requestBody); return; } if (requestPath.contains("/shardingscaling/job/progress/") && method.equals(HttpMethod.GET)) { getJobProgress(channelHandlerContext, requestPath); return; } if ("/shardingscaling/job/list".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.GET)) { listAllJobs(channelHandlerContext); return; } if ("/shardingscaling/job/stop".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) { stopJob(channelHandlerContext, requestBody); return; } response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")), channelHandlerContext, HttpResponseStatus.BAD_REQUEST); }
还有后续 上面startjob --- SCALING_JOB_CONTROLLER.start(shardingScalingJob);
又启动了一个线程
public void run() { realtimeDataSyncTask.prepare(); historyDataSyncTaskGroup.prepare(); syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA; historyDataSyncTaskGroup.start(event -> { log.info("history data migrate task {} finished, execute result: {}", event.getTaskId(), event.getEventType().name()); if (EventType.EXCEPTION_EXIT.equals(event.getEventType())) { stop(); dataSourceManager.close(); syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA_FAILURE; } else { executeRealTimeSyncTask(); } }); }
执行SyncTask,实时数据同步
private void executeRealTimeSyncTask() { if (!SyncTaskControlStatus.MIGRATE_HISTORY_DATA.equals(syncTaskControlStatus)) { dataSourceManager.close(); syncTaskControlStatus = SyncTaskControlStatus.STOPPED; return; } realtimeDataSyncTask.start(event -> { log.info("realtime data sync task {} finished, execute result: {}", syncTaskId, event.getEventType().name()); dataSourceManager.close(); syncTaskControlStatus = EventType.FINISHED.equals(event.getEventType()) ? SyncTaskControlStatus.STOPPED : SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA_FAILURE; }); syncTaskControlStatus = SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA; }
task能干啥,启动reader
private void instanceSyncExecutors(final SyncExecutorGroup syncExecutorGroup) { reader = ReaderFactory.newInstanceLogReader(syncConfiguration.getReaderConfiguration(), logPositionManager.getCurrentPosition()); List<Writer> writers = instanceWriters(); DistributionChannel channel = instanceChannel(writers); reader.setChannel(channel); for (Writer each : writers) { each.setChannel(channel); } syncExecutorGroup.setChannel(channel); syncExecutorGroup.addSyncExecutor(reader); syncExecutorGroup.addAllSyncExecutor(writers); }
reader又干了啥
MySQLBinlogReader
public void read(final Channel channel) { JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration(); final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl()); MySQLClient client = new MySQLClient(123456, uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword()); client.connect(); client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition()); while (isRunning()) { AbstractBinlogEvent event = client.poll(); if (null == event) { sleep(); continue; } handleEvent(channel, uri, event); } pushRecord(channel, new FinishedRecord(new NopLogPosition())); }
启动了client,订阅了binlog信息,Client里又干了啥,见名知意,处理了跟mysql之间的同步交互协议
public synchronized void connect() { responseCallback = new DefaultPromise<>(eventLoopGroup.next()); channel = new Bootstrap() .group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(final SocketChannel socketChannel) { socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine())); socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder()); socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback)); socketChannel.pipeline().addLast(new MySQLCommandResponseHandler()); } }) .option(ChannelOption.AUTO_READ, true) .connect(host, port).channel(); serverInfo = waitExpectedResponse(ServerInfo.class); }