vertx的NetServer模块

 

启动

public synchronized void listen(Handler<NetSocket> handler, SocketAddress socketAddress, Handler<AsyncResult<Void>> listenHandler) {
  //检验
    if (handler == null) {
      throw new IllegalStateException("Set connect handler first");
    }
    if (listening) {
      throw new IllegalStateException("Listen already called");
    }
    listening = true;

    listenContext = vertx.getOrCreateContext();
    registeredHandler = handler;//逻辑

    synchronized (vertx.sharedNetServers()) {// 监听多个不同网络接口(ip:port) Netserver 防止并发
      this.actualPort = socketAddress.port();
      String hostOrPath = socketAddress.host() != null ? socketAddress.host() : socketAddress.path();
      id = new ServerID(actualPort, hostOrPath);
      NetServerImpl shared = vertx.sharedNetServers().get(id);
      if (shared == null || actualPort == 0) { // mutil instances 的情况,利用 mutli core cpu
        /**
          * frist instances
          */
        serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);

        // 采用单线程池,与HttpServer不同支出
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(availableWorkers);
        sslHelper.validate(vertx);//验证ssl相关参数

        bootstrap.childHandler(new ChannelInitializer<Channel>() {
          @Override
           /**
             * connection accept 调度切换线程后触发
             */
          protected void initChannel(Channel ch) throws Exception {
           //限流策略,读大于写,导致内存无限扩大,最终 OOM
            if (isPaused()) {
              ch.close();//超过服务承载能力,关闭连接
              return;
            }
            //根据 EventLoop 挑出一个verticle.handlers
            HandlerHolder<Handlers> handler = handlerManager.chooseHandler(ch.eventLoop());
            if (handler != null) {//逻辑处理不为Null
              if (sslHelper.isSSL()) {//是否在TCP基础上添加TLS支持
                io.netty.util.concurrent.Future<Channel> handshakeFuture;
                if (options.isSni()) {//是否启用sni,单服务多证书情况
                  VertxSniHandler sniHandler = new VertxSniHandler(sslHelper, vertx);
                  handshakeFuture = sniHandler.handshakeFuture();
                  ch.pipeline().addFirst("ssl", sniHandler);
                } else {
                  SslHandler sslHandler = new SslHandler(sslHelper.createEngine(vertx));
                  handshakeFuture = sslHandler.handshakeFuture();
                  ch.pipeline().addFirst("ssl", sslHandler);
                }
                //侦听 TLS handshark
                handshakeFuture.addListener(future -> {
                  if (future.isSuccess()) {
                    connected(handler, ch);
                  } else {
                    Handler<Throwable> exceptionHandler = handler.handler.exceptionHandler;
                    if (exceptionHandler != null) {
                      handler.context.executeFromIO(() -> {
                        exceptionHandler.handle(future.cause());
                      });
                    } else {
                      log.error("Client from origin " + ch.remoteAddress() + " failed to connect over ssl: " + future.cause());
                    }
                  }
                });
              } else {
                connected(handler, ch);
              }
            }
          }
        });

        applyConnectionOptions(bootstrap);//添加Connection Accept之后的附属选项

        //添加一个instaces(verticle的Handlers)到handlerManager中
        handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext);

        try {
         //绑定指定网络接口
          bindFuture = AsyncResolveConnectHelper.doBind(vertx, socketAddress, bootstrap);
          //侦听绑定结果
          bindFuture.addListener(res -> {
            if (res.succeeded()) {
              Channel ch = res.result();
              log.trace("Net server listening on " + (hostOrPath) + ":" + ch.localAddress());
              
              if (NetServerImpl.this.actualPort != -1) {//检查正确性
                NetServerImpl.this.actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
              }
              NetServerImpl.this.id = new ServerID(NetServerImpl.this.actualPort, id.host);
              serverChannelGroup.add(ch);
              //启动的NetServer服务(verticle)添加到Vertx.sharedHttpMap中
              vertx.sharedNetServers().put(id, NetServerImpl.this);
              //初始化metrics
              VertxMetrics metrics = vertx.metricsSPI();
              if (metrics != null) {
                this.metrics = metrics.createMetrics(new SocketAddressImpl(id.port, id.host), options);
              }
            } else {
              vertx.sharedNetServers().remove(id);
            }
          });

        } catch (Throwable t) {
          if (listenHandler != null) {
           //出现异常回调给上层
            vertx.runOnContext(v ->  listenHandler.handle(Future.failedFuture(t)));
          } else {
            log.error(t);
          }
          listening = false;
          return;
        }
        if (actualPort != 0) {//尝试,bind成功,后续失败的情况
          vertx.sharedNetServers().put(id, this);
        }
        actualServer = this;
      } else {//other instances
        actualServer = shared;
        this.actualPort = shared.actualPort();
        //初始化metrics
        VertxMetrics metrics = vertx.metricsSPI();
        this.metrics = metrics != null ? metrics.createMetrics(new SocketAddressImpl(id.port, id.host), options) : null;
        //在actualServer基础上添加一个instaces(verticle的Handlers)到handlerManager中
        actualServer.handlerManager.addHandler(new Handlers(handler, exceptionHandler), listenContext);
      }

      // 侦听 bind 结果
      actualServer.bindFuture.addListener(res -> {
        if (listenHandler != null) {
          AsyncResult<Void> ares;
          if (res.succeeded()) {
            ares = Future.succeededFuture();
          } else {
            listening = false;
            ares = Future.failedFuture(res.cause());
          }
          //结果回调上层
          listenContext.runOnContext(v -> listenHandler.handle(ares));
        } else if (res.failed()) {
          log.error("Failed to listen", res.cause());
          listening = false;
        }
      });
    }
    return;
}

 

处理

private void connected(HandlerHolder<Handlers> handler, Channel ch) {
    EventLoop worker = ch.eventLoop();
    //配置context映射,因为eventloopSize < verticle Size情况 ; 1 eventloop: N verticle 
    ContextImpl.setContext(handler.context);

    NetServerImpl.this.initChannel(ch.pipeline());//初始化属性和配置

    VertxNetHandler nh = new VertxNetHandler(ctx -> new NetSocketImpl(vertx, ctx, handler.context, sslHelper, metrics)) {
      @Override
      /**
        * 接收消息处理
        * @param connection 客户端连接
        * @param context 当前 verticle 上下文
        * @param msg 接收的消息对象
        */
      protected void handleMessage(NetSocketImpl connection, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
        connection.handleMessageReceived(msg);
      }
    };
    
    //为了调用close方法释放socket资源
    nh.addHandler(conn -> socketMap.put(ch, conn));
    nh.removeHandler(conn -> socketMap.remove(ch));
    
    
    ch.pipeline().addLast("handler", nh);//添加消息逻辑处理
    
    NetSocketImpl sock = nh.getConnection();//获取当前连接
    handler.context.executeFromIO(() -> {
      if (metrics != null) {
        sock.metric(metrics.connected(sock.remoteAddress(), sock.remoteName()));
      }
      //client connection 处理
      handler.handler.connectionHandler.handle(sock);
    });
}
 
note: NetServer 与 HttpServer 不同处在Http大量短连接请求,所以采用单独线程池处理connection/accept,
 NetServer是单线程池模型,其中有一个EventLoop(Thread)还需额外处理client/accept事件,切记不在
 Eventloop中做阻塞操作,需额外处理client/accept 的EventLoop处理时间过长,会造成client connection慢
 的现象.
上一篇:Bring JavaScript to your Java enterprise with Vert.x


下一篇:PHP 电子围栏算法-不依赖任何第三方接口