获取遥测数据
TelemetryController
-
首先找到入口,比如获取最新遥测数据方法getLatestTelemetry,/DEVICE/deviceId/values/timeseries 在 TelemetryController 中:
@RestController @TbCoreComponent // TbUrlConstants.TELEMETRY_URL_PREFIX=="/api/plugins/telemetry" @RequestMapping(TbUrlConstants.TELEMETRY_URL_PREFIX) @Slf4j public class TelemetryController extends BaseController { // 这个组件负责获取设备遥测数据 @Autowired private TimeseriesService tsService; // 这个组件时为了验证当前用户是否有权限去执行当前操作 @Autowired private AccessValidator accessValidator; @Value("${transport.json.max_string_value_length:0}") private int maxStringValueLength; private ExecutorService executor; @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET) @ResponseBody public DeferredResult<ResponseEntity> getLatestTimeseries( @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, @RequestParam(name = "keys", required = false) String keysStr, @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException { SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, (result, tenantId, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr, useStrictDataTypes)); } private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys, Boolean useStrictDataTypes) { ListenableFuture<List<TsKvEntry>> future; if (StringUtils.isEmpty(keys)) { // 如果我们不传入键名列表,则返回所有遥测数据。 future = tsService.findAllLatest(user.getTenantId(), entityId); } else { // 如果传入键名列表,则只查找相关的遥测数据。比如查经纬度要将longitude和latitude传入这个接口 future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys)); } Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); } }
AccessValidator
-
这个类主要是用于判断当前访问者是否有访问遥测数据的权限
-
validateEntityAndCallback 重载方法很多,一定要注意区分。这里使用的是第一个方法
@Component public class AccessValidator { ........ // 这是第一个 validateEntityAndCallback 方法,这一步加入了出错处理方法 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException { // 调用了第二个 validateEntityAndCallback 方法 return validateEntityAndCallback(currentUser, operation, entityType, entityIdStr, onSuccess, // 加入出错处理方法 (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); } // 这是第二个 validateEntityAndCallback 方法,这一步根据id和类型(DEVICE)产生EntityId,以便找到这个设备 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException { // 调用第四个 validateEntityAndCallback 方法 return validateEntityAndCallback(currentUser, operation, // 根据类型(DEVICE)和id产生EntityId实例,以便查找设备 EntityIdFactory.getByTypeAndId(entityType, entityIdStr),onSuccess, onFailure); } // 这是第三个 validateEntityAndCallback 方法,没有被调用 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException { return validateEntityAndCallback(currentUser, operation, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR)); } // 这是第四个 validateEntityAndCallback 方法 public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId, ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess, BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException { final DeferredResult<ResponseEntity> response = new DeferredResult<>(); // 调用validate方法,根据当前EntityType来确定使用哪个方法 validate(currentUser, operation, entityId, new HttpValidationCallback(response, new FutureCallback<DeferredResult<ResponseEntity>>() { @Override public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) { try { onSuccess.accept(response, currentUser.getTenantId(), entityId); } catch (Exception e) { onFailure(e); } } @Override public void onFailure(Throwable t) { onFailure.accept(response, t); } })); return response; } public void validate(SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { switch (entityId.getEntityType()) { case DEVICE: // EntityType=='DEVICE',调用validateDevice方法,直接返回响应结果到http response validateDevice(currentUser, operation, entityId, callback); return; case DEVICE_PROFILE: validateDeviceProfile(currentUser, operation, entityId, callback); return; /*。。。。。。。。。。。。。。*/ default: //TODO: add support of other entities throw new IllegalStateException("Not Implemented!"); } } private void validateDevice(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) { // 系统管理员没有查看设备遥测数据的权限 if (currentUser.isSystemAdmin()) { callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION)); } else { // 根据TenantId和DeviceId来查找Device实例 ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(currentUser.getTenantId(), new DeviceId(entityId.getId())); Futures.addCallback(deviceFuture, getCallback(callback, device -> { if (device == null) { return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND); } else { try { // 查找权限列表判断当前用户有无读取设备遥测数据的权限。如果没有权限将直接报错,然后出错处理 accessControlService.checkPermission(currentUser, Resource.DEVICE, operation, entityId, device); } catch (ThingsboardException e) { return ValidationResult.accessDenied(e.getMessage()); } return ValidationResult.ok(device); } }), executor); } } public interface ThreeConsumer<A, B, C> { void accept(A a, B b, C c); } }
TimeseriesService
- 这个类用于查找最新遥测数据
@Service
@Slf4j
public class BaseTimeseriesService implements TimeseriesService {
// 这个组件负责查找所有最新遥测数据
@Autowired
private TimeseriesLatestDao timeseriesLatestDao;
@Override
public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId);
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)));
return Futures.allAsList(futures);
}
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
//校验当前deviceId是否合法
validate(entityId);
// 根据tenantId和deviceId返回结果
return timeseriesLatestDao.findAllLatest(tenantId, entityId);
}
}
TimeseriesLatestDao
- 时序查找遥测数据的Dao层接口
@Slf4j
@Component
@SqlTsLatestAnyDaopublic
class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao { @Autowired
private SearchTsKvLatestRepository searchTsKvLatestRepository;
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
return getFindAllLatestFuture(entityId);
}
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {
return Futures.immediateFuture(DaoUtil.convertDataList(Lists.newArrayList(
searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
}
}
SearchTsKvLatestDao
- 时序键值查找遥测数据的接口,在这里可以找到sql语句
@SqlTsLatestAnyDao@Repositorypublic class SearchTsKvLatestRepository {
public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId";
public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," + " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " + "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";
@PersistenceContext
private EntityManager entityManager;
public List<TsKvLatestEntity> findAllByEntityId(UUID entityId) {
return entityManager.createNamedQuery(FIND_ALL_BY_ENTITY_ID,
TsKvLatestEntity.class.setParameter("id",entityId).getResultList());
}
}
EntityManager
- 进行并发查找的类,里面有很多加锁、刷新数据、查找的方法
Futures
- 异步响应组件。