小序
到新公司不久,就接到一个任务:有个发送方,会通过udp发送一些信息,然后服务接收到信息后保存到数据库的一张表A,保存的这些数据在经过一系列处理,处理完成后累积到另一张表B,然后清空处理的表A的数据。目前发送方比较少,不久就要增加到100个。
方案
我采用netty5来进行udp的网络通讯,将接收到的数据保存到BlockingQueue中,然后读取BlockingQueue中的数据,取到100条就存到hbase数据库中。
部分代码
初始化netty
int DEFAULT_PORT = 6000;
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
.handler(new UdpServerHandler());
Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel();
channel.closeFuture().await();
LOGGER.info("netty初始化成功!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
接收udp数据
public BlockingQueue<Map<String, Object>> queue =
new LinkedBlockingQueue<Map<String, Object>>(990000);
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。
String result = msg.content().toString(CharsetUtil.UTF_8); Map<String, Object> getMap = new HashMap<String, Object>();
//处理数据 queue.put(getMap); ctx.writeAndFlush(new DatagramPacket(
Unpooled.copiedBuffer("结果:", CharsetUtil.UTF_8), msg.sender()));
}
读取数据存hbase
public void getDate() {
LOGGER.info("开始取数据");
List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>();
while (true) {
Map<String, Object> takeMap = null;
try {
takeMap = queue.take();
if (takeMap == null) {
continue;
}
jsonList.add(takeMap);
if (jsonList.size() == 100) {
String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList);
LOGGER.info(httpJson);
List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson);
HbaseAPI.insertDataList(hbaseDatas);
jsonList.clear();
LOGGER.info("hbase存了100条");
}
} catch (Exception e) {
jsonList.clear();
continue;
}
} }
遇到的坑
- BlockingQueue一定要设置大小,不设置是int最大值,有可能会内存溢出;
- 从BlockingQueue取数据的时候一定要阻塞式取take(),负责会死循环,占CPU100%;
- hbase库连接时是阻塞式的,如果连接不上会一直阻塞。