JAVA通信编程(四)——UDP通讯

经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。

UdpImpl类如下:

package com.zzh.comm;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.log4j.Logger;

public class UdpImpl implements CommBuff
{
	private Logger logger = Logger.getLogger(Object.class.getName());
	
	private int local_port;
	private int dest_port;
	private String ip;
	private int time_out;
	
	DatagramSocket client = null;
	
	private String fileName = "/udp.properties";
	public UdpImpl()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		try
		{
			local_port = Integer.parseInt(map.get("udp_local_port"));
			dest_port = Integer.parseInt(map.get("udp_dest_port"));
			time_out = Integer.parseInt(map.get("udp_timeout"));
			ip = map.get("udp_dest_ip");
		}
		catch (Exception e)
		{
			logger.error(e.getMessage());
		}
	}
	
	@Override
	public byte[] readBuff()
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		byte[] recvBuf = new byte[1024];
		DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
		try
		{
			client.receive(recvPacket);
		}
		catch (IOException e)
		{
			logger.info(e.getMessage());
			return new byte[0];
		}
		byte[] ans = new byte[recvPacket.getLength()];
		System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
		logger.info("网口接收:"+CommUtil.bytesToHex(ans));
		return ans;
	}

	@Override
	public void writeBuff(byte[] message)
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		
		try
		{
			InetAddress addr = InetAddress.getByName(ip);
			DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
			client.send(sendPacket);
			logger.info("发送成功: "+CommUtil.bytesToHex(message));
		}
		catch (UnknownHostException e)
		{
			logger.error(e.getMessage());
		}
		catch (IOException e)
		{
			logger.error(e.getMessage());
		}
		
	}

	@Override
	public void open() {
		try
		{
			client = new DatagramSocket(local_port);
			client.setSoTimeout(time_out);
			if(client != null)
			{
				logger.info("client open succeed!");
			}
		}
		catch (SocketException e)
		{
			logger.error(e.getMessage());
		}
	}

	@Override
	public void close() 
	{
		if(client != null)
		{
			client.close();
		}
	}

	@Override
	public Object getInfo()
	{
		return null;
	}

}
UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。

下面通过一个简单工厂模式,可以实现底层通讯的便利性。

package com.zzh.comm;

public class CommFactory
{
	public CommBuff getCommBuff(String properties) throws Exception
	{
		if(properties.equals("comm_serial"))
		{
			return new SerialImpl();
		}
		else if(properties.equals("comm_tcpServer"))
		{
			return new TcpServerImpl();
		}
		else if(properties.equals("comm_tcpClient"))
		{
			return new TcpClientImpl();
		}
		else if(properties.equals("comm_udp"))
		{
			return new UdpImpl();
		}
		else
		{
			throw new Exception("Communication para error: no found avaliable communication Object instance.");
		}
	}
}
上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。

本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.

package com.zzh.protocol;

import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.zzh.comm.CommBuff;
import com.zzh.comm.CommFactory;
import com.zzh.comm.CommUtil;
import com.zzh.comm.ReadProperties;
import com.zzh.dao.ModbusDao;
import com.zzh.dao.ModbusDaoImpl;
import com.zzh.dao.pojo.ModbusPojo;

public class Modbus {
	private CommBuff comm;
	private int comm_timeout;
	private byte devAddr;
	
	private static int RECV_SIZE = 35;
	private static int RECV_INNER_SIZE = 30;
	private static int MINUTE=60000;
	private volatile boolean  refreshFlag = false;
	
	private ModbusPojo modbusPojo; 
	
	private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
	private String fileName = "/modbus.properties";
	
	public Modbus()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		String comm_way = map.get("modbus_comm_way");
		String comm_timeouts = map.get("comm_timeout");
		comm_timeout = Integer.parseInt(comm_timeouts);
		String devAddrs = map.get("devAddr");
		devAddr = Byte.parseByte(devAddrs);
		if(comm_way!=null)
		{
			modbusPojo = new ModbusPojo(); 
			try
			{
				comm = new CommFactory().getCommBuff(comm_way);
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
			comm.open();
			
			ExecutorService pool = Executors.newFixedThreadPool(2);
			Thread thread1 = new Thread(new readThread());
	    	thread1.setDaemon(true);
	    	Thread thread2 = new Thread(new dbThread());
	    	thread2.setDaemon(true);
	    	pool.execute(thread1);
	    	pool.execute(thread2);
		}
		else
		{
			throw new RuntimeException("没有配置好合适的串口参数");
		}
	}
	
	private class readThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				byte[] recvBuff = comm.readBuff();
				if(recvBuff.length>0)
				{
					for(int i=0;i<recvBuff.length;i++)
					{
						deque.add(recvBuff[i]);
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
	}
	
	private class dbThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				if(refreshFlag == true)
				{
					Calendar now = Calendar.getInstance();
					if(now.get(Calendar.MINUTE)%5==0)
//					if(true)
					{
						synchronized (modbusPojo)
						{
							filterModbusPojo();
							modbusPojo.setNow(TimeUtil.getDateOfMM(now));
//							modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
							ModbusDao md = new ModbusDaoImpl();
							md.addModbus(modbusPojo);
						}
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(MINUTE);
//					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
		
	}
	
	public void filterModbusPojo()
	{
		modbusPojo.setQua(0);
		if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
	}
	
	public void process()
	{
		try
		{
			TimeUnit.MILLISECONDS.sleep(comm_timeout);
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		recvProcess();
		sendProcess();
	}

	public void recvProcess()
	{
		refreshFlag = false;
		byte[] recvBuff = new byte[RECV_INNER_SIZE];
		while(deque.size()>=RECV_SIZE)
		{
			Byte first = deque.pollFirst();
			if(first == devAddr)
			{
				Byte second = deque.pollFirst();
				if(second == 0x03)
				{
					Byte third = deque.pollFirst();
					if(third == RECV_INNER_SIZE)
					{
						for(int i=0;i<RECV_INNER_SIZE;i++)
						{
							recvBuff[i] = deque.pollFirst();
						}
						deque.pollFirst();
						deque.pollFirst();
						dealRecvBuff(recvBuff);
					}
				}
			}
		}
	}
	
	public void dealRecvBuff(byte[] recvBuff)
	{
		System.out.println(CommUtil.bytesToHex(recvBuff));
		refreshFlag = true;
		getModbusPojo(recvBuff);
//		modbusPojo.print();
	}
	
	public void getModbusPojo(byte[] recvBuff)
	{
		int temp;
		synchronized (modbusPojo)
		{
			for(int i=0;i<recvBuff.length;)
			{
				switch(i)
				{
					case 0:
						temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
						double envTemperature = temp*0.1;
						modbusPojo.setEnvTemperature(envTemperature);
						break;
					case 2:
						temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
						double temperature = temp*0.1;
						modbusPojo.setTemperature(temperature);
						break;
					case 4:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
						double humidity = temp*0.1;
						modbusPojo.setHumidity(humidity);
						break;
					case 6:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
						double pressure = temp*0.1;
						modbusPojo.setPressure(pressure);
						break;
					case 8:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
						modbusPojo.setIrradiance(temp);
						break;
					case 10:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
						modbusPojo.setScaIrradiance(temp);
						break;
					case 12:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
						modbusPojo.setDirIrradiance(temp);
						break;
					case 14:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
						modbusPojo.setWindDir(temp);
						break;
					case 16:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
						double windSpeed = temp*0.1;
						modbusPojo.setWindSpeed(windSpeed);
						break;
					case 18:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
						double windSpeedTwo = temp*0.1;
						modbusPojo.setWindSpeedTwo(windSpeedTwo);
						break;
					case 20:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
						double windSpeedTen = temp*0.1;
						modbusPojo.setWindSpeedTen(windSpeedTen);
						break;
					case 22:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
						modbusPojo.setDailyExposure(temp);
						break;
					case 24:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
						double totalExposure = temp*0.001;
						modbusPojo.setTotalExposure(totalExposure);
						break;
					case 26:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
						double scaExposure = temp*0.001;
						modbusPojo.setScaExposure(scaExposure);
						break;
					case 28:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
						double dirExposure = temp*0.001;
						modbusPojo.setDirExposure(dirExposure);
						break;
				}
				i=i+2;
			}
		}
	}
	
	public void sendProcess()
	{
		byte[] message = new byte[8];
		int sendLen = 0;
		message[sendLen++] = devAddr;
		message[sendLen++] = 0x03;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x0F;
		byte[] crc = CommUtil.CRC16(message,6);
		message[sendLen++] = crc[0];
		message[sendLen++] = crc[1];
		comm.writeBuff(message);
	}

}


上一篇:阿里云双11优惠活动全攻略(不只是拼团哦)


下一篇:Cloud Toolkit 最佳实践之部署包备份