h-4.2$ trap 'printf "\n"' DEBUG
sh-4.2$ export PS1="\n[\u@\h \W] $ "
[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-configuration-recorder \
> --recording-group allSupported=false,includeGlobalResourceTypes=\
> false,resourceTypes=AWS::S3::Bucket \
> --configuration-recorder name=default,roleARN=arn:aws:iam::082574611135:role/ConfigRole
[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-delivery-channel \
> --delivery-channel configSnapshotDeliveryProperties=\
> {deliveryFrequency=Twelve_Hours},name=default,\
> s3BucketName=qls-5241363-b1e1d6c90a2a0d1b-configbucket-1wccjqhy4gzkt,\
> snsTopicARN=arn:aws:sns:us-east-1:082574611135:qls-5241363-b1e1d6c90a2a0d1b-ConfigSNSTopic-WNIXYOSCP50T
[ssm-user@ip-10-0-0-43 ~] $ aws configservice start-configuration-recorder --configuration-recorder-name default
[ssm-user@ip-10-0-0-43 ~] $ cat <<EOF > S3ProhibitPublicReadAccess.json
> {
> "ConfigRuleName": "S3PublicReadProhibited",
> "Description": "Checks that your S3 buckets do not allow public read access. If an S3 bucket policy or bucket ACL allows public read access, the bucket is noncompliant.",
> "Scope": {
> "ComplianceResourceTypes": [
> "AWS::S3::Bucket"
> ]
> },
> "Source": {
> "Owner": "AWS",
> "SourceIdentifier": "S3_BUCKET_PUBLIC_READ_PROHIBITED"
> }
> }
> EOF
[ssm-user@ip-10-0-0-43 ~] $ cat <<EOF > S3ProhibitPublicWriteAccess.json
> {
> "ConfigRuleName": "S3PublicWriteProhibited",
> "Description": "Checks that your S3 buckets do not allow public write access. If an S3 bucket policy or bucket ACL allows public write access, the bucket is noncompliant.",
> "Scope": {
> "ComplianceResourceTypes": [
> "AWS::S3::Bucket"
> ]
> },
> "Source": {
> "Owner": "AWS",
> "SourceIdentifier": "S3_BUCKET_PUBLIC_WRITE_PROHIBITED"
> }
> }
> EOF
[ssm-user@ip-10-0-0-43 ~] $
[ssm-user@ip-10-0-0-43 ~] $
[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-config-rule --config-rule file://S3ProhibitPublicReadAccess.json
[ssm-user@ip-10-0-0-43 ~] $ aws configservice put-config-rule --config-rule file://S3ProhibitPublicWriteAccess.json
[ssm-user@ip-10-0-0-43 ~] $
[ssm-user@ip-10-0-0-43 ~] $ cat lambda_function.py
import boto3
from botocore.exceptions import ClientError
import json
import os
ACL_RD_WARNING = "The S3 bucket ACL allows public read access."
PLCY_RD_WARNING = "The S3 bucket policy allows public read access."
ACL_WRT_WARNING = "The S3 bucket ACL allows public write access."
PLCY_WRT_WARNING = "The S3 bucket policy allows public write access."
RD_COMBO_WARNING = ACL_RD_WARNING + PLCY_RD_WARNING
WRT_COMBO_WARNING = ACL_WRT_WARNING + PLCY_WRT_WARNING
def policyNotifier(bucketName, s3client):
try:
bucketPolicy = s3client.get_bucket_policy(Bucket = bucketName)
# notify that the bucket policy may need to be reviewed due to security concerns
sns = boto3.client("sns")
subject = "Potential compliance violation in " + bucketName + " bucket policy"
message = "Potential bucket policy compliance violation. Please review: " + json.dumps(bucketPolicy["Policy"])
# send SNS message with warning and bucket policy
response = sns.publish(
TopicArn = os.environ["TOPIC_ARN"],
Subject = subject,
Message = message
)
except ClientError as e:
# error caught due to no bucket policy
print("No bucket policy found; no alert sent.")
# return True if canBePublic is 1. Otherwise, return false
def checkBucketTags(tags):
print("Tags: {}".format(tags))
for tag in tags:
print(tag)
if tag["Key"] == "CanBePublic":
return tag["Value"] == "1"
return False
def lambda_handler(event, context):
# instantiate Amazon S3 client
s3 = boto3.client("s3")
print("{}".format(event))
resource = list(event["detail"]["requestParameters"]["evaluations"])[0]
bucketName = resource["complianceResourceId"]
bucketTags = s3.get_bucket_tagging(Bucket=bucketName)
if (bucketTags is not None):
shouldBePublic = checkBucketTags(bucketTags["TagSet"])
if not shouldBePublic:
complianceFailure = event["detail"]["requestParameters"]["evaluations"][0]["annotation"]
if(complianceFailure == ACL_RD_WARNING or complianceFailure == ACL_WRT_WARNING):
s3.put_bucket_acl(Bucket = bucketName, ACL = "private")
elif(complianceFailure == PLCY_RD_WARNING or complianceFailure == PLCY_WRT_WARNING):
policyNotifier(bucketName, s3)
elif(complianceFailure == RD_COMBO_WARNING or complianceFailure == WRT_COMBO_WARNING):
s3.put_bucket_acl(Bucket = bucketName, ACL = "private")
policyNotifier(bucketName, s3)
return 0 # done