上篇写到,将设备数据改传到monggodb,本篇博文记录一下具体过程。
SpringBoot内置Mongdb模块MongoTemplate,类似于RedisTemplate
1.添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency>
2.增加配置
spring: data: mongodb: uri: mongodb://username:password@ip:posrt/database
3.配置连接信息(这里简单连接,如果需要配置连接池等信息,可自行查看源码,在URL后拼接参数)
com.mongodb.MongoClientSettings.Builder#applyConnectionString
private static final String MONGODB_PREFIX = "mongodb://"; private static final String MONGODB_SRV_PREFIX = "mongodb+srv://"; private static final Set<String> ALLOWED_OPTIONS_IN_TXT_RECORD = new HashSet<String>(asList("authsource", "replicaset")); private static final String UTF_8 = "UTF-8"; private static final Logger LOGGER = Loggers.getLogger("uri"); private final MongoCredential credential; private final boolean isSrvProtocol; private final List<String> hosts; private final String database; private final String collection; private final String connectionString; private ReadPreference readPreference; private WriteConcern writeConcern; private Boolean retryWrites; private Boolean retryReads; private ReadConcern readConcern; private Integer minConnectionPoolSize; private Integer maxConnectionPoolSize; private Integer maxWaitTime; private Integer maxConnectionIdleTime; private Integer maxConnectionLifeTime; private Integer connectTimeout; private Integer socketTimeout; private Boolean sslEnabled; private Boolean sslInvalidHostnameAllowed; private String requiredReplicaSetName; private Integer serverSelectionTimeout; private Integer localThreshold; private Integer heartbeatFrequency; private String applicationName; private List<MongoCompressor> compressorList; private UuidRepresentation uuidRepresentation;
4.去除_class,添加自动索引
/** * @program: * @description: 芒果DBhepper配置项 * @Author: Zhangyb * @CreateDate: 15:37 * @UpdateUser: * @UpdateDate * @UpdateRemark: * @Version: 1.0 */ @Configuration //@ComponentScan(basePackages = {"com.bysk.base.mongodb"}) // 聚合工程, public class MongoConfig { @Autowired private MongoDatabaseFactory mongoDatabaseFactory; @Autowired private MongoMappingContext mongoMappingContext; @Bean public MappingMongoConverter mappingMongoConverter() { mongoMappingContext.setAutoIndexCreation(true); mongoMappingContext.afterPropertiesSet(); DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDatabaseFactory); MappingMongoConverter converter = new MappingMongoConverter(dbRefResolver, mongoMappingContext); // 此处是去除插入数据库的 _class 字段 converter.setTypeMapper(new DefaultMongoTypeMapper(null)); return converter; } }
5.创建集合实体类
/** * 设备数据 * * @author Mark sunlightcs@gmail.com * @since 1.0.0 2020-11-06 */ @Data @Document(value="device_monitor_record") @ToString public class MgdbDeviceMonitorRecord { @Id String id; @ApiModelProperty(value = "mqtt上传的设备序列号") private String equipmentId; private String monitorTime; @ApiModelProperty("空气湿度") private BigDecimal airHumidity; @ApiModelProperty("大气温度") private BigDecimal airTemperature; }
6.分页对象
import java.io.Serializable; import java.util.List; /** * @program: bysk * @Description: mongodb 分页对象 * @Author: johnny * @CreateDate: 2021/1/8 16:07 * @UpdateUser: 更新者 * @UpdateDate: 2021/1/8 16:07 * @UpdateRemark: 更新说明 * @Version: 1.0 */ public class Page<T> implements Serializable { private static final long serialVersionUID = 1L; /** * 总数 */ protected long total = 0; /** * 每页显示条数,默认 10 */ protected long size = 10; /** * 当前页 */ protected long current = 1; /** * 总页数 */ protected long pages = 1; /** * 结果列表 */ private List<T> rows; public Page(){ this.current = 1; this.size = 10; } public Page(int currentPage, int pageSize){ this.current=currentPage<=0?1:currentPage; this.size=pageSize<=0?1:pageSize; } public long getSize() { return size; } public void setSize(long pageSize) { this.size = pageSize; } public long getCurrent() { return this.current; } public Page<T> setCurrent(long current) { this.current = current; return this; } public long getTotal() { return this.total; } public Page<T> setTotal(long total) { this.total = total; return this; } public void setPages(long pages){ this.pages = pages; } public long getPages(){ return this.pages; } /** * 设置结果 及总页数 * @param rows */ public void build(List<T> rows) { this.setRows(rows); long count = this.getTotal(); long divisor = count / this.getSize(); long remainder = count % this.getSize(); this.setPages(remainder == 0 ? divisor == 0 ? 1 : divisor : divisor + 1); } public List<T> getRows() { return rows; } public void setRows(List<T> rows) { this.rows = rows; } }
7.CRUDbase接口
import com.bysk.base.mongdb.pojo.Page; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.stereotype.Service; import java.util.List; /** * @program: bysk * @Description: 作用描述 * @Author: johnny * @CreateDate: 2021/1/8 16:20 * @UpdateUser: 更新者 * @UpdateDate: 2021/1/8 16:20 * @UpdateRemark: 更新说明 * @Version: 1.0 */ public interface IBaseMongoService<T> { /** * 保存一个对象到mongodb * * @param entity * @return */ public T save(T entity) ; /** * 根据id删除对象 */ public void deleteById(String id); /** * 根据对象的属性删除 * @param t */ public void deleteByCondition(T t); /** * 根据id进行更新 * @param id * @param t */ public void updateById(String id, T t); /** * 根据对象的属性查询 * @param t * @return */ public List<T> findByCondition(T t); /** * 通过条件查询实体(集合) * * @param query */ public List<T> find(Query query) ; /** * 通过一定的条件查询一个实体 * * @param query * @return */ public T findOne(Query query) ; /** * 通过一定的条件查询一个实体 * * @param t * @return */ public T findOne(T t) ; /** * 通过条件查询更新数据 * * @param query * @param update * @return */ public void update(Query query, Update update) ; /** * 通过ID获取记录 * * @param id * @return */ public T findById(String id) ; /** * 通过ID获取记录,并且指定了集合名(表的意思) * * @param id * @param collectionName * 集合名 * @return */ public T findById(String id, String collectionName) ; /** * 通过条件查询,查询分页结果 * @param page * @param query * @return */ public Page<T> findPage(Page<T> page, Query query); public Page<T> findPageByCondition(Page<T> page,T t); /** * 求数据总和 * @param query * @return */ public long count(Query query); /** * 获取MongoDB模板操作 * @return */ public MongoTemplate getMongoTemplate(); }
8.CRUDbase接口实现(这里mongdb住建名称默认ID,如果要灵活控制,可根据反射获取@ID注解设置)
import cn.craccd.mongoHelper.utils.FormatUtils; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.bysk.base.mongdb.servicebase.IBaseMongoService; import com.bysk.base.mongdb.pojo.Page; import lombok.extern.slf4j.Slf4j; import org.bson.Document; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.convert.UpdateMapper; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import javax.annotation.PostConstruct; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.util.List; /** * @program: bysk * @Description: 作用描述 * @Author: johnny * @CreateDate: 2021/1/8 16:21 * @UpdateUser: 更新者 * @UpdateDate: 2021/1/8 16:21 * @UpdateRemark: 更新说明 * @Version: 1.0 */ @Slf4j public abstract class BaseMongoServiceImpl<T> implements IBaseMongoService<T> { @Autowired protected MongoTemplate mongoTemplate; @Autowired MongoConverter mongoConverter; QueryMapper queryMapper; UpdateMapper updateMapper; @Value("${spring.profiles.active}") private String env; private Boolean print; @PostConstruct public void init() { queryMapper = new QueryMapper(mongoConverter); updateMapper = new UpdateMapper(mongoConverter); print = StrUtil.containsAny(env,"dev","test"); print=false; } /** * 保存一个对象到mongodb * @param bean * @return */ @Override public T save(T bean) { logSave(bean); mongoTemplate.save(bean); return bean; } /** * 根据id删除对象 */ @Override public void deleteById(String id) { Query query = new Query(); query.addCriteria(Criteria.where("id").is(id)); logDelete(query); mongoTemplate.remove(this.findById(id)); } /** * 根据对象的属性删除 * @param t */ @Override public void deleteByCondition(T t) { Query query = buildBaseQuery(t); logDelete(query); mongoTemplate.remove(query, getEntityClass()); } /** * 根据id进行更新 * @param id * @param t */ @Override public void updateById(String id, T t) { Query query = new Query(); query.addCriteria(Criteria.where("id").is(id)); Update update = buildBaseUpdate(t); logUpdate(query,update,false); update(query, update); } /** * 根据对象的属性查询 * @param t * @return */ @Override public List<T> findByCondition(T t) { Query query = buildBaseQuery(t); logQuery(query); return mongoTemplate.find(query, getEntityClass()); } /** * 通过条件查询实体(集合) * @param query * @return */ @Override public List<T> find(Query query) { logQuery(query); return mongoTemplate.find(query, this.getEntityClass()); } /** * 通过一定的条件查询一个实体 * @param query * @return */ @Override public T findOne(Query query) { logQuery(query); return mongoTemplate.findOne(query, this.getEntityClass()); } @Override public T findOne(T t) { Query query = buildBaseQuery(t); return findOne(query); } /** * 通过条件查询更新数据 * @param query * @param update */ @Override public void update(Query query, Update update) { logUpdate(query,update,false); mongoTemplate.updateMulti(query, update, this.getEntityClass()); } /** * 通过ID获取记录 * @param id * @return */ @Override public T findById(String id) { Class<T> entityClass = this.getEntityClass(); logQuery( new Query(Criteria.where("id").is(id))); return mongoTemplate.findById(id, entityClass); } /** * 通过ID获取记录,并且指定了集合名(表的意思) * @param id * @param collectionName * @return */ @Override public T findById(String id, String collectionName) { return mongoTemplate.findById(id, this.getEntityClass(), collectionName); } /** * 通过条件查询,查询分页结果 * @param page * @param query * @return */ @Override public Page<T> findPage(Page<T> page, Query query) { //如果没有条件 则所有全部 query=query==null?new Query(Criteria.where("_id").exists(true)):query; long count = this.count(query); // 总数 page.setTotal(count); long currentPage = page.getCurrent(); long pageSize = page.getSize(); query.skip((currentPage - 1) * pageSize).limit(Convert.toInt(pageSize)); logQuery(query); List<T> rows = this.find(query); page.build(rows); return page; } @Override public Page<T> findPageByCondition(Page<T> page,T t){ Query query = buildBaseQuery(t); return findPage(page,query); } /** * 求数据总和 * @param query * @return */ @Override public long count(Query query){ return mongoTemplate.count(query, this.getEntityClass()); } /** * 根据vo构建查询条件Query * @param t * @return */ private Query buildBaseQuery(T t) { Query query = new Query(); Field[] fields = t.getClass().getDeclaredFields(); for (Field field : fields) { field.setAccessible(true); try { Object value = field.get(t); if (value != null) { query.addCriteria(Criteria.where(field.getName()).is(value)); } } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } return query; } /** * 根据vo构建更新条件Query * @param t * @return */ private Update buildBaseUpdate(T t) { Update update = new Update(); Field[] fields = t.getClass().getDeclaredFields(); for (Field field : fields) { field.setAccessible(true); try { Object value = field.get(t); if (value != null) { update.set(field.getName(), value); } } catch (Exception e) { e.printStackTrace(); } } return update; } /** * 获取需要操作的实体类class * @return */ @SuppressWarnings("unchecked") protected Class<T> getEntityClass() { return getSuperClassGenricType(getClass(),0); } /** * 获取MongoDB模板操作 * @return */ @Override public MongoTemplate getMongoTemplate() { return mongoTemplate; } private Class getSuperClassGenricType(final Class clazz, final int index){ Type genType = clazz.getGenericSuperclass(); if (!(genType instanceof ParameterizedType)) { log.warn(clazz.getSimpleName() + "'s superclass not ParameterizedType"); return Object.class; } Type[] params = ((ParameterizedType) genType).getActualTypeArguments(); if (index >= params.length || index < 0) { log.warn("Index: " + index + ", Size of " + clazz.getSimpleName() + "'s Parameterized Type: " + params.length); return Object.class; } if (!(params[index] instanceof Class)) { log.warn(clazz.getSimpleName() + " not set the actual class on superclass generic parameter"); return Object.class; } return (Class) params[index]; } /** * 打印查询语句 * * @param query */ private void logQuery( Query query) { if (print) { Class<?> clazz = this.getEntityClass(); MongoPersistentEntity<?> entity = mongoConverter.getMappingContext().getPersistentEntity(clazz); Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); Document mappedField = queryMapper.getMappedObject(query.getFieldsObject(), entity); Document mappedSort = queryMapper.getMappedObject(query.getSortObject(), entity); String logStr = "\ndb." + StrUtil.lowerFirst(clazz.getSimpleName()) + ".find("; logStr += FormatUtils.bson(mappedQuery.toJson()) + ")"; if (!query.getFieldsObject().isEmpty()) { logStr += ".projection("; logStr += FormatUtils.bson(mappedField.toJson()) + ")"; } if (query.isSorted()) { logStr += ".sort("; logStr += FormatUtils.bson(mappedSort.toJson()) + ")"; } if (query.getLimit() != 0l) { logStr += ".limit(" + query.getLimit() + ")"; } if (query.getSkip() != 0l) { logStr += ".skip(" + query.getSkip() + ")"; } logStr += ";"; log.info(logStr); } } /** * 打印查询语句 * * @param query */ private void logCount( Query query) { if (print) { Class<?> clazz = this.getEntityClass(); MongoPersistentEntity<?> entity = mongoConverter.getMappingContext().getPersistentEntity(clazz); Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); String logStr = "\ndb." + StrUtil.lowerFirst(clazz.getSimpleName()) + ".find("; logStr += FormatUtils.bson(mappedQuery.toJson()) + ")"; logStr += ".count();"; log.info(logStr); } } /** * 打印查询语句 * * @param query */ private void logDelete(Query query) { if (print) { Class<?> clazz = this.getEntityClass(); MongoPersistentEntity<?> entity = mongoConverter.getMappingContext().getPersistentEntity(clazz); Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); String logStr = "\ndb." + StrUtil.lowerFirst(clazz.getSimpleName()) + ".remove("; logStr += FormatUtils.bson(mappedQuery.toJson()) + ")"; logStr += ";"; log.info(logStr); } } /** * 打印查询语句 * * @param query */ private void logUpdate( Query query, Update update, boolean multi) { if (print) { Class<?> clazz = this.getEntityClass(); MongoPersistentEntity<?> entity = mongoConverter.getMappingContext().getPersistentEntity(clazz); Document mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); Document mappedUpdate = updateMapper.getMappedObject(update.getUpdateObject(), entity); String logStr = "\ndb." + StrUtil.lowerFirst(clazz.getSimpleName()) + ".update("; logStr += FormatUtils.bson(mappedQuery.toJson()) + ","; logStr += FormatUtils.bson(mappedUpdate.toJson()) + ","; logStr += FormatUtils.bson("{multi:" + multi + "})"); logStr += ";"; log.info(logStr); } } /** * 打印查询语句 * * @param object * */ private void logSave(Object object) { if (print) { String logStr = "\ndb." + StrUtil.lowerFirst(object.getClass().getSimpleName()) + ".save("; logStr += JSONUtil.toJsonPrettyStr(object); logStr += ");"; log.info(logStr); } } /** * 打印查询语句 * */ private void logSave(List<?> list) { if (print && list.size() > 0) { Object object = list.get(0); String logStr = "\ndb." + StrUtil.lowerFirst(object.getClass().getSimpleName()) + ".save("; logStr += JSONUtil.toJsonPrettyStr(list); logStr += ");"; log.info(logStr); } } }
10.仿mybatisplus方式实现单表CRUD接口以及接口实现类
import com.bysk.base.mongdb.pojo.MgdbDeviceMonitorRecord; import com.bysk.base.mongdb.servicebase.IBaseMongoService; public interface MgdbDeviceMonitorRecordService extends IBaseMongoService<MgdbDeviceMonitorRecord> { }
import com.bysk.base.mongdb.pojo.MgdbDeviceMonitorRecord; import com.bysk.base.mongdb.service.MgdbDeviceMonitorRecordService; import com.bysk.base.mongdb.servicebase.impl.BaseMongoServiceImpl; import org.springframework.stereotype.Service; @Service public class MgdbDeviceMonitorRecordServiceImpl extends BaseMongoServiceImpl<MgdbDeviceMonitorRecord> implements MgdbDeviceMonitorRecordService { }
11.使用
@Autowired MgdbDeviceMonitorRecordService mgdbDeviceMonitorRecordService; public void saveOrUpdateForMgDb(String equipmentId, DeviceMonitorRecordDTO bean) { //1.取得数据存储时间 LocalDateTime monitorTime = bean.getMonitorTime(); //2.计算时间偏移量 LocalDateTime frontTime = monitorTime.minusSeconds(dataRangeTime); LocalDateTime backTime = monitorTime.plusSeconds(dataRangeTime); //3.构建查询条件 Query query = new Query(); query.addCriteria(Criteria.where("equipmentId").is(equipmentId).and("monitorTime") .gte(LocalDateTimeUtil.format(frontTime, DatePattern.NORM_DATETIME_PATTERN)) .lte(LocalDateTimeUtil.format(backTime, DatePattern.NORM_DATETIME_PATTERN))); MgdbDeviceMonitorRecord oneByQuery1 = mgdbDeviceMonitorRecordService.findOne(query); // 5.存储逻辑 if (oneByQuery1 == null) { // 不存在 保存_ String beanStr = JSONUtil.toJsonStr(bean); MgdbDeviceMonitorRecord mgdbDeviceMonitorRecord = JSONUtil.toBean(beanStr, MgdbDeviceMonitorRecord.class); mgdbDeviceMonitorRecord.setMonitorTime(LocalDateTimeUtil.format(bean.getMonitorTime(), DatePattern.NORM_DATETIME_PATTERN)); mgdbDeviceMonitorRecord.setId(null); mgdbDeviceMonitorRecordService.save(mgdbDeviceMonitorRecord); log.info(monitorTime+"==数据存储芒果DB成功=="); } else { String beanStr = JSONUtil.toJsonStr(bean); MgdbDeviceMonitorRecord mgdbDeviceMonitorRecord = JSONUtil.toBean(beanStr, MgdbDeviceMonitorRecord.class); mgdbDeviceMonitorRecord.setMonitorTime(LocalDateTimeUtil.format(bean.getMonitorTime(), DatePattern.NORM_DATETIME_PATTERN)); mgdbDeviceMonitorRecord.setId(oneByQuery1.getId()); mgdbDeviceMonitorRecordService.updateById(oneByQuery1.getId(),mgdbDeviceMonitorRecord); log.info(monitorTime+"==数据更新芒果DB成功=="); } }