
点击上方蓝字关注我们
0x01 分布式介绍以及优缺点
首先分布式是由一个管理节点、多个执行节点组成。由管理节点处理任务注册到网络上,执行节点连接管理节点后读取任务进行处理,然后结果存储起来(可在管理节点,也可以在执行节点写存储代码)。分布式的优点:对主机配置要求低、更稳定、适合大型工具、速度相对更快分布式的缺点:多台主机、主机适配性要求高
ps:与分布式相比,多线程协程对硬件的要求在慢慢淡化。
0x02 节点需要做什么
控制节点需要做什么:
1.管理任务2.注册队列3.接收结果4.处理结果/存储结果
执行节需要做什么:
1.连接控制节点2.获取数据3.执行设定程序4.返回结果(或存储结果)
0x03 相关代码细讲
使用python模块有哪些?
1.queue2.multiprocessing
控制节点重点代码是什么?
1.from multiprocessing.managers import >BaseManager2.task=Queue.Queue() 创建任务队列3.result=Queue.Queue() 创建结果队列4.BaseManager.register("task_queue",callable=task) 将task这个队列注册到网络上,result也是。
ps:这里的task是一个方法名 具体看下文
5.manager= BaseManager(address=("",9000),authkey=b'key') 这里绑定ip端口,同时执行节点连接时需要带key才能访问6.manager.start() 启动服务7.task_put=manager.get_task() 获取task队列8.task.put 向task队列写入数据9.task.get 从task队列读取数据
执行节点重点代码是什么?
1.from multiprocessing.managers import BaseManager2.BaseManager.register(task_queue) 获取tasl_queue3.worker=BaseManager(address=("",9000),authkey=b'key') 保持一直4.worker=connect() 连接控制节点5.task_put=manager.get_task() 获取task队列
0x04 踩坑
Myltiprocessing模块安装问题
pip install multiprocessfrom multiprocessing import pool 验证是否下载成功
注册对象时间报错
BaseManager.register('get_task', callable=task) 这里的callable值不能直接给queue队列def get_task(self): return self.tasksBaseManager.register('get_task', callable=get_task) 将值转化为对象后就可以了
打包exe后占满内存
这个问题是从百度上找到的答案,只需要在运行前加入
from multiprocessing import freeze_supportif __name__ == '__main__': freeze_support()
全封装在类中会出现报错
本来想写规范一点,然后报错了。查了很多资料,Myltiprocessing下模块代码问题.所以在使用时尽量不要全在类中执行。
authkey密码报错
manager = BaseManager(address=('127.0.0.1', 5100), authkey=b'abc')authkey需要设置成二进制
0x05 代码部分
管理节点代码:
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support
import queue
tasks = queue.Queue()
results = queue.Queue()
def get_task():
return tasks
def get_result():
return results
BaseManager.register('get_task', callable=get_task)
BaseManager.register('get_result', callable=get_result)
manager = BaseManager(address=('127.0.0.1', 5100), authkey=b'abc')
class managers:
def run_1(self):
manager.start()
self.open_txt(manager)
def open_txt(self,manager):
task_put = manager.get_task()
with open("./ceshi.txt", 'r') as i:
for ce in i:
task_put.put(ce)
if __name__ == '__main__':
cheshi=managers()
freeze_support()
cheshi.run_1()
result_get = manager.get_result()
while True:
print(result_get.get())
执行节点代码:
from multiprocessing.managers import BaseManager
import time
BaseManager.register('get_task')
BaseManager.register('get_result')
worker=BaseManager(address=("127.0.0.1", 5100), authkey=b'abc')
try:worker.connect()
except :print("server not yet started")#服务器未开启时触发
task=worker.get_task()
result=worker.get_result()
while True:
try:
king=task.get()
print("当前获取数据:"+task.get())
result.put("完成数据:"+king)
except :
print('[WinError 10054] 远程主机强迫关闭了一个现有的连接。')#服务器强制关闭连接时触发
exit()
0x06 运行效果
管理节点运行截图
执行节点运行截图
0x07 结尾
分布式更适合用作大型工具或者数量庞大的数据处理,在海量数据的处理方面它比多线程以及协程更能在速度上体现出来。本文就是写了下分布式框架的基本代码,还有遇到的一些坑。如有什么问题大家可以一起交流。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论