分布式架构基础(一)远程通信协议

远程通信协议

一个http请求的整个流程

负责域名解析的DNS服务

首先,用户访问一个域名,会经过DNS解析。
DNS(Domain Name System),和HTTP协议一样是位于应用层的协议,主要提供域名到IP的解析服务。分布式架构基础(一)远程通信协议

加速静态内容速度的CDN

CDN(Content Delivery Network)表示的是内容分发网络。是一种网络缓存技术,能够把一些相对稳定的资源放到距离最终用户较近的地方。一方面可以节省整个广域网的带宽消耗,另一方面可以提升用户的访问速度,改进用户体验。

HTTP协议通信原理

域名被成功解析后,客户端和服务端之间建立通信的方式有TCP和UDP两种通信协议,以及建立连接的握手过程。HTTP协议的通信是基于TCP/IP协议之上的一个应用层协议。(应用层协议除了HTTP还有FTP、DNS、SMTP、TeInet等)
涉及到网络协议,我们一般知道OSI七层网络模型和TCP/IP四层概念模型。
OSI七层网络模型包括:应用层、表示层、会话层、传输层、网络层、数据链路层、物理层;
TCP/IP四层概念模型包括:应用层、传输层、网络层、数据链路层;

请求发起过程,在TCP/IP四层网络模型中所做的事情

当应用程序用TCP传送数据是,数据被送人协议栈中,然后逐个通过每一层知道被当做一串比特流送入网络。其中每一层对接受的数据都要增加一些首部信息(有时候要增加尾部信息)分布式架构基础(一)远程通信协议

客户端如何找到目标服务

在客户端发起请求的时候,我们会在数据链路层去组装目标机器的MAC地址。(目标机器的MAC是通过ARP协议获取:这个协议简单的说就是已知目标机器的ip,需要获得目标机器的MAC地址。发送一个广播消息,这个ip是谁的,请来认领。认领ip的机器会发送一个mac地址的响应)
有了这个目标MAC地址,数据包在链路上广播,MAC的网卡才能发现,这个包是给它的。MAC的网卡把包收进来,然后打开IP包,发现IP地址是自己的,再打开TCP包,发现端口是自己的也就是80端口,而这个时候机器上有一个nginx是监听80端口。于是将请求提交给nginx,nginx返回一个网页。然后将网页需要发回请求的机器,然后层层封装,最后到MAC层。因为来的时候有源MAC地址,返回的时候源MAC就变成了目标MAC再返给请求的机器。
为了避免每次ARP请求,机器本地会进行ARP缓存,当然机器会不断地上下线,IP也可能会变化,所以ARP的MAC地址缓存过一段时间就会过期。

接收端收到数据包以后的处理过程

当目的主机收到一个以太网数据帧时,数据就开始从协议栈中由底向上升,同事去掉各层协议加上的报文首部。每层协议都要去检查报文肘部中的协议标识,以确定接受数据的上层协议。分布式架构基础(一)远程通信协议

Java通过socket实现远程通信

TCP/IP、UDP/IP、Socket

分布式架构基础(一)远程通信协议
TCP/IP(Transmission Control Protocol/Internet Protocol)即传输控制协议/网间协议,是一个工业标准的协议集,它是为广域网(WANs)设计的。
UDP(User Data Protocol,用户数据报协议)是与TCP相对应的协议。它是属于TCP/IP协议族中的一种。
TCP/IP协议族包括运输层、网络层、链路层。
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。
Socket的使用:建立心跳机制的时候通过Socket进行通信。
分布式架构基础(一)远程通信协议

BIO SocketDemo

服务端:

public static void main(String[] args) {

	// 建立监听
	SocketService socketService = new SorcketService(8090);
	// 连接
	Socket socket = socketService.accept();
	// 从socket中获取输入流
	BufferRead bufferRead = new BufferRead(new InputStreamRead(socket.getInputSteam()));
	String strline = bufferRead.readLine();

	//将返回给客户端的信息写入输出流
	BufferWrite bufferWrite = new BuffferWrite(new OutputStreamWrite(socket.getOutputStream()));
	bufferWrite.write("Hello, this is Socket Service. \n")
	bufferWrite.flush();
}
客户端:
public static void main(String[] args) {
	
	//与服务器建立连接
	Socket socket = new Socket("localhost",8090);

	//向服务器发送消息
	BufferWriter bufferWriter = new BufferWriter(new OutputStreamWriter(socket.getOutputStream()));
	bufferWriter.writer("Hello, this is Socket Client /n");
	bufferWriter.flush();

	//接受服务器返回的消息
	BufferReader bufferReader = new BufferReader(new InputStream(socket.getInputStream()));
	bufferReader.readline();
}

注意:

  • accept():函数是连接阻塞的
    当socket的服务端启动的时候,如果客户端没有启动,那么服务端会进行阻塞,知道有客户端连接服务端为止。

  • Connection reset 异常
    java.net.SocketException: Connection reset //连接重置,出现该异常的原因:在进行数据传输的时候,没有在需要传输的数据末尾加上结束识别符,造成的结果就是一端已经写完并且关闭了流,而另一端一直在监听,直到流关闭。

  • 代码中使用流的时候,必须关闭,否则很容易造成内存泄露

BIO SocketDemo2

在SocketDemo中,存在连接阻塞(accept())和IO阻塞(bufferRead.readLine())
通过线程池对IO阻塞进行优化,在Tomcat 7 之前IO模型使用线程池进行优化,增加吞吐量。但是依然存在瓶颈问题:线程池的线程数量。

服务端:

public static void main (String[] args)  {

	//建立线程池
	static ExecutorService executorService = Executors.newFixedThrealTool(20);
	
	// 建立监听
	SocketService socketService = new SocketService(8090);
	// 监听服务端
	while(true) {
		Socket socket = socketService.accept();
		// 使用线程池处理IO阻塞
		executorService.execute(new SocketThread(socket));
	}
}


public class SocketThread implements Runnable {
	
	private Socket socket;

	public SocketThread (Socket socket) {
		this.socket = socket;
	}

	@Override
	public void run(){
		
		BufferReader bufferReader = new BufferReader(new InputStreamReader(socket.getInputStream()));
		String s  = bufferReader.readline();

		BufferWriter bufferWriter = new BufferWriter(new OutputStreamWriter(socket.getOutputStream()));
		bufferWriter.writer("Hello,this is Socket Service! \n");
		bufferWriter.flush();
	}
}

NIO SocketDemo3

Socket通信中存在两处阻塞,第一个是连接阻塞,即accept(),第二个是IO阻塞,即readline();如果要使用NIO技术,只需要把这两处修改即可。

NIO的核心:

  • Channel(管道)
  • Buffer(缓存区):NIO中所有的通信都是面向缓存区的
  • Selector(选择器,多路复用器)
服务端

main(){
    try{
        ServiceSocketChannel serviceSocketChannel = ServiceSocketChannel.open();
        
        //设置连接为非阻塞,改配置的参数默认为true,即默认连接阻塞
        serviceSocketChannel.configBlocking(false);
        
        serviceSocketChannel.bind(new InetSocketAddress(8080)); //绑定监听端口
        
        //持续监听
        while(true) {
            SocketChannel socketChannel = serviceSocketChannel.accept();
            
            if (socketChannel != null) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //NIO的所有的通信都是面向缓存区的,分配缓存区
                socketChannel.read(byteBuffer); //将数据加载到缓存区
                sout(byteBuffer.array()); //从缓存区中输出
                
                //写出数据
                byteBuffer.filp();//buffer反转
                socketChannel.writed(byteBuffer);
                
            } else {
                // 连接时非阻塞的,如果没有监听到连接,socketChannel为空
                Thread.Sleep(1000);
                sout("连接未就绪");
            }
        }
        
    } catch(Execption e){
        
    }
}
客户端
    
main(){
    //创建连接
    SocketChannel socketChannel = SocketChannel.open();
    
    socketChannel.configBlocking(false);
    socketChannel.connect(new InetSocketAddress("localhost",8080));
    
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //NIO的所有的通信都是面向缓存区的,分配缓存区
    byteBuffer.put("Hello I`m socketChannel client".getBytes());
    byteBuffer.flip();
    
    socketChannel.write(byteBuffer);
    
    //读取数据
    byteBuffer.clear();
    int i = socketChannel.read(byteBuffer);
    if (i > 0) {
        sout("收到了服务端数据:" + new String(byteBuffer.array()));
    } else {
        sout("没有收到数据")
    }
    
}

注意:

  • SocketServiceChannel、SocketChannel默认是阻塞的
    如果需要配置成非阻塞的:

    socketServiceChannel.configBlocking(false);
    
    socketChannel.configBlocking(false);
    
  • NotYetConnectedException异常
    Exception in thread “main” java.nio.channels.NotYetConnectedException
    客户端出现该异常是因为,客户端还没有与服务端建立连接,将其修改为:

    SocketChannel socketChannel = SocketChannel.open();
    
    socketChannel.configBlocking(false);
    socketChannel.connect(new InetSocketAddress("localhost",8080));
    
    //如果客户端与服务端还未建立连接,等待1s后再次连接判断是否连接
    while(!socketChannel.finishConnect()) {
      wait(1000);
    }
    
    //或者
    if (socketChannel.isConnectionPending) {
        socketChannel.finishConnect();
    }
    

    NIO通信中的connect()方法和finishConnect()方法

    • public boolean connect(SocketAddress remote) throws IOException
      使底层Socket建立远程连接,当SocketChannel处于非阻塞模式时,如果立即连接成功,该方法返回 true,如果不能立即连接成功,该方法返回false,程序过会必须通过调用finishConnect()方法完成连接。 当SocketChannel处于阻塞模式,如果立即连接成功,该方法返回true,如果不能立即连接成功,将进入 阻塞状态,直到连接成功,或者出现IO异常。

    • public boolean finishConnect() throws IOException
      试图完成连接远程服务器的操作。在非阻塞模式下,建立连接从调用SocketChannel的connect方法开始,到调用finishConnect()方法结束。如果finishConnect()方法顺利完成连接,或者在调用此方法之前连接已经建立,则finishConnect()方法立即返回true,如果连接操作还没有完成,则立即返回false;如果连接操作中遇到异常而失败,则抛出响应的IO异常。
      在阻塞模式下,如果连接操作还没有完成,则会进入阻塞状态,直到连接成功。

  • 远程主机强迫关闭了一个现有的连接
    java.io.IOException: 远程主机强迫关闭了一个现有的连接
    客户端在运行完后关闭服务,同时会关闭通道channel。
    而channel没有通过协议交互调用close方法。因此服务端会抛出异常

NIO SocketDemo4

服务端:
public class NIOSocketService{
	
	// 选择器
	private Selector selector;

	public static void main(String[] args) {
		
		//打开选择器
		selector = Selector.open();

		//建立监听
		ServiceSocketChannel serviceSocketChannel = ServiceSocketChannel.open();
		//将监听设置为非阻塞
		serviceSocketChannel.configBlocking(false);
		//设置监听端口
		serviceSocketChannel.socket().bind(new InterSocketAddress(8090));
		//将连接事件注册到选择器上
		serviceSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		while(true) {
			selector.select();	//唯一阻塞的地方
			 Set<SelectionKey> selectionKeySet=selector.selectedKeys();
			 Iterator<SelectionKey> iterable=selectionKeySet.iterator();
			 while(iterable.haxNext()) {
			 	SelectionKey key = iterable.next();
			 	iterable.remove();
			 	if (key.isAcceptable())  { //连接事件
					 handleAccept(key);
				} else if (key.isReadable()) { //读的就绪事件
					handleRead(key);
				}
			 }
		}
	}

	/**
	* 监听连接事件
	**/
	private static void handleAccept(SelectionKey selectionKey){
		 
		ServerSocketChannel serverSocketChannel=(ServerSocketChannel) selectionKey.channel();
		try {
            SocketChannel socketChannel=serverSocketChannel.accept() ;//一定会有一个连接
            socketChannel.configureBlocking(false);
            socketChannel.write(ByteBuffer.wrap("Hello Client,I'm NIO Server".getBytes()));
            socketChannel.register(selector,SelectionKey.OP_READ); //注册读事件
        } catch (IOException e) {
            e.printStackTrace();
        }
	}

	/**
	* 监听读事件
	**/
	private static void handleRead(SelectionKey selectionKey){
        SocketChannel socketChannel=(SocketChannel)selectionKey.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        try {
            socketChannel.read(byteBuffer); //这里是不是一定有值
            System.out.println("server receive msg:"+new String(byteBuffer.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
客户端
public class NewIOClient {

    static Selector selector;

    public static void main(String[] args) {
        try {
            selector=Selector.open();
            SocketChannel socketChannel=SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost",8090));
            socketChannel.register(selector, SelectionKey.OP_CONNECT); //连接事件
            while(true){
                selector.select();
                Set<SelectionKey> selectionKeySet=selector.selectedKeys();
                Iterator<SelectionKey> iterator=selectionKeySet.iterator();
                while(iterator.hasNext()){
                    SelectionKey key=iterator.next();
                    iterator.remove();
                    if(key.isConnectable()){ //连接事件
                        handleConnect(key);
                    }else if(key.isReadable()){
                        handleRead(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private static void handleConnect(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel=(SocketChannel)selectionKey.channel();
        if(socketChannel.isConnectionPending()){
            //
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("Hello Server,I'm NIo Client".getBytes()));
        socketChannel.register(selector,SelectionKey.OP_READ); //
    }

    private static void handleRead(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel=(SocketChannel)selectionKey.channel();
        //TODO
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("client receive msg:"+new String(byteBuffer.array()));

    }
}

服务端采用轮询的机制 “while(true)” 会造成性能消耗,因此引入多路复用的机制优化性能消耗问题。
NIO 是利用了单线程轮询事件的机制,通过高效地定位就绪的 Channel,来决定做什么,仅仅 select 阶段是阻塞的,可以有效避免大量客户端连接时,频繁线程切换带来的问题.

上一篇:Channel


下一篇:线程的Java开销.我应该使用套接字还是套接字通道?