在本教程中,我们将使用Amazon Web Services(AWS),Slack和Raspberry Pi构建一个IoT项目原型。我们的项目简单演示了如何通过集成流行的产品和服务来创建自定义的,启用云的传感器系统。它来自Internet上的多种资源。
它是如何工作的?
使用DS18B20温度传感器,树莓派每分钟测量一次温度。它通过HTTP POST请求将测量数据(传感器名称、时间戳、摄氏温度和华氏温度)发送到AWS API网关端点。端点调用一个Lambda函数,该函数将数据插入到DynamoDB表中。
另外,AWS EventBridge每分钟调用一次第二个Lambda函数。 此函数在DynamoDB表中查询最近60秒内插入的所有项目,然后通过HTTP POST请求将它们发送到Slack通道。
为了安全起见,API Gateway端点使用存储在AWS Systems Manager中的授权令牌。
需求
此项目需要一个AWS账户,一个Slack账户,AWS命令行界面(CLI),AWS无服务器应用程序模型(SAM)CLI,Raspberry Pi,Linux的Raspbian发行版,DS18B20温度传感器和Python 3。
设置Slack
我们项目的第一个组件是带有传入的Webhooks的Slack应用。我们根据Slack网站上的教程(https://api.slack.com/messaging/webhooks)创建了该应用。我们记下Webhook的URL,因为在下面的部分中将需要它。
设置AWS
我们项目的第二个组件是使用API网关,DynamoDB,EventBridge,Lambda和Systems Manager服务的AWS无服务器应用程序。它具有以下文件夹结构:
project/
|__ template.yaml
|__ iot/
|__ app.py
|__ requirements.txt
template.yaml的内容是:
Transform: AWS::Serverless-2016-10-31
Globals:
Api:
Auth:
Authorizers:
TokenValidator:
FunctionArn: !GetAtt ValidateRequest.Arn
FunctionPayloadType: TOKEN
Identity:
Header: x-api-token
ReauthorizeEvery: 0
DefaultAuthorizer: TokenValidator
EndpointConfiguration: REGIONAL
Function:
Environment:
Variables:
DYNAMODB_TABLE: sensor-data
Runtime: python3.7 # Change as necessary.
Resources:
ValidateRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.validate_request
Policies:
– Statement:
– Action:
– ssm:GetParameter
Effect: Allow
Resource: !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/api-token
Version: ‘2012-10-17’
HandleSensorRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.handle_sensor_request
Policies:
Statement:Action:dynamodb:PutItem
Effect: Allow
Resource: !GetAtt SensorData.Arn
Version: '2012-10-17'
Events:
SensorResource:
Type: Api
Properties:
Method: POST
Path: /sensor
MakeSlackRequest:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./iot
Handler: app.make_slack_request
Policies:
Statement:Action:dynamodb:Query
Effect: Allow
Resource: !GetAtt SensorData.Arn
Version: '2012-10-17'
Statement:Action:ssm:GetParameter
Effect: Allow
Resource: !Sub arn:aws:ssm:${AWS::Region}:${AWS::AccountId}:parameter/slack-url
Events:
SlackSchedule:
Type: Schedule
Properties:
Schedule: rate(1 minute)
SensorData:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
AttributeName: sensor
AttributeType: SAttributeName: timestamp
AttributeType: N
KeySchema:
AttributeName: sensor
KeyType: HASHAttributeName: timestamp
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
TableName: sensor-data
Outputs:
SensorURL:
Value: !Sub 'https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/${ServerlessRestApi.Stage}/sensor'
app.py的内容是:
import decimal
import json
import os
import time
import boto3
import boto3.dynamodb.conditions
import requests
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
SENSORS = ['1']
def get_stored_parameter(name):
ssm = boto3.client('ssm')
response = ssm.get_parameter(
Name=name,
WithDecryption=True
)
return response['Parameter']['Value']
def validate_request(event, context):
expected_token = get_stored_parameter('api-token')
if event['authorizationToken'] == expected_token:
effect = 'Allow'
else:
effect = 'Deny'
return {
'principalId': '*',
'policyDocument': {
'Version': '2012-10-17',
'Statement': [{
'Action': 'execute-api:Invoke',
'Effect': '{}'.format(effect),
'Resource': '{}'.format(event['methodArn'])
}]
}
}
def handle_sensor_request(event, context):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
table.put_item(
Item=json.loads(event['body'], parse_float=decimal.Decimal)
)
return {'body': event['body']}
def compute_timestamp(value):
time_in_seconds = time.time()
return decimal.Decimal(time_in_seconds - value)
def query_table(sensor, timestamp):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)
response = table.query(
KeyConditionExpression=
boto3.dynamodb.conditions.Key('sensor').eq(sensor)
& boto3.dynamodb.conditions.Key('timestamp').gte(timestamp)
)
return response['Items']
def create_message(item):
gmt = time.gmtime(item['timestamp'])
datetime_format = time.strftime('%Y-%m-%d %H:%M:%S', gmt)
return '{} Sensor {} {}U00002103 {}U00002109n'.format(
datetime_format,
item['sensor'],
item['celsius'],
item['fahrenheit']
)
def make_slack_request(event, context):
timestamp = compute_timestamp(60)
status = {}
for sensor in SENSORS:
items = query_table(sensor, timestamp)
if items:
messages = [create_message(item) for item in items]
body = {'text': 'n'.join(messages)}
slack_url = get_stored_parameter('slack-url')
response = requests.post(slack_url, json=body)
status[sensor] = response.status_code
return status
Requirements.txt的内容是:
requests
设置AWS要求我们从Linux Shell运行几个命令。
首先,使用AWS CLI,我们将Slack应用程序Webhook的URL存储在Systems Manager中。 我们将VALUE替换为实际网址。
aws configure set cli_follow_urlparam false
aws ssm put-parameter --name 'slack-url' --value 'VALUE' --type 'SecureString'
其次,我们将API授权令牌存储在Systems Manager中。我们的令牌是一个字符串,用于验证对API Gateway的请求。 (创建安全令牌不在本文讨论范围之内。)同样,我们将VALUE替换为实际令牌。
aws ssm put-parameter --name 'api-token' --value 'VALUE' --type 'SecureString'
最后,我们使用AWS SAM CLI构建和部署无服务器应用程序。(我们从上述文件夹结构的项目目录中运行这些特定命令。)
sam build
sam deploy --guided
部署应用程序后,我们记下sam deploy --guided命令的输出中引用的SensorURL,因为在下面将需要它。
设置Raspberry Pi
我们项目的第三个也是最后一个组件是具有DS18B20温度传感器和简短Python程序的Raspberry Pi。 我们配置了Raspberry Pi,并根据Adafruit网站上的学习模块(https://learn.adafruit.com/adafruits-raspberry-pi-lesson-11-ds18b20-temperature-sensing)安装了温度传感器。
Python程序是一个名为ds18b20.py的文件。它很大程度上是对在同一Adafruit学习模块中找到的示例的重写。其内容是:
import logging
import os
import pathlib
import time
import requests
AWS_API_TOKEN = os.environ['AWS_API_TOKEN']
AWS_SENSOR_URL = os.environ['AWS_SENSOR_URL']
SENSOR_NAME = os.environ['SENSOR_NAME']
logging.getLogger(__name__)
def find_device_file():
base_directory = pathlib.Path('/sys/bus/w1/devices')
device_file = next(
base_directory.glob('28*/w1_slave')
)
return base_directory.joinpath(device_file)
def read_device_file(filename):
with open(filename, 'r') as device_file:
return device_file.read()
def compute_temperature(reading):
celsius = int(reading) / 1000
fahrenheit = celsius * 9 / 5 + 32
return celsius, fahrenheit
def send_temperature(celsius, fahrenheit):
header = {
'x-api-token': AWS_API_TOKEN
}
body = {
'sensor': SENSOR_NAME,
'timestamp': time.time(),
'celsius': celsius,
'fahrenheit': fahrenheit
}
response = requests.post(
AWS_SENSOR_URL,
json=body,
headers=header
)
if response.status_code != 200:
logging.warning(
'Status code {}'.format(response.status_code)
)
def main():
device_file = find_device_file()
while True:
data = read_device_file(device_file)
while 'YES' not in data:
time.sleep(1)
data = read_device_file(device_file)
else:
_, device_reading = data.split('t=')
celsius, fahrenheit = compute_temperature(device_reading)
send_temperature(celsius, fahrenheit)
time.sleep(60)
if name == '__main__':
main()
在运行程序之前,我们在Linux Shell中设置了三个环境变量。AWS_API_TOKEN是上一节中的API授权令牌。AWS_SENSOR_URL是Raspberry Pi向其发送请求的URL;这是上一节中提到的SensorURL。最后,SENSOR_NAME是我们分配给Raspberry Pi的名称。与往常一样,我们将VALUE替换为每个环境变量的实际值。
export AWS_API_TOKEN=VALUE
export AWS_SENSOR_URL=VALUE
export SENSOR_NAME=1
如果我们希望环境变量保持不变,则将它们添加到我们的.bashrc文件中。
echo export AWS_API_TOKEN=VALUE >> ~/.bashrc
echo export AWS_SENSOR_URL=VALUE >> ~/.bashrc
echo export SENSOR_NAME=1 >> ~/.bashrc
然后,我们运行程序。从现在开始,Raspberry Pi将每分钟一次将测量数据发送到AWS API Gateway端点。AWS EventBridge将以相同的频率从DynamoDB表检索数据,并将其发送到我们的Slack通道。
python3 ds18b20.py
(我们的程序需要第三方Python库requests。如果我们还没有安装它,那么我们可以通过运行来自Linux shell的sudo pip install requests
来安装它。在新的Raspberry Pi上,我们可能需要先运行sudo apt-get install python3-pip
。)
总结
我们的原型IoT项目集成了AWS,Slack和Raspberry Pi。它提供了一个示例,说明如何使用流行的产品和服务来构建自定义的、支持云计算的传感器系统。为了增强我们的原型,我们可以增加带有DS18B20温度传感器的Raspberry Pi的数量,或添加其他类型的传感器(例如湿度传感器),将Python程序转换为可安装的程序包,将Python程序作为Linux服务运行,以及创建传感器数据的可视化。创意是无限的!
原文链接