这个问题已经在这里有了答案: > Do not share same socket between two threads at the same time 7个
我有一个处理套接字的代码,我需要确保在两个线程之间不要共享同一套接字.在下面的代码中,我有一个后台线程,每60秒运行一次,并调用updateLiveSockets()方法.在updateLiveSockets()方法中,我迭代所有的套接字,然后开始通过调用SendToQueue类的send方法并根据响应将它们标记为有效或无效来对它们进行ping操作.
现在,所有读取器线程将同时调用getNextSocket()方法以获取下一个活动的可用套接字,因此它必须是线程安全的,并且我需要确保所有读取器线程应看到SocketHolder和Socket的相同状态.
下面是我的SocketManager类:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 60, 60, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
}
}
private List<SocketHolder> connect(List<String> paddes, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
// ....
return socketList;
}
// this method will be called by multiple threads concurrently to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().send(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
}
这是我的SendToQueue类:
// this method will be called by multiple threads concurrently to send the data
public boolean sendAsync(final long address, final byte[] encodedRecords) {
Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
PendingMessage m = new PendingMessage(address, encodedRecords, liveSockets.get().getSocket(), true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
} finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
} finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
问题陈述
现在您可以看到我正在两个线程之间共享同一套接字.似乎getNextSocket()可以将0MQ套接字返回到线程A.同时,计时器线程可以访问相同的0MQ套接字以对其执行ping操作.在这种情况下,线程A和计时器线程正在更改同一个0MQ套接字,这可能会导致问题.因此,我试图找到一种方法,以防止不同的线程同时将数据发送到同一套接字并破坏我的数据.
因此,我决定同步套接字,以便没有两个线程可以同时访问同一套接字.以下是我在updateLiveSockets方法中所做的更改.我通过以下方法在套接字上进行了同步:
// runs every 60 seconds to ping all the socket to make sure whether they are alive or not
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) {
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
// using the socket as its own lock
synchronized (socket) {
// pinging to see whether a socket is live or not
boolean status = SendToQueue.getInstance().execute(message.getAddress(), message.getEncodedRecords(), socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
}
}
下面是我在doSendAsync方法中所做的更改.在这种情况下,我也在发送套接字之前在套接字上进行了同步.
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A by synchronizing on it
synchronized (socket) {
return msg.send(socket);
}
} finally {
msg.destroy();
}
}
确保不在两个线程之间共享同一套接字的最佳方法是什么?通常,我有大约60个套接字和20个线程访问这些套接字.
如果许多线程使用相同的套接字,则资源利用率不高.此外,如果msg.send(socket);被阻塞(从技术上讲不应该),所有等待此套接字的线程都被阻塞.因此,我想可能会有更好的方法来确保每个线程同时使用不同的单个活动套接字,而不是在特定套接字上进行同步.还有我错过的任何角落情况或边缘情况会导致某些错误吗?
解决方法:
首先,您需要一种让客户端通知您使用Socket完成的方法.您可以添加一种方法,使他们可以发出信号.这是合法的,并且会起作用,但是您必须依靠客户来表现良好.更确切地说,使用您的套接字的程序员不会忘记返回它.有一种模式可以解决这个问题:execute around模式.而不是给出一个Socket,您可以创建一个接受Consumer< Socket>的方法,然后执行消费者,并返回Socket本身.
public void useSocket(Consumer<Socket> socketUser) {
Socket socket = getSocket();
try {
socketUser.accept(socket);
} finally {
returnSocket(socket);
}
}
现在让我们看一下如何实现getSocket()和returnSocket().显然,这涉及从某种集合中获取它们,并将它们返回到该集合中.在这里,队列是一个不错的选择(正如其他人也提到的那样).它允许从一侧获取它,而从另一侧返回它,再加上有许多有效的线程安全实现,并且接收者和加法器通常不会互相争用.既然您事先知道套接字的数量,我就选择ArrayBlockingQueue.
这里的另一个问题是您的实现返回一个Optional.我不确定如果没有可用的套接字,客户端将如何处理,但是如果它正在等待并重试,建议您简单地使getSocket()阻塞队列.实际上,我会尊重您的方法的这一方面,并考虑到可能没有可用的Socket.对于环回执行方法,如果没有可用的Socket,它将转换为useSocket()方法并返回false.
private final BlockingQueue<Socket> queue;
public SocketPool(Set<Socket> sockets) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
}
public boolean useSocket(Consumer<Socket> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
try {
maybeSocket.ifPresent(socketUser);
return maybeSocket.isPresent();
} finally {
maybeSocket.ifPresent(this::returnSocket);
}
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
就是这样,这就是您的SocketPool.
啊,可惜的是:检查活动.之所以小气,是因为您的生活检查实际上与您的普通客户竞争.
为了解决这个问题,我提出以下建议:让您的客户报告他们获得的Socket是否处于活动状态.由于检查活动性取决于使用Socket,因此对于您的客户而言,这应该很简单.
因此,我们将使用Function< Socket,Boolean>来代替Consumer< Socket>.并且如果函数返回false,我们将认为Socket不再可用.在这种情况下,我们没有将其添加回常规队列中,而是将其添加到了无效套接字的集合中,并且我们将执行一个计划任务,该任务会间歇性地重新检查无效套接字.由于这发生在单独的集合上,因此计划的检查不再与常规客户竞争.
现在,您可以使用带有将数据中心映射到SocketPool实例的Map来创建SocketManager.该映射不需要更改,因此您可以使其最终化并在SocketManager的构造函数中对其进行初始化.
这是我的SocketPool初步代码(未经测试):
class SocketPool implements AutoCloseable {
private final BlockingQueue<Socket> queue;
private final Queue<Socket> deadSockets = new ConcurrentLinkedQueue<>();
private final ScheduledFuture<?> scheduledFuture;
public SocketPool(Set<Socket> sockets, ScheduledExecutorService scheduledExecutorService) {
queue = new ArrayBlockingQueue<>(sockets.size());
queue.addAll(sockets);
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(this::recheckDeadSockets, 60, 60, TimeUnit.SECONDS);
}
public boolean useSocket(Function<Socket, Boolean> socketUser) throws InterruptedException {
Optional<Socket> maybeSocket = getSocket();
boolean wasLive = true;
try {
wasLive = maybeSocket.map(socketUser).orElse(false);
return wasLive && maybeSocket.isPresent();
} finally {
boolean isLive = wasLive;
maybeSocket.ifPresent(socket -> {
if (isLive) {
returnSocket(socket);
} else {
reportDead(socket);
}
});
}
}
private void reportDead(Socket socket) {
deadSockets.add(socket);
}
private void returnSocket(Socket socket) {
queue.add(socket);
}
private Optional<Socket> getSocket() throws InterruptedException {
return Optional.ofNullable(queue.poll());
}
private void recheckDeadSockets() {
for (int i = 0; i < deadSockets.size(); i++) {
Socket socket = deadSockets.poll();
if (checkAlive(socket)) {
queue.add(socket);
} else {
deadSockets.add(socket);
}
}
}
private boolean checkAlive(Socket socket) {
// do actual live check with SendSocket class, or implement directly in this one
return true;
}
@Override
public void close() throws Exception {
scheduledFuture.cancel(true);
}
}