在开发多线程程序的过程中,你是否发现使用同步关键字后,怎么不好使呢,还是出现不能同步的情况。
见下面的例子,这时一个连接MQ的程序,通过使用多线程,模拟多个客户端同时登录、连接MQ。首先创建多个线程,然后每个线程自己连接MQ服务器。
这里实现的主要功能是测试创建线程的时间(T1),测试所有线程都连接上MQ服务器的时间(T2)。
1. T1实现
直接在创建线程循环语句前后计时,然后算出所花时间。
2. T2实现
使用一个计数器,每个线程连接完成后,计数器加1,该计数器为全局计数器,所有线程均能访问。当计数达到500个时,获取当时时刻,然后减去开始时刻,求得所有线程连接完成的时间。
开始时刻在线程类的构造函数中传入。
3. 客户端编号的正确获取
由于采用多线程,获取编号需要同步,否则获取到的编号会出现紊乱情况(如编号重复),同时也会导致创建连接成功计数不准确。
这里给出的代码是能正常工作的,如果对代码进行一些修改,代码还能正常执行吗?修改如下:
原代码:
/** * 获得线程号 * @return */ synchronized public static int getThreadNum(){ return threadNum++; }
修改后代码:
/** * 获得线程号 * @return */ synchronized public int getThreadNum(){ return threadNum++; }
运行修改后的代码,发现出现了编号重复的现象,实际运行错误信息为:
javax.jms.InvalidClientIDException: Broker: Message_Bus - Client: client_255 already connected from tcp://127.0.0.1:40454
意即创建的客户编号有重复。
可以看到,代码的修改部分仅仅去掉了一个修饰词:static,结果就不正确了,这是为什么呢?
如果是静态方法(使用static)关键字,所有创建的线程均共享一个方法getThreadNum;如果使用非静态方法,则在创建新线程时,每个线程均拥有自己的getThreadNum()方法,即使加了同步方法,也只能在类实例的内部起作用,而不能对其他线程对象起作用。
所以,涉及到多线程的共享方法或变量同步,一般使用同步关键字(synchronized)和静态方法(使用static修饰)。
示例代码MsgSubscribetest.java
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; public class MsgSubscribeTest extends Thread{ private final static String JMS_HOST_IP = "127.0.0.1"; private final static String JMS_HOST_PORT = "61616"; private final static String USERNAME = "system"; private final static String PASSWORD = "manager2"; private final static String TEST_TOPIC = "my-topic"; public static int threadNum = 0; //线程完成计数 public static int iFinishedThread = 0; //测试开始时间 long startTestTime = 0; //JMS连接 Connection connection = null; /** * 构造函数,设置运行开始时间 * @param t1 */ public UapMsgSubscribeTest(long t1){ startTestTime = t1; } /** * 临时订阅消息 * @param msg */ void tempSubscribTopic(String clientid) { //System.out.println("Connection Count:"+ clientid + "," + iFinishedThread); try { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( USERNAME, PASSWORD, "tcp://" + JMS_HOST_IP + ":" + JMS_HOST_PORT +"?wireFormat.maxInactivityDurationInitalDelay=30000"); //JMS 客户端到JMS Provider 的连接 Connection connection = connectionFactory.createConnection(); connection.setClientID(clientid); connection.start(); // Session: 一个发送或接收消息的线程 //Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); ActiveMQSession session = (ActiveMQSession) connection .createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TEST_TOPIC); TopicSubscriber consumer = session.createSubscriber(topic); consumer.setMessageListener(new UapTextListener(connection, session)); //Thread.sleep(3*1000); //session.close(); //connection.stop(); //alConnMgr.add(connection); finishedCount(); System.out.println("订阅消息成功!"); } catch (Exception e) { e.printStackTrace(); } } /** * 检查连接是否创建完成 * @return */ boolean checkCreateConnectionFinished(){ // System.out.println("iFinishedThread=" + iFinishedThread + "," + startTestTime); if(iFinishedThread ==499){ long endTime = System.currentTimeMillis(); System.out.println("Finished Created All Connections Cost:" + (endTime-startTestTime)); return true; } return false; } /** * 创建完成线程计数 * @return */ synchronized public static int finishedCount(){ return iFinishedThread++; } /** * 获得线程号 * @return */ synchronized public static int getThreadNum(){ return threadNum++; } @Override public void run() { String clientId = "client_" + getThreadNum(); tempSubscribTopic(clientId); //检查是否完成 checkCreateConnectionFinished(); } /** * 主入口 * @param args */ public static void main(String[] args) { long t1 = System.currentTimeMillis(); long startTime = System.currentTimeMillis(); for (int i = 0; i < 500; i++) { new MsgSubscribeTest(t1).start(); } long endTime = System.currentTimeMillis(); System.out.println("Create all threads cost:" + (endTime-startTime)); }