提要

在有动态公网IPV6的情况下,使用DDNS和自定义端口对外提供WEB服务并不难,但是面临两个重要问题:

  • 如果客户端不支持IPV6,怎么办?
  • 需要打开防火墙,暴漏端口,是否不太安全?

对外提供Web服务的方案很多,比如使用端口映射方案,我也曾使用过 fatedier/frp 这样的开源项目,但是还需要另外再维护一台VPS,其带宽和成本都是个问题,另外,端口映射的方案其实也并不安全,所以我选用了另一种更加适合我目前场景的方案:

腾讯云 Edge One

EdgeOne 的全称叫做边缘安全加速平台,其中的具体介绍可以查看腾讯云的产品介绍,这里就不再赘述,可以简单的把他看作一个更加安全的 CDN 。我选择

  • ¥9.9/月:1个站点,50GB安全加速流量,300万次安全加速请求,具有CDN功能、智能加速功能,并且提供了一些基本的DDoS防护、Web防护、基础 CC 防护。注意:这里的一个站点,指的是一个主域名,可以在这个站点里添加若干子域名。
  • 支持IPV6的自定义端口源站:也就是说,CDN可以直接从指定v6地址的指定端口回源,而客户端访问web时,不再需要输入端口,也不必关注客户端是否支持v6地址。
  • 多个子域名可以共用同一个源站组:可以配置一个源站组,然后在各个子域中引入这个源站组。这样,在IPV6地址变更时,只需要更新源站组的IP即可,不必操作多个源站。
  • 支持使用API更新源站组:可以在源站上使用自动化脚本来监测IPV6地址变更情况,并及时自动更新源站组。
  • 分大陆和海外提供了详细的节点IP地址的获取地址:可以自动化的更新防火墙,仅对 EdgeOne 节点放通入站请求,规避一些因开放防火墙导致的安全问题。

自动化更新防火墙配置

我的服务部署在一台 Windows Server 上,并使用 Python 脚本进行计划任务,来更新防火墙。

 
import requests
import subprocess
 
 
def get_ips(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        ip_list = response.text.strip().split('\n')
        return ','.join(ip_list)
    except requests.RequestException:
        return None
 
 
def read_previous_ips(filename):
    try:
        with open(filename, 'r') as file:
            return file.read().strip()
    except FileNotFoundError:
        return None
 
 
def write_ips(filename, ips):
    with open(filename, 'w') as file:
        file.write(ips)
 
 
def check_rule_exists(name):
    cmd = f"netsh advfirewall firewall show rule name=\"{name}\""
    result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
    return name in result.stdout
 
 
def update_firewall_rule(name, ips, protocol="TCP", direction="in", action="allow"):
    delete_cmd = f"netsh advfirewall firewall delete rule name=\"{name}\""
    add_cmd = f"netsh advfirewall firewall add rule name=\"{name}\" dir={direction} action={action} protocol={protocol} remoteip={ips}"
    subprocess.run(delete_cmd, shell=True)  # 删除旧规则
    subprocess.run(add_cmd, shell=True)  # 添加新规则
 
 
def add_firewall_rule(name, ips, protocol="TCP", direction="in", action="allow"):
    cmd = f"netsh advfirewall firewall add rule name=\"{name}\" dir={direction} action={action} protocol={protocol} remoteip={ips}"
    subprocess.run(cmd, shell=True)  # 创建规则
 
 
def main(ip_versions, areas):
    changes_made = False
    for version in ip_versions:
        for area in areas:
            url = f"https://api.edgeone.ai/ips?version={version}&area={area}"
            ips = get_ips(url)
            if ips:
                filename = f"{version}_{area}_ips.txt"
                previous_ips = read_previous_ips(filename)
                if ips != previous_ips:
                    rule_name = f"@EdgeOne_{version}_{area}"
                    if check_rule_exists(rule_name):
                        update_firewall_rule(rule_name, ips)
                    else:
                        add_firewall_rule(rule_name, ips)
                    write_ips(filename, ips)
                    changes_made = True
 
    return changes_made
 
 
if __name__ == "__main__":
    main(['v4', 'v6'], ['mainland-china', 'overseas'])
 

腾讯云通过地址 https://api.edgeone.ai/ips 提供EdgeOne的节点IP,并接收参数 versionarea 来获取指定的节点IP。

我的脚本创建了四条防火墙规则,并在节点IP发生更新时,删除旧的防火墙规则并添加新的。该脚本示例中,并未对端口进行限制。

自动更新源站组IP

import hashlib
import hmac
import os
import logging
import time
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler
import socket
import json
import requests
 
 
def get_ipv6_addresses():
    """
    一般来说,240e开头是电信,2048开头是联通,2409开头是移动,所以仅筛选这些IP地址。
    """
    ipv6_addrs = []
    for addrinfo in socket.getaddrinfo(socket.gethostname(), None):
        if addrinfo[0] == socket.AF_INET6:
            addr = addrinfo[4][0]
            if addr.startswith("240e") or addr.startswith("2048") or addr.startswith("2409"):
                ipv6_addrs.append(addr)
    return ipv6_addrs
 
 
def read_old_ipv6_list(file_path):
    """
    从文件中读取旧的IPv6地址列表。
    """
    if os.path.exists(file_path):
        with open(file_path, 'r') as file:
            return file.read().splitlines()
    return []
 
 
def write_new_ipv6_list(file_path, ipv6_list):
    """
    将新的IPv6地址列表写入文件。
    """
    with open(file_path, 'w') as file:
        file.write('\n'.join(ipv6_list))
 
 
def modify_origin_group(iplist, secret_id, secret_key, group_id):
    # 请求信息
    service: str = 'teo'
    host: str = 'teo.tencentcloudapi.com'
    endpoint: str = 'https://' + host
    action: str = 'ModifyOriginGroup'
    version: str = '2022-09-01'
    algorithm: str = 'TC3-HMAC-SHA256'
    timestamp: int = int(time.time())
    date: str = datetime.fromtimestamp(timestamp, timezone.utc).strftime('%Y-%m-%d')
    content_type: str = 'application/json; charset=utf-8'
    http_request_method: str = 'POST'
    canonical_uri: str = '/'
    canonical_query_string: str = ''
    signed_headers: str = 'content-type;host;x-tc-action'
 
    records_list: list[dict] = []
 
    for ip in iplist:
        records_list.append({"Record": ip, "Type": "IP_DOMAIN", "Weight": 100})
 
    # 请求正文
    dict_payload = {"ZoneId": "zone-2stwsezgh8pe", "GroupId": group_id, "Records": records_list}
    payload = json.dumps(dict_payload)
    hashed_request_payload: str = hashlib.sha256(payload.encode('utf-8')).hexdigest()
    canonical_headers: str = f'content-type:{content_type}\nhost:{host}\nx-tc-action:{action.lower()}\n'
    canonical_request: str = (http_request_method + '\n' +
                              canonical_uri + '\n' +
                              canonical_query_string + '\n' +
                              canonical_headers + '\n' +
                              signed_headers + '\n' +
                              hashed_request_payload)
 
    # 拼接待签名字符串
    credential_scope = f'{date}/{service}/tc3_request'
    hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()
    string_to_sign = f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}"
 
    # 计算签名
    def sign(key, message):
        return hmac.new(key, message.encode('utf-8'), hashlib.sha256).digest()
 
    secret_date = sign(('TC3' + secret_key).encode('utf-8'), date)
    secret_service = sign(secret_date, service)
    secret_signing = sign(secret_service, 'tc3_request')
    signature = hmac.new(secret_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
    authorization = (f'{algorithm} '
                     f'Credential={secret_id}/{credential_scope}, '
                     f'SignedHeaders={signed_headers}, '
                     f'Signature={signature}')
    # 发送请求
    headers = {
        'Authorization': authorization,
        'Content-Type': content_type,
        'Host': host,
        'X-TC-Action': action,
        'X-TC-Version': version,
        'X-TC-Timestamp': str(timestamp)
    }
 
    response = requests.post(endpoint, headers=headers, data=payload).json()
 
    return response
 
 
def setup_logging():
    """
    设置日志记录。
    """
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    handler = RotatingFileHandler("ipv6_changes.log", maxBytes=200 * 1024, backupCount=1)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    return logger
 
 
def main(secret_id, secret_key, group_id):
    ipv6_list = get_ipv6_addresses()
    old_ipv6_list = read_old_ipv6_list("ipv6_list.txt")
 
    logger = setup_logging()
 
    if set(ipv6_list) == set(old_ipv6_list):
        logger.info(f"未变更.")
        return
 
    write_new_ipv6_list("ipv6_list.txt", ipv6_list)
    result = modify_origin_group(ipv6_list, secret_id, secret_key, group_id)
 
    error_msg = result.get("Response", {}).get("Error", {}).get("Message", "")
    error_code = result.get("Response", {}).get("Error", {}).get("Code", "")
 
    if "Retry your request" in error_msg:
        result = modify_origin_group(ipv6_list, secret_id, secret_key, group_id)
        error_msg = result.get("Response", {}).get("Error", {}).get("Message", "")
        error_code = result.get("Response", {}).get("Error", {}).get("Code", "")
 
    if not error_msg:
        message = "#EdgeOne\nIPv6地址已变更,更新EdgeOne源站组IP成功。"
        logger.info(f"Success.")
    else:
        message = f"#EdgeOne\nIPv6地址已变更,更新EdgeOne源站组IP失败。错误信息如下:\n{error_code}:{error_msg}"
        logger.error(f"{error_code}:{error_msg}")
 
    return message
 
 
if __name__ == "__main__":
    SecretId = ""
    SecretKey = ""
    GroupId = ""
    main(SecretId, SecretKey, GroupId)

该脚本需要使用腾讯云账号的 SecretIDSecretKey,以及源站组的 GroupId,可以在源站组页面查到。

结语

该方案和内网穿透相比,有一定局限性,比较依赖运行商是否开放IPV6,其流量也很少。但是不局限于内网穿透的vps带宽,并且仅仅用于web服务时,价格相对还算可接受范围。并且提供了一些基础防护和CDN,总的来说,我的使用体验还不错。

sequenceDiagram
    用户-->>+EdgeOne: ipv4发起请求
    EdgeOne-->>+服务器: 公网V6回源
    服务器-->>EdgeOne: 
    EdgeOne-->>+用户: 响应请求
    
    用户->>+EdgeOne: ipv6发起请求
    EdgeOne->>+用户: 返回302
    用户->>+服务器: ipv6发起请求
    服务器->>+用户: 响应请求