久久福利_99r_国产日韩在线视频_直接看av的网站_中文欧美日韩_久久一

您的位置:首頁技術文章
文章詳情頁

Python RabbitMQ實現簡單的進程間通信示例

瀏覽:107日期:2022-07-18 18:36:11

RabbitMQ 消息隊列

PYthreading Queue進程Queue 父進程與子進程,或同一父進程下的多個子進程進行交互缺點:兩個不同Python文件不能通過上面兩個Queue進行交互

erlong基于這個語言創建的一種中間商win中需要先安裝erlong才能使用rabbitmq_server start

安裝 Python module

pip install pika

or

easy_install pika

or源碼

rabbit 默認端口15672查看當前時刻的隊列數rabbitmqctl.bat list_queue

exchange在定義的時候就是有類型的,決定到底哪些queue符合條件,可以接受消息fanout:所有bind到此exchange的queue都可以收到消息direct:通過routingkey和exchange決定唯一的queue可以接受消息topic: 所有符合routingkey(此時可以是一個表達式)的routingkey所bind的queue都可以接受消息 表達式符號說明: # 代表一個或多個字符 * 代表任何字符

RPCremote procedure call 雙向傳輸,指令<-------->指令執行結果實現方法:創建兩個隊列,一個隊列收指令,一個隊列發送執行結果

用rabbitmq實現簡單的生產者消費者模型

1) rabbit_producer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# create the queue, the name of queue is 'hello'# durable=True can make the queue be exist, although the service have stopped before.channel.queue_declare(queue='hello', durable=True)# n RabbitMQ a message can never be sent directly to queue,it always need to go throughchannel.basic_publish(exchange = ' ', routing_key = 'hello', body = 'Hello world!', properties = pika.BasicPropreties( delivery_mode=2, # make the message persistence ) )print('[x] sent ’Hello world!’')connection.close()

2) rabbit_consumer.py

# Author : Xuefengimport pikaconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# follow is for consumer to auto change with the abilitychannel.basic_qos(profetch_count=1)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的fanout模式實現廣播模式

1) fanout_rabbit_publish.py

# Author : Xuefengimport pikaimport sys# 廣播模式:# 生產者發送一條消息,所有的開通鏈接的消費者都可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='logs', type='fanout')message = ’ ’.join(sys.argv[1:]) or 'info:Hello world!'channel.basic_publish( exchange='logs', routing_key='', body=message)print('[x] Send %r' % message)connection.close()

2) fanout_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)channel.queue_bind(exchange='logs', queue=queue_name)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的direct模式實現消息過濾模式

1) direct_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息過濾模式:# 生產者發送一條消息,通過severity優先級來確定是否可以接收到消息connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message)print('[x] Send %r:%r' % (severity, message))connection.close()

2) direct_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]if not severities: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties ) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag = method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue = 'hello', no_ack = True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq中的topic模式實現細致消息過濾模式

1) topic_rabbit_publisher.py

# Author : Xuefengimport pikaimport sys# 消息細致過濾模式:# 生產者發送一條消息,通過運行腳本 *.info 等確定接收消息類型進行對應接收connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')binding_key = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ’ ’.join(sys.argv[2:]) or 'info:Hello world!'channel.basic_publish( exchange='topic_logs', routing_key=binding_key, body=message)print('[x] Send %r:%r' % (binding_key, message))connection.close()

2) topic_rabbit_consumer.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.exchange_declare(exchange='topic_logs', type='topic')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)binding_keys = sys.argv[1:]if not binding_keys: sys.stderr.write('Usage:%s [info] [warning] [error]n' % sys.argv[0]) sys.exit(1)for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)def callback(ch, method, properties, body): ’’’ Handle the recieved data :param ch: The address of the channel :param method: Information about the connection :param properties: :param body: :return: ’’’ print('------>', ch, method, properties) print('[x] Recieved %r' % body) # ack by ourself ch.basic_ack(delivery_tag=method.delivery_tag)# no_ack = True represent that the message cannot be transfor to next consumer,# when the current consumer is stop by accident.channel.basic_consume(callback, # If have recieved message, enable the callback() function to handle the message. queue='hello', no_ack=True)print('[*] Waiting for messages. To Exit press CTRL+C')channel.start_consuming()

用rabbitmq實現rpc操作

1) Rpc_rabbit_client.py

# Author : Xuefengimport pikaimport timeimport uuidclass FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue # 隨機的生成一個接收命令執行結果的隊列 self.channel.basic_consume(self.on_response, # 只要收到消息就調用 no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self,n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', properties=pika.BasicPropreties( rely_to=self.callback_queue, correlation_id=self.corr_id # 通過隨機生成的ID來驗證指令執行結果與指令的匹配性 ), body=str(n) ) while self.response is None: self.connection.process_data_events() # 非阻塞版的start_consume,有沒有消息都繼續 print('no message...') time.sleep(0.5) return int(self.response)fibonacci_rcp = FibonacciRpcClient()print('[x] Requesting fib(30)')response = fibonacci_rcp.call(30)print('[x] Rec %r' % response)

2) Rpc_rabbit_server.py

# Author : Xuefengimport pikaimport sysconnection = pika.BlockingConnection(pika.Connection.Parameters( 'localhost'))# statement a channelchannel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1)+fib(n-2)def on_request(ch, method, props, body): n = int(body) print('[.] fib(%s)' % n) response = fib(n) ch.basic_publish( exchange='', routing_key=props.rely_to, properties=pika.BasicPropreties(correlation_id= props.correlation), body = str(body) ) ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print('[x] Awaiting RPC requests')channel.start_consumeing()channel.exchange_declare(exchange='direct_logs', type='direct')# exclusive 排他,唯一的 隨機生成queueresult = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint('Random queue name:', queue_name)severities = sys.argv[1:]

到此這篇關于Python RabbitMQ實現簡單的進程間通信示例的文章就介紹到這了,更多相關Python RabbitMQ進程間通信內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 国产噜噜噜噜噜久久久久久久久 | 天天澡天天狠天天天做 | aaa黄色片| 日韩av一区二区在线观看 | 日韩视频一区二区三区四区 | 日韩有码一区 | 国产精品一区二区精品 | 日韩欧美在线综合 | 亚洲精品午夜 | 久久99国产精品久久99大师 | 蜜月久综合久久综合国产 | 午夜私人影院 | 香蕉av在线 | 中文字幕一区二区三区乱码在线 | 国产嫩草91 | 欧美性一区二区三区 | 精品欧美一区二区三区 | 久久久国产精品一区 | 香蕉视频91 | www.久久精品 | 国产成人av一区 | 国产干干干 | 免费观看日韩av | 国产精品7 | 午夜久久 | 香蕉久久久 | 亚洲一区二区三区视频 | 国产无套一区二区三区久久 | 九色 在线 | 国产激情一区二区三区 | 亚洲国产二区 | 免费三片在线观看网站 | av大片在线 | 伊人久久国产 | 中文精品一区二区三区 | 国产欧美精品一区二区三区四区 | 成人在线免费 | aaa在线| 亚洲国产成人在线 | 一区二区三区四区精品 | 精品国产仑片一区二区三区 |