AWS实现自动IP封禁

admin 2025年4月28日09:26:20评论2 views字数 13078阅读43分35秒阅读模式

一个人的安全,那就总得想一些办法来优化一下自己的工作,这个时候就不得不思恋国内的waf的易操作性,这aws玩了个遍,waf这块就有点小不和谐,咱就说不能整一个一键操作嘛,直接开启连锁功能多好,哈哈,开玩笑,有点不严瑾啦(也有可能是我太菜不太会玩)。

AWS实现自动IP封禁

因为业务都是global的,所以这被骚扰的频率就有点小高,总不能每次都自己上去分析然后block吧,所以有了下文,自动封禁。

AWS实现自动IP封禁

这里实现全部采用terraform代码完成(terraform是一款比较好用的开源IaC工具。具体可以看:https://registry.terraform.io/providers/hashicorp/aws/latest/docs,这里就不做赘叙啦)。

实现原理主要基于扫描行为即访问频率控制,使用waf设置rate规则,1分钟访问超过200次(这个可以根据具体业务情况进行实际调整)即为异常流量。然后使用awslambda来处理事件,lambda主要做2件事件,提取IP,与block ip list进行对比,如果IP为新的IP,则存入S3以及更新wafblock list

核心思路就是这样,然后lambda不会自动触发,需要再加一个规则让lambda持续触发,这里用的是awseventbridge。原理就讲到这里,直接上主体代码。

创建WAF规则

provider "aws" {    region ="ap-northeast-1"}provider "aws" {    alias  = "us-east-1"   region ="us-east-1"}module "lambda_function" {  source                    = "./modules/waf/lambda_function"  function_name             ="custom-rbr-lambda-ceshi-stg-test"  role_arn                  = module.iam_role.role_arn  handler                   ="index.lambda_handler"  runtime                   = "python3.9"  timeout                   = 10  ip_set_id_custom_v4      = aws_wafv2_ip_set.ipv4_ipset.id  ip_set_name_custom_v4     = aws_wafv2_ip_set.ipv4_ipset.name  ip_set_id_custom_v6      = aws_wafv2_ip_set.ipv6_ipset.id  ip_set_name_custom_v6     = aws_wafv2_ip_set.ipv6_ipset.name  config_log_bucket         =module.custom_rbr_log_bucket.bucket_name  config_log_key            = "blocked_ips_list.json"  rate_rule_name            = "rate-control"  web_acl_name              ="ceshi-stg-webacl-Sample-test"  web_acl_id                =aws_wafv2_web_acl.ceshi-stg-webacl.id  rate_rule_group_name      = "rate-group-control"  custom_block_period       = 10080}module "custom_rbr_log_bucket" {  source     = "./modules/waf/s3_bucket"  bucket_name ="custom-rbr-log-bucket-ceshi-stg-test"}module "iam_role" {  source    = "./modules/waf/iam_role"  role_name ="lambda_rbr_role-ceshi-stg-test"}module "eventbridge_rule" {  source              = "./modules/waf/eventbridge"  rule_name           ="eventbridge-rule-ceshi-stg-test"  lambda_function_arn =module.lambda_function.lambda_function_arn  lambda_function_name =module.lambda_function.lambda_function_name  schedule_expression ="rate(1 minute)"}# 创建ACLresource "aws_wafv2_web_acl" "ceshi-stg-webacl"{  provider = aws.us-east-1  name        ="ceshi-stg-webacl-Sample-test"  description ="ceshi-stg-webacl-test"  scope       = "CLOUDFRONT"  default_action {    allow {}    }   rule {    name     = "rate-group-control"    priority = 1    override_action {         none {}     }    statement {rule_group_reference_statement {           arn =aws_wafv2_rule_group.rategroup.arn      }    }    visibility_config {cloudwatch_metrics_enabled = true      metric_name                ="Sample-ceshi-stg-rate-control-group-test"sampled_requests_enabled    = true    }  }    rule {    name     = "block-ip"    priority = 0    action {      allow {}    }     statement {ip_set_reference_statement {        arn =aws_wafv2_ip_set.ipv4_ipset.arn      }    }    visibility_config {cloudwatch_metrics_enabled = true      metric_name                ="Sample-ceshi-stg-ip-block-test"      sampled_requests_enabled    = true    }  }  visibility_config {    cloudwatch_metrics_enabledtrue    metric_name                ="Sample-ceshi-stg-webacl-test"sampled_requests_enabled   = true  }}# 创建规则组resource "aws_wafv2_rule_group" "rategroup" {  provider = aws.us-east-1  name     = "rate-group-control"  scope    = "CLOUDFRONT"  capacity = 100  rule {    name     = "rate-control"    priority = 0    action {      block {}    }    statement {      rate_based_statement{          limit = 200evaluation_window_sec = 60          aggregate_key_type ="IP"      }    }    visibility_config {cloudwatch_metrics_enabled = true      metric_name                = "rate-control-test"sampled_requests_enabled   = true    }  }   visibility_config {cloudwatch_metrics_enabled = true      metric_name                ="rate-control-group-test"sampled_requests_enabled   = true    }}# 创建 IPv4 IPSetresource "aws_wafv2_ip_set" "ipv4_ipset" {  name               ="ipv4-ipset-ceshi-stg-test"  scope              = "CLOUDFRONT"  ip_address_version ="IPV4"  addresses          = []  provider = aws.us-east-1}# 创建 IPv6 IPSetresource "aws_wafv2_ip_set" "ipv6_ipset" {  name               ="ipv6-ipset-ceshi-stg-test"  scope              = "CLOUDFRONT"  ip_address_version ="IPV6"  addresses          = []  provider = aws.us-east-1}

lambda函数创建

resource "aws_lambda_function""custom_rbr_lambda" {  provider = aws.us-east-1  function_name =var.function_name  role          = var.role_arn  handler       = var.handler  runtime       = var.runtime  timeout       = var.timeout  environment {    variables = {      SCOPE                     = "CLOUDFRONT"      IP_SET_ID_CUSTOM_V4      = var.ip_set_id_custom_v4IP_SET_NAME_CUSTOM_V4     =var.ip_set_name_custom_v4      IP_SET_ID_CUSTOM_V6      = var.ip_set_id_custom_v6IP_SET_NAME_CUSTOM_V6     =var.ip_set_name_custom_v6      CONFIG_LOG_BUCKET         = var.config_log_bucket      CONFIG_LOG_KEY            = var.config_log_key        RATE_RULE_NAME            = var.rate_rule_name        WEB_ACL_NAME              = var.web_acl_name      WEB_ACL_ID                = var.web_acl_idRATE_RULE_GROUP_NAME      =var.rate_rule_group_name      CUSTOM_BLOCK_PERIOD       = var.custom_block_period    }  }  source_code_hash =filebase64sha256("modules/waf/lambda_function/lambda.zip")  filename         ="modules/waf/lambda_function/lambda.zip"}

执行操作的函数文件代码如下。

import jsonimport boto3import loggingimport datetimeimport osregion_name = 'us-east-1'wafv2_client = boto3.client('wafv2', region_name = region_name)s3_client = boto3.client('s3', region_name = region_name)def update_custom_ipset_and_config(log, latest_ipv4_blocked_list,latest_ipv6_blocked_list):    try:        # update the custom v4IP set        ipv4_lock_token =get_lock_token(            log, wafv2_client,os.getenv('IP_SET_ID_CUSTOM_V4'),            os.getenv('IP_SET_NAME_CUSTOM_V4')        )        update_ip_set(            log, wafv2_client,os.getenv('IP_SET_ID_CUSTOM_V4'),list(latest_ipv4_blocked_list.keys()),            ipv4_lock_token,os.getenv('IP_SET_NAME_CUSTOM_V4')        )        # update the custom v6IP set        ipv6_lock_token =get_lock_token(            log, wafv2_client,os.getenv('IP_SET_ID_CUSTOM_V6'),os.getenv('IP_SET_NAME_CUSTOM_V6')        )        update_ip_set(            log, wafv2_client,os.getenv('IP_SET_ID_CUSTOM_V6'),list(latest_ipv6_blocked_list.keys()),            ipv6_lock_token,os.getenv('IP_SET_NAME_CUSTOM_V6')        )    except Exception as e:        # log error messagelog.error("[update_custom_ipset_and_config] "                  "Errorupdating custom ipset.")        raise e    try:        # create json objectof the latest custom config        latest_custom_config ={            'IPv4':latest_ipv4_blocked_list,            'IPv6':latest_ipv6_blocked_list        }byte_latest_custom_config = json.dumps(latest_custom_config).encode()        # upload the config tos3        s3_client.put_object(Bucket=os.getenv('CONFIG_LOG_BUCKET'),Body=byte_latest_custom_config,            Key=os.getenv('CONFIG_LOG_KEY')        )log.error("[update_custom_ipset_and_config] "                 "Configuploaded to S3 successfully." + "---------------"+str(latest_ipv4_blocked_list))    except Exception as e:        # log error messagelog.error("[update_custom_ipset_and_config] "                  "Erroruploading config to S3.")        raise edef get_lock_token(log, wafv2_client, ip_set_id, name):    try:        ipv4_get_response =wafv2_client.get_ip_set(            Scope=os.getenv('SCOPE'),            Name=name,            Id=ip_set_id        )        returnipv4_get_response['LockToken']    except Exception as e:        log.error(f"Errorin get_lock_token: {e}")        raisedef update_ip_set(log, wafv2_client, ip_set_id, addresses,                  lock_token,name):    try:wafv2_client.update_ip_set(Scope=os.getenv('SCOPE'),            Name=name,            Id=ip_set_id,            Description='LastUpdate: ' +datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z%z"),Addresses=addresses,LockToken=lock_token        )    except Exception as e:        log.error("Errorin update_ip_set: {}".format(e))        raisedef sync_ip_from_rbr_to_custom_ipset(log, rbr_managed_ip_list,custom_managed_ip_config):    # Get the currenttimestamp in UTC format    utc_now_timestamp =datetime.datetime.now(        datetime.timezone.utc)    # Convert the timestamp tostring    utc_now_timestamp_str =utc_now_timestamp.strftime(        "%Y-%m-%d%H:%M:%S %Z%z")    # Iterate over the managedIPs in the RBR list    for managed_ip inrbr_managed_ip_list:        # If the IP is alreadyin the custom IP config        if managed_ip incustom_managed_ip_config.keys():            # Get thetimestamp when the IP was blocked in UTC format            utc_blocked_at =datetime.datetime.strptime(custom_managed_ip_config[managed_ip],                "%Y-%m-%d%H:%M:%S %Z%z").astimezone(datetime.timezone.utc)            # Calculate thedifference in minutes between now and when the IP            # was blocked            total_diff_min =((utc_now_timestamp - utc_blocked_at)                              .total_seconds()) / 60            # If thedifference is greater than block period, update the timestamp            ifround(total_diff_min) >= int(os.getenv('CUSTOM_BLOCK_PERIOD')):custom_managed_ip_config[managed_ip] = utc_now_timestamp_str        # If the IP is not inthe custom IP config, add it with the current        # timestamp        else:custom_managed_ip_config[managed_ip] = utc_now_timestamp_str    # Create a new dictionary tostore the latest blocked IPs    latest_ip_blocked_list ={}    # Iterate over the customIP config    for blocked_ip,blocked_at_str in custom_managed_ip_config.items():        # Get the timestampwhen the IP was blocked in UTC format        utc_blocked_at =datetime.datetime.strptime(custom_managed_ip_config[blocked_ip],            "%Y-%m-%d%H:%M:%S %Z%z").astimezone(datetime.timezone.utc)        # Calculate thedifference in minutes between now and when the IP        # was blocked        total_diff_min =((utc_now_timestamp - utc_blocked_at).total_seconds()) / 60        # If the difference isless than the custom block period        #then add it to thelatest blocked IPs list        ifround(total_diff_min) < int(os.getenv('CUSTOM_BLOCK_PERIOD')):latest_ip_blocked_list[blocked_ip] = blocked_at_str    returnlatest_ip_blocked_listdef get_custom_config_file(log):    try:        # Get the customconfig file from S3        s3_response =s3_client.get_object(Bucket=os.getenv('CONFIG_LOG_BUCKET'),Key=os.getenv('CONFIG_LOG_KEY')        )        # Load the customconfig file as a JSON objectcustom_managed_ip_config = json.loads(s3_response['Body'].read()        )    except Exception as e:log.error("[get_custom_config_file] Error to get the custom config"                  "filefrom S3")        log.error(e)        # If there is anerror, return an empty configcustom_managed_ip_config = {'IPv4': {}, 'IPv6': {}}    returncustom_managed_ip_configdef get_rbr_managed_ip_list(log, rule_name):    try:               # Get the list of IPsblocked by the rate based rule        wafv2_response =wafv2_client.get_rate_based_statement_managed_keys(          Scope=os.getenv('SCOPE'),WebACLName=os.getenv('WEB_ACL_NAME'),WebACLId=os.getenv('WEB_ACL_ID'),RuleGroupRuleName=os.getenv('RATE_RULE_GROUP_NAME'),          RuleName=rule_name        )        return wafv2_response    except Exception as e:log.error("[get_rbr_managed_ip_list] "                  "Errorto get the list of IP blocked by rate based rule")        log.error(e)        # If there is anerror, raise the exception        raise edef lambda_handler(event, context):    log = logging.getLogger()    try:        # Set Log Levellog.setLevel(logging.ERROR)        # Get the list of IPblocked by rate based rule        ipv4_addresses = set()        ipv6_addresses = set()        for rule_name inos.getenv('RATE_RULE_NAME').split(","):            rbr_managed_list =get_rbr_managed_ip_list(log, rule_name.strip())            for ipv4_addressin rbr_managed_list['ManagedKeysIPV4']['Addresses']:ipv4_addresses.add(ipv4_address)            for ipv6_addressin rbr_managed_list['ManagedKeysIPV6']['Addresses']:ipv6_addresses.add(ipv6_address)        # Get custom configfile from S3custom_managed_ip_config = get_custom_config_file(log)        # Update IP from ratebased rule list to custom listlatest_ipv4_blocked_list = sync_ip_from_rbr_to_custom_ipset(              log,list(ipv4_addresses),custom_managed_ip_config['IPv4'])latest_ipv6_blocked_list = sync_ip_from_rbr_to_custom_ipset(              log,list(ipv6_addresses),custom_managed_ip_config['IPv6'])        # Update latestblocked list to S3 and WAF IPsetupdate_custom_ipset_and_config(log, latest_ipv4_blocked_list,latest_ipv6_blocked_list)log.error(str(get_rbr_managed_ip_list(log, rule_name.strip())))        return {            'statusCode'200,            'body':json.dumps('Update Success!')        }    except Exception as e:        log.error(e)        return {            'statusCode'500,            'body': e        }

创建eventbridge规则

并关联上面创建的lambda

resource "aws_cloudwatch_event_rule""schedule_rule" {  provider = aws.us-east-1  name               = var.rule_name  schedule_expression =var.schedule_expression}resource "aws_cloudwatch_event_target""lambda_target" {  provider = aws.us-east-1  rule      = aws_cloudwatch_event_rule.schedule_rule.name  target_id ="custom_rbr_lambda"  arn       = var.lambda_function_arn}resource "aws_lambda_permission""allow_eventbridge" {  provider = aws.us-east-1  statement_id  = "AllowExecutionFromEventBridge"  action        = "lambda:InvokeFunction"  function_name =var.lambda_function_name  principal     = "events.amazonaws.com"  source_arn    =aws_cloudwatch_event_rule.schedule_rule.arn}

然后就是创建一个S3来存储blockIP。

resource "aws_s3_bucket" "custom_rbr_log_bucket" {  provider = aws.us-east-1  bucket = var.bucket_name  force_destroy = true}# 配置服务器端加密resource "aws_s3_bucket_server_side_encryption_configuration" "custom_rbr_log_bucket_encryption" {  provider = aws.us-east-1  bucket = aws_s3_bucket.custom_rbr_log_bucket.id  rule {    apply_server_side_encryption_by_default {      sse_algorithm = "AES256"    }  }}# 设置公共访问阻止配置resource "aws_s3_bucket_public_access_block" "custom_rbr_log_bucket_access_block" {  provider = aws.us-east-1  bucket = aws_s3_bucket.custom_rbr_log_bucket.id  block_public_acls       = true  block_public_policy     = true  ignore_public_acls      = true  restrict_public_buckets = true}

主要代码就是这些,当然其中还有一些涉及到aws资源之间访问的iam声明和定义,这里就不做赘叙。

原文始发于微信公众号(安全无界):AWS实现自动IP封禁

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2025年4月28日09:26:20
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   AWS实现自动IP封禁https://cn-sec.com/archives/4008937.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息