Scoket通信--------这是一个例子,可以在这个例子的基础上进行相应的拓展,核心也是在多线程任务上进行修改
package cn.itcast.bigdata.socket; import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket; public class ServiceServer { public static void main(String[] args) throws Exception { // 创建一个serversocket,绑定到本机的8899端口上
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress("localhost", 8899)); // 接受客户端的连接请求;accept是一个阻塞方法,会一直等待,到有客户端请求连接才返回
while (true) {
Socket socket = server.accept();
new Thread(new ServiceServerTask(socket)).start();
}
} }
package cn.itcast.bigdata.socket; import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket; public class ServiceClient { public static void main(String[] args) throws Exception { /*ServiceIterface service = ProxyUtils.getProxy(ServiceIterface.class,"methodA",hostname,port);
Result = service.methodA(parameters);*/ // 向服务器发出请求建立连接
Socket socket = new Socket("localhost", 8899);
// 从socket中获取输入输出流
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream(); PrintWriter pw = new PrintWriter(outputStream);
pw.println("hello");
pw.flush(); BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String result = br.readLine();
System.out.println(result); inputStream.close();
outputStream.close();
socket.close(); }
}
package cn.itcast.bigdata.socket; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket; public class ServiceServerTask implements Runnable{
Socket socket ;
InputStream in=null;
OutputStream out = null; public ServiceServerTask(Socket socket) {
this.socket = socket;
} //业务逻辑:跟客户端进行数据交互
@Override
public void run() {
try {
//从socket连接中获取到与client之间的网络通信输入输出流
in = socket.getInputStream();
out = socket.getOutputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(in));
//从网络通信输入流中读取客户端发送过来的数据
//注意:socketinputstream的读数据的方法都是阻塞的
String param = br.readLine();// 按理说这里应该改为多行的就是while(true) /**
* 作业:
* 将以下业务调用逻辑写成更加通用的:可以根据客户端发过来的调用类名、调用方法名、调用该参数来灵活调用
*
* 《反射》
*
*/ GetDataServiceImpl getDataServiceImpl = new GetDataServiceImpl();
String result = getDataServiceImpl.getData(param); //将调用结果写到sokect的输出流中,以发送给客户端
PrintWriter pw = new PrintWriter(out);
pw.println(result);
pw.flush(); } catch (IOException e) { e.printStackTrace();
}finally{
try {
in.close();
out.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
} } } }
package cn.itcast.bigdata.socket; public class GetDataServiceImpl { public String getData(String param){
return "ok-"+param;
}
} PS:这是传统的调用方式,当线程多了以后就不一样了。 NIO 非阻塞IO,是新的 但是不知道怎么使用,现在知道了 NIO就是使用Netty框架,性能高很多
PS : RPC是一种远程调用的协议,有很多的实现,WebService就是一种实现。
-------------------------------------自定义RPC的框架-
PS:1.正常一个应用,蓝线上面是用户需要配置的。配置好怎样交换呢?首先spring启动好后,会扫描注解,然后把javabean放在hashmap结构中。
2.当有客户端有请求过来时,传统是用socket通信,但是这种方式性能会出现问题。现在通常使用Netty来解决这个问题(而不是使用传统的socket通信)。
3.客户端也是注解,他是通过动态代理包装整合向外暴漏接口。然后两者通过socket通信。
---------------
4.如果不应用zookeeper的话,这就是一个完成的sp ring的使用。使用zookeeper以后就可以 远程调用了。(因为注册了服务器的地址和端口)
PS:在实现上,服务器端通过spring自定义自定义注解 获取相应的 service, 然后通过 netty (NIO)进行异步传输,使用zookeeper进行 使用RPC服务器配置。
PS :netty是为了解决传统的阻塞的问题,netty是nio的一种实现。
----------------------------------------------------------------------------
PS:传统的要在内核之间交互
PS :Netty是一个很庞大的体系, 如果精通netty的话,薪水是普通屌丝的好几倍。hadoop、spark都是有用netty
PS:
0.首先从操作系统的底层是支持异步IO的,但是之前的异步IO是C++的多,Java的异步IO少,源生的socket是同步的性能不好,急需异步框架出现
1.传统BIO模式是这样的,来一个请求,创建一个线程;
后来出现了线程池或者使用消息队列来实现一个线程或多个线程处理N个请求的模型,其实底层还是同步IO,称之为“伪异步IO ,这样其实还是会出现问题,如果队列满的话,还是会出现连接超时。只能用NIO出手了。
NIO
PS : 低负载、低并发应该使用同步阻塞; 高并发进行 NIO
1.类库的简介: buffer、channel、selector
2.看了源码以后,发现NIO的源码比较复杂
总结: 因为原声NIO java代码编写起来比较费劲,所以不太鼓励用java原声代码(需要对多线程比较熟悉),
而是推荐使用netty,因为netty就是高并发的解决方案
PS : Netty打包部署也很简单就是一个jar文件,具体代码看下面
PS:
PS :
PS:平常Java网络传输数据的时候比较慢,后来出现了几种框架
PS : 上面是netty的位置
PS :WebSocket(二)-WebSocket、Socket、TCP、HTTP区别
PS:WebSocket他是基于Tcp的新的,比传统的更好
PS : 案例-----看慕课网的培训教程代码
package com.imooc.netty; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor; /**
* 存储整个工程的全局配置
* @author liuyazhuang
*
*/
public class NettyConfig { /**
* 存储每一个客户端接入进来时的channel对象
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
package com.imooc.netty; import java.util.Date; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil; /**
* 接收/处理/响应客户端websocket请求的核心业务处理类
* @author liuyazhuang
*
*/
public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
//客户端与服务端创建连接的时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());//group像是集合类
System.out.println("客户端与服务端连接开启...");
} //客户端与服务端断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客户端与服务端连接关闭...");
} //服务端接收客户端发送过来的数据结束之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} //工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
} //服务端处理客户端websocket请求的核心方法
@Override
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
//处理客户端向服务端发起http握手请求的业务
if (msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest)msg);
}else if (msg instanceof WebSocketFrame) { //处理websocket连接业务
handWebsocketFrame(context, (WebSocketFrame)msg);
}
} /**
* 处理客户端与服务端之前的websocket业务
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判断是否是关闭websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
}
//判断是否是ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
} //判断是否是二进制消息,如果是二进制消息,抛出异常
if( ! (frame instanceof TextWebSocketFrame) ){
System.out.println("目前我们不支持二进制消息");
throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");
}
//返回应答消息
//获取客户端向服务端发送的消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服务端收到客户端的消息====>>>" + request);
if(request.equals("CXLL")){
request = "正在办理查询流量";
}
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ ctx.channel().id()
+ " ===>>> "
+ request);
//群发,服务端向每个连接上来的客户端 群发 消息
NettyConfig.group.writeAndFlush(tws);//响应到浏览器中的内容!!!!
}
/**
* 处理客户端向服务端发起http握手请求的业务
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
if (!req.getDecoderResult().isSuccess()
|| ! ("websocket".equals(req.headers().get("Upgrade")))) {//不是websocket的请求
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else{
handshaker.handshake(ctx.channel(), req);
}
} /**
* 服务端向客户端响应消息
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
DefaultFullHttpResponse res){
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
package com.imooc.netty; import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler; /**
* 初始化连接时候的各个组件
* @author liuyazhuang
*
*/
public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> { @Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec", new HttpServerCodec());
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
e.pipeline().addLast("handler", new MyWebSocketHandler());
} }
package com.imooc.netty; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* 程序的入口,负责启动应用
* @author liuyazhuang
*
*/
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new MyWebSocketChannelHandler());
System.out.println("服务端开启等待客户端连接....");
Channel ch = b.bind(8888).sync().channel();
ch.closeFuture().sync(); } catch (Exception e) {
e.printStackTrace();
}finally{
//优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
<title>WebSocket客户端</title>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket = window.MozWebSocket;
} if(window.WebSocket){
socket = new WebSocket("ws://localhost:8888/websocket");
socket.onmessage = function(event){
var ta = document.getElementById('responseContent');
ta.value += event.data + "\r\n";
}; socket.onopen = function(event){
var ta = document.getElementById('responseContent');
ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";
}; socket.onclose = function(event){
var ta = document.getElementById('responseContent');
ta.value = "";
ta.value = "WebSocket连接已经关闭\r\n";
};
}else{
alert("您的浏览器不支持WebSocket");
} function send(message){
if(!window.WebSocket){
return;
}
if(socket.readyState == WebSocket.OPEN){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!!");
}
}
</script>
</head>
<body>
<form onSubmit="return false;">
<input type = "text" name = "message" value = ""/>
<br/><br/>
<input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>
<hr color="red"/>
<h2>客户端接收到服务端返回的应答消息</h2>
<textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>
</form>
</body>
</html>
------------------------------------------------------------------------------------------------------
Dubbo底层用的就是Netty
3.2. netty的helloworld
3.2.1. 下载netty包
• 下载netty包,下载地址http://netty.io/
3.2.2. 服务端启动类
package com.netty.demo.server; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* • 配置服务器功能,如线程、端口 • 实现服务器处理程序,它包含业务逻辑,决定当有一个请求连接或接收数据时该做什么
*
* @author wilson
*
*/
public class EchoServer { private final int port; public EchoServer(int port) {
this.port = port;
} public void start() throws Exception {
EventLoopGroup eventLoopGroup = null;
try {
//创建ServerBootstrap实例来引导绑定和启动服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等
eventLoopGroup = new NioEventLoopGroup();
//指定通道类型为NioServerSocketChannel,设置InetSocketAddress让服务器监听某个端口已等待客户端连接。
serverBootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).localAddress("localhost",port).childHandler(new ChannelInitializer<Channel>() {
//设置childHandler执行所有的连接请求
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
// 最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定,然后服务器等待通道关闭,因为使用sync(),所以关闭操作也会被阻塞。
ChannelFuture channelFuture = serverBootstrap.bind().sync();//主要用于异步操作通知回调
System.out.println("开始监听,端口为:" + channelFuture.channel().localAddress());
channelFuture.channel().closeFuture().sync();//方法柱塞,等待服务端链路关闭后,Main函数才退出
} finally {
eventLoopGroup.shutdownGracefully().sync(); //优雅的退出,释放所有的资源
}
} public static void main(String[] args) throws Exception {
new EchoServer(20000).start();
}
}
3.2.3. 服务端回调方法
package com.netty.demo.server; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoServerHandler extends ChannelInboundHandlerAdapter {// channelInboundHandlerAdapter 对于网络事件进行读写操作 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("server 读取数据……");
//读取数据
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("接收客户端数据:" + body);
//向客户端写数据
System.out.println("server向client发送数据");
String currentTime = new Date(System.currentTimeMillis()).toString();
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("server 读取数据完毕..");
ctx.flush();//刷新后才将数据发出到SocketChannel
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
} }
3.2.4. 客户端启动类
package com.netty.demo.client; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; /**
* • 连接服务器 • 写数据到服务器 • 等待接受服务器返回相同的数据 • 关闭连接
*
* @author wilson
*
*/
public class EchoClient { private final String host;
private final int port; public EchoClient(String host, int port) {
this.host = host;
this.port = port;
} public void start() throws Exception {
EventLoopGroup nioEventLoopGroup = null;
try {
//创建Bootstrap对象用来引导启动客户端
Bootstrap bootstrap = new Bootstrap();
//创建EventLoopGroup对象并设置到Bootstrap中,EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据;专门用来网络数据的处理
nioEventLoopGroup = new NioEventLoopGroup();
//创建InetSocketAddress并设置到Bootstrap中,InetSocketAddress是指定连接的服务器地址
bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
//添加一个ChannelHandler,客户端成功连接服务器后就会被执行
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
// • 调用Bootstrap.connect()来连接服务器
ChannelFuture f = bootstrap.connect().sync();
// • 最后关闭EventLoopGroup来释放资源
f.channel().closeFuture().sync();
} finally {
nioEventLoopGroup.shutdownGracefully().sync();
}
} public static void main(String[] args) throws Exception {
new EchoClient("localhost", 20000).start();
}
}
3.2.5. 客户端回调方法
package com.netty.demo.client; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
//客户端连接服务器后被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接服务器,开始发送数据……");
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuf firstMessage = Unpooled.buffer(req.length);
firstMessage.writeBytes(req);
ctx.writeAndFlush(firstMessage);
}
//• 从服务器接收到数据后调用
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client 读取server数据..");
//服务端返回消息后
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("服务端数据为 :" + body);
}
//• 发生异常时被调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exceptionCaught..");
// 释放资源
ctx.close();
}
}
PS:谈谈个人的理解
图上面是传统的IO交互方式,会出现io阻塞。然后又下图NIO,我也看不太明白具体是怎样的交互方式,但是在netty引入的地方我知道和传统IO流差不多,感觉更方便一些。
速度应该也更快速一些。 后面的框架都是根据这个完成的。 还有就是很多hadoop的框架并不难,都是业务逻辑。
---------------------------------
PS : 这些实现是为了 讲解其实就是dubbo的简单实现,