Cisco ASA/FTD top traffic gebruikers

Stel je internetlijn wordt opeens vol gedrukt, dan wil je graag weten wie of wat daarvoor zorgt. Gek genoeg heeft Cisco FMC daar geen enkele grafiek of tabel voor terwijl dat toch cruciale informatie is.
Als je met een cisco ASA of FTD je sessies logt naar syslog dan is die data wel beschikbaar. Onderstaande scriptje leest alle regels met "teardown" er in en stopt elk nieuw gezien intern ip adres in een hash met het aantal bytes erbij. Elke keer dat dat adres weer wordt gezien telt hij het aantal bytes erbij op. Elke 30 seconden, of op te geven tijd met "-s", wordt de top 20 weer gegeven.
De logfile kan gepiped worden naar dit script, zo kun je bepalen wat je wilt bekijken, realtime door "tail -f logfile | asatoptraffic.py" op te geven of de hele logfile of een selectie.

Belangrijk is alleen even de interface naam te checken, als die niet "inside" is moet je de variabele aanpassen.

Code

#!/usr/bin/python3
# file    : asatoptraffic.py
# author  : Wouter Barendsen
# created : 2023-08-17 15:49
# modified: 2023-08-28 09:17
# remarks : 

import os,time,sys,re,itertools

insideInterfaceName="inside"
Ips={}
Epoch=time.time()
startEpoch=Epoch
progName=os.path.basename(sys.argv[0])
Delay=30

def help():
  print(f"{progName}  Wouter Barendsen  2023-08-25                                            ")
  print(f"  Syntax: cat asalog.log | {progName}                           # check hele log    ")
  print(f"          cat asalog.log | grep \"^Aug 10 08:\" | {progName}    # check van 8 tot 9 ")
  print(f"          tail -f asalog.log | {progName}                       # check realtime    ")
  print( "  Telt alle bytes van interne IP adressen op en print de top20                      ")
  print( "  Het interne interface moet 'inside' heten of pas de variabele aan.                ")
  print( "  Opties:                                                                           ")
  print( "     -s 10    Refresh time ipv 30s                                                  ")
  print("")
  sys.exit(1)

def showresult():
  deltaEpoch=Epoch-startEpoch
  minutes, seconds = divmod(deltaEpoch, 60)
  hours, minutes = divmod(minutes, 60)      
  deltaTime=str(int(hours))+":"+str(int(minutes))+":"+str(int(seconds))
  print(f'{"IP:":14} {"Bytes:":^17}      Tijd: {getTime()}      Sinds start: {deltaTime}')
  IpsSorted=dict(sorted(Ips.items(),key=lambda item: item[1], reverse=True))
  #print("DEBUG:",IpsSorted)
  Top20List=list(itertools.islice(IpsSorted.items(),20))
  if len(Top20List)<20:
    Max=len(Top20List)
  else:
    Max=20
  for I in range(0,Max):
    IP=Top20List[I][0]
    Bytes=format(Top20List[I][1],',d').replace(",",".")
    print(f'{IP:14} {Bytes:>17}')

def getepoch():
  return time.time()

def getTime():
  return time.strftime('%H:%M:%S',time.localtime(int(time.time())))

if len(sys.argv) >1:
  if sys.argv[1] == "-h": help()

if len(sys.argv) >2:
  if sys.argv[1] == "-s":
    Delay=int(sys.argv[2])      # Error als geen nummer opgegeven

try:
  for Line in sys.stdin:
    if "Teardown" in Line:
      if getepoch() - Epoch > Delay:
        Epoch=getepoch()
        os.system('clear')
        showresult()
      #print("DEBUG:",Line,end="")
      Match=re.search(r'.*'+insideInterfaceName+r':([0-9.]*)/.*bytes ([0-9]*) .*',Line)
      if Match:
        IP=Match.group(1)
        Bytes=Match.group(2)
        #print("DEBUG:",IP,Bytes)
        if IP in Ips:
          Ips[IP]=Ips[IP]+int(Bytes)
        else:
          Ips[IP]=int(Bytes)
except IOError as E:
  print("Error:",E)
except KeyboardInterrupt:
  pass

showresult()

Output

cat /var/log/asa5506.log | asatoptraffic.py 
IP:                 Bytes:            Tijd: 09:35:04      Sinds start: 0:0:0
192.168.7.29           2.742.159
192.168.7.79             532.819
192.168.2.106            154.555
192.168.2.103             41.521
192.168.4.119              9.971
192.168.2.240              1.612
192.168.7.10                 373


Website door Wouter Barendsen, 2005-2021