NIO 源码分析(02-2) BIO 源码分析 Socket
Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)
在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。
一、BIO 最简使用姿势
//1. 连接服务器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true);
//2. 发送请求数据
out.println("客户端发送请求数据...");
//3. 接收服务端数据
String response = in.readLine();
System.out.println("Client: " + response);
ok,代码已经完成!!!下面的源码分析都会基于这个 demo。
二、connect 方法
2.1 Socket.connect 方法
// timeout=0 表示永久阻塞,timeout>0 则指定超时时间
public void connect(SocketAddress endpoint, int timeout) throws IOException {
InetSocketAddress epoint = (InetSocketAddress) endpoint;
InetAddress addr = epoint.getAddress ();
int port = epoint.getPort();
// 1. 创建底层 socket 套接字
if (!created)
createImpl(true);
// 2. oldImpl 默认为 false,也就是进入第一个 if 条件
// checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
// 来设置 oldImpl 的值
if (!oldImpl)
impl.connect(epoint, timeout);
else if (timeout == 0) {
if (epoint.isUnresolved())
impl.connect(addr.getHostName(), port);
else
impl.connect(addr, port);
} else
throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
connected = true;
bound = true;
}
总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作
2.2 AbstractPlainSocketImpl.connect 方法
protected void connect(SocketAddress address, int timeout) throws IOException {
boolean connected = false;
try {
InetSocketAddress addr = (InetSocketAddress) address;
this.port = addr.getPort();
this.address = addr.getAddress();
connectToAddress(this.address, port, timeout);
connected = true;
} finally {
if (!connected) {
close();
}
}
}
private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
if (address.isAnyLocalAddress()) {
doConnect(InetAddress.getLocalHost(), port, timeout);
} else {
doConnect(address, port, timeout);
}
}
总结: connect 将连接具体由 doConnect 完成
synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpConnect(fd, address, port);
}
}
try {
acquireFD();
try {
socketConnect(address, port, timeout);
/* socket may have been closed during poll/select */
synchronized (fdLock) {
if (closePending) {
throw new SocketException ("Socket closed");
}
}
if (socket != null) {
socket.setBound();
socket.setConnected();
}
} finally {
releaseFD();
}
} catch (IOException e) {
close();
throw e;
}
}
2.3 DualStackPlainSocketImpl.socketConnect 方法
void socketConnect(InetAddress address, int port, int timeout)
throws IOException {
int nativefd = checkAndReturnNativeFD();
if (address == null)
throw new NullPointerException("inet address argument is null.");
int connectResult;
if (timeout <= 0) {
connectResult = connect0(nativefd, address, port);
} else {
configureBlocking(nativefd, false);
try {
connectResult = connect0(nativefd, address, port);
if (connectResult == WOULDBLOCK) {
waitForConnect(nativefd, timeout);
}
} finally {
configureBlocking(nativefd, true);
}
}
if (localport == 0)
localport = localPort0(nativefd);
}
补充1:connect0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
(JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
SOCKETADDRESS sa;
int rv;
int sa_len = sizeof(sa);
if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
&sa_len, JNI_TRUE) != 0) {
return -1;
}
rv = connect(fd, (struct sockaddr *)&sa, sa_len);
if (rv == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
} else if (err == WSAEADDRNOTAVAIL) {
JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
"connect: Address is invalid on local machine, or port is not valid on remote machine");
} else {
NET_ThrowNew(env, err, "connect");
}
return -1; // return value not important.
}
return rv;
}
总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len)
建立远程连接。
补充2:waitForConnect 在 JVM 中的实现
和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。
JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
(JNIEnv *env, jclass clazz, jint fd, jint timeout) {
int rv, retry;
int optlen = sizeof(rv);
fd_set wr, ex;
struct timeval t;
FD_ZERO(&wr);
FD_ZERO(&ex);
FD_SET(fd, &wr);
FD_SET(fd, &ex);
t.tv_sec = timeout / 1000;
t.tv_usec = (timeout % 1000) * 1000;
rv = select(fd+1, 0, &wr, &ex, &t);
if (rv == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"connect timed out");
shutdown( fd, SD_BOTH );
return;
}
if (!FD_ISSET(fd, &ex)) {
return; /* connection established */
}
for (retry=0; retry<3; retry++) {
NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
(char*)&rv, &optlen);
if (rv) {
break;
}
Sleep(0);
}
if (rv == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"Unable to establish connection");
} else {
NET_ThrowNew(env, rv, "connect");
}
}
总结: rv = select(fd+1, 0, &wr, &ex, &t)
轮询会阻塞程序。
三、SocketInputStream
3.1 构造方法
SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
super(impl.getFileDescriptor());
this.impl = impl;
socket = impl.getSocket();
}
总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。
3.2 read 方法
int read(byte b[], int off, int length, int timeout) throws IOException {
int n;
// EOF already encountered
if (eof) {
return -1;
}
// connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
// bounds check
if (length <= 0 || off < 0 || off + length > b.length) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException();
}
boolean gotReset = false;
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
gotReset = true;
} finally {
impl.releaseFD();
}
/*
* We receive a "connection reset" but there may be bytes still
* buffered on the socket
*/
if (gotReset) {
impl.setConnectionResetPending();
impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
} finally {
impl.releaseFD();
}
}
/*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionResetPending()) {
impl.setConnectionReset();
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}
private int socketRead(FileDescriptor fd, byte b[], int off, int len,
int timeout) throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
补充2:socketRead0 在 JVM 中的实现
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
char *bufP;
char BUF[MAX_BUFFER_LEN];
jint fd, newfd;
jint nread;
if (IS_NULL(fdObj)) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
return -1;
}
fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (fd == -1) {
NET_ThrowSocketException(env, "Socket closed");
return -1;
}
/*
* If the caller buffer is large than our stack buffer then we allocate
* from the heap (up to a limit). If memory is exhausted we always use
* the stack buffer.
*/
if (len <= MAX_BUFFER_LEN) {
bufP = BUF;
} else {
if (len > MAX_HEAP_BUFFER_LEN) {
len = MAX_HEAP_BUFFER_LEN;
}
bufP = (char *)malloc((size_t)len);
if (bufP == NULL) {
/* allocation failed so use stack buffer */
bufP = BUF;
len = MAX_BUFFER_LEN;
}
}
if (timeout) {
if (timeout <= 5000 || !isRcvTimeoutSupported) {
int ret = NET_Timeout (fd, timeout);
if (ret <= 0) {
if (ret == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"Read timed out");
} else if (ret == JVM_IO_ERR) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
} else if (ret == JVM_IO_INTR) {
JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
"Operation interrupted");
}
if (bufP != BUF) {
free(bufP);
}
return -1;
}
/*check if the socket has been closed while we were in timeout*/
newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (newfd == -1) {
NET_ThrowSocketException(env, "Socket Closed");
if (bufP != BUF) {
free(bufP);
}
return -1;
}
}
}
// 最关键的代码,recv 从 socketfd 中读取数据
nread = recv(fd, bufP, len, 0);
if (nread > 0) {
(*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
} else {
if (nread < 0) {
// Check if the socket has been closed since we last checked.
// This could be a reason for recv failing.
if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
NET_ThrowSocketException(env, "Socket closed");
} else {
switch (WSAGetLastError()) {
case WSAEINTR:
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"socket closed");
break;
case WSAECONNRESET:
case WSAESHUTDOWN:
/*
* Connection has been reset - Windows sometimes reports
* the reset as a shutdown error.
*/
JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
break;
case WSAETIMEDOUT :
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
break;
default:
NET_ThrowCurrent(env, "recv failed");
}
}
}
}
if (bufP != BUF) {
free(bufP);
}
return nread;
}
总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0);
即可,毕竟我们不是专门做 c++。
四、SocketInputStream
和 SocketInputStream 类似,就不继续分析了。
每天用心记录一点点。内容也许不重要,但习惯很重要!