Monitor and Respond with AWS Config

h-4.2$ trap 'printf "\n"' DEBUG
sh-4.2$ export PS1="\n[\u@\h \W] $ "


[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-configuration-recorder \
> --recording-group allSupported=false,includeGlobalResourceTypes=\
> false,resourceTypes=AWS::S3::Bucket \
> --configuration-recorder name=default,roleARN=arn:aws:iam::082574611135:role/ConfigRole


[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-delivery-channel \
> --delivery-channel configSnapshotDeliveryProperties=\
> {deliveryFrequency=Twelve_Hours},name=default,\
> s3BucketName=qls-5241363-b1e1d6c90a2a0d1b-configbucket-1wccjqhy4gzkt,\
> snsTopicARN=arn:aws:sns:us-east-1:082574611135:qls-5241363-b1e1d6c90a2a0d1b-ConfigSNSTopic-WNIXYOSCP50T


[ssm-user@ip-10-0-0-43 ~] $ aws configservice start-configuration-recorder --configuration-recorder-name default


[ssm-user@ip-10-0-0-43 ~] $ cat <<EOF > S3ProhibitPublicReadAccess.json
> {
>   "ConfigRuleName": "S3PublicReadProhibited",
>   "Description": "Checks that your S3 buckets do not allow public read access. If an S3 bucket policy or bucket ACL allows public read access, the bucket is noncompliant.",
>   "Scope": {
>     "ComplianceResourceTypes": [
>       "AWS::S3::Bucket"
>     ]
>   },
>   "Source": {
>     "Owner": "AWS",
>     "SourceIdentifier": "S3_BUCKET_PUBLIC_READ_PROHIBITED"
>   }
> }
> EOF


[ssm-user@ip-10-0-0-43 ~] $ cat <<EOF > S3ProhibitPublicWriteAccess.json
> {
>   "ConfigRuleName": "S3PublicWriteProhibited",
>   "Description": "Checks that your S3 buckets do not allow public write access. If an S3 bucket policy or bucket ACL allows public write access, the bucket is noncompliant.",
>   "Scope": {
>     "ComplianceResourceTypes": [
>       "AWS::S3::Bucket"
>     ]
>   },
>   "Source": {
>     "Owner": "AWS",
>     "SourceIdentifier": "S3_BUCKET_PUBLIC_WRITE_PROHIBITED"
>   }
> }
> EOF


[ssm-user@ip-10-0-0-43 ~] $

[ssm-user@ip-10-0-0-43 ~] $

[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-config-rule --config-rule file://S3ProhibitPublicReadAccess.json


[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-config-rule --config-rule file://S3ProhibitPublicWriteAccess.json



[ssm-user@ip-10-0-0-43 ~] $

[ssm-user@ip-10-0-0-43 ~] $ cat lambda_function.py

import boto3
from botocore.exceptions import ClientError
import json
import os

ACL_RD_WARNING = "The S3 bucket ACL allows public read access."
PLCY_RD_WARNING = "The S3 bucket policy allows public read access."
ACL_WRT_WARNING = "The S3 bucket ACL allows public write access."
PLCY_WRT_WARNING = "The S3 bucket policy allows public write access."
RD_COMBO_WARNING = ACL_RD_WARNING + PLCY_RD_WARNING
WRT_COMBO_WARNING = ACL_WRT_WARNING + PLCY_WRT_WARNING

def policyNotifier(bucketName, s3client):
    try:
        bucketPolicy = s3client.get_bucket_policy(Bucket = bucketName)
        # notify that the bucket policy may need to be reviewed due to security concerns
        sns = boto3.client("sns")
        subject = "Potential compliance violation in " + bucketName + " bucket policy"
        message = "Potential bucket policy compliance violation. Please review: " + json.dumps(bucketPolicy["Policy"])
        # send SNS message with warning and bucket policy
        response = sns.publish(
            TopicArn = os.environ["TOPIC_ARN"],
            Subject = subject,
            Message = message
        )
    except ClientError as e:
        # error caught due to no bucket policy
        print("No bucket policy found; no alert sent.")

# return True if canBePublic is 1. Otherwise, return false
def checkBucketTags(tags):
    print("Tags: {}".format(tags))
    for tag in tags:
        print(tag)
        if tag["Key"] == "CanBePublic":
            return tag["Value"] == "1"
    return False

def lambda_handler(event, context):
    # instantiate Amazon S3 client
    s3 = boto3.client("s3")
    print("{}".format(event))
    resource = list(event["detail"]["requestParameters"]["evaluations"])[0]
    bucketName = resource["complianceResourceId"]
    bucketTags = s3.get_bucket_tagging(Bucket=bucketName)
    if (bucketTags is not None):
        shouldBePublic = checkBucketTags(bucketTags["TagSet"])
        if not shouldBePublic:
            complianceFailure = event["detail"]["requestParameters"]["evaluations"][0]["annotation"]
            if(complianceFailure == ACL_RD_WARNING or complianceFailure == ACL_WRT_WARNING):
                s3.put_bucket_acl(Bucket = bucketName, ACL = "private")
            elif(complianceFailure == PLCY_RD_WARNING or complianceFailure == PLCY_WRT_WARNING):
                policyNotifier(bucketName, s3)
            elif(complianceFailure == RD_COMBO_WARNING or complianceFailure == WRT_COMBO_WARNING):
                s3.put_bucket_acl(Bucket = bucketName, ACL = "private")
                policyNotifier(bucketName, s3)
    return 0  # done
上一篇:Linux基础(练习三)


下一篇:思科网络配置-ACL