Thingsboard源码分析(三)遥测数据获取

获取遥测数据

TelemetryController

  • 首先找到入口,比如获取最新遥测数据方法getLatestTelemetry,/DEVICE/deviceId/values/timeseries 在 TelemetryController 中:

    @RestController
    @TbCoreComponent
    // TbUrlConstants.TELEMETRY_URL_PREFIX=="/api/plugins/telemetry"
    @RequestMapping(TbUrlConstants.TELEMETRY_URL_PREFIX)
    @Slf4j
    public class TelemetryController extends BaseController {
    
        // 这个组件负责获取设备遥测数据
        @Autowired
        private TimeseriesService tsService;
    
        // 这个组件时为了验证当前用户是否有权限去执行当前操作
        @Autowired
        private AccessValidator accessValidator;
    
        @Value("${transport.json.max_string_value_length:0}")
        private int maxStringValueLength;
    
        private ExecutorService executor;
        
        @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
        @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET)
        @ResponseBody
        public DeferredResult<ResponseEntity> getLatestTimeseries(
                @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr,
                @RequestParam(name = "keys", required = false) String keysStr,
                @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException {
            SecurityUser user = getCurrentUser();
    
            return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr,
                    (result, tenantId, entityId) -> getLatestTimeseriesValuesCallback(result, user, entityId, keysStr, useStrictDataTypes));
        }
        
        private void getLatestTimeseriesValuesCallback(@Nullable DeferredResult<ResponseEntity> result, SecurityUser user, EntityId entityId, String keys, Boolean useStrictDataTypes) {
            ListenableFuture<List<TsKvEntry>> future;
            if (StringUtils.isEmpty(keys)) {
                // 如果我们不传入键名列表,则返回所有遥测数据。
                future = tsService.findAllLatest(user.getTenantId(), entityId);
            } else {
                // 如果传入键名列表,则只查找相关的遥测数据。比如查经纬度要将longitude和latitude传入这个接口
                future = tsService.findLatest(user.getTenantId(), entityId, toKeysList(keys));
            }
            Futures.addCallback(future, getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor());
        }
        
    }
    

AccessValidator

  • 这个类主要是用于判断当前访问者是否有访问遥测数据的权限

  • validateEntityAndCallback 重载方法很多,一定要注意区分。这里使用的是第一个方法

    @Component
    public class AccessValidator {
        ........
        // 这是第一个 validateEntityAndCallback 方法,这一步加入了出错处理方法
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException {
            // 调用了第二个 validateEntityAndCallback 方法
            return validateEntityAndCallback(currentUser, operation, entityType, entityIdStr, onSuccess,
                   // 加入出错处理方法                          
             	   (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
        }
        
        // 这是第二个 validateEntityAndCallback 方法,这一步根据id和类型(DEVICE)产生EntityId,以便找到这个设备
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, String entityType, String entityIdStr,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess,
                                                                        BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
            // 调用第四个 validateEntityAndCallback 方法
            return validateEntityAndCallback(currentUser, operation,                                    
                   // 根据类型(DEVICE)和id产生EntityId实例,以便查找设备                          
                   EntityIdFactory.getByTypeAndId(entityType, entityIdStr),onSuccess, onFailure);
        }
    	
        // 这是第三个 validateEntityAndCallback 方法,没有被调用
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess) throws ThingsboardException {
            return validateEntityAndCallback(currentUser, operation, entityId, onSuccess, (result, t) -> handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR));
        }
    
        // 这是第四个 validateEntityAndCallback 方法
        public DeferredResult<ResponseEntity> validateEntityAndCallback(SecurityUser currentUser, Operation operation, EntityId entityId,
                                                                        ThreeConsumer<DeferredResult<ResponseEntity>, TenantId, EntityId> onSuccess,
                                                                        BiConsumer<DeferredResult<ResponseEntity>, Throwable> onFailure) throws ThingsboardException {
    
            final DeferredResult<ResponseEntity> response = new DeferredResult<>();
    		// 调用validate方法,根据当前EntityType来确定使用哪个方法
            validate(currentUser, operation, entityId, new HttpValidationCallback(response,
                    new FutureCallback<DeferredResult<ResponseEntity>>() {
                        @Override
                        public void onSuccess(@Nullable DeferredResult<ResponseEntity> result) {
                            try {
                                onSuccess.accept(response, currentUser.getTenantId(), entityId);
                            } catch (Exception e) {
                                onFailure(e);
                            }
                        }
    
                        @Override
                        public void onFailure(Throwable t) {
                            onFailure.accept(response, t);
                        }
                    }));
    
            return response;
        }
        
        public void validate(SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
            switch (entityId.getEntityType()) {
                case DEVICE:
                    // EntityType=='DEVICE',调用validateDevice方法,直接返回响应结果到http response
                    validateDevice(currentUser, operation, entityId, callback);
                    return;
                case DEVICE_PROFILE:
                    validateDeviceProfile(currentUser, operation, entityId, callback);
                    return;
                /*。。。。。。。。。。。。。。*/
                default:
                    //TODO: add support of other entities
                    throw new IllegalStateException("Not Implemented!");
            }
        }
        
        private void validateDevice(final SecurityUser currentUser, Operation operation, EntityId entityId, FutureCallback<ValidationResult> callback) {
            // 系统管理员没有查看设备遥测数据的权限
            if (currentUser.isSystemAdmin()) {
                callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
            } else {
                // 根据TenantId和DeviceId来查找Device实例
                ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(currentUser.getTenantId(), new DeviceId(entityId.getId()));
                Futures.addCallback(deviceFuture, getCallback(callback, device -> {
                    if (device == null) {
                        return ValidationResult.entityNotFound(DEVICE_WITH_REQUESTED_ID_NOT_FOUND);
                    } else {
                        try {
                            // 查找权限列表判断当前用户有无读取设备遥测数据的权限。如果没有权限将直接报错,然后出错处理
                            accessControlService.checkPermission(currentUser, Resource.DEVICE, operation, entityId, device);
                        } catch (ThingsboardException e) {
                            return ValidationResult.accessDenied(e.getMessage());
                        }
                        return ValidationResult.ok(device);
                    }
                }), executor);
            }
        }
        
        public interface ThreeConsumer<A, B, C> {
            void accept(A a, B b, C c);
        }
    }
    
    
    

TimeseriesService

  • 这个类用于查找最新遥测数据
@Service
@Slf4j
public class BaseTimeseriesService implements TimeseriesService {
    
    // 这个组件负责查找所有最新遥测数据
    @Autowired
    private TimeseriesLatestDao timeseriesLatestDao;
    
    @Override
    public ListenableFuture<List<TsKvEntry>> findLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
        validate(entityId);
        List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
        keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
        keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)));
        return Futures.allAsList(futures);
    }

    @Override
    public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
        //校验当前deviceId是否合法
        validate(entityId);
        // 根据tenantId和deviceId返回结果
        return timeseriesLatestDao.findAllLatest(tenantId, entityId);
    }
}

TimeseriesLatestDao

  • 时序查找遥测数据的Dao层接口
@Slf4j
@Component
@SqlTsLatestAnyDaopublic 
class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao implements TimeseriesLatestDao {    		@Autowired    
    private SearchTsKvLatestRepository searchTsKvLatestRepository;
    @Override    
    public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
        return getFindAllLatestFuture(entityId);    
    }        
    protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {        
        return Futures.immediateFuture(DaoUtil.convertDataList(Lists.newArrayList(
            searchTsKvLatestRepository.findAllByEntityId(entityId.getId()))));
    }
}

SearchTsKvLatestDao

  • 时序键值查找遥测数据的接口,在这里可以找到sql语句
@SqlTsLatestAnyDao@Repositorypublic class SearchTsKvLatestRepository {
    public static final String FIND_ALL_BY_ENTITY_ID = "findAllByEntityId"; 
    public static final String FIND_ALL_BY_ENTITY_ID_QUERY = "SELECT ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, ts_kv_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue," +            " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts FROM ts_kv_latest " +            "INNER JOIN ts_kv_dictionary ON ts_kv_latest.key = ts_kv_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";    
    @PersistenceContext   
    private EntityManager entityManager;   
    public List<TsKvLatestEntity> findAllByEntityId(UUID entityId) { 
        return entityManager.createNamedQuery(FIND_ALL_BY_ENTITY_ID, 
               TsKvLatestEntity.class.setParameter("id",entityId).getResultList());    
    }
}

EntityManager

  • 进行并发查找的类,里面有很多加锁、刷新数据、查找的方法

Futures

  • 异步响应组件。
上一篇:阿里云ecs服务器如何购买---(2020图文教程)


下一篇:网络编程--TCP套接字(C/CC++)