Channel

在BIO编程中,每一个客户端连接请求过来,对于输入流,必须有单独的线程监听,看是否有数据到达,对于输出流,可以采用一个线程池管理,这样服务端的线程数量至少为n

下面例子是NIO中采用Channel+线程池方式,有个缺点是不知道SocketChannel是否有数据到达了,必须迭代所有的SocketChannel,如果有数据到达,有就处理,否则就跳过,效率太低

public class TimeServer {
    private  BlockingQueue<SocketChannel> idleQueue =new LinkedBlockingQueue<SocketChannel>();
    private  BlockingQueue<Future<SocketChannel>> workingQueue=new LinkedBlockingQueue<Future<SocketChannel>>();
    private  ExecutorService executor = Executors.newSingleThreadExecutor();
   
     {
        new Thread(){
            @Override
            public void run() {
            try {
               while (true) {
                        //task1:迭代当前idleQueue中的SocketChannel,提交到线程池中执行任务,并将其移到workingQueue中
                        for (int i = 0; i < idleQueue.size(); i++) {
                            SocketChannel socketChannel = idleQueue.poll();
                            if (socketChannel != null) {
                                Future<SocketChannel> result = executor.submit(new TimeServerHandleTask(socketChannel), socketChannel);
                                workingQueue.put(result);
                            }
                        }
                        //task2:迭代当前workingQueue中的SocketChannel,如果任务执行完成,将其移到idleQueue中
                        for (int i = 0; i < workingQueue.size(); i++) {
                            Future<SocketChannel> future = workingQueue.poll();
                            if (!future.isDone()){
                                workingQueue.put(future);
                                continue;
                            }
                             SocketChannel channel  = null;
                             channel = future.get();
                             idleQueue.put(channel);
                        }
                    }
            } catch (Exception e) {
               e.printStackTrace();
            }
            }
        }.start();
    }
     
     /**
      * 1、在main线程中,当接受到一个新的连接时,我们将相应的SocketChannel放入idleQueue
      * 
      * 
      */
     
    public static void main(String[] args) throws IOException, InterruptedException {
        TimeServer timeServer = new TimeServer();
        ServerSocketChannel ssc=ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.socket().bind(new InetSocketAddress(8080));
        while (true){
            SocketChannel socketChannel = ssc.accept();
            if(socketChannel==null){
                continue;
            }else{
                socketChannel.configureBlocking(false);
                timeServer.idleQueue.add(socketChannel);
            }
        }
    }
}

 

public class TimeServerHandleTask implements Runnable {
     SocketChannel socketChannel;
       public TimeServerHandleTask(SocketChannel socketChannel) {
          this.socketChannel = socketChannel;
       }
       @Override
       public void run() {
          try {
             ByteBuffer requestBuffer = ByteBuffer.allocate("GET CURRENT TIME".length());
             //尝试读取数据,因为是非阻塞,所以如果没有数据会立即返回。
             int bytesRead = socketChannel.read(requestBuffer);
            //如果没有读取到数据,说明当前SocketChannel并没有接受到数据,不需要处理
             if (bytesRead <= 0) {
                return;
             }
             //如果读取到了数据,则需要考虑粘包、解包问题,这个while代码是为了读取一个完整的请求信息"GET CURRENT TIME",
             while (requestBuffer.hasRemaining()) {
                socketChannel.read(requestBuffer);
             }
             String requestStr = new String(requestBuffer.array());
             if (!"GET CURRENT TIME".equals(requestStr)) {
                String bad_request = "BAD_REQUEST";
                ByteBuffer responseBuffer = ByteBuffer.allocate(bad_request.length());
                    responseBuffer.put(bad_request.getBytes());
                    responseBuffer.flip();
                    socketChannel.write(responseBuffer);
             } else {
                    String timeStr = Calendar.getInstance().getTime().toLocaleString();
                    ByteBuffer responseBuffer = ByteBuffer.allocate(timeStr.length());
                    responseBuffer.put(timeStr.getBytes());
                    responseBuffer.flip();
                    socketChannel.write(responseBuffer);
             }
          } catch (Exception e) {
             throw new RuntimeException(e);
          }
       }
    
}

 

public class TimeClient {

    // 连接超时时间
    static int connectTimeOut = 3000;
    static ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));
        socketChannel.configureBlocking(false);
        long start = System.currentTimeMillis();
        while (!socketChannel.finishConnect()) {
            if (System.currentTimeMillis() - start >= connectTimeOut) {
                throw new RuntimeException("尝试建立连接超过3秒");
            }
        }
        // 如果走到这一步,说明连接建立成功
        while (true) {
            buffer.put("GET CURRENT TIME".getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
            if (socketChannel.read(buffer) > 0) {
                buffer.flip();
                byte[] response = new byte[buffer.remaining()];
                buffer.get(response);
                System.out.println("reveive response:" + new String(response));
                buffer.clear();
            }
            Thread.sleep(5000);
        }

    }
}

 

上一篇:mariaDB安装配置


下一篇:分布式架构基础(一)远程通信协议