package com.liujin.cms.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.listener.MessageListener; import com.alibaba.fastjson.JSON; import com.liujin.cms.dao.PlanDao; import com.liujin.cms.domain.Plan; public class ArticleListener implements MessageListener<String, String>{ @Autowired PlanDao planDao; @Override public void onMessage(ConsumerRecord<String, String> data) { // TODO Auto-generated method stub System.err.println("接收到了消息"); String value = data.value(); //json转成对象 Plan plan = JSON.parseObject(value, Plan.class); planDao.save(plan); System.err.println("保存成功"); } }
package com.bw.test; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.util.ArrayList; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.alibaba.fastjson.JSON; import com.bw.bean.Plan; import com.bw.utils.StreamUtil; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:producer.xml") public class MyTest { @Autowired KafkaTemplate<String, String> kafkaTemplate; @SuppressWarnings("unchecked") @Test public void readtest() throws FileNotFoundException { ArrayList<Plan> list = new ArrayList<Plan>(); File file = new File("E:/a/data.txt"); FileInputStream inputStream = new FileInputStream(file); List<String> readLine = StreamUtil.readLine(inputStream); readLine.remove(0); for (String string : readLine) { Plan plan = new Plan(); String[] split = string.split("\\|\\|"); //System.out.println(line); //每个|都要转义 String name=split[0]; double amount=Double.parseDouble(split[1]); String manager=split[3]; String content=split[2]; Integer dept_id=null; if ("药厂".equals(split[4])) { dept_id=1; }else if ("准能选煤厂".equals(split[4])) { dept_id=2; }else if ("洗选车间".equals(split[4])) { dept_id=3; }else if ("生产服务中心".equals(split[4])) { dept_id=4; }else if ("矸电公司".contains(split[4])) { dept_id=5; }else if ("大准铁路公司".equals(split[4])) { dept_id=6; } plan.setName(name); plan.setAmount(amount); plan.setManager(manager); plan.setContent(content); plan.setDept_id(dept_id); String jsonString = JSON.toJSONString(plan); kafkaTemplate.send("zhunneng",jsonString); list.add(plan); } for (Plan plan : list) { System.out.println(plan); } } }