在本教程中,我将使用python和Paramiko库通过SSH在网络设备中自动化任务。 Paramiko是SSH的Python实现,使我们能够连接到设备并在其上执行命令,节省了时间并减少执行任务时的人为错误。
在此特定示例中,我们将在网络中的每个开关中配置DHCP侦探。
DHCP窥探
DHCP Snooping是一个第2层安全协议,该协议在未经授权的接口上删除DHCP流量,从而保护网络免受Rogue DHCP服务器的影响。在保护网络免受Rogue DHCP服务器的保护时,您也需要能够接受授权的DHCP服务器,因此您需要指定授权流量来自的可信接口。受信任的接口通常是每个开关的上行链路,但是网络中的每个开关都没有正确标记或识别的接口,因此,我们如何知道开关中的上行链接是什么端口?答案位于ARP Protocol。
设想
让我们看一下此图:
在几乎每个网络中(至少在我合作过的所有网络上),网络的核心是所有内容的网关(服务器,网络设备,VoIP设备等...),因此,这意味着如果我想要到达Internet或任何其他网络,流量将通过网络设备的上行链路接口流动。知道这一点,并且知道默认网关的IP,我们可以使用 arp协议。
来确定上行链路接口。我们将使用ARP协议查找默认网关的MAC地址,然后,您将找到从中学到MAC地址的端口,该端口是您的上行链路。一些网络设备在ARP表中提供端口信息,您可以节省Mac表中的查找时间。
让我们再次查看图:
现在,我们已经确定了上行链路接口,而且,我们没有授权的无线路由器,其中一些用户从家里带来了连接到我们的网络(非常常见的情况),该路由器将某些用户从家里使用连接一些无线设备到公司网络。
该设备将在我们的网络设备中注入DHCP,从而导致公司设备从无线路由器网络而不是公司中获取IPS。为了防止这种情况发生,我们只需要激活DHCP侦听并信任每个交换机中的上行链路接口,以便授权的DHCP服务器仍然可以将IPS分配给公司网络设备。
代码
从上一节中,我们知道我们需要什么才能在每个网络设备中正确配置DHCP:
- 它是上行链路端口:使用ARP表和Mac表(或某些网络设备中的ARP表)
- 激活DHCP Snooping:使用设备CLI命令(请咨询您的网络设备手册)
要获取上行链路端口并配置DHCP Snooping,我遵循以下步骤:
- 连接到设备
- 从ARP表获取默认网关的Mac
- 从使用Mac表 中获取默认网关的Mac的端口
- 执行命令激活DHCP侦听并将上行链路设置为可信接口。
为此,我创建了以下功能,以使代码更易于创建:
import paramiko
import time
import re
'''
This function creates a SSH connection to a Device.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
'''
def create_connection(host):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(**host, look_for_keys=False, allow_agent=False)
cli = ssh.invoke_shell()
return ssh, cli
'''This function reads the output of the cli of a connection
I created this for the sole purpose of having a shorter way
to write the read command on the rest of the code.
'''
def read_output(cli):
message = cli.recv(1000000000000000000000000).decode('utf-8')
return message
'''
This function makes a lookup on the MAC Address Table
for the port where the provided mac address is learned from.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
You need to know how to find a mac addres in the target device.
The MAC address needs to be provided in the same format the switch supports.
The search command should be written as you would do on the console just without the MAC.
The port_regexp is a regluar expression that needs to match the names of all the possible interface
names on the switch.
'''
def get_port_mac(host, mac, search_command, port_regexp):
try:
ssh, cli = create_connection(host)
cli.send(f'{search_command} {mac}\n')
time.sleep(5)
message = read_output(cli)
port = re.findall(port_regexp, message, flags=re.IGNORECASE)
ssh.close()
except IndexError:
print(host['hostname'] + " not available\n")
return port[0]
'''
This function helps us to get the uplink of a network device using
the gateway IP and ARP for this.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
The arp_search_command should be the search command you type o the device console when
searching for an ARP record, just without the IP.
The mac_search_command should be written as you would do on the console just without the MAC.
The port_regexp is a regluar expression that needs to match the names of all the possible interface
names on the switch.
What happens here is basically we add 1 command to the search and reuse the get_mac_port function
'''
def get_uplink(host, gateway, arp_search_command, mac_search_command, port_regexp):
try:
ssh, cli = create_connection(host)
cli.send(f'{arp_search_command} {gateway}\n')
time.sleep(5)
arp = read_output(cli)
mac = re.findall('(?:[0-9A-Fa-f]{2}[:-]){5}(?:[0-9A-Fa-f]{2})', arp)
ssh.close()
port = get_port_mac(host, mac[0], mac_search_command, port_regexp)
except IndexError:
print(host['hostname'] + " not available\n")
return port
'''
This function executes a single command directly on the target device and returns
the console output.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
Command should be a string
'''
def execute_command(host, command):
try:
ssh, cli = create_connection(host)
cli.send(command + "\n")
time.sleep(5)
message = read_output(cli)
ssh.close()
except Exception as e:
print(f"Error at: {host} ------> {e}".format(hostname=host, error=e))
return message
'''
This function executes a list of commands on the target device.
host is a dictionary that has the following structure:
host = {'hostname': 'IP or HOSTNAME', 'port': '22', 'username': 'username', 'password': 'passwd'}
commands should be a list of strings
'''
def execute_bulk_commands(host, commands):
try:
ssh, cli = create_connection(host)
for command in commands:
cli.send(command + "\n")
time.sleep(2)
ssh.close()
except Exception as e:
print(f"Error at: {host} ------> {e}".format(hostname=host, error=e))
注意:您需要很好地了解网络设备的CLI,以便详细说明Regexp模式和搜索命令。
使用此功能,我创建了一个脚本来配置网络上的设备。就我而言,我的网络中有两种不同的开关。我有一些Fortiswitches和Cisco SF300,因此我的regexp模式和显示命令可能与您需要的东西不同。
我需求产生的代码如下:
import threading
import net_tools
'''
Creating all of the parameter for each device brand
in my case I had these but they could be different in your case.
'''
cisco_switches = ["cisco_switch_01", "cisco_switch_02"]
cisco_switches_arp_search = "show arp ip-address "
cisco_switches_mac_search = "show mac address-table address "
cisco_switches_port_regexp = "gi[0123456789].{1,5}|fa[0123456789].{1,5}"
fortiswitches = ["fortiswitch_01", "fotiswitch_02"]
fortiswitches_arp_search = "diagnose ip arp list | grep "
fortiswitches_mac_search = "diagnose switch mac-address list | grep "
fortiswitches_port_regexp = "port[0-9]{1,2}"
# Creating the connection information. We´ll substitute the hostname later
connection_info = {'hostname': "",
'port': '22',
'username': 'username',
'password': 'password'}
gateway = "172.20.90.1"
cisco_switches_threads = list()
for hostname in cisco_switches:
# We substitute the value of the hostname in the dictionary to match the target device
# and we create a thread for each switch so the commands can be executed faster using multi-threading
connection_info['hostname'] = hostname
uplink_port = net_tools.get_uplink(connection_info, gateway, cisco_switches_arp_search,
cisco_switches_mac_search, cisco_switches_port_regexp)
cisco_switches_commands = ["enable",
"configure terminal",
"ip dhcp snooping",
"ip dhcp snooping database",
"ip dhcp snooping vlan 1",
f"interface {uplink_port}",
"ip dhcp snooping trust",
"end",
"wr",
"Y"]
th = threading.Thread(target=net_tools.execute_bulk_commands, args=(connection_info, cisco_switches_commands))
cisco_switches_threads.append(th)
fortiswitches_threads = list()
for hostname in fortiswitches:
# We substitute the value of the hostname in the dictionary to match the target device
# and we create a thread for each switch so the commands can be executed faster using multi-threading
connection_info['hostname'] = hostname
uplink_port = net_tools.get_uplink(connection_info, gateway, fortiswitches_arp_search,
fortiswitches_mac_search, fortiswitches_port_regexp)
fortiswitches_commands = ["config switch vlan",
"edit 1",
"set dhcp-snooping enable",
"end",
"config switch interface",
f"edit {uplink_port}",
"set dhcp-snooping trusted",
"end"]
th = threading.Thread(target=net_tools.execute_bulk_commands, args=(connection_info, fortiswitches_commands))
fortiswitches_threads.append(th)
# We start our threads
for thread in cisco_switches_threads:
thread.start()
for thread in fortiswitches_threads:
thread.start()
# We wait for all of them to finish
for thread in cisco_switches_threads:
th.join()
for thread in fortiswitches_threads:
th.join()
,如果您想在此之前做其他事情,您可以在我的GitHub存储库中找到文件,基本上您需要知道的只是如何在目标设备中手动执行并自动化它!
。。如果您有任何疑问,请与我联系,ANS我会尽快回答您。
希望这篇文章帮助一些网络工程师困难!