DynamoDB简介
DynamoDB是Amazon提供的一种完全托管的 NoSQL 数据库服务,数据库的软硬件的管理和维护都由Amazon负责,相比传统数据库,免除了用户操作和扩展分布式数据库的管理工作负担,因而无需担心硬件预置、设置和配置、复制、软件修补或集群扩展等问题。DynamoDB具有以下特性:
- 高性能,可预测的性能,能够实现无缝扩展:自动将表的数据和流量分布到足够多的服务器,以满足吞吐量和存储需求,同时保持一致、快速的性能
- 高可用性:自动将数据在同一个地区(Region)的不同可用区(Availability Zone)进行复制
- 支持数据加密
- 备份功能:支持按需备份和按时间点备份
- 支持数据过期自动删除(TTL)
- 支持存储和检索任意量级的数据,并支持任何级别的请求流量
DynamoDB表的创建
DynamoDB表的创建有以下几种方式:
- AWS Cloud Formation:测试生产环境部署,方便CI/CD
- AWS Command Line Interface:开发环境
- AWS SDK:开发环境
- AWS Console:开发环境
实际开发过程中,测试环境和生产环境都是使用AWS Cloud Formation进行自动化部署,在开发环境会用到AWS Command Line Interface或者AWS Console进行表的创建,几乎没有使用过AWS SDK进行表的创建。AWS SDK主要应用为数据库的CRUD操作。
DynamoDB表的属性
- TableName:必要,表的名字在同一个地区(Region)内唯一
- AttributeDefinitions:必要,只需包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)属性,其他属性无需在此定义
-
KeySchema:必要,指定表的分区键(Partition/Hash Key)和排序键(RangeKey)
- AttributeName:键的名称,必须在AttributeDefinitions中定义过
- KeyType:键的类型,可取值为:
-
HASH
- 分区键 -
RANGE
- 排序键
-
-
GlobalSecondaryIndexes:非必要,全局索引列表,表创建之后可以添加,最多可创建20个
- IndexName: 索引名称
- KeySchema: 索引的分区键(Partition/Hash Key)和排序键(RangeKey)
-
Projection:指定哪些属性需要从表中复制到索引中
- ProjectionType:类型,可取值为
-
KEYS_ONLY
- 只包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey) -
INCLUDE
- 除了包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)之外,还包含NonKeyAttributes指定的非键属性 -
ALL
- 所有表属性
-
-
NonKeyAttributes: ProjectionType为
INCLUDE
时用于指定的非键属性,最多包含20个属性
- ProjectionType:类型,可取值为
-
ProvisionedThroughput:指定ReadCapacityUnits和WriteCapacityUnits
- ReadCapacityUnits: 数字,按需设置
- WriteCapacityUnits: 数字,按需设置
- ContributorInsightsSpecification:是否启用CloudWatch Contributor Insights
- Enabled: true/false
-
LocalSecondaryIndexes:非必要,局部索引列表,表创建之后不可添加,所以在创建表的时候要考虑好是否需要
- IndexName: 索引名称
- KeySchema: 索引的分区键(Partition/Hash Key)和排序键(RangeKey)
-
Projection:指定哪些属性需要从表中复制到索引中
- ProjectionType:类型,可取值为
-
KEYS_ONLY
- 只包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey) -
INCLUDE
- 除了包含表和索引的分区键(Partition/Hash Key)和排序键(RangeKey)之外,还包含NonKeyAttributes指定的非键属性 -
ALL
- 所有表属性
-
-
NonKeyAttributes: ProjectionType为
INCLUDE
时用于指定的非键属性,最多包含20个属性
- ProjectionType:类型,可取值为
-
TimeToLiveSpecification:非必要,设置是否开启TTL功能,并指定TTL属性
- AttributeName:TTL属性名称
- Enabled:true/false
-
BillingMode:计费方式,可取值为:
-
PROVISIONED
- 预留方式,设置为此方式时必须同时设置ProvisionedThroughput,适用于可预测负载的情况 -
PAY_PER_REQUEST
- 按需计费,适用于不可预测负载的情况
-
-
ProvisionedThroughput:指定ReadCapacityUnits和WriteCapacityUnits,
BillingMode
为PROVISIONED时必须设置
- ReadCapacityUnits: 数字,按需设置,
- WriteCapacityUnits: 数字,按需设置
-
PointInTimeRecoverySpecification:是否开启按时间点备份
- PointInTimeRecoveryEnabled: true/false
-
SSESpecification:是否开启服务端加密
- KMSMasterKeyId:加密秘钥id
- SSEEnabled:: true/false
-
SSEType:
加密类型,可取值为:KMS
-
KinesisStreamSpecification:指定Kinesis data stream的ARN
-
StreamArn:
String
-
StreamArn:
-
StreamSpecification:指定数据修改时有哪些信息需要写入stream
- StreamViewType:
可取值为:
-
KEYS_ONLY
- 只写入键属性 -
NEW_IMAGE
- 写入修改后的整条数据 -
OLD_IMAGE
- 写入修改前的整条数据 -
NEW_AND_OLD_IMAGES
- 修改前后的数据都写入
-
- StreamViewType:
- ContributorInsightsSpecification:是否启用CloudWatch Contributor Insights
- Enabled: true/false
- Tags:tag的列表
使用AWS Cloud Formation创建表
service: jessica-dto
provider:
name: aws
stage: develop
region: ap-southeast-1
stackName: ${self:provider.stage}-${self:service}
resources:
Resources:
TestTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.stage}.TestTableCloudFormation
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
- AttributeName: gsiOnePk
AttributeType: S
- AttributeName: gsiOneSk
AttributeType: S
- AttributeName: gsiTwoPk
AttributeType: S
- AttributeName: gsiTwoSk
AttributeType: S
- AttributeName: lsiOneSk
AttributeType: S
- AttributeName: lsiTwoSk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
BillingMode: PROVISIONED
LocalSecondaryIndexes:
- IndexName: test.lsiOne
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: lsiOneSk
KeyType: RANGE
Projection:
ProjectionType: ALL
- IndexName: test.lsiTwo
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: lsiTwoSk
KeyType: RANGE
Projection:
ProjectionType: ALL
GlobalSecondaryIndexes:
- IndexName: test.gsiOne
KeySchema:
- AttributeName: gsiOnePk
KeyType: HASH
- AttributeName: gsiOneSk
KeyType: RANGE
Projection:
ProjectionType: KEYS_ONLY
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
- IndexName: test.gsiTwo
KeySchema:
- AttributeName: gsiTwoPk
KeyType: HASH
- AttributeName: gsiTwoSk
KeyType: RANGE
Projection:
ProjectionType: KEYS_ONLY
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
SSESpecification:
SSEEnabled: false
ContributorInsightsSpecification:
Enabled: true
Tags:
- Key: product
Value: ${self:service}
使用sls deploy命令进行部署.
使用AWS Command Line Interface 创建表
执行命令aws dynamodb create-table 包含如下参数:
create-table --attribute-definitions <value> --table-name <value> --key-schema <value> [--local-secondary-indexes <value>] [--global-secondary-indexes <value>] [--billing-mode <value>] [--provisioned-throughput <value>] [--stream-specification <value>] [--sse-specification <value>] [--tags <value>] [--cli-input-json <value>] [--generate-cli-skeleton <value>]
aws cli不支持在创建表的同时设置TimeToLiveSpecification, PointInTimeRecoverySpecification, ContributorInsightsSpecification,只能在表创建之后通过更新来进行设置.
aws dynamodb create-table \
--table-name develop.TestTableCli \
--attribute-definitions \
AttributeName=pk,AttributeType=S \
AttributeName=sk,AttributeType=S \
AttributeName=gsiPk,AttributeType=S \
AttributeName=gsiSk,AttributeType=S \
AttributeName=gsiTwoPk,AttributeType=S \
AttributeName=gsiTwoSk,AttributeType=S \
AttributeName=lsiOneSk,AttributeType=S \
AttributeName=lsiTwoSk,AttributeType=S \
--key-schema \
AttributeName=pk,KeyType=HASH \
AttributeName=sk,KeyType=RANGE \
--billing-mode \
PROVISIONED \
--provisioned-throughput \
"{
\"ReadCapacityUnits\": 5,
\"WriteCapacityUnits\": 5
}" \
--local-secondary-indexes \
"[
{
\"IndexName\": \"test.lsiOne\",
\"KeySchema\": [{\"AttributeName\":\"pk\",\"KeyType\":\"HASH\"},
{\"AttributeName\":\"lsiOneSk\",\"KeyType\":\"RANGE\"}],
\"Projection\":{
\"ProjectionType\":\"ALL\"
}
},
{
\"IndexName\": \"test.lsiTwo\",
\"KeySchema\": [{\"AttributeName\":\"pk\",\"KeyType\":\"HASH\"},
{\"AttributeName\":\"lsiTwoSk\",\"KeyType\":\"RANGE\"}],
\"Projection\":{
\"ProjectionType\":\"ALL\"
}
}
]" \
--global-secondary-indexes \
"[
{
\"IndexName\": \"test.gsi\",
\"KeySchema\": [{\"AttributeName\":\"gsiPk\",\"KeyType\":\"HASH\"},
{\"AttributeName\":\"gsiSk\",\"KeyType\":\"RANGE\"}],
\"Projection\":{
\"ProjectionType\":\"ALL\"
},
\"ProvisionedThroughput\": {
\"ReadCapacityUnits\": 5,
\"WriteCapacityUnits\": 5
}
},
{
\"IndexName\": \"test.gsiTwo\",
\"KeySchema\": [{\"AttributeName\":\"gsiTwoPk\",\"KeyType\":\"HASH\"},
{\"AttributeName\":\"gsiTwoSk\",\"KeyType\":\"RANGE\"}],
\"Projection\":{
\"ProjectionType\":\"ALL\"
},
\"ProvisionedThroughput\": {
\"ReadCapacityUnits\": 5,
\"WriteCapacityUnits\": 5
}
}
]" \
--sse-specification \
Enabled=false \
--tags \
"[
{
\"Key\": \"product\",
\"Value\": \"social\"
},
{
\"Key\": \"Name\",
\"Value\": \"test.develop\"
}
]"
设置TimeToLiveSpecification
aws dynamodb update-time-to-live \
--table-name \
develop.TestTableCli \
--time-to-live-specification \
Enabled=true,AttributeName=ttl
设置PointInTimeRecoverySpecification
aws dynamodb update-continuous-backups \
--table-name \
develop.TestTableCli \
--point-in-time-recovery-specification \
PointInTimeRecoveryEnabled=true
设置ContributorInsightsSpecification
aws dynamodb update-contributor-insights \
--table-name \
develop.TestTableCli \
--contributor-insights-action ENABLE
使用AWS Java SDK 创建表
aws dynanodb java sdk不支持在创建表的同时设置TimeToLiveSpecification, PointInTimeRecoverySpecification, ContributorInsightsSpecification,只能在表创建之后通过更新来进行设置.
package com.jessica.dynamodb.table.create;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.ContributorInsightsAction;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex;
import com.amazonaws.services.dynamodbv2.model.PointInTimeRecoverySpecification;
import com.amazonaws.services.dynamodbv2.model.Projection;
import com.amazonaws.services.dynamodbv2.model.ProjectionType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.SSESpecification;
import com.amazonaws.services.dynamodbv2.model.Tag;
import com.amazonaws.services.dynamodbv2.model.TimeToLiveSpecification;
import com.amazonaws.services.dynamodbv2.model.UpdateContinuousBackupsRequest;
import com.amazonaws.services.dynamodbv2.model.UpdateContributorInsightsRequest;
import com.amazonaws.services.dynamodbv2.model.UpdateTimeToLiveRequest;
import com.jessica.dynamodb.constant.DynamoDBConstant;
public class CreateTable {
static AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build();
static String tableName = "develop.TestTableJava";
static DynamoDB dynamoDB = new DynamoDB(dynamoDBClient);
public static void main(String[] args) throws Exception {
createTable();
updateTableTTL();
updateTableBackeUpPolicy();
updateTableContributorInsights();
}
static void createTable() {
try {
// attributes
List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions
.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.HASH_KEY).withAttributeType("S"));
attributeDefinitions.add(
new AttributeDefinition().withAttributeName(DynamoDBConstant.RANGE_KEY).withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_ONE_HASH_KEY)
.withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_ONE_RANGE_KEY)
.withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_TWO_HASH_KEY)
.withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.GSI_TWO_RANGE_KEY)
.withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.LSI_ONE_RANGE_KEY)
.withAttributeType("S"));
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DynamoDBConstant.LSI_TWO_RANGE_KEY)
.withAttributeType("S"));
// table key schema
List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema
.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY).withKeyType(KeyType.HASH)); // partition
// key
keySchema.add(
new KeySchemaElement().withAttributeName(DynamoDBConstant.RANGE_KEY).withKeyType(KeyType.RANGE)); // range
// key
// provisioned Throughput
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(1L)
.withWriteCapacityUnits(1L);
// local secondary index
List<LocalSecondaryIndex> localSecondaryIndexs = new ArrayList<>();
List<KeySchemaElement> lsiOneKeySchema = new ArrayList<KeySchemaElement>();
lsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY)
.withKeyType(KeyType.HASH));
lsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.LSI_ONE_RANGE_KEY)
.withKeyType(KeyType.RANGE));
List<KeySchemaElement> lsiTwoKeySchema = new ArrayList<KeySchemaElement>();
lsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.HASH_KEY)
.withKeyType(KeyType.HASH));
lsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.LSI_TWO_RANGE_KEY)
.withKeyType(KeyType.RANGE));
localSecondaryIndexs.add(new LocalSecondaryIndex().withIndexName(DynamoDBConstant.LSI_ONE_NAME)
.withKeySchema(lsiOneKeySchema)
.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)));
localSecondaryIndexs.add(new LocalSecondaryIndex().withIndexName(DynamoDBConstant.LSI_TWO_NAME)
.withKeySchema(lsiTwoKeySchema)
.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)));
// global secondary index
List<GlobalSecondaryIndex> globalSecondaryIndexs = new ArrayList<>();
List<KeySchemaElement> gsiOneKeySchema = new ArrayList<KeySchemaElement>();
gsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_ONE_HASH_KEY)
.withKeyType(KeyType.HASH));
gsiOneKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_ONE_RANGE_KEY)
.withKeyType(KeyType.RANGE));
List<KeySchemaElement> gsiTwoKeySchema = new ArrayList<KeySchemaElement>();
gsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_TWO_HASH_KEY)
.withKeyType(KeyType.HASH));
gsiTwoKeySchema.add(new KeySchemaElement().withAttributeName(DynamoDBConstant.GSI_TWO_RANGE_KEY)
.withKeyType(KeyType.RANGE));
globalSecondaryIndexs.add(new GlobalSecondaryIndex().withIndexName(DynamoDBConstant.GSI_ONE_NAME)
.withKeySchema(gsiOneKeySchema)
.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY))
.withProvisionedThroughput(provisionedThroughput));
globalSecondaryIndexs.add(new GlobalSecondaryIndex().withIndexName(DynamoDBConstant.GSI_TWO_NAME)
.withKeySchema(gsiTwoKeySchema)
.withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY))
.withProvisionedThroughput(provisionedThroughput));
SSESpecification sseSpecification = new SSESpecification().withEnabled(false);
Tag tag = new Tag().withKey("product").withValue("create-table");
CreateTableRequest request = new CreateTableRequest()
.withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions)
.withKeySchema(keySchema)
.withBillingMode(BillingMode.PROVISIONED)
.withProvisionedThroughput(provisionedThroughput)
.withLocalSecondaryIndexes(localSecondaryIndexs)
.withGlobalSecondaryIndexes(globalSecondaryIndexs)
.withSSESpecification(sseSpecification)
.withTags(tag);
System.out.println("Issuing CreateTable request for " + tableName);
Table table = dynamoDB.createTable(request);
System.out.println("Waiting for " + tableName + " to be created...this may take a while...");
table.waitForActive();
System.out.println("Create " + tableName + " success");
} catch (Exception e) {
System.err.println("CreateTable request failed for " + tableName);
System.err.println(e.getMessage());
}
}
static void updateTableTTL() {
TimeToLiveSpecification timeToLiveSpecification = new TimeToLiveSpecification().withAttributeName("ttl")
.withEnabled(true);
UpdateTimeToLiveRequest updateTimeToLiveRequest = new UpdateTimeToLiveRequest().withTableName(tableName)
.withTimeToLiveSpecification(timeToLiveSpecification);
System.out.println("Update time to live for " + tableName);
try {
dynamoDBClient.updateTimeToLive(updateTimeToLiveRequest);
System.out.println("Waiting for " + tableName + " to update TimeToLive");
dynamoDB.getTable(tableName).waitForActive();
System.out.println("Update " + tableName + " TimeToLive success");
} catch (Exception e) {
System.err.println("Update time to live request failed for " + tableName);
System.err.println(e.getMessage());
}
}
static void updateTableBackeUpPolicy() {
PointInTimeRecoverySpecification pointInTimeRecoverySpecification = new PointInTimeRecoverySpecification()
.withPointInTimeRecoveryEnabled(true);
UpdateContinuousBackupsRequest updateContinuousBackupsRequest = new UpdateContinuousBackupsRequest()
.withTableName(tableName).withPointInTimeRecoverySpecification(pointInTimeRecoverySpecification);
try {
System.out.println("Update table back up policy for " + tableName);
dynamoDBClient.updateContinuousBackups(updateContinuousBackupsRequest);
System.out.println("Waiting for " + tableName + " to update ContinuousBackups");
dynamoDB.getTable(tableName).waitForActive();
System.out.println("Update " + tableName + " ContinuousBackups success");
} catch (Exception e) {
System.err.println("Update table back up policy failed for " + tableName);
System.err.println(e.getMessage());
}
}
static void updateTableContributorInsights() {
UpdateContributorInsightsRequest updateContributorInsightsRequest = new UpdateContributorInsightsRequest()
.withTableName(tableName).withContributorInsightsAction(ContributorInsightsAction.ENABLE);
try {
System.out.println("Update table contributor insights for " + tableName);
dynamoDBClient.updateContributorInsights(updateContributorInsightsRequest);
System.out.println("Waiting for " + tableName + " to update ContributorInsights");
dynamoDB.getTable(tableName).waitForActive();
System.out.println("Update " + tableName + " ContributorInsights success");
} catch (Exception e) {
System.err.println("Update table contributor insights failed for " + tableName);
System.err.println(e.getMessage());
}
}
}
完整代码
https://github.com/JessicaWin/dynamodb-in-action