久久福利_99r_国产日韩在线视频_直接看av的网站_中文欧美日韩_久久一

您的位置:首頁技術(shù)文章
文章詳情頁

SpringBoot整合RabbitMQ, 實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的功能

瀏覽:71日期:2023-03-21 11:12:50

自然,依賴是少不了的。除了spring-boot-starter-web依賴外。就這個(gè)是最主要的依賴了,其他的看著辦就是了。我用的是gradle,用maven的看著弄也一樣的。無非就是包+包名+版本

//AMQPcompile(’org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE’)

這里有一個(gè)坑。導(dǎo)致我后來發(fā)送消息時(shí)一直連不上去。報(bào)錯(cuò): java.net.SocketException: socket closed。我去網(wǎng)上尋找了許多方案。大致都是一個(gè)意思。沒有設(shè)置遠(yuǎn)程連接權(quán)限。讓我添加一個(gè)用戶,并且設(shè)置最大權(quán)限。 下面是添加rabbitmq用戶的命令

#rabbitmqctl add_user 賬號(hào) 密碼rabbitmqctl add_user admin 614#分配用戶標(biāo)簽(admin為要賦予administrator權(quán)限的剛創(chuàng)建的那個(gè)賬號(hào)的名字)rabbitmqctl set_user_tags admin administrator#設(shè)置權(quán)限,開啟遠(yuǎn)程訪問rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

我用完之后去管控臺(tái)(http://ip:15672)看了一下用戶列表。確實(shí)已經(jīng)添加上去了,也是最大權(quán)限。然鵝并沒有什么卵用后來強(qiáng)行摸索出來了,原來是版本差異的原因。我SpringBoot本來是使用的是2.0.3版本,然后AMQP我使用的是2.0.4。可能有什么不兼容的地方。把Springboot和AMQP的版本給同步成一個(gè)就好了。別的版本差一點(diǎn)根本沒啥問題,就AMQP特殊,也是醉了。 使用SpriongBoot的yml配置:重點(diǎn)是rabbitmq那一欄設(shè)置好登錄用戶、密碼、地址端口、虛擬地址、超時(shí)時(shí)間就可以了

server: port: 8080 servlet: context-path: /spring: http: encoding: charset: UTF-8 jackson: #前端頁面?zhèn)鱀ate值時(shí)格式化 date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai username: root password: 614 rabbitmq: port: 5672 host: 192.168.194.128 username: admin password: 614 virtual-host: / connection-timeout: 15s#Redis配置 redis: host: 192.168.194.128 port: 6379 #Redis連接池配置 jedis: pool: min-idle: 0 max-idle: 8 max-active: 8 max-wait: -1ms

這里又有個(gè)小坑,這個(gè)rabbitmq的超時(shí)時(shí)間(connection-timeout)配的我真的是醉了,我看的教程里寫的是15000,表示15秒,我一輸之后IDEA直接報(bào)紅線啊。網(wǎng)上一找,全特么用毫秒值配的,行吧,應(yīng)該我們用的不是一個(gè)版本。點(diǎn)開看下這參數(shù)接受一個(gè)java.time.Duration對(duì)象,百思不得其解。這玩意咋配?我不會(huì)啊。找了二十分鐘的攻略才知道是這樣子配的,使用數(shù)字+時(shí)間標(biāo)志。比如1h、1M、1m、1d、1s、1ms這種格式就行了。

咳咳,配置文件弄好后也就差不多可以使用rabbitmq發(fā)消息了。生產(chǎn)端發(fā)消息。只需要使用 RabbitTemplate 類就夠了,看到這個(gè)名字,有沒有一種很熟悉的感覺?Redis也有個(gè)這玩意 叫 RedisTemplate 關(guān)于發(fā)消息,在這兒最好還是先指定好exchange和routingKey,即交換機(jī)和路由鍵。這樣發(fā)過去的消息才能被發(fā)到指定的交換機(jī)上,然后交換機(jī)在通過你的routingKey來發(fā)送給綁定了該routingKey的所有隊(duì)列。所以首先登陸管控臺(tái)(http://ip:15672),到Exchanges和Queues菜單下,創(chuàng)建好交換機(jī)和隊(duì)列,還有他們之間的routingKey。這個(gè)步驟我就不詳細(xì)描述了。單靠語言不怎么能夠描述清楚。估計(jì)得配很多圖,有需要的自行g(shù)oogle把。 萬事俱備。正式開始發(fā)送消息。先準(zhǔn)備一個(gè)要發(fā)的玩意。根據(jù)業(yè)務(wù)需求自己創(chuàng)個(gè)model就行。我這隨便寫一個(gè)。關(guān)于這個(gè)messageId,及消息唯一ID。他的作用是將該條消息數(shù)據(jù)和RabbitMQ發(fā)送的消息綁定起來。不要也不是不行。只是最好還是設(shè)置一個(gè)這個(gè)參數(shù)。

package com.skypyb.rabbitmq.entity;import java.io.Serializable;public class User1 implements Serializable{ private Long id; private String name; private String messageId;//儲(chǔ)存消息發(fā)送的唯一標(biāo)識(shí) public User1() { } public User1(Long id, String name, String messageId) { this.id = id; this.name = name; this.messageId = messageId; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; }}

要發(fā)送的數(shù)據(jù)模型已經(jīng)準(zhǔn)備好,接下來這個(gè)類是一個(gè)重點(diǎn)。即發(fā)送消息的類。注入RabbbitTemplate,然后就可以通過他的 convertSendAndReceive() 方法進(jìn)行消息的發(fā)送。他有很多種重載,最好是選用我這種,比較可控。交換機(jī)、路由鍵、消息唯一ID全部指定好。

package com.skypyb.rabbitmq.producer;import com.skypyb.rabbitmq.entity.User1;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Component('user1Sender')public class User1Sender { @Autowired private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板 public void send(User1 user1){ CorrelationData correlationData= new CorrelationData(); correlationData.setId(user1.getMessageId()); rabbitTemplate.convertSendAndReceive('user1-exchange',//exchange'user1.key1',//routingKeyuser1,//消息體內(nèi)容correlationData//消息唯一ID ); }}

emmmm,是不是感覺還是挺簡(jiǎn)單的。一個(gè)方法調(diào)用,消息就過去了。就發(fā)送到指定的交換機(jī)了。交換機(jī)再通過你的routingKey轉(zhuǎn)發(fā)給綁定在上邊的隊(duì)列。生產(chǎn)端這邊就完事了。 寫個(gè)測(cè)試類測(cè)試一下。

package com.skypyb.test;import com.skypyb.rabbitmq.Application;import com.skypyb.rabbitmq.entity.User1;import com.skypyb.rabbitmq.producer.User1Sender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;import java.util.UUID;@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class TestOne { @Autowired private User1Sender user1Sender; @Test public void testSend1(){ User1 user1 = new User1(); user1.setId(1L); user1.setName('測(cè)試用戶1'); user1.setMessageId('user1$'+System.currentTimeMillis()+'$'+ UUID.randomUUID().toString()); user1Sender.send(user1); }}

運(yùn)行完畢后。登陸管控臺(tái)(http://ip:15672),進(jìn)入Queues菜單。即可發(fā)現(xiàn)消息隊(duì)列中已接收到一條消息,會(huì)是一個(gè)等待消費(fèi)的狀態(tài)。至于到底是哪個(gè)消息隊(duì)列來處理嘛,那就得看你的exchange通過你的routingKey具體把消息轉(zhuǎn)發(fā)到哪兒了。這個(gè)都是在管控臺(tái)里邊配置的。 生產(chǎn)端準(zhǔn)備完畢。接下來是消費(fèi)端。消費(fèi)端也很簡(jiǎn)單,yml需要添加消費(fèi)端的配置。簽收模式最好選擇手動(dòng)簽收。可控。

server: port: 8081 servlet: context-path: /spring: http: encoding: charset: UTF-8 jackson: #前端頁面?zhèn)鱀ate值時(shí)格式化 date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.194.128:3306/mysql?serverTimezone=Asia/Shanghai username: root password: 614 #rabbitmq基本配置 rabbitmq: addresses: 192.168.194.128:5672 username: admin password: 614 virtual-host: / connection-timeout: 15s #rabbitmq消費(fèi)端配置 listener: simple: #并發(fā)數(shù) concurrency: 5 #最大并發(fā)數(shù) max-concurrency: 10 #簽收模式:手工簽收、自動(dòng)簽收 acknowledge-mode: manual #限流,在此消費(fèi)端同一時(shí)間只有一條消息消費(fèi) prefetch: 1#Redis配置 redis: host: 192.168.194.128 port: 6379 #Redis連接池配置 jedis: pool: min-idle: 0 max-idle: 8 max-active: 8 max-wait: -1ms

具體的消費(fèi)者,具體解釋都寫在注釋中了。 關(guān)于@Exchange注解中設(shè)置的交換機(jī)的type屬性,主要是用這些值:

fanout:會(huì)把所有發(fā)到Exchange的消息路由到所有和它綁定的Queue direct:會(huì)把消息路由到routing key和binding key完全相同的Queue,不相同的丟棄 topic:direct是嚴(yán)格匹配,那么topic就算模糊匹配,routing key和binding key都用.來區(qū)分單詞串,比如A.B.C,*匹配任意單詞,#匹配任意多個(gè)或0個(gè)單詞,比如。A.B.*可以匹配到A.B.C headers:不依賴routing key和binding key,通過對(duì)比消息屬性中的headers屬性,對(duì)比Exchange和Queue綁定時(shí)指定的鍵值對(duì),相同就路由過來

basicAck()方法可以確認(rèn)消息消費(fèi)。執(zhí)行后,消息隊(duì)列中這條消息就沒了。multiple參數(shù)表示是否批量消費(fèi),一般都選false。

package com.skypyb.rabbitmq.controller;import com.rabbitmq.client.Channel;import com.skypyb.rabbitmq.entity.User1;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.Map;@Componentpublic class User1Receiver { /** * @param user1 消息體,使用 @Payload 注解 * @param headers 消息頭,使用 @Headers 注解 * @param channel */ /*@RabbitListener表示監(jiān)聽的具體隊(duì)列. bindings屬性代表綁定。里邊有幾個(gè)值填寫,填寫好綁定的隊(duì)列名字和交換機(jī)名字 指定好routingKey。若指定的這些參數(shù)不存在的話。則會(huì)自行給你創(chuàng)建好 durable代表是否持久化 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = 'user1-queue', durable = 'true'), exchange = @Exchange(name = 'user1-exchange', durable = 'true', type = 'topic'), key = 'user1.#' ) ) @RabbitHandler//標(biāo)識(shí)這個(gè)方法用于消費(fèi)消息 public void onUser1Message(@Payload User1 user1,@Headers Map<String, Object> headers,Channel channel) throws IOException { //消費(fèi)者操作 System.out.println('-------收到消息辣!-----'); System.out.println('發(fā)過來的用戶名為:' + user1.getName()); //basicAck()表示確認(rèn)已經(jīng)消費(fèi)消息。通知一下mq,需要先得到 delivery tag //delivery tag可以從消息頭里邊get出來 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); }}

把消費(fèi)端的服務(wù)打開后,就已經(jīng)在監(jiān)聽了。若監(jiān)聽的隊(duì)列中已有消息,則會(huì)立即處理。直到隊(duì)列中沒消息為止。若隊(duì)列為空,他就不會(huì)動(dòng),這個(gè)時(shí)候我啟動(dòng)一下生產(chǎn)者那邊的測(cè)試,消息一發(fā)出去,立馬就被消費(fèi)。非常完美。就是這個(gè)效果。 呼,偶爾也不想咸魚了啊,今天一天大概把RabbitMQ搞明白一些了,配置也會(huì)配了,消息也會(huì)發(fā)了。踩了一萬個(gè)坑,有不少是那種比較SB的采坑方式,一般人應(yīng)該踩不到,我就不打出來了。還是感覺有很多收獲的。就是累成麻瓜了。

以上就是SpringBoot整合RabbitMQ, 實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的功能的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot整合RabbitMQ, 實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的功能的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Spring
相關(guān)文章:
主站蜘蛛池模板: 久久久久久久国产精品 | 一级毛片免费观看 | 亚洲毛片在线观看 | 久久久久国产精品免费免费搜索 | 综合久久网 | 国产一级毛片在线视频 | 成人激情免费视频 | 中文在线一区二区 | 国产精品久久久精品 | 视频一区免费观看 | av在线天堂 | 国精产品一区二区三区 | 日韩av一区二区在线观看 | 成人免费看片 | 国产一区二区三区久久久久久 | 91精品国产综合久久婷婷香蕉 | 亚洲精品国产第一综合99久久 | 成人av免费观看 | 99视频在线播放 | 在线免费毛片 | 国产成人福利在线观看 | 欧美一区二区三区在线 | 一级片av| 久久国产久| 欧美国产日韩一区 | 亚洲综合在 | 精品久久久久久久久久久 | 欧美色综合一区二区三区 | 国产精品不卡视频 | av不卡电影在线观看 | 在线观看亚洲精品 | 日本色网址 | 午夜精品久久久久久久星辰影院 | 日韩欧美在线观看 | 国产精品美女久久久久久久久久久 | 国产精品乱码一二三区的特点 | 国产一区二区三区免费 | 中文字幕在线播放不卡 | 日本淫视频 | 日韩视频在线观看视频 | 国产一区二区视频在线观看 |