开源自动化网络设备备份工具——轻松实现千台设备备份

admin 2023年4月27日20:37:20评论134 views字数 15633阅读52分6秒阅读模式


转载知乎:网络小斐 

原文链接:https://zhuanlan.zhihu.com/p/603084870 

公众号:网络小斐 在IT江湖中狂飙

背景需求

首先,在等级保护测评要求中,应提供数据的本地备份机制,每天备份至本地,且场外存放,其中包括主要网络设备配置文件。

其次,日常网络运维,为了审计和防止设备故障导致设备配置丢失而造成长时间网络故障,需要对网络设备的配置做策略备份。

开源软件

  • oxidized - https://github.com/ytti/oxidized

  • Bacula - https://www.bacula.org/source-download-center

开源软件场景:跨平台,支持多厂商系统,UI界面友好,功能强大,备份策略可自定义。

自写脚本

通常现网中常用到的脚本为python脚本和GO脚本,而Python脚本的适用性更强

自写脚本场景:自定义强,适应度高,可根据自身的需求不断优化迭代。

Python脚本备份

需求说明:

公司内部企业网交换机和防火墙以及其他配套设备需要做配置备份,需要满足以下几点需求:

  • 支持华为、思科、华三、锐捷等主流厂商交换机系统

  • 支持密码加密和解密

  • 支持成功备份邮件通知

  • 支持TFTP和FTP登录上传备份文件

  • 支持可自定义备份策略

思路拓展:

1、首先是如何备份思考?

无外乎就是执行查看全局配置的命令,然后抓取回显,保存成配置文件。TFTP/FTP登录网元,拷贝配置文件,或者反过来网元作为客户端,FTP/TFTP搭建在Linux/Windows主机中。现在很多厂商新设备都是支持自动备份配置命令,这也是一种手段。

2、多厂商交换机系统支持,如果采用FTP就不需要考虑太多,因为基本上每家厂商都支持FTP。

3、每台设备的密码都不尽相同,需要针对密码做加密和解密操作,执行命令的时候不直接显示明文密码。

4、备份成功后,需通过阿里云邮箱发送邮件通知。

5、自定义备份策略,需要单独开启模块。

开源参考:

  • GitHub - BarryCui/NetDevOps: 网络设备配置备份。可以备份cisco ios交换机,华为quidway交换机以及fortigate防火墙的配置。

  • 蜗牛勇士/网络设备自动备份。

  • Feiinbeer:python脚本自动备份网络设备配置文件

自我实现:

1、前期完成基础的需求,并正式运用在测试环境中进行测试,并不断改善,后续应用在生产环境中。

2、中期思考如何结合Web UI进行监控和操作。

3、后期可以考虑形成一个比较完善的网络设备备份系统平台,进行总体闭环,并开源到Github和Gitee。

程序实例

网络设备配置备份程序

  • 网络设备的自动化实现方式:SNMP/NetConf + NAPALM/RestConf/OpenConfig

  • 本程序是通过最原始的ssh协议来备份网络设备的配置。

  • 使用场景为:不支持API的网络设备或运维人员不会调用API的场景。

  • 备份说明:备份方式是通过ssh协议结合ftp协议进行,读取网络配置文件,通过paramiko模块连接网络设备发送put命令,把配置文件上传FTP服务器,完成网络设备配置文件的备份。

  • 目前支持备份:目前只支持华为、华三系列设备,后续将进行适配支持更多其他厂商设备。

程序结构说明

当前目录依赖包生成:

pip install pipreqs
pipreqs . --encoding=utf8 --force

目录结构

  • AutoBakCfg.yml 是存放FTP服务器配置信息(登录用户、登录密码、FTP服务器IP、网络设备明文密码加密开关)

  • requirements.txt 当前程序python依赖库

  • dev_info.xlsx 网络设备信息(登录用户、登录密码、设备管理IP、配置文件路径)

  • password_aes.py 对明文密码进行AES加密和解密处理

  • configuration_rw.py 对YAML配置文件的读写处理

  • write_excels.py 对xlsx文本数据的读写处理

  • ftp_server.py 模拟FTP客户端进行FTP服务器验证

  • utils.py 本程序依赖的一些工具函数

  • run_backup.py 程序主入口

环境说明

  • Python 3.11.0

  • Windows 10、11系统

  • Pycharm 2022.1.2专业版

  • Linux发行版暂未测试,后续发出Linux版本上的运行环境说明。

网络设备支持

  • HUAWEI

  • H3C

  • 后续进行优化迭代,加入思科、锐捷和其他主流厂商的支持。

FTP服务器

Universal FTP Server(Microsoft Store可直接下载)

开源自动化网络设备备份工具——轻松实现千台设备备份

Universal FTP Server

程序运行结果说明

程序会自动把网络设备配置文件备份到服务器的根目录下

代码示例

开源自动化网络设备备份工具——轻松实现千台设备备份

示例

  • AutoBakCfg.yml 测试文件:FTP服务器信息和加密开关参数
base-info:
  ftp_password: '123456'
  ftp_server: 10.10.10.2
  ftp_user: python
  is_encrypt: '1'
  • requirements.txt 项目依赖库
pandas==1.5.3
paramiko==3.0.0
pycryptodome==3.17
PyYAML==6.0
  • dev_info.xlsx 网络设备信息

开源自动化网络设备备份工具——轻松实现千台设备备份

  • password_aes.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# Linux下和Windows下安装pycryptodome 把读取的xlsx文档的密码进行加密和解密处理
# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from binascii import b2a_hex, a2b_hex
import re


class CryptPassword:
    """
    AES.new 初始化AES对象
    "
""

    def __init__(self, key, init_vector):
        """
        :param key: 初始化AES加密密钥 AES.new传入参数必须是bytes类型
        :param init_vector: 初始化iv偏移量 AES.new传入参数必须是bytes类型
        "
""
        self.key = key
        self.init_vector = init_vector
        self.mode = AES.MODE_CBC
        # self.cryptor = AES.new(self.key, self.mode, self.init_vector)

    # AES加密 待加密的字符串为str类型并返回str类型数据
    def encrypt_str(self, pass_str) -> str:
        # encode('ascii') 字符串转字节串,把字符串:abc,转为字节串:b'abc'
        cryptor = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
        # pass_str str类型 pad的data_to_pad必须为bytes类型 故pass_str需要转换为字节串
        padtext = pad(pass_str.encode('ascii'), 16, style='pkcs7')
        # 调用加密方法
        cipher_text = cryptor.encrypt(padtext)
        # 字符串 --> 十六进制
        str_cipher = b2a_hex(cipher_text)
        return str_cipher.decode('ascii')

    # AES解密 待解密的字符串为str类型并返回str类型数据
    def decode_str(self, text_str) -> str:
        if not CryptPassword.is_hex(text_str) or (len(text_str) < 32):
            return "错误的16进制加密字符串"
        else:
            # 十六进制 --> 字符串
            cipher_text = a2b_hex(text_str)
            decrypter = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
            # 调用解密方法
            plain_text = decrypter.decrypt(cipher_text)
            # 不够16位,补全到16位
            try:
                unpadtext = unpad(plain_text, 16, 'pkcs7')
                # decode('ascii') 字节串转字符串,把字节串:b'abc',转为字符串:abc
                return unpadtext.decode('ascii')
            except ValueError:
                return "请输入正确的加密字符串"

    # 16进制检查
    @staticmethod
    def is_hex(hex_str) -> bool:
        # 16进制的正则表达式
        regex = "[A-Fa-f0-9]+$"
        result = re.match(regex, hex_str)
        if result:
            return True
        else:
            return False


# 测试用例
if __name__ == '__main__':
    key_word = 'QWErtyuio@098765'
    init_vector_word = 'MEIYuanTian--Xia'
    password = 'T5fx2L3aFt'
    data = CryptPassword(key_word, init_vector_word)
    print(data.encrypt_str(password))
    print(CryptPassword.is_hex(data.encrypt_str(password)))
    text = 'c0cf24e87b328810c0b5711b2f326c8f'
    print(data.decode_str(text))

  • configuration_rw.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
针对yaml/yml等配置文件进行读取和写入的实现类
"
""

import yaml
import os
import pathlib


class YamlReadWrite:
    # 初始化
    def __init__(self, filepath):
        self.filepath = filepath

    def yaml_read(self, file_name):
        # filename = str(file_name)
        # print(filename)
        if os.path.isfile(file_name):
            with open(self.filepath + '\' + file_name, 'r', encoding='utf-8') as fp:
                line_data = yaml.load(fp, Loader=yaml.FullLoader)
                return line_data
        else:
            print("配置文件不存在")

    def yaml_write(self, file_name, encrypt_str):
        # 读取yaml文件数据
        if os.path.isfile(file_name):
            old_file_data = YamlReadWrite(self.filepath).yaml_read(file_name)
            # 修改读取数据
            old_file_data['base-info']['is_encrypt'] = encrypt_str
            with open(self.filepath + '\' + file_name, 'w', encoding='utf-8') as fp:
                yaml.dump(old_file_data, fp)
                return "密码已加密,配置文件加密参数修改为0"
        else:
            print("配置文件不存在")


if __name__ == '__main__':
    path = os.path.dirname(__file__)
    read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
    print(read_data['base-info']['ftp_password'])
    write_data = YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
    print(write_data)
    print("修改完成")

write_excels.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
利用pandas模块读写excel文本数据,并对文本数据中的密码进行判断,如果不是16进制字符串,那就进行加密并重新写入,读取16进制字符串解密进行ftp登录
依赖模块: pandas,openpyxl
"
""
import pandas as pd
from password_aes import CryptPassword


class MachiningText:

    def __init__(self, xlsx_file_name):
        self.xlsx_file_name = xlsx_file_name

    # 更新格式化后的密码并写入xlsx文件中
    def write_xlsx_file(self, dev_name, dev_ip, dev_user, dev_pw, file_path):
        data_df = pd.DataFrame()
        data_df["设备名称"] = dev_name
        data_df["设备IP"] = dev_ip
        data_df["账号"] = dev_user
        data_df["密码"] = dev_pw
        data_df["文件路径"] = file_path
        try:
            writer = pd.ExcelWriter(self.xlsx_file_name)
            data_df.to_excel(writer, sheet_name='dev_info', index=False)
            writer.close()
        except PermissionError:
            print("请先关闭文档,再次尝试")

    def update_password(self, key, init_vector):
        dev_name = []
        dev_ip = []
        dev_user = []
        dev_pw = []
        file_path = []
        dev_pd = pd.read_excel(self.xlsx_file_name, sheet_name='dev_info')
        # 赋予i=0 密码已加密,i=1 把明文密码转为加密密码
        i = 0

        for line_str in dev_pd.values:
            dev_name = dev_name + [line_str[0].strip()]
            dev_ip = dev_ip + [line_str[1].strip()]
            dev_user = dev_user + [line_str[2].strip()]
            # try:
            #     dev_pw = dev_pw + [line_str[3].strip()]
            # except AttributeError:
            #     dev_pw = dev_pw + [line_str[3]]
            file_path = file_path + [line_str[4].strip()]

            # 如果密码是单纯的数字 len(123)会出错,需要转换为str类型
            try:
                if int(line_str[3]):
                    password_len = len(str(line_str[3]))
                    passwd_str = str(line_str[3])
                    if not CryptPassword.is_hex(passwd_str) or (password_len < 32):
                        new_password = CryptPassword(key, init_vector).encrypt_str(passwd_str)
                        # print(new_password)
                        # print(dev_pw)
                        dev_pw = dev_pw + [new_password]
                        i = 1
                elif str(line_str[3].strip()):
                    password_len = len(line_str[3].strip())
                    if not CryptPassword.is_hex(line_str[3].strip()) or (password_len < 32):
                        new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].encode('ascii'))
                        dev_pw = dev_pw + [new_password]
                        i = 1
                else:
                    return "不存在的类型"
            except ValueError:
                # 密码不是16进制,则进行加密
                password_len = len(line_str[3].strip())
                if (not CryptPassword.is_hex(line_str[3].strip())) or (password_len < 32):
                    new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].strip())
                    dev_pw = dev_pw + [new_password]
                    i = 1
                else:
                    new_password = [line_str[3]]
                    # 如果密码是16进制,new_password 取表中密码,赋予Dev_pw
                    dev_pw = dev_pw + new_password
        # 密码有加密,则重新写xls表,否则不需要重写
        # print(dev_ip)
        # print(dev_user)
        # print(dev_pw)
        # print(file_path)
        if i == 1:
            MachiningText(self.xlsx_file_name).write_xlsx_file(dev_name, dev_ip, dev_user, dev_pw, file_path)


if __name__ == '__main__':
    key_word = 'QWErtyuio@098765'
    init_vector_word = 'MEIYuanTian--Xia'
    MachiningText(r'./dev_info.xlsx').update_password(key_word, init_vector_word)
    print("完成")

  • ftp_server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 模拟客户端进行FTP服务器验证

import ftplib
import utils


class FTPServer:
    def __init__(self, ftp_server):
        self.ftp_server = ftp_server

    def connect_ftp_server(self, ftp_user, ftp_password) -> bool:
        print("正在检测FTP服务器,请稍候......")
        try:
            ftp = ftplib.FTP(self.ftp_server)
            print(ftp.getwelcome())
            ftp.login(ftp_user, ftp_password)
            ftp.encoding = 'utf-8'
            # 230 Logged in 230代表登录成功
            login_response = ftp.login(ftp_user, ftp_password)
            # 通过指定分隔符对字符串进行切片 login_response.split(" ")[0]
            if int(login_response.split(" ")[0]) == 230:
                print("FTP服务器登陆成功!")
                utils.writelog("FTP服务器登陆成功!n")
                return True
            else:
                return False
        except ConnectionRefusedError:
            print("FTP服务器无法登录,请检查服务是否启动!")
            utils.writelog("FTP服务器无法登录,请检查服务是否启动!n")
            return False


if __name__ == '__main__':
    user = 'python'
    password = '123456'
    server = '10.10.10.2'
    FTPServer(server).connect_ftp_server(user, password)
    print("完成")

  • utils.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 代码运行日志生成器

import datetime
import msvcrt


def writelog(logstr):
    try:
        f = open('./result.log''a')
        out = f.write(str(datetime.datetime.now()) + ":-->" + logstr)
        f.close()
        return out
    except Exception as _result:
        return print(_result)

# 如果不打包exe不需要 借鉴前人经验
def prompt_msg(tmep_str):
    print("------------------------------------------------------------------")
    print("如发现程序有问题,请联系作者!")
    print("按任意键退出......")
    print("------------------------------------------------------------------")
    return ord(msvcrt.getch())

  • run_backup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# 利用paramiko进行网络设备ssh连接和备份命令操作
from configuration_rw import YamlReadWrite
from write_excels import MachiningText
from ftp_server import FTPServer
from password_aes import CryptPassword
import pandas as pd
import paramiko
import time
import utils
import os


def connect_device_ssh(dev_name, dev_ip, dev_username, dev_pw, filename_path):
    try:
        # 创建一个SSH客户端
        ssh_client = paramiko.SSHClient()
        # AutoAddPolicy: 自动添加主机名和主机密钥
        # set_missing_host_key_policy: 一个连接主机的策略(没有本地主机秘钥或者HostKeys对象时)
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh_client.connect(hostname=dev_ip, port=22, username=dev_username, password=dev_pw)
        # 打开连接到的终端,使用ssh shell通道,相当于使用ssh远程到了主机上
        command = ssh_client.invoke_shell()
        # send发送内容,然后将字符串进行编码,编码后中文仍乱码
        command.send(("ftp " + _FTP_HOST).encode())
        command.send(b"n")
        command.send(_FTP_USER + "n")
        time.sleep(1)
        command.send(_FTP_PW + "n")
        time.sleep(1)
        command.send(b"n")
        time.sleep(1)
        command.send(b"n")

        # 备份flash:/vrpcfg.zip,并提取配置名称
        cfg_name = filename_path.split("/")[1].split(".")
        expanded_name = "." + cfg_name[1]

        # 拼接备份文件名称
        bak_file_name = cfg_name[0] + "-" + dev_ip + "-" + time.strftime("%Y-%m-%d") + expanded_name
        # print (Bak_File_Name)
        command.send("put " + filename_path + " " + bak_file_name + "n")
        # print("put " + filename_path + " " + Bak_File_Name + "n")
        time.sleep(1)
        command.send(b"n")

        received = command.recv(65535).decode()
        # 备份成功recv 包含“226 File received ok”
        if "226 File has been recieved" in received:
            print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!")
            utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!n")
        ssh_client.close()
    except:
        print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。")
        utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。######n")

    # recv = command.recv(65535).decode()  # recv接收响应,字节为65535,并进行解码
    # result = recv.splitlines()
    # print(recv)


if __name__ == '__main__':

    # key: 16或16的倍数
    key = 'QWErtyuio@098765'
    # init_vector: 16位
    init_vector = 'MEIYuanTian--Xia'
    # 读取程序当前目录路径
    path = os.path.dirname(__file__)
    read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')

    if os.path.exists('./result.log'):
        os.remove('./result.log')

    # 私有常量
    _IS_ENCRYPT = int(read_data['base-info']['is_encrypt'])
    _FTP_HOST = read_data['base-info']['ftp_server']
    _FTP_USER = read_data['base-info']['ftp_user']
    _FTP_PW = read_data['base-info']['ftp_password']

    #  _IS_ENCRYPT = 1, 把'./dev_info.xlsx中密码字段的明文密码改为密文(AES)
    if _IS_ENCRYPT == 1:
        xlsx_file_name = r'./Dev_info.xlsx'
        MachiningText(xlsx_file_name).update_password(key, init_vector)
        # 加密后把 _IS_ENCRYPT = 0
        YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
        utils.writelog("把当前明文密码修改成功为密文密码并更改配置文件为is_encrypt参数为0")
    else:
        utils.writelog("dev_info.xlsx文件中密码字段已加密!")
        print("dev_info.xlsx文件中密码字段已加密!")

    if FTPServer(_FTP_HOST).connect_ftp_server(_FTP_USER, _FTP_PW):
        dev_file_name = './dev_info.xlsx'
        dev_pd = pd.read_excel(dev_file_name, sheet_name='dev_info')
        dev_info = dev_pd.values
        # Dev_Info.head()
        for line_str in dev_info:
            # print(line_str[0], line_str[1], line_str[2], line_str[3], line_str[4])
            dev_name_str = line_str[0]
            dev_ip_str = line_str[1]
            dev_username_str = line_str[2]
            dev_pw_str = CryptPassword(key, init_vector).decode_str(line_str[3])
            # print (PassWord)
            filename_path_str = line_str[4]
            connect_device_ssh(dev_name_str, dev_ip_str, dev_username_str, dev_pw_str, filename_path_str)

        # prompt_msg("备份完成,备份文件保存在FTP服务器的目录下")
        utils.writelog("备份完成,备份文件保存在FTP服务器的目录下")
    else:
        # prompt_msg("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
        utils.writelog("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")

利用paramiko进行连接,可以对华为和华三的网络设备配文件进行自动化备份。后续将使用netmiko进行重构,添加对其他厂商网络设备的支持。

开源自动化网络设备备份工具——轻松实现千台设备备份

开源自动化网络设备备份工具——轻松实现千台设备备份

开源自动化网络设备备份工具——轻松实现千台设备备份

原文始发于微信公众号(释然IT杂谈):开源自动化网络设备备份工具——轻松实现千台设备备份

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年4月27日20:37:20
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   开源自动化网络设备备份工具——轻松实现千台设备备份https://cn-sec.com/archives/1695180.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息