转载知乎:网络小斐
原文链接:https://zhuanlan.zhihu.com/p/603084870
公众号:网络小斐 在IT江湖中狂飙
背景需求
首先,在等级保护测评要求中,应提供数据的本地备份机制,每天备份至本地,且场外存放,其中包括主要网络设备配置文件。
其次,日常网络运维,为了审计和防止设备故障导致设备配置丢失而造成长时间网络故障,需要对网络设备的配置做策略备份。
开源软件
-
oxidized - https://github.com/ytti/oxidized
-
Bacula - https://www.bacula.org/source-download-center
开源软件场景:跨平台,支持多厂商系统,UI界面友好,功能强大,备份策略可自定义。
自写脚本
通常现网中常用到的脚本为python脚本和GO脚本,而Python脚本的适用性更强
自写脚本场景:自定义强,适应度高,可根据自身的需求不断优化迭代。
Python脚本备份
需求说明:
公司内部企业网交换机和防火墙以及其他配套设备需要做配置备份,需要满足以下几点需求:
-
支持华为、思科、华三、锐捷等主流厂商交换机系统
-
支持密码加密和解密
-
支持成功备份邮件通知
-
支持TFTP和FTP登录上传备份文件
-
支持可自定义备份策略
思路拓展:
1、首先是如何备份思考?
2、多厂商交换机系统支持,如果采用FTP就不需要考虑太多,因为基本上每家厂商都支持FTP。
3、每台设备的密码都不尽相同,需要针对密码做加密和解密操作,执行命令的时候不直接显示明文密码。
4、备份成功后,需通过阿里云邮箱发送邮件通知。
5、自定义备份策略,需要单独开启模块。
开源参考:
-
GitHub - BarryCui/NetDevOps: 网络设备配置备份。可以备份cisco ios交换机,华为quidway交换机以及fortigate防火墙的配置。
-
蜗牛勇士/网络设备自动备份。
-
Feiinbeer:python脚本自动备份网络设备配置文件
自我实现:
1、前期完成基础的需求,并正式运用在测试环境中进行测试,并不断改善,后续应用在生产环境中。
2、中期思考如何结合Web UI进行监控和操作。
3、后期可以考虑形成一个比较完善的网络设备备份系统平台,进行总体闭环,并开源到Github和Gitee。
程序实例
网络设备配置备份程序
-
网络设备的自动化实现方式:SNMP/NetConf + NAPALM/RestConf/OpenConfig
-
本程序是通过最原始的ssh协议来备份网络设备的配置。
-
使用场景为:不支持API的网络设备或运维人员不会调用API的场景。
-
备份说明:备份方式是通过ssh协议结合ftp协议进行,读取网络配置文件,通过paramiko模块连接网络设备发送put命令,把配置文件上传FTP服务器,完成网络设备配置文件的备份。
-
目前支持备份:目前只支持华为、华三系列设备,后续将进行适配支持更多其他厂商设备。
程序结构说明
当前目录依赖包生成:
pip install pipreqs
pipreqs . --encoding=utf8 --force
目录结构
-
AutoBakCfg.yml 是存放FTP服务器配置信息(登录用户、登录密码、FTP服务器IP、网络设备明文密码加密开关)
-
requirements.txt 当前程序python依赖库
-
dev_info.xlsx 网络设备信息(登录用户、登录密码、设备管理IP、配置文件路径)
-
password_aes.py 对明文密码进行AES加密和解密处理
-
configuration_rw.py 对YAML配置文件的读写处理
-
write_excels.py 对xlsx文本数据的读写处理
-
ftp_server.py 模拟FTP客户端进行FTP服务器验证
-
utils.py 本程序依赖的一些工具函数
-
run_backup.py 程序主入口
环境说明
-
Python 3.11.0
-
Windows 10、11系统
-
Pycharm 2022.1.2专业版
-
Linux发行版暂未测试,后续发出Linux版本上的运行环境说明。
网络设备支持
-
HUAWEI
-
H3C
-
后续进行优化迭代,加入思科、锐捷和其他主流厂商的支持。
FTP服务器
Universal FTP Server(Microsoft Store可直接下载)
Universal FTP Server
程序运行结果说明
程序会自动把网络设备配置文件备份到服务器的根目录下
代码示例
示例
-
AutoBakCfg.yml 测试文件:FTP服务器信息和加密开关参数
base-info:
ftp_password: '123456'
ftp_server: 10.10.10.2
ftp_user: python
is_encrypt: '1'
-
requirements.txt 项目依赖库
pandas==1.5.3
paramiko==3.0.0
pycryptodome==3.17
PyYAML==6.0
-
dev_info.xlsx 网络设备信息
-
password_aes.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Linux下和Windows下安装pycryptodome 把读取的xlsx文档的密码进行加密和解密处理
# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
from binascii import b2a_hex, a2b_hex
import re
class CryptPassword:
"""
AES.new 初始化AES对象
"""
def __init__(self, key, init_vector):
"""
:param key: 初始化AES加密密钥 AES.new传入参数必须是bytes类型
:param init_vector: 初始化iv偏移量 AES.new传入参数必须是bytes类型
"""
self.key = key
self.init_vector = init_vector
self.mode = AES.MODE_CBC
# self.cryptor = AES.new(self.key, self.mode, self.init_vector)
# AES加密 待加密的字符串为str类型并返回str类型数据
def encrypt_str(self, pass_str) -> str:
# encode('ascii') 字符串转字节串,把字符串:abc,转为字节串:b'abc'
cryptor = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
# pass_str str类型 pad的data_to_pad必须为bytes类型 故pass_str需要转换为字节串
padtext = pad(pass_str.encode('ascii'), 16, style='pkcs7')
# 调用加密方法
cipher_text = cryptor.encrypt(padtext)
# 字符串 --> 十六进制
str_cipher = b2a_hex(cipher_text)
return str_cipher.decode('ascii')
# AES解密 待解密的字符串为str类型并返回str类型数据
def decode_str(self, text_str) -> str:
if not CryptPassword.is_hex(text_str) or (len(text_str) < 32):
return "错误的16进制加密字符串"
else:
# 十六进制 --> 字符串
cipher_text = a2b_hex(text_str)
decrypter = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii'))
# 调用解密方法
plain_text = decrypter.decrypt(cipher_text)
# 不够16位,补全到16位
try:
unpadtext = unpad(plain_text, 16, 'pkcs7')
# decode('ascii') 字节串转字符串,把字节串:b'abc',转为字符串:abc
return unpadtext.decode('ascii')
except ValueError:
return "请输入正确的加密字符串"
# 16进制检查
@staticmethod
def is_hex(hex_str) -> bool:
# 16进制的正则表达式
regex = "[A-Fa-f0-9]+$"
result = re.match(regex, hex_str)
if result:
return True
else:
return False
# 测试用例
if __name__ == '__main__':
key_word = 'QWErtyuio@098765'
init_vector_word = 'MEIYuanTian--Xia'
password = 'T5fx2L3aFt'
data = CryptPassword(key_word, init_vector_word)
print(data.encrypt_str(password))
print(CryptPassword.is_hex(data.encrypt_str(password)))
text = 'c0cf24e87b328810c0b5711b2f326c8f'
print(data.decode_str(text))
-
configuration_rw.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
针对yaml/yml等配置文件进行读取和写入的实现类
"""
import yaml
import os
import pathlib
class YamlReadWrite:
# 初始化
def __init__(self, filepath):
self.filepath = filepath
def yaml_read(self, file_name):
# filename = str(file_name)
# print(filename)
if os.path.isfile(file_name):
with open(self.filepath + '\' + file_name, 'r', encoding='utf-8') as fp:
line_data = yaml.load(fp, Loader=yaml.FullLoader)
return line_data
else:
print("配置文件不存在")
def yaml_write(self, file_name, encrypt_str):
# 读取yaml文件数据
if os.path.isfile(file_name):
old_file_data = YamlReadWrite(self.filepath).yaml_read(file_name)
# 修改读取数据
old_file_data['base-info']['is_encrypt'] = encrypt_str
with open(self.filepath + '\' + file_name, 'w', encoding='utf-8') as fp:
yaml.dump(old_file_data, fp)
return "密码已加密,配置文件加密参数修改为0"
else:
print("配置文件不存在")
if __name__ == '__main__':
path = os.path.dirname(__file__)
read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
print(read_data['base-info']['ftp_password'])
write_data = YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
print(write_data)
print("修改完成")
write_excels.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
利用pandas模块读写excel文本数据,并对文本数据中的密码进行判断,如果不是16进制字符串,那就进行加密并重新写入,读取16进制字符串解密进行ftp登录
依赖模块: pandas,openpyxl
"""
import pandas as pd
from password_aes import CryptPassword
class MachiningText:
def __init__(self, xlsx_file_name):
self.xlsx_file_name = xlsx_file_name
# 更新格式化后的密码并写入xlsx文件中
def write_xlsx_file(self, dev_name, dev_ip, dev_user, dev_pw, file_path):
data_df = pd.DataFrame()
data_df["设备名称"] = dev_name
data_df["设备IP"] = dev_ip
data_df["账号"] = dev_user
data_df["密码"] = dev_pw
data_df["文件路径"] = file_path
try:
writer = pd.ExcelWriter(self.xlsx_file_name)
data_df.to_excel(writer, sheet_name='dev_info', index=False)
writer.close()
except PermissionError:
print("请先关闭文档,再次尝试")
def update_password(self, key, init_vector):
dev_name = []
dev_ip = []
dev_user = []
dev_pw = []
file_path = []
dev_pd = pd.read_excel(self.xlsx_file_name, sheet_name='dev_info')
# 赋予i=0 密码已加密,i=1 把明文密码转为加密密码
i = 0
for line_str in dev_pd.values:
dev_name = dev_name + [line_str[0].strip()]
dev_ip = dev_ip + [line_str[1].strip()]
dev_user = dev_user + [line_str[2].strip()]
# try:
# dev_pw = dev_pw + [line_str[3].strip()]
# except AttributeError:
# dev_pw = dev_pw + [line_str[3]]
file_path = file_path + [line_str[4].strip()]
# 如果密码是单纯的数字 len(123)会出错,需要转换为str类型
try:
if int(line_str[3]):
password_len = len(str(line_str[3]))
passwd_str = str(line_str[3])
if not CryptPassword.is_hex(passwd_str) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(passwd_str)
# print(new_password)
# print(dev_pw)
dev_pw = dev_pw + [new_password]
i = 1
elif str(line_str[3].strip()):
password_len = len(line_str[3].strip())
if not CryptPassword.is_hex(line_str[3].strip()) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].encode('ascii'))
dev_pw = dev_pw + [new_password]
i = 1
else:
return "不存在的类型"
except ValueError:
# 密码不是16进制,则进行加密
password_len = len(line_str[3].strip())
if (not CryptPassword.is_hex(line_str[3].strip())) or (password_len < 32):
new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].strip())
dev_pw = dev_pw + [new_password]
i = 1
else:
new_password = [line_str[3]]
# 如果密码是16进制,new_password 取表中密码,赋予Dev_pw
dev_pw = dev_pw + new_password
# 密码有加密,则重新写xls表,否则不需要重写
# print(dev_ip)
# print(dev_user)
# print(dev_pw)
# print(file_path)
if i == 1:
MachiningText(self.xlsx_file_name).write_xlsx_file(dev_name, dev_ip, dev_user, dev_pw, file_path)
if __name__ == '__main__':
key_word = 'QWErtyuio@098765'
init_vector_word = 'MEIYuanTian--Xia'
MachiningText(r'./dev_info.xlsx').update_password(key_word, init_vector_word)
print("完成")
-
ftp_server.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 模拟客户端进行FTP服务器验证
import ftplib
import utils
class FTPServer:
def __init__(self, ftp_server):
self.ftp_server = ftp_server
def connect_ftp_server(self, ftp_user, ftp_password) -> bool:
print("正在检测FTP服务器,请稍候......")
try:
ftp = ftplib.FTP(self.ftp_server)
print(ftp.getwelcome())
ftp.login(ftp_user, ftp_password)
ftp.encoding = 'utf-8'
# 230 Logged in 230代表登录成功
login_response = ftp.login(ftp_user, ftp_password)
# 通过指定分隔符对字符串进行切片 login_response.split(" ")[0]
if int(login_response.split(" ")[0]) == 230:
print("FTP服务器登陆成功!")
utils.writelog("FTP服务器登陆成功!n")
return True
else:
return False
except ConnectionRefusedError:
print("FTP服务器无法登录,请检查服务是否启动!")
utils.writelog("FTP服务器无法登录,请检查服务是否启动!n")
return False
if __name__ == '__main__':
user = 'python'
password = '123456'
server = '10.10.10.2'
FTPServer(server).connect_ftp_server(user, password)
print("完成")
-
utils.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 代码运行日志生成器
import datetime
import msvcrt
def writelog(logstr):
try:
f = open('./result.log', 'a')
out = f.write(str(datetime.datetime.now()) + ":-->" + logstr)
f.close()
return out
except Exception as _result:
return print(_result)
# 如果不打包exe不需要 借鉴前人经验
def prompt_msg(tmep_str):
print("------------------------------------------------------------------")
print("如发现程序有问题,请联系作者!")
print("按任意键退出......")
print("------------------------------------------------------------------")
return ord(msvcrt.getch())
-
run_backup.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# 利用paramiko进行网络设备ssh连接和备份命令操作
from configuration_rw import YamlReadWrite
from write_excels import MachiningText
from ftp_server import FTPServer
from password_aes import CryptPassword
import pandas as pd
import paramiko
import time
import utils
import os
def connect_device_ssh(dev_name, dev_ip, dev_username, dev_pw, filename_path):
try:
# 创建一个SSH客户端
ssh_client = paramiko.SSHClient()
# AutoAddPolicy: 自动添加主机名和主机密钥
# set_missing_host_key_policy: 一个连接主机的策略(没有本地主机秘钥或者HostKeys对象时)
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.connect(hostname=dev_ip, port=22, username=dev_username, password=dev_pw)
# 打开连接到的终端,使用ssh shell通道,相当于使用ssh远程到了主机上
command = ssh_client.invoke_shell()
# send发送内容,然后将字符串进行编码,编码后中文仍乱码
command.send(("ftp " + _FTP_HOST).encode())
command.send(b"n")
command.send(_FTP_USER + "n")
time.sleep(1)
command.send(_FTP_PW + "n")
time.sleep(1)
command.send(b"n")
time.sleep(1)
command.send(b"n")
# 备份flash:/vrpcfg.zip,并提取配置名称
cfg_name = filename_path.split("/")[1].split(".")
expanded_name = "." + cfg_name[1]
# 拼接备份文件名称
bak_file_name = cfg_name[0] + "-" + dev_ip + "-" + time.strftime("%Y-%m-%d") + expanded_name
# print (Bak_File_Name)
command.send("put " + filename_path + " " + bak_file_name + "n")
# print("put " + filename_path + " " + Bak_File_Name + "n")
time.sleep(1)
command.send(b"n")
received = command.recv(65535).decode()
# 备份成功recv 包含“226 File received ok”
if "226 File has been recieved" in received:
print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!")
utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!n")
ssh_client.close()
except:
print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。")
utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。######n")
# recv = command.recv(65535).decode() # recv接收响应,字节为65535,并进行解码
# result = recv.splitlines()
# print(recv)
if __name__ == '__main__':
# key: 16或16的倍数
key = 'QWErtyuio@098765'
# init_vector: 16位
init_vector = 'MEIYuanTian--Xia'
# 读取程序当前目录路径
path = os.path.dirname(__file__)
read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml')
if os.path.exists('./result.log'):
os.remove('./result.log')
# 私有常量
_IS_ENCRYPT = int(read_data['base-info']['is_encrypt'])
_FTP_HOST = read_data['base-info']['ftp_server']
_FTP_USER = read_data['base-info']['ftp_user']
_FTP_PW = read_data['base-info']['ftp_password']
# _IS_ENCRYPT = 1, 把'./dev_info.xlsx中密码字段的明文密码改为密文(AES)
if _IS_ENCRYPT == 1:
xlsx_file_name = r'./Dev_info.xlsx'
MachiningText(xlsx_file_name).update_password(key, init_vector)
# 加密后把 _IS_ENCRYPT = 0
YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0')
utils.writelog("把当前明文密码修改成功为密文密码并更改配置文件为is_encrypt参数为0")
else:
utils.writelog("dev_info.xlsx文件中密码字段已加密!")
print("dev_info.xlsx文件中密码字段已加密!")
if FTPServer(_FTP_HOST).connect_ftp_server(_FTP_USER, _FTP_PW):
dev_file_name = './dev_info.xlsx'
dev_pd = pd.read_excel(dev_file_name, sheet_name='dev_info')
dev_info = dev_pd.values
# Dev_Info.head()
for line_str in dev_info:
# print(line_str[0], line_str[1], line_str[2], line_str[3], line_str[4])
dev_name_str = line_str[0]
dev_ip_str = line_str[1]
dev_username_str = line_str[2]
dev_pw_str = CryptPassword(key, init_vector).decode_str(line_str[3])
# print (PassWord)
filename_path_str = line_str[4]
connect_device_ssh(dev_name_str, dev_ip_str, dev_username_str, dev_pw_str, filename_path_str)
# prompt_msg("备份完成,备份文件保存在FTP服务器的目录下")
utils.writelog("备份完成,备份文件保存在FTP服务器的目录下")
else:
# prompt_msg("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
utils.writelog("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")
利用paramiko进行连接,可以对华为和华三的网络设备配文件进行自动化备份。后续将使用netmiko进行重构,添加对其他厂商网络设备的支持。
原文始发于微信公众号(释然IT杂谈):开源自动化网络设备备份工具——轻松实现千台设备备份
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论