一个人的安全,那就总得想一些办法来优化一下自己的工作,这个时候就不得不思恋国内的waf的易操作性,这aws玩了个遍,waf这块就有点小不和谐,咱就说不能整一个一键操作嘛,直接开启连锁功能多好,哈哈,开玩笑,有点不严瑾啦(也有可能是我太菜不太会玩)。
因为业务都是global的,所以这被骚扰的频率就有点小高,总不能每次都自己上去分析然后block吧,所以有了下文,自动封禁。
这里实现全部采用terraform代码完成(terraform是一款比较好用的开源IaC工具。具体可以看:https://registry.terraform.io/providers/hashicorp/aws/latest/docs,这里就不做赘叙啦)。
实现原理主要基于扫描行为即访问频率控制,使用waf设置rate规则,1分钟访问超过200次(这个可以根据具体业务情况进行实际调整)即为异常流量。然后使用aws的lambda来处理事件,lambda主要做2件事件,提取IP,与block ip list进行对比,如果IP为新的IP,则存入S3以及更新waf的block list。
核心思路就是这样,然后lambda不会自动触发,需要再加一个规则让lambda持续触发,这里用的是aws的eventbridge。原理就讲到这里,直接上主体代码。
创建WAF规则
provider "aws" {
region =
"ap-northeast-1"
}
provider "aws" {
alias = "us-east-1"
region =
"us-east-1"
}
module "lambda_function" {
source = "./modules/waf/lambda_function"
function_name =
"custom-rbr-lambda-ceshi-stg-test"
role_arn = module.iam_role.role_arn
handler =
"index.lambda_handler"
runtime = "python3.9"
timeout = 10
ip_set_id_custom_v4 = aws_wafv2_ip_set.ipv4_ipset.id
ip_set_name_custom_v4 = aws_wafv2_ip_set.ipv4_ipset.name
ip_set_id_custom_v6 = aws_wafv2_ip_set.ipv6_ipset.id
ip_set_name_custom_v6 = aws_wafv2_ip_set.ipv6_ipset.name
config_log_bucket =
module.custom_rbr_log_bucket.bucket_name
config_log_key = "blocked_ips_list.json"
rate_rule_name = "rate-control"
web_acl_name =
"ceshi-stg-webacl-Sample-test"
web_acl_id =
aws_wafv2_web_acl.ceshi-stg-webacl.id
rate_rule_group_name = "rate-group-control"
custom_block_period = 10080
}
module "custom_rbr_log_bucket" {
source = "./modules/waf/s3_bucket"
bucket_name =
"custom-rbr-log-bucket-ceshi-stg-test"
}
module "iam_role" {
source = "./modules/waf/iam_role"
role_name =
"lambda_rbr_role-ceshi-stg-test"
}
module "eventbridge_rule" {
source = "./modules/waf/eventbridge"
rule_name =
"eventbridge-rule-ceshi-stg-test"
lambda_function_arn =
module.lambda_function.lambda_function_arn
lambda_function_name =
module.lambda_function.lambda_function_name
schedule_expression =
"rate(1 minute)"
}
# 创建ACL
resource "aws_wafv2_web_acl" "ceshi-stg-webacl"
{
provider = aws.us-east-1
name =
"ceshi-stg-webacl-Sample-test"
description =
"ceshi-stg-webacl-test"
scope = "CLOUDFRONT"
default_action {
allow {}
}
rule {
name = "rate-group-control"
priority = 1
override_action {
none {}
}
statement {
rule_group_reference_statement {
arn =
aws_wafv2_rule_group.rategroup.arn
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name =
"Sample-ceshi-stg-rate-control-group-test"
sampled_requests_enabled = true
}
}
rule {
name = "block-ip"
priority = 0
action {
allow {}
}
statement {
ip_set_reference_statement {
arn =
aws_wafv2_ip_set.ipv4_ipset.arn
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name =
"Sample-ceshi-stg-ip-block-test"
sampled_requests_enabled = true
}
}
visibility_config {
cloudwatch_metrics_enabled
= true
metric_name =
"Sample-ceshi-stg-webacl-test"
sampled_requests_enabled = true
}
}
# 创建规则组
resource "aws_wafv2_rule_group" "rategroup" {
provider = aws.us-east-1
name = "rate-group-control"
scope = "CLOUDFRONT"
capacity = 100
rule {
name = "rate-control"
priority = 0
action {
block {}
}
statement {
rate_based_statement{
limit = 200
evaluation_window_sec = 60
aggregate_key_type =
"IP"
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name = "rate-control-test"
sampled_requests_enabled = true
}
}
visibility_config {
cloudwatch_metrics_enabled = true
metric_name =
"rate-control-group-test"
sampled_requests_enabled = true
}
}
# 创建 IPv4 IPSet
resource "aws_wafv2_ip_set" "ipv4_ipset" {
name =
"ipv4-ipset-ceshi-stg-test"
scope = "CLOUDFRONT"
ip_address_version =
"IPV4"
addresses = []
provider = aws.us-east-1
}
# 创建 IPv6 IPSet
resource "aws_wafv2_ip_set" "ipv6_ipset" {
name =
"ipv6-ipset-ceshi-stg-test"
scope = "CLOUDFRONT"
ip_address_version =
"IPV6"
addresses = []
provider = aws.us-east-1
}
lambda函数创建
resource "aws_lambda_function"
"custom_rbr_lambda" {
provider = aws.us-east-1
function_name =
var.function_name
role = var.role_arn
handler = var.handler
runtime = var.runtime
timeout = var.timeout
environment {
variables = {
SCOPE = "CLOUDFRONT"
IP_SET_ID_CUSTOM_V4 = var.ip_set_id_custom_v4
IP_SET_NAME_CUSTOM_V4 =
var.ip_set_name_custom_v4
IP_SET_ID_CUSTOM_V6 = var.ip_set_id_custom_v6
IP_SET_NAME_CUSTOM_V6 =
var.ip_set_name_custom_v6
CONFIG_LOG_BUCKET = var.config_log_bucket
CONFIG_LOG_KEY = var.config_log_key
RATE_RULE_NAME = var.rate_rule_name
WEB_ACL_NAME = var.web_acl_name
WEB_ACL_ID = var.web_acl_id
RATE_RULE_GROUP_NAME =
var.rate_rule_group_name
CUSTOM_BLOCK_PERIOD = var.custom_block_period
}
}
source_code_hash =
filebase64sha256("modules/waf/lambda_function/lambda.zip")
filename =
"modules/waf/lambda_function/lambda.zip"
}
执行操作的函数文件代码如下。
import json
import boto3
import logging
import datetime
import os
region_name = 'us-east-1'
wafv2_client = boto3.client('wafv2', region_name = region_name)
s3_client = boto3.client('s3', region_name = region_name)
def update_custom_ipset_and_config(log, latest_ipv4_blocked_list,
latest_ipv6_blocked_list):
try:
# update the custom v4
IP set
ipv4_lock_token =
get_lock_token(
log, wafv2_client,
os.getenv('IP_SET_ID_CUSTOM_V4'),
os.getenv('IP_SET_NAME_CUSTOM_V4')
)
update_ip_set(
log, wafv2_client,
os.getenv('IP_SET_ID_CUSTOM_V4'),
list(latest_ipv4_blocked_list.keys()),
ipv4_lock_token,
os.getenv('IP_SET_NAME_CUSTOM_V4')
)
# update the custom v6
IP set
ipv6_lock_token =
get_lock_token(
log, wafv2_client,
os.getenv('IP_SET_ID_CUSTOM_V6'),
os.getenv('IP_SET_NAME_CUSTOM_V6')
)
update_ip_set(
log, wafv2_client,
os.getenv('IP_SET_ID_CUSTOM_V6'),
list(latest_ipv6_blocked_list.keys()),
ipv6_lock_token,
os.getenv('IP_SET_NAME_CUSTOM_V6')
)
except Exception as e:
# log error message
log.error("[update_custom_ipset_and_config] "
"Error
updating custom ipset.")
raise e
try:
# create json object
of the latest custom config
latest_custom_config =
{
'IPv4':
latest_ipv4_blocked_list,
'IPv6':
latest_ipv6_blocked_list
}
byte_latest_custom_config = json.dumps(latest_custom_config).encode()
# upload the config to
s3
s3_client.put_object(
Bucket=os.getenv('CONFIG_LOG_BUCKET'),
Body=byte_latest_custom_config,
Key=os.getenv('CONFIG_LOG_KEY')
)
log.error("[update_custom_ipset_and_config] "
"Config
uploaded to S3 successfully." + "---------------"
+str(latest_ipv4_blocked_list))
except Exception as e:
# log error message
log.error("[update_custom_ipset_and_config] "
"Error
uploading config to S3.")
raise e
def get_lock_token(log, wafv2_client, ip_set_id, name):
try:
ipv4_get_response =
wafv2_client.get_ip_set(
Scope=os.getenv('SCOPE'),
Name=name,
Id=ip_set_id
)
return
ipv4_get_response['LockToken']
except Exception as e:
log.error(f"Error
in get_lock_token: {e}")
raise
def update_ip_set(log, wafv2_client, ip_set_id, addresses,
lock_token,
name):
try:
wafv2_client.update_ip_set(
Scope=os.getenv('SCOPE'),
Name=name,
Id=ip_set_id,
Description='Last
Update: ' +
datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S %Z%z"),
Addresses=addresses,
LockToken=lock_token
)
except Exception as e:
log.error("Error
in update_ip_set: {}".format(e))
raise
def sync_ip_from_rbr_to_custom_ipset(log, rbr_managed_ip_list,
custom_managed_ip_config):
# Get the current
timestamp in UTC format
utc_now_timestamp =
datetime.datetime.now(
datetime.timezone.utc)
# Convert the timestamp to
string
utc_now_timestamp_str =
utc_now_timestamp.strftime(
"%Y-%m-%d
%H:%M:%S %Z%z")
# Iterate over the managed
IPs in the RBR list
for managed_ip in
rbr_managed_ip_list:
# If the IP is already
in the custom IP config
if managed_ip in
custom_managed_ip_config.keys():
# Get the
timestamp when the IP was blocked in UTC format
utc_blocked_at =
datetime.datetime.strptime(
custom_managed_ip_config[managed_ip],
"%Y-%m-%d
%H:%M:%S %Z%z").astimezone(
datetime.timezone.utc)
# Calculate the
difference in minutes between now and when the IP
# was blocked
total_diff_min =
((utc_now_timestamp - utc_blocked_at)
.total_seconds()) / 60
# If the
difference is greater than block period, update the timestamp
if
round(total_diff_min) >= int(os.getenv('CUSTOM_BLOCK_PERIOD')):
custom_managed_ip_config[managed_ip] = utc_now_timestamp_str
# If the IP is not in
the custom IP config, add it with the current
# timestamp
else:
custom_managed_ip_config[managed_ip] = utc_now_timestamp_str
# Create a new dictionary to
store the latest blocked IPs
latest_ip_blocked_list =
{}
# Iterate over the custom
IP config
for blocked_ip,
blocked_at_str in custom_managed_ip_config.items():
# Get the timestamp
when the IP was blocked in UTC format
utc_blocked_at =
datetime.datetime.strptime(
custom_managed_ip_config[blocked_ip],
"%Y-%m-%d
%H:%M:%S %Z%z").astimezone(datetime.timezone.utc)
# Calculate the
difference in minutes between now and when the IP
# was blocked
total_diff_min =
((utc_now_timestamp - utc_blocked_at)
.total_seconds()) / 60
# If the difference is
less than the custom block period
#then add it to the
latest blocked IPs list
if
round(total_diff_min) < int(os.getenv('CUSTOM_BLOCK_PERIOD')):
latest_ip_blocked_list[blocked_ip] = blocked_at_str
return
latest_ip_blocked_list
def get_custom_config_file(log):
try:
# Get the custom
config file from S3
s3_response =
s3_client.get_object(
Bucket=os.getenv('CONFIG_LOG_BUCKET'),
Key=os.getenv('CONFIG_LOG_KEY')
)
# Load the custom
config file as a JSON object
custom_managed_ip_config = json.loads(
s3_response['Body'].read()
)
except Exception as e:
log.error("[get_custom_config_file] Error to get the custom config
"
"file
from S3")
log.error(e)
# If there is an
error, return an empty config
custom_managed_ip_config = {'IPv4': {}, 'IPv6': {}}
return
custom_managed_ip_config
def get_rbr_managed_ip_list(log, rule_name):
try:
# Get the list of IPs
blocked by the rate based rule
wafv2_response =
wafv2_client.get_rate_based_statement_managed_keys(
Scope=os.getenv('SCOPE'),
WebACLName=os.getenv('WEB_ACL_NAME'),
WebACLId=os.getenv('WEB_ACL_ID'),
RuleGroupRuleName=os.getenv('RATE_RULE_GROUP_NAME'),
RuleName=rule_name
)
return wafv2_response
except Exception as e:
log.error("[get_rbr_managed_ip_list] "
"Error
to get the list of IP blocked by rate based rule")
log.error(e)
# If there is an
error, raise the exception
raise e
def lambda_handler(event, context):
log = logging.getLogger()
try:
# Set Log Level
log.setLevel(logging.ERROR)
# Get the list of IP
blocked by rate based rule
ipv4_addresses = set()
ipv6_addresses = set()
for rule_name in
os.getenv('RATE_RULE_NAME').split(","):
rbr_managed_list =
get_rbr_managed_ip_list(log, rule_name.strip())
for ipv4_address
in rbr_managed_list['ManagedKeysIPV4']['Addresses']:
ipv4_addresses.add(ipv4_address)
for ipv6_address
in rbr_managed_list['ManagedKeysIPV6']['Addresses']:
ipv6_addresses.add(ipv6_address)
# Get custom config
file from S3
custom_managed_ip_config = get_custom_config_file(log)
# Update IP from rate
based rule list to custom list
latest_ipv4_blocked_list = sync_ip_from_rbr_to_custom_ipset(
log,
list(ipv4_addresses),
custom_managed_ip_config['IPv4'])
latest_ipv6_blocked_list = sync_ip_from_rbr_to_custom_ipset(
log,
list(ipv6_addresses),
custom_managed_ip_config['IPv6'])
# Update latest
blocked list to S3 and WAF IPset
update_custom_ipset_and_config(log, latest_ipv4_blocked_list,
latest_ipv6_blocked_list)
log.error(str(get_rbr_managed_ip_list(log, rule_name.strip())))
return {
'statusCode': 200,
'body':
json.dumps('Update Success!')
}
except Exception as e:
log.error(e)
return {
'statusCode': 500,
'body': e
}
创建eventbridge规则
并关联上面创建的lambda。
resource "aws_cloudwatch_event_rule"
"schedule_rule" {
provider = aws.us-east-1
name = var.rule_name
schedule_expression =
var.schedule_expression
}
resource "aws_cloudwatch_event_target"
"lambda_target" {
provider = aws.us-east-1
rule = aws_cloudwatch_event_rule.schedule_rule.name
target_id =
"custom_rbr_lambda"
arn = var.lambda_function_arn
}
resource "aws_lambda_permission"
"allow_eventbridge" {
provider = aws.us-east-1
statement_id = "AllowExecutionFromEventBridge"
action = "lambda:InvokeFunction"
function_name =
var.lambda_function_name
principal = "events.amazonaws.com"
source_arn =
aws_cloudwatch_event_rule.schedule_rule.arn
}
然后就是创建一个S3来存储blockIP。
resource "aws_s3_bucket" "custom_rbr_log_bucket" {
provider = aws.us-east-1
bucket = var.bucket_name
force_destroy = true
}
# 配置服务器端加密
resource "aws_s3_bucket_server_side_encryption_configuration" "custom_rbr_log_bucket_encryption" {
provider = aws.us-east-1
bucket = aws_s3_bucket.custom_rbr_log_bucket.id
rule {
apply_server_side_encryption_by_default {
sse_algorithm = "AES256"
}
}
}
# 设置公共访问阻止配置
resource "aws_s3_bucket_public_access_block" "custom_rbr_log_bucket_access_block" {
provider = aws.us-east-1
bucket = aws_s3_bucket.custom_rbr_log_bucket.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
主要代码就是这些,当然其中还有一些涉及到aws资源之间访问的iam声明和定义,这里就不做赘叙。
原文始发于微信公众号(安全无界):AWS实现自动IP封禁
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论