线程同步

在开发多线程程序的过程中,你是否发现使用同步关键字后,怎么不好使呢,还是出现不能同步的情况。

见下面的例子,这时一个连接MQ的程序,通过使用多线程,模拟多个客户端同时登录、连接MQ。首先创建多个线程,然后每个线程自己连接MQ服务器。

这里实现的主要功能是测试创建线程的时间(T1),测试所有线程都连接上MQ服务器的时间(T2)。

1. T1实现

直接在创建线程循环语句前后计时,然后算出所花时间。

2. T2实现

使用一个计数器,每个线程连接完成后,计数器加1,该计数器为全局计数器,所有线程均能访问。当计数达到500个时,获取当时时刻,然后减去开始时刻,求得所有线程连接完成的时间。

开始时刻在线程类的构造函数中传入。

3. 客户端编号的正确获取

由于采用多线程,获取编号需要同步,否则获取到的编号会出现紊乱情况(如编号重复),同时也会导致创建连接成功计数不准确。

这里给出的代码是能正常工作的,如果对代码进行一些修改,代码还能正常执行吗?修改如下:

原代码:

    /**
     * 获得线程号
     * @return
     */
    synchronized public static int getThreadNum(){
    	return threadNum++;
    }

修改后代码:

    /**
     * 获得线程号
     * @return
     */
    synchronized public int getThreadNum(){
    	return threadNum++;
    }

运行修改后的代码,发现出现了编号重复的现象,实际运行错误信息为:

javax.jms.InvalidClientIDException: Broker: Message_Bus - Client: client_255 already connected from tcp://127.0.0.1:40454

意即创建的客户编号有重复。

可以看到,代码的修改部分仅仅去掉了一个修饰词:static,结果就不正确了,这是为什么呢?

如果是静态方法(使用static)关键字,所有创建的线程均共享一个方法getThreadNum;如果使用非静态方法,则在创建新线程时,每个线程均拥有自己的getThreadNum()方法,即使加了同步方法,也只能在类实例的内部起作用,而不能对其他线程对象起作用。

所以,涉及到多线程的共享方法或变量同步,一般使用同步关键字(synchronized)和静态方法(使用static修饰)。


示例代码MsgSubscribetest.java

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;


public class MsgSubscribeTest extends Thread{
	private final static String JMS_HOST_IP = "127.0.0.1";
  	private final static String JMS_HOST_PORT = "61616";	
 	private final static String USERNAME = "system";	
	private final static String PASSWORD = "manager2";	
	
	private final static String TEST_TOPIC = "my-topic";	
	
	public static int threadNum = 0;
	
	
 	//线程完成计数
	 public static int iFinishedThread = 0;

	 //测试开始时间
  	long startTestTime = 0;
	
	 //JMS连接
	Connection connection = null;
	
 	/**
	 * 构造函数,设置运行开始时间
	 * @param t1
	  */
	public UapMsgSubscribeTest(long t1){
 		startTestTime = t1;
	 }
	

    /**
     * 临时订阅消息
     * @param msg
     */
    void tempSubscribTopic(String clientid) {
		//System.out.println("Connection Count:"+ clientid + "," + iFinishedThread);
        try {
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
	                USERNAME,
	                 PASSWORD,
	                "tcp://" + JMS_HOST_IP + ":" + JMS_HOST_PORT +"?wireFormat.maxInactivityDurationInitalDelay=30000");
		//JMS 客户端到JMS Provider 的连接
 		Connection connection = connectionFactory.createConnection();
 		connection.setClientID(clientid);
     		connection.start(); 
			
    		// Session: 一个发送或接收消息的线程
    		//Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
		ActiveMQSession session = (ActiveMQSession) connection
			.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    		Topic topic = session.createTopic(TEST_TOPIC);
    		TopicSubscriber consumer = session.createSubscriber(topic);
     		consumer.setMessageListener(new UapTextListener(connection, session));
                 
            //Thread.sleep(3*1000); 
            //session.close(); 
            //connection.stop();    
            
    	    //alConnMgr.add(connection);
     	    finishedCount();
    	    System.out.println("订阅消息成功!");
	} catch (Exception e) {
	    e.printStackTrace();
	}
    }    
       
    /**
     * 检查连接是否创建完成
     * @return
     */
    boolean checkCreateConnectionFinished(){
     //    	System.out.println("iFinishedThread=" + iFinishedThread + "," + startTestTime);
    	if(iFinishedThread ==499){
 		long endTime = System.currentTimeMillis();
    		System.out.println("Finished Created All Connections Cost:" + (endTime-startTestTime));
 		return true;
    	}
    	
    	return false;
    }
    
    
    /**
     * 创建完成线程计数
     * @return
     */
    synchronized public static int finishedCount(){
    	return iFinishedThread++;
    }
    
    /**
     * 获得线程号
     * @return
     */
    synchronized public static int getThreadNum(){
    	return threadNum++;
    }
  
    @Override
    public void run() {
        String clientId = "client_" + getThreadNum();
	tempSubscribTopic(clientId);
	//检查是否完成
	checkCreateConnectionFinished();
    }

    
    
    /**
     * 主入口
     * @param args
     */
    public static void main(String[] args) {	
		long t1 = System.currentTimeMillis();

		long startTime = System.currentTimeMillis();
		for (int i = 0; i < 500; i++) {
			new MsgSubscribeTest(t1).start();
		}
		long endTime = System.currentTimeMillis();
		System.out.println("Create all threads cost:" + (endTime-startTime));
	}


线程同步

上一篇:5 分钟的颈椎操


下一篇:运行捕食者算法的全过程(zk大神的跟踪算法)