模仿Tomcat的BIO,NIO线程模型

模仿Tomcat的BIO模型,来一个消息,分配一个线程处理.
则主线程池代码如下
package com.guanjian;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ThreadPool {

private ExecutorService service;
private List<MessageTask> tasks;
private int fixedThreadNum = 0;
private List<String> messages;
private MessageHandler messageHandler;
public ThreadPool(int fixedThreadNum,List<String> messages,MessageHandler messageHandler) {
    this.fixedThreadNum = fixedThreadNum;
    this.messages = messages;
    this.messageHandler = messageHandler;
    service = Executors.newFixedThreadPool(fixedThreadNum);
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            shutdownGracefully(service);
        }
    });
}
public void shutdownGracefully(ExecutorService ThreadPool) {
    ShutdownPool.shutdownThreadPool(ThreadPool, "main-pool");
}

public void startup() {
    tasks = new ArrayList<>();
    MessageTask messageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(messageHandler,messages) : new ConcurrentMessageTask(messageHandler,messages));
    for (String message:messages) {
        tasks.add(messageTask);
        service.execute(messageTask);
    }
}

}
它是通过线程数fixedThreadNum来区分使用哪种线程模型.
package com.guanjian;

/**

  • Created by Administrator on 2018/7/10.
    */

public interface MessageHandler {

public void execute(String message);

}
package com.guanjian;

/**

  • Created by Administrator on 2018/7/10.
    */

public class MessageHandlerImpl implements MessageHandler {

@Override
public void execute(String message) {
    System.out.println(message);
}

}
以上是消息处理器的接口和实现类
package com.guanjian;

import java.util.List;

/**

  • Created by Administrator on 2018/7/10.
    */

public abstract class MessageTask implements Runnable {

protected MessageHandler messageHandler;
protected  List<String> messages;

MessageTask(MessageHandler messageHandler,List<String> messages) {
    this.messageHandler = messageHandler;
    this.messages = messages;
}
@Override
public void run() {
    for (String message:messages) {
        handlerMessage(message);
    }
}
protected abstract void handlerMessage(String message);

}
消息任务抽象类实现了Runnable线程接口,以不同的子类来实现BIO,NIO线程模型,具体在抽象方法handlerMessage中实现.
package com.guanjian;

import java.util.List;

/**

  • Created by Administrator on 2018/7/10.
    */

public class SequentialMessageTask extends MessageTask {

SequentialMessageTask(MessageHandler messageHandler, List<String> messages) {
    super(messageHandler, messages);
}

@Override
protected void handlerMessage(String message) {
    messageHandler.execute(message);
}

}
BIO线程模型子类,通过主线程池来分配线程处理.
package com.guanjian;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ConcurrentMessageTask extends MessageTask {

private ExecutorService asyncService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
ConcurrentMessageTask(MessageHandler messageHandler, List<String> messages) {
    super(messageHandler, messages);
}

@Override
protected void handlerMessage(String message) {
    asyncService.submit(new Runnable() {
        @Override
        public void run() {
            messageHandler.execute(message);
        }
    });
}
protected void shutdown() {
    ShutdownPool.shutdownThreadPool(asyncService,"async-pool-" + Thread.currentThread().getId());
}

}
NIO线程模型,不再使用主线程池来分配线程,而是异步线程池,类比于Netty中的Worker线程池,从BOSS线程池中接管消息处理.
package com.guanjian;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**

  • Created by Administrator on 2018/7/10.
    */

public class ShutdownPool {

private static Logger log = LoggerFactory.getLogger(ThreadPool.class);
/**
 * 优雅关闭线程池
 * @param threadPool
 * @param alias
 */
public static void shutdownThreadPool(ExecutorService threadPool, String alias) {
    log.info("Start to shutdown the thead pool: {}", alias);

    threadPool.shutdown(); // 使新任务无法提交.
    try {
        // 等待未完成任务结束
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow(); // 取消当前执行的任务
            log.warn("Interrupt the worker, which may cause some task inconsistent. Please check the biz logs.");

            // 等待任务取消的响应
            if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
                log.error("Thread pool can't be shutdown even with interrupting worker threads, which may cause some task inconsistent. Please check the biz logs.");
        }
    } catch (InterruptedException ie) {
        // 重新取消当前线程进行中断
        threadPool.shutdownNow();
        log.error("The current server thread is interrupted when it is trying to stop the worker threads. This may leave an inconcistent state. Please check the biz logs.");

        // 保留中断状态
        Thread.currentThread().interrupt();
    }

    log.info("Finally shutdown the thead pool: {}", alias);
}

}
最后是线程池的优雅关闭,无论是主线程池还是异步线程池皆调用该方法实现优雅关闭.

以上只是模型代码,具体可替换成具体需要的业务代码来达到业务性能的提升.

上一篇:函数式接口、默认方法、纯函数、函数的副作用、高阶函数、可变的和不可变的、函数式编程和 Lambda 表达式 - 响应式编程 [Android RxJava2](这到底是什么)第三部分


下一篇:Fabric go sdk开发入门