netty在shardingsphere中的运用

在shardingsphere中会发现不少netty的代码,到底做了什么,来看下

ShardingProxy代理服务,上代码,典型的netty启动

    public void start(final int port) {
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bossGroup = createEventLoopGroup();
            if (bossGroup instanceof EpollEventLoopGroup) {
                groupsEpoll(bootstrap);
            } else {
                groupsNio(bootstrap);
            }
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            BackendExecutorContext.getInstance().getExecutorEngine().close();
        }
    }

具体配置了什么

bootstrap.group(bossGroup, workerGroup)
                .channel(EpollServerSocketChannel.class)
                .option(EpollChannelOption.SO_BACKLOG, 128)
                .option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
                .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childOption(EpollChannelOption.TCP_NODELAY, true)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ServerHandlerInitializer());

handler做了啥呢

    @Override
    protected void initChannel(final SocketChannel socketChannel) {
        DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(LogicSchemas.getInstance().getDatabaseType());
        ChannelPipeline pipeline = socketChannel.pipeline();
        pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
        pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
    }

见名知意,其实就是启动了一个netty服务器,解析了数据库的交互协议做通信支持

-----------------------------------------------------

ShardingScaling中Bootstrap典型的Netty启动

public static void main(final String[] args) {
        log.info("Init server config");
        initServerConfig();
        log.info("ShardingScaling Startup");
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new HttpServerInitializer());
            int port = ScalingContext.getInstance().getServerConfiguration().getPort();
            Channel channel = bootstrap.bind(port).sync().channel();
            log.info("ShardingScaling is server on http://127.0.0.1:" + port + '/');
            channel.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

到底做了啥? 作为典型的http服务端接收处理请求

    @Override
    protected void initChannel(final SocketChannel socketChannel) {
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        channelPipeline.addLast(new HttpServerCodec());
        channelPipeline.addLast(new HttpObjectAggregator(65536));
        channelPipeline.addLast(new HttpServerHandler());
    }

大文章在HttpServerHandler,处理了啥? 作为扩展服务器,响应/shardingscaling/job/*请求

protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final FullHttpRequest request) {
        String requestPath = request.uri();
        String requestBody = request.content().toString(CharsetUtil.UTF_8);
        HttpMethod method = request.method();
        if (!URL_PATTERN.matcher(requestPath).matches()) {
            response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")),
                    channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if ("/shardingscaling/job/start".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) {
            startJob(channelHandlerContext, requestBody);
            return;
        }
        if (requestPath.contains("/shardingscaling/job/progress/") && method.equals(HttpMethod.GET)) {
            getJobProgress(channelHandlerContext, requestPath);
            return;
        }
        if ("/shardingscaling/job/list".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.GET)) {
            listAllJobs(channelHandlerContext);
            return;
        }
        if ("/shardingscaling/job/stop".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) {
            stopJob(channelHandlerContext, requestBody);
            return;
        }
        response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")),
                channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
    }

还有后续 上面startjob --- SCALING_JOB_CONTROLLER.start(shardingScalingJob);

又启动了一个线程

public void run() {
        realtimeDataSyncTask.prepare();
        historyDataSyncTaskGroup.prepare();
        syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA;
        historyDataSyncTaskGroup.start(event -> {
            log.info("history data migrate task {} finished, execute result: {}", event.getTaskId(), event.getEventType().name());
            if (EventType.EXCEPTION_EXIT.equals(event.getEventType())) {
                stop();
                dataSourceManager.close();
                syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA_FAILURE;
            } else {
                executeRealTimeSyncTask();
            }
        });
    }

执行SyncTask,实时数据同步

private void executeRealTimeSyncTask() {
        if (!SyncTaskControlStatus.MIGRATE_HISTORY_DATA.equals(syncTaskControlStatus)) {
            dataSourceManager.close();
            syncTaskControlStatus = SyncTaskControlStatus.STOPPED;
            return;
        }
        realtimeDataSyncTask.start(event -> {
            log.info("realtime data sync task {} finished, execute result: {}", syncTaskId, event.getEventType().name());
            dataSourceManager.close();
            syncTaskControlStatus = EventType.FINISHED.equals(event.getEventType()) ? SyncTaskControlStatus.STOPPED : SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA_FAILURE;
        });
        syncTaskControlStatus = SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA;
    }

task能干啥,启动reader

    private void instanceSyncExecutors(final SyncExecutorGroup syncExecutorGroup) {
        reader = ReaderFactory.newInstanceLogReader(syncConfiguration.getReaderConfiguration(), logPositionManager.getCurrentPosition());
        List<Writer> writers = instanceWriters();
        DistributionChannel channel = instanceChannel(writers);
        reader.setChannel(channel);
        for (Writer each : writers) {
            each.setChannel(channel);
        }
        syncExecutorGroup.setChannel(channel);
        syncExecutorGroup.addSyncExecutor(reader);
        syncExecutorGroup.addAllSyncExecutor(writers);
    }

reader又干了啥

MySQLBinlogReader

public void read(final Channel channel) {
        JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration();
        final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl());
        MySQLClient client = new MySQLClient(123456, uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword());
        client.connect();
        client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
        while (isRunning()) {
            AbstractBinlogEvent event = client.poll();
            if (null == event) {
                sleep();
                continue;
            }
            handleEvent(channel, uri, event);
        }
        pushRecord(channel, new FinishedRecord(new NopLogPosition()));
    }

启动了client,订阅了binlog信息,Client里又干了啥,见名知意,处理了跟mysql之间的同步交互协议

    public synchronized void connect() {
        responseCallback = new DefaultPromise<>(eventLoopGroup.next());
        channel = new Bootstrap()
                .group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(final SocketChannel socketChannel) {
                        socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
                        socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
                        socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
                        socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
                    }
                })
                .option(ChannelOption.AUTO_READ, true)
                .connect(host, port).channel();
        serverInfo = waitExpectedResponse(ServerInfo.class);
    }
上一篇:实习经验总结


下一篇:项目实训报告-5 深入了解Pipeline