我们将会把重点放在一个无连接协议即用户数据报协议(UDP)上,它通常用在性能至关重要并且能够容忍一定的数据报丢失的情况下。
面向连接的传输(如TCP)管理了两个网络端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输,以及最后,连接的有序终止。相比之下,在类似于UDP这样的无连接协议中,并没有持久化连接这样的概念,并且每个消息(一个UDP数据报)都是一个单独的传输单元。
此外,UDP也没有TCP的纠错机制,其中每个节点都将确认它们所接收到的包,而没有被确认的包将会被发送方重新传输。
通过类比,TCP连接就像打电话,其中一系列的有序消息将会在两个方法上流动,相反,UDP则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。
UDP的这些方面可能会让你感觉到严重的局限性,但是它们也解释了为何它会比TCP快那么多:所有的握手以及消息管理机制的开销已经被消除了。显然,UDP很适合那些能够处理或者容忍消息丢失的应用程序,但可能不适合那些处理金融交易的应用程序。
2、UDP广播
到目前为止,我们所有的例子采用的都是一种叫做单播的传输模式,定义为发送消息给一个由唯一的地址所标识的单一的网络目的地。面向连接的协议和无连接协议都支持这种模式。
UDP提供了向多个接收者发送消息的额外传输模式:
多播——传输到一个预定义的主机组
广播——传输到网络(或者子网)上的所有主机
示例应用程序将通过发送能够被同一个网络中的所有主机所接收的消息来演示UDP广播的使用。为此,我们将使用特殊的受限广播地址或者零网络地址255.255.255.255.发送到这个地址的消息都将会被定向给本地网络(0.0.0.0)上的所有主机,而不会被路由器转发给其他的网络。
3、UDP示例应用程序
我们的示例程序将打开一个文件,随后将会通过UDP把每一行都作为一个消息广播到一个指定的端口。如果你熟悉类UNIX操作系统,你可能会认识到这是标准的syslog实用程序的一个非常简化的版本。UDP非常适合于这样的应用程序,因为考虑到日志文件本身已经被存储在了文件系统中,因此,偶尔丢失日志文件中的一两行是可以容忍的。此外,该应用程序还提供了极具有价值的高效处理大量数据的能力。
接收方是怎么样呢?通过UDP广播,只需简单地通过在指定的端口上启动一个监听程序,便可以创建一个事件监视器来接收日志消息。需要注意的是,这样的轻松访问性也带来了潜在的安全隐患,这也就是为何在不安全的环境中并不倾向于使用UDP广播的原因之一。出于同样的原因,路由器通常也会阻止广播消息,并将它们限制在它们的来源网络上。
发布/订阅模式 : 类似于syslog这样的应用程序通常会被归类为发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
下图展示了整个系统的一个高级别试图,其由一个广播者以及一个或者多个事件监听器所组成。广播者将监听新内容的出现,当它出现时,则通过UDP将它作为一个广播消息进行传输。
所有的该UDP端口上监听的事件监听器都将会接收到广播消息。
为了简单起见,我们将不会为我们的示例程序添加身份认证、验证或者加密。但是,要加入这些功能并使得其成为一个健壮的、可用的实用程序应该也不难。
4、消息POJO:LogEvent
在消息处理应用程序中,数据通常由POJO表示,除了实际上的消息内容,其还包含配置或处理消息。在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们将它称为LogEvent。以下代码展示了这个简单的POJO的详细信息。
public final class LogEvent { public static final byte SEPARATOR = (byte) ‘:‘; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; //用于传出消息的构造函数
public LogEvent(String logfile, String msg) { this(null,-1,logfile,msg);
} //用于传入消息的构造函数
public LogEvent(InetSocketAddress source,long received, String logfile, String msg) { this.source = source; this.logfile = logfile; this.msg = msg; this.received = received;
} public InetSocketAddress getSource() { return source;
} public String getLogfile() { return logfile;
} public String getMsg() { return msg;
} public long getReceived() { return received;
}
}
5、编写广播者
Netty提供了大量的类来支持UDP应用程序的编写。Netty的DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。
要将LogEvent消息转换为DatagramPacket,我们将需要一个编码器。但是没有必要从头开始编写我们自己的。我们将扩展Netty的MessageToMessageEncoder。
下图展示了正在广播的3个日志条目,每一个都将通过一个专门的DatagramPacket进行广播。
下图呈现了该LogEventBroadcaster的ChannelPipeline的一个高级别试图,展示了LogEvent消息是如何流经它的。
正如你所看到的,所有的将要被传输的数据都被封装在了LogEvent消息中。LogEventBroadcaster将把这些写入到Channel中,并通过ChannelPipeline发送它们,在那里它们将会被转码为DatagramPacket消息。最后,它们都将通过UDP被广播,并由远程节点所捕获。
以下代码展示了我们自定义版本的MessageToMessageEncoder,其将执行刚才所描述的转换。
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent>{
private final InetSocketAddress remoteAddress; //LogEventEncoder创建了即将被发送到指定的InetSocketAddress的DatagramPacket消息
public LogEventEncoder(InetSocketAddress remoteAddress){ this.remoteAddress = remoteAddress;
} @Override
protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8); ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1); //将文件名写入到ByteBuf中
buf.writeBytes(file); //添加一个SEPARATOR
buf.writeByte(LogEvent.SEPARATOR); //将日志消息写入ByteBuf中
buf.writeBytes(msg); //将一个拥有数据和目的地地址的新DatagramPacket添加到出站的消息列表中
out.add(new io.netty.channel.socket.DatagramPacket(buf,remoteAddress));
}
}
在LogEventEncoder被实现之后,我们已经准备好了引导该服务器,其包括设置各种各样的ChannelOption,以及在ChannelPipeline中安装所需要的ChannelHandler。这将通过主类LogEventBroadcaster完成。如下代码所示。
public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress address, File file){ group = new NioEventLoopGroup();
bootstrap = new Bootstrap(); //引导该NioDatagramChannel(无连接)
bootstrap.group(group).channel(NioDatagramChannel.class) //设置SO_BROADCAST套接字选项
.option(ChannelOption.SO_BROADCAST,true)
.handler(new LogEventEncoder(address)); this.file = file;
} public void run() throws Exception{ //绑定Channel
Channel ch = bootstrap.bind(0).sync().channel(); long pointer = 0; //启动主处理循环
for (;;){ long len = file.length(); if (len < pointer){ //file was reset
//如果有必要,将文件指针设置到该文件的最后一个字符
pointer = len;
} else if (len > pointer){ //Content was added
RandomAccessFile raf = new RandomAccessFile(file,"r"); //设置当前的文件指针,以确保没有任何的旧日志被发送
raf.seek(pointer);
String line; while((line = raf.readLine()) != null){ //对于每条日志条目。,写入一个LogEvent到Channel中
ch.writeAndFlush(new LogEvent(null,-1,file.getAbsolutePath(),line));
} //存储其在文件中的当前位置
pointer = raf.getFilePointer();
raf.close();
} try { //休眠1秒,如果被中断,则退出循环,否则重新处理它
Thread.sleep(1000);
}catch (InterruptedException e){
Thread.interrupted(); break;
}
}
} public void stop(){ group.shutdownGracefully();
} public static void main(String[] args) throws Exception{ if (args.length != 2){ throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster( new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0])),new File(args[1])); try {
broadcaster.run();
}finally {
broadcaster.stop();
}
}
}
6、编写监视器
目标是将netcat替换为一个更加完整的事件消费者,我们称之为LogEventMonitor。这个程序将:
(1)接收有LogEventBroadcaster广播的UDP DatagramPacket
(2)将它们解码为LogEvent消息
(3)将LogEvent消息写到System.out
和之前一样,该逻辑由一组自定义的ChannelHandler实现——对于我们的解码器来说,我们将扩展MessageToMessageDecoder。下图描绘LogEventMonitor的ChannelPipeline,并且展示了LogEvnet是如何流经它的。
ChannelPipeline中的第一个解码器LogEventDecoder负责传入的DatagramPacket解码为LogEvent消息(一个用于转换入站数据的任何Netty应用程序的典型设置)
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket>{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> out) throws Exception { //获取对DatagramPacket中的数据的引用
ByteBuf data = datagramPacket.content(); //获取该SEPARATOR的索引
int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR); //提取文件名
String fileName = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取日志消息
String logMsg = data.slice(idx + 1,data.readableBytes()).toString(CharsetUtil.UTF_8); //构建一个新的LogEvent对象,并且将它添加到列表中
LogEvent event = new LogEvent(datagramPacket.sender(),System.currentTimeMillis(),fileName,logMsg);
out.add(event);
}
}
第二个ChannelHandler的工作是对第一个ChannelHandler所创建的LogEvent消息执行一些处理。在这个场景下,它只是简单地将它们写到System.out。在真实世界的应用程序中,你可能需要聚合来源于不同日志文件的事件,或者将它们发布到数据库中。
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent>{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //当异常发生时,打印栈跟踪信息,并关闭对应的Channel
cause.printStackTrace();
ctx.close();
} @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception { //创建StringBuilder,并且构建输出的字符串
StringBuilder builder = new StringBuilder();
builder.append(event.getReceived());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg()); //打印LogEvent的数据
System.out.println(builder.toString());
}
}
LogEventHandler将以一种简单易读的格式打印LogEvent消息,现在我们需要将我们的LogEventDecoder和LogEventHandler安装到ChannelPipeline中。
public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address){ group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true)
.handler(new ChannelInitializer<Channel>() {
@Override protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
} public Channel bind(){ return bootstrap.bind().syncUninterruptibly().channel();
} public void stop(){ group.shutdownGracefully();
} public static void main(String[] args) throws Exception{ if (args.length != 1){ throw new IllegalArgumentException("Usage:LoEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync();
}finally {
monitor.stop();
}
}