思路:
socket必须要随项目启动时启动,所以需用Spring自带的监听器,需要保持长连接,要用死循环,所以必须另外起线程,不能阻碍主线程运行
1.在项目的web.xml中配置listener
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<listener>
<listener-class>com.ra.car.utils.MyListener</listener-class>
</listener>
2.因为是一个独立的线程,所以需要调用的注入类不能通过@resource或@aotowire注入,需要应用上下文获取
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.0.xsd"> <!-- 扫描包加载Service实现类 -->
<context:component-scan base-package="com.ra.*.service.impl"></context:component-scan>
<bean id="DataCallBackService" class="com.ra.truck.service.impl.DataCallBackServiceImpl"/>
<bean id="RdTrackInfoService" class="com.ra.truck.service.impl.RdTrackInfoServiceImpl"/>
<bean id="OutInterfaceService" class="com.ra.truck.service.impl.OutInterfaceImpl"/>
<bean id="RdPhotoInfoService" class="com.ra.truck.service.impl.RdPhotoInfoServiceImpl"/>
<bean id="MessagePackegerService" class="com.ra.truck.service.impl.MessagePackegerServiceImpl"/>
<!--<bean id="redis" class="com.ra.redis.service.impl.JedisClientCluster"/>-->
</beans>
3.创建listener监听器类
package com.ra.car.utils; import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import com.ra.car.rabbitMQ.PBWRabbitMQCustomer;
import com.ra.car.rabbitMQ.RabbitMQCustomer; /**
* listener监听器类
*
*/
public class MyListener implements ServletContextListener { protected static final Logger logge = LoggerFactory
.getLogger(MyListener.class); @Override
public void contextInitialized(ServletContextEvent arg0) {
//必须单独启线程去跑listener
Mythread myThread = new Mythread();
//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
// ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// cachedThreadPool.execute(myThread);
Thread thread = new Thread(myThread);
thread.start();
//启动MQTT
// MQTTSubMsg client = new MQTTSubMsg();
// client.start();
RabbitMQCustomer customer=new RabbitMQCustomer();
Thread threadCustomer = new Thread(customer);
threadCustomer.start(); PBWRabbitMQCustomer pbwcustomer=new PBWRabbitMQCustomer();
Thread pbwT = new Thread(pbwcustomer);
pbwT.start();
} @Override
public void contextDestroyed(ServletContextEvent arg0) {
logge.info("进入ListenerUtil的contextDestroyed方法.........");
} }
package com.ra.car.utils; import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* 多线程类
*
*/
public class Mythread implements Runnable{ protected static final Logger logge = LoggerFactory
.getLogger(Mythread.class); @Override
public void run() {
logge.info("进入ListenerUtil的contextInitialized方法.........");
try {
ServerSocket serverSocket = new ServerSocket(8888);
logge.info("socket通信服务端已启动,等待客户端连接.......");
logge.info("我是111111111111111");
while (true) {
Socket socket = serverSocket.accept();// 侦听并接受到此套接字的连接,返回一个Socket对象
JavaTCPServer socketThread = new JavaTCPServer(socket);
socketThread.run();
try {
//休眠10毫秒,压力测试50000次连接无压力
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
logge.error("通信服务器启动失败!", e);
}
}
public static String stampToDate(String s){
Long timestamp = Long.parseLong(s)*1000;
String date = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(timestamp)); return date;
} }
package com.ra.car.utils; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class JavaTCPServer {
protected static final Logger logger=LoggerFactory.getLogger(JavaTCPServer.class); private Socket socket; public JavaTCPServer(Socket socket) {
this.socket = socket;
} public void run() {
MyThread2 myThread2=null;
try {
myThread2 = new MyThread2(socket);
} catch (IOException e) {
e.printStackTrace();
}
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(myThread2);
} }
package com.ra.car.utils; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ra.truck.model.RdDeviceCallBackDataDomain;
import com.ra.truck.service.DataCallBackService;
import com.ra.truck.service.RdPhotoInfoService;
import com.ra.truck.service.RdTrackInfoService;
import com.ra.truck.service.outInterface.OutInterfaceService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.ContextLoader; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.*; public class MyThread2 implements Runnable { protected static final Logger logger = LoggerFactory
.getLogger(MyThread2.class); private Socket socket;
private InputStream inputStream;
private OutputStream outputStream;
private PrintWriter printWriter; private int totalCount; //总数量 private int adasCount; // 传输的ADAS信号数量
private int gpsCount; // 传输的GPS信号数量
private DataCallBackService dataCallBackService;//数据回传private SimpleDateFormat df; public MyThread2(Socket socket) throws IOException {
this.socket = socket;
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
printWriter = new PrintWriter(outputStream); dataCallBackService=(DataCallBackService)
ContextLoader.getCurrentWebApplicationContext().getBean("DataCallBackService");
df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
} @Override
public void run() {
// 根据输入输出流和客户端连接 // 得到一个输入流,接收客户端传递的信息
// InputStreamReader inputStreamReader = new InputStreamReader(
// inputStream);// 提高效率,将自己字节流转为字符流
// bufferedReader = new BufferedReader(inputStreamReader);// 加入缓冲区
Date timestart = new Date();
Date timeend = null;
long minuine = 0;
int count = 0;
while (true) {
try {
if (inputStream.available() > 0 == false) {
timeend = new Date();
minuine = timeend.getTime() - timestart.getTime();
if (minuine != 0 && (minuine / 1000) > 60) {
break;
}
continue;
} else {
timestart = new Date();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
logger.error("*****线程休眠出现异常*****", e);
}
count = inputStream.available();
byte[] b = new byte[count];
int readCount = 0; // 已经成功读取的字节的个数
while (readCount < count) {
readCount += inputStream.read(b, readCount, count
- readCount);
}
logger.info("**********当前服务器正在被连接**********");
logger.info("正在连接的客户端IP为:"
+ socket.getInetAddress().getHostAddress()); logger.info("当前时间为:" + df.format(new Date()));
String data = new String(b, "utf-8");
logger.info("传输过来的info:" + data);
String id = jsonStringToObject(data);
Map<Object, Object> map = new HashMap<Object, Object>();
//心跳发送不带id的json数据
if (StringUtils.isNotBlank(id)) {
map.put("id", id);
}
map.put("resultCode", "1");
map.put("result", "success");
printWriter.print(JSON.toJSONString(map) + "\n");
printWriter.flush();
}
} catch (Exception e) {
logger.error("数据传输出现异常", e);
try {
outputStream = socket.getOutputStream();
} catch (IOException e1) {
logger.error("获取outputStream出现异常");
}
// 获取一个输出流,向服务端发送信息
// printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("resultCode", "0");
map.put("result", "fail");
printWriter.print(JSON.toJSONString(map) + "\n");
printWriter.flush();
}
}
try {
printWriter.close();
outputStream.close();
inputStream.close();
logger.info("30s没有发送数据,服务端主动关闭连接");
logger.info("被断开的客户端IP为:"
+ socket.getInetAddress().getHostAddress());
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
logger.info("被断开的时间为:" + df.format(new Date()));
socket.close();
} catch (IOException e) {
logger.error("关闭socket出现异常", e); } /*
* while ((temp = bufferedReader.readLine()) != null) { info += temp;
* logger.info(bufferedReader.readLine());
* logger.info("已接收到客户端连接!!!!!!"); logger.info("服务端接收到客户端信息:" +
* info + ",当前客户端ip为:" + socket.getInetAddress().getHostAddress());
* logger.info("服务端接收到客户端信息:" + info + ",当前客户端ip为:" +
* socket.getInetAddress().getHostAddress()); }
*/ /*
* logger.info("*****测试Redis*****"); JedisClient
* jedisClient=(JedisClient)
* ContextLoader.getCurrentWebApplicationContext().getBean("redis");
* jedisClient.set("testLanHao", "123456789"); String
* str=jedisClient.get("testLanHao");
* logger.info("从Redis中取得数据为:"+str);
* logger.info("*****测试Redis*****");
*/ // ApplicationContext applicationContext=new
// ClassPathXmlApplicationContext("classpath*:applicationContext-*.xml");
// RiskManageService
// riskManageService=applicationContext.getBean(RiskManageService.class);
// socket单独线程,需要重新加载上下文,扫描的类在applicationContext-service.xml配置
/*
* RiskManageService riskManageService=(RiskManageService)
* ContextLoader.getCurrentWebApplicationContext().getBean("risk");
* RdRiskEventInfo rdRiskEventInfo=new RdRiskEventInfo();
* rdRiskEventInfo.setId("10"); try { List<RdPhotoInfo>
* list=riskManageService.findPhotoInfoByEventId(rdRiskEventInfo);
* logger.info(list); } catch (ServiceException e) {
* e.printStackTrace(); }
*/
// outputStream = socket.getOutputStream();// 获取一个输出流,向服务端发送信息
// printWriter = new PrintWriter(outputStream);// 将输出流包装成打印流 } private String jsonStringToObject(String data) {
//数据解析方法return xx;
}
public static Date stampToDate(String s){ Long timestamp = Long.parseLong(s)*1000;
Date date = new Date(timestamp); return date;
}