版本2

admin 2018年5月10日18:52:15评论493 views字数 11643阅读38分48秒阅读模式
摘要

最近有个奇葩需求:
要把又拍云所有的图片全都下载下来 大约10T 3亿张左右
联系又拍云 能否邮寄硬盘直接拷贝,答曰不行。。但是给了个下载的python脚本
download_file_with_iter.py
我怀着感激的心情下载下来,结果一秒不到3张。。实在是慢,于是乎打算自己写一个


前言

最近有个奇葩需求:
要把又拍云所有的图片全都下载下来 大约10T 3亿张左右
联系又拍云 能否邮寄硬盘直接拷贝,答曰不行。。但是给了个下载的python脚本
download_file_with_iter.py
我怀着感激的心情下载下来,结果一秒不到3张。。实在是慢,于是乎打算自己写一个

版本1

这是刚做的时候的版本,直接使用aio的队列,
环境:python3.6.2
所需包:

  1. pip install asyncio
  2. pip install aiohttp
  3. pip install aiofiles
  1. import asyncio import base64 import os import urllib import aiohttp import aiofiles # ----------------------- # ----------------------- bucket = 'bucket_name' #这里就是空间名称 username = 'username' #操作员账号 password = 'password' #操作员密码 #空间外联地址 因为又拍云的http下载没有频率限制,所以使用http下载 不适用restful的api接口下载 hostname = "http://xxxxx" # 这里是本地保存的根路径 这样下载后路径地址就跟空间内的地址是相对的了 base_save_path = 'f:' # ----------------------- headers = {} auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8')) headers['Authorization'] = 'Basic ' + str(auth) #又拍云认证header头 headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT" headers['x-list-limit'] = '300' thread_sleep = 1 def is_dic(url): """判断key是否是目录 根据是否有后缀名判断""" # print(f'判断url:{url}') url = url.replace('http://v0.api.upyun.com/', '') if len(url.split('.')) == 1: return True else: return False class Crawler: def __init__(self, init_key, hostname, max_tasks=10, pic_tsak=50): '''初始化爬虫''' self.loop = asyncio.get_event_loop() self.max_tries = 4 # 每个图片重试册数 self.max_tasks = max_tasks # 接口请求进程数 self.key_queue = asyncio.Queue(loop=self.loop) # 接口队列 self.pic_queue = asyncio.Queue(loop=self.loop) # 图片队列 self.session = aiohttp.ClientSession(loop=self.loop) # 接口异步http请求 self.key_queue.put_nowait( {'key': init_key, 'x-list-iter': None, 'hostname': hostname}) # 初始化接口队列 push需要下载的目录 self.pic_tsak = pic_tsak # 图片下载队列 def close(self): """回收http session""" self.session.close() async def work(self): """接口请求队列消费者""" try: while True: url = await self.key_queue.get() # print('key队列数量:' + await self.key_queue.qsize()) await self.handle(url) self.key_queue.task_done() await asyncio.sleep(thread_sleep) except asyncio.CancelledError: pass async def work_pic(self): """图片请求队列消费者""" try: while True: url = await self.pic_queue.get() await self.handle_pic(url) self.pic_queue.task_done() await asyncio.sleep(thread_sleep) except asyncio.CancelledError: pass async def handle_pic(self, key): """处理图片请求""" url = (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) pic_url = key['hostname'] + url + '!s400' tries = 0 while tries < self.max_tries: try: print(f'请求图片:{pic_url}') async with self.session.get(pic_url, timeout=60) as response: async with aiofiles.open(key['save_path'], 'wb') as f: # print('保存文件:{}'.format(key['save_path'])) await f.write(await response.read()) break except aiohttp.ClientError: pass tries += 1 async def handle(self, key): """处理接口请求""" url = '/' + bucket + / (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) if key['x-list-iter'] is not None: if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg': headers['X-List-Iter'] = key['x-list-iter'] tries = 0 while tries < self.max_tries: try: reque_url = "http://v0.api.upyun.com" + url print(f'请求接口:{reque_url}') async with self.session.get(reque_url, headers=headers, timeout=60) as response: content = await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + / str(response.status) + "`" + str(iter_header) await self.do_file(self.get_list(list_json_param), key['key'], key['hostname']) break except aiohttp.ClientError: pass tries += 1 def get_list(self, content): # print(content) if content: content = content.split("`") items = content[0].split('/n') content = [dict(zip(['name', 'type', 'size', 'time'], x.split('/t'))) for x in items] + content[1].split() + / content[2].split() return content else: return None async def do_file(self, list_json, key, hostname): """处理接口数据""" for i in list_json[:-2]: if not i['name']: continue new_key = key + i['name'] if key == '/' else key + '/' + i['name'] try: if i['type'] == 'F': self.key_queue.put_nowait( {'key': new_key, 'x-list-iter': None, 'hostname': hostname}) else: try: if not os.path.exists(bucket + key): os.makedirs(bucket + key) except OSError as e: print('新建文件夹错误:' + str(e)) save_path = base_save_path + '/' + bucket + new_key if not os.path.isfile(save_path): self.pic_queue.put_nowait( {'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname}) else: print(f'文件已存在:{save_path}') except Exception as e: print('下载文件错误!:' + str(e)) async with aiofiles.open('download_err.txt', 'a') as f: await f.write(new_key + '/n') if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg': self.key_queue.put_nowait({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname}) async def run(self): """初始化任务进程""" workers = [asyncio.Task(self.work(), loop=self.loop) for _ in range(self.max_tasks)] workers_pic = [asyncio.Task(self.work_pic(), loop=self.loop) for _ in range(self.pic_tsak)] await self.key_queue.join() await self.pic_queue.join() workers.append(workers_pic) for w in workers: w.cancel() if __name__ == '__main__': loop = asyncio.get_event_loop() crawler = Crawler('/', hostname, max_tasks=5, pic_tsak=150) loop.run_until_complete(crawler.run()) crawler.close() loop.close()

上面的爬虫 ,速度可达到400张/S 但是 接下来的问题。。

版本2

  1. 本来上面的脚本已经爽的不行了。基本上已经可以跑满带宽了,但是有个新的问题。 用来放图片的硬盘单个只有4T 但是有一个又拍云的空间大小已经达到了7T 也就是说要两个同时下载,于是乎就用上了MQ 目录爬虫-爬取所有目录放到mq中:
  1. import aiohttp import asyncio import urllib import aiofiles import asynqp import os base_save_path = 'f' mq_host = '192.168.199.13' mq_user = 'admin' mq_password = '123123' bucket = 'bucket_name' hostname = "http://xxxxxx" username = 'username' password = 'password' auth = base64.b64encode(f'{username}:{password}'.encode(encoding='utf-8')) headers = {} headers['Authorization'] = 'Basic ' + str(auth) headers['User-Agent'] = "UPYUN_DOWNLOAD_SCRIPT" headers['x-list-limit'] = '300' class Spider: def __init__(self, max_task=10, max_tried=4): print(f'新建spider! 线程数:{max_task} 每次最多重试次数: {max_tried}') self.loop = asyncio.get_event_loop() self.max_tries = max_tried self.max_task = max_task self.session = aiohttp.ClientSession(loop=self.loop) def close(self): """回收http session""" self.session.close() async def download_work(self): try: while True: received_message = await self.queue.get() if received_message is None: await asyncio.sleep(1) continue msg_json = received_message.json() await self.handle(msg_json) received_message.ack() await asyncio.sleep(500) #爬太快了消费不了 加了很久的延迟 except asyncio.CancelledError: pass async def handle(self, key): """处理接口请求""" url = '/' + key['bucket'] + / (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) if key['x-list-iter'] is not None: if key['x-list-iter'] is not None or not 'g2gCZAAEbmV4dGQAA2VvZg': headers['X-List-Iter'] = key['x-list-iter'] tries = 0 while tries < self.max_tries: try: reque_url = "http://v0.api.upyun.com" + url print(f'请求接口:{reque_url}') async with self.session.get(reque_url, headers=headers, timeout=60) as response: content = await response.text() try: iter_header = response.headers.get('x-upyun-list-iter') except: iter_header = 'g2gCZAAEbmV4dGQAA2VvZg' list_json_param = content + "`" + / str(response.status) + "`" + str(iter_header) await self.do_file(self.get_list(list_json_param), key['key'], key['hostname'], key['bucket']) break except aiohttp.ClientError: pass tries += 1 def get_list(self, content): # print(content) if content: content = content.split("`") items = content[0].split('/n') content = [dict(zip(['name', 'type', 'size', 'time'], x.split('/t'))) for x in items] + content[1].split() + / content[2].split() return content else: return None async def do_file(self, list_json, key, hostname, bucket): """处理接口数据""" for i in list_json[:-2]: if not i['name']: continue new_key = key + i['name'] if key == '/' else key + '/' + i['name'] try: if i['type'] == 'F': await self.put_key_queue({'key': new_key, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket}) # self.key_queue.put_nowait( # {'key': new_key, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket}) else: save_path = '/' + bucket + new_key if not os.path.isfile(base_save_path + save_path): await self.put_pic_queue({'key': new_key, 'save_path': save_path, 'x-list-iter': None, 'hostname': hostname, 'bucket': bucket}) #else: # print(f'文件已存在:{base_save_path}{save_path}') except Exception as e: print('下载文件错误!:' + str(e)) async with aiofiles.open('download_err.txt', 'a') as f: await f.write(new_key + '/n') if list_json[-1] != 'g2gCZAAEbmV4dGQAA2VvZg': # self.key_queue.put_nowait( # {'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname, 'bucket': bucket}) await self.put_key_queue({'key': key, 'x-list-iter': list_json[-1], 'hostname': hostname, 'bucket': bucket}) async def put_key_queue(self, obj): msg = asynqp.Message(obj) self.exchange.publish(msg, f'{bucket}.routing.key.key') async def put_pic_queue(self, obj): msg = asynqp.Message(obj) self.pic_exchange.publish(msg, 'routing.pic.key') async def run(self): self.connection = await asynqp.connect(host=mq_host, username=mq_user, password=mq_password) self.channel = await self.connection.open_channel() self.exchange = await self.channel.declare_exchange('key.exchange', 'direct') self.queue = await self.channel.declare_queue(f'{bucket}.key.queue', durable=True) await self.queue.bind(self.exchange, f'{bucket}.routing.key.key') self.channel_pic = await self.connection.open_channel() self.pic_exchange = await self.channel_pic.declare_exchange('pic.exchange', 'direct') self.pic_queue = await self.channel_pic.declare_queue('pic.queue', durable=True) await self.pic_queue.bind(self.pic_exchange, 'routing.pic.key') # 这里新的空间才需要爬取根目录 # await self.put_key_queue({'key': '/', 'x-list-iter': None,'hostname': hostname, 'bucket': bucket}) for _ in range(self.max_task): asyncio.ensure_future(self.download_work()) await asyncio.sleep(2.0) if __name__ == '__main__': loop = asyncio.get_event_loop() spider = Spider(max_task=10) # asyncio.ensure_future(spider.run()) loop.run_until_complete(spider.run()) loop.run_forever() # print(f'Pending tasks at exit:{asyncio.Task.all_tasks(loop)}') spider.close() loop.close()

下面是下载服务,可以断点续传,多个机器同时下载:

  1. import asyncio import urllib import os import asynqp import aiofiles import aiohttp import time class Spider: def __init__(self, base_save_path, max_tried=4, mq_host='192.168.199.13', mq_user='admin', mq_password='123123'): self.loop = asyncio.get_event_loop() self.max_tries = max_tried self.session = aiohttp.ClientSession(loop=self.loop) self.mq_host = mq_host self.mq_user = mq_user self.mq_password = mq_password self.base_save_path = base_save_path def __del__(self): print('进程完成!,关闭aiohttp.session 和mq连接') self.session.close() async def download_work(self): print(f'创建mq连接') self.connection = await asynqp.connect(host=self.mq_host, username=self.mq_user, password=self.mq_password) self.channel = await self.connection.open_channel() self.exchange = await self.channel.declare_exchange('pic.exchange', 'direct') self.queue = await self.channel.declare_queue('pic.queue', durable=True) await self.queue.bind(self.exchange, 'routing.pic.key') print('连接成功!,建立队列通道') try: for _ in range(10000): try: received_message = await self.queue.get() if received_message is None: await asyncio.sleep(1) continue msg_json = received_message.json() await self.handle_pic(msg_json) except Exception: async with aiofiles.open('download_error.txt', 'a') as f: await f.write(msg_json['hostname'] + msg_json['key'] + '/n') finally: received_message.ack() await asyncio.sleep(0.01) del received_message, msg_json # 释放变量 except asyncio.CancelledError: # 进程退出 pass async def handle_pic(self, key): """处理图片请求""" url = (lambda x: x[0] == '/' and x or '/' + x)(key['key']) url = url.encode('utf-8') url = urllib.parse.quote(url) pic_url = key['hostname'] + url + '!s400' # del url # 释放变量 tries = 0 while tries < self.max_tries: try: # print(f'请求图片:{pic_url}') async with self.session.get(pic_url, timeout=60) as response: save_path = self.base_save_path + key['save_path'] dir_name = os.path.dirname(save_path) # print(f'文件夹路径:{dir_name}') try: if not os.path.exists(dir_name): os.makedirs(dir_name) except OSError as e: print('新建文件夹错误:' + str(e)) if os.path.isfile(save_path): break async with aiofiles.open(save_path, 'wb') as f: print(f'保存文件:{save_path}') await f.write(await response.read()) del save_path, dir_name break except aiohttp.ClientError: pass tries += 1 del url, pic_url # 释放变量 def restart_program(): import sys import os python = sys.executable os.execl(python, python, * sys.argv) def main(): base_path = '/Users/luoda/Documents/project/pic_downloader' thread_count = 2 loop = asyncio.get_event_loop() spider = Spider(base_path) works = [] for _ in range(int(thread_count)): works.append(spider.download_work()) loop.run_until_complete(asyncio.wait(works)) print(f'麻痹的执行完了') time.sleep(20) restart_program() if __name__ == '__main__': main()

效果

版本2

版本2

版本2

版本2

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2018年5月10日18:52:15
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   版本2https://cn-sec.com/archives/51258.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息