Python分布式工具开发之分布式的使用

admin 2021年9月23日10:00:00评论63 views字数 2788阅读9分17秒阅读模式


Python分布式工具开发之分布式的使用

        点击上方蓝字关注我们       

0x01 分布式介绍以及优缺点

首先分布式是由一个管理节点、多个执行节点组成。由管理节点处理任务注册到网络上,执行节点连接管理节点后读取任务进行处理,然后结果存储起来(可在管理节点,也可以在执行节点写存储代码)。分布式的优点:对主机配置要求低、更稳定、适合大型工具、速度相对更快分布式的缺点:多台主机、主机适配性要求高

ps:与分布式相比,多线程协程对硬件的要求在慢慢淡化。

0x02 节点需要做什么

控制节点需要做什么:

1.管理任务2.注册队列3.接收结果4.处理结果/存储结果

执行节需要做什么:

1.连接控制节点2.获取数据3.执行设定程序4.返回结果(或存储结果)

0x03 相关代码细讲

使用python模块有哪些?

1.queue2.multiprocessing 

控制节点重点代码是什么?

1.from multiprocessing.managers import >BaseManager2.task=Queue.Queue()   创建任务队列3.result=Queue.Queue()   创建结果队列4.BaseManager.register("task_queue",callable=task)   将task这个队列注册到网络上,result也是。

ps:这里的task是一个方法名 具体看下文

5.manager= BaseManager(address=("",9000),authkey=b'key') 这里绑定ip端口,同时执行节点连接时需要带key才能访问6.manager.start()   启动服务7.task_put=manager.get_task()   获取task队列8.task.put  向task队列写入数据9.task.get   从task队列读取数据

执行节点重点代码是什么?

1.from multiprocessing.managers import BaseManager2.BaseManager.register(task_queue)   获取tasl_queue3.worker=BaseManager(address=("",9000),authkey=b'key')  保持一直4.worker=connect()  连接控制节点5.task_put=manager.get_task()  获取task队列

0x04 踩坑

Myltiprocessing模块安装问题

pip install multiprocessfrom multiprocessing import pool  验证是否下载成功

注册对象时间报错

BaseManager.register('get_task', callable=task)  这里的callable值不能直接给queue队列def get_task(self):  return self.tasksBaseManager.register('get_task', callable=get_task) 将值转化为对象后就可以了

打包exe后占满内存

这个问题是从百度上找到的答案,只需要在运行前加入

from multiprocessing import freeze_supportif __name__ == '__main__':    freeze_support()

全封装在类中会出现报错

本来想写规范一点,然后报错了。查了很多资料,Myltiprocessing下模块代码问题.所以在使用时尽量不要全在类中执行。

authkey密码报错

manager = BaseManager(address=('127.0.0.1', 5100), authkey=b'abc')authkey需要设置成二进制 

0x05 代码部分

管理节点代码:

from multiprocessing.managers import BaseManagerfrom multiprocessing import freeze_supportimport queuetasks = queue.Queue()results = queue.Queue()def get_task():    return tasksdef get_result():    return resultsBaseManager.register('get_task', callable=get_task)BaseManager.register('get_result', callable=get_result)manager = BaseManager(address=('127.0.0.1', 5100), authkey=b'abc')
class managers: def run_1(self): manager.start() self.open_txt(manager)
def open_txt(self,manager): task_put = manager.get_task() with open("./ceshi.txt", 'r') as i: for ce in i: task_put.put(ce)if __name__ == '__main__':
cheshi=managers() freeze_support() cheshi.run_1() result_get = manager.get_result() while True: print(result_get.get())

执行节点代码:

from multiprocessing.managers import BaseManagerimport timeBaseManager.register('get_task')BaseManager.register('get_result')worker=BaseManager(address=("127.0.0.1", 5100), authkey=b'abc')try:worker.connect()except :print("server not yet started")#服务器未开启时触发task=worker.get_task()result=worker.get_result()
while True: try: king=task.get() print("当前获取数据:"+task.get()) result.put("完成数据:"+king) except : print('[WinError 10054] 远程主机强迫关闭了一个现有的连接。')#服务器强制关闭连接时触发 exit()

0x06 运行效果

管理节点运行截图

执行节点运行截图

Python分布式工具开发之分布式的使用

0x07 结尾

分布式更适合用作大型工具或者数量庞大的数据处理,在海量数据的处理方面它比多线程以及协程更能在速度上体现出来。本文就是写了下分布式框架的基本代码,还有遇到的一些坑。如有什么问题大家可以一起交流。


☆ END ☆

Python分布式工具开发之分布式的使用

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2021年9月23日10:00:00
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Python分布式工具开发之分布式的使用https://cn-sec.com/archives/554044.html

发表评论

匿名网友 填写信息