久久福利_99r_国产日韩在线视频_直接看av的网站_中文欧美日韩_久久一

您的位置:首頁技術文章
文章詳情頁

Spring Boot集群管理工具KafkaAdminClient使用方法解析

瀏覽:5日期:2023-09-20 11:19:50

原理介紹

在Kafka官網中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

創建Topic:createTopics(Collection<NewTopic> newTopics) 刪除Topic:deleteTopics(Collection<String> topics) 羅列所有Topic:listTopics() 查詢Topic:describeTopics(Collection<String> topicNames) 查詢集群信息:describeCluster() 查詢ACL信息:describeAcls(AclBindingFilter filter) 創建ACL信息:createAcls(Collection<AclBinding> acls) 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters) 查詢配置信息:describeConfigs(Collection<ConfigResource> resources) 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs) 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) 查詢節點的日志目錄信息:describeLogDirs(Collection<Integer> brokers) 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) 增加分區:createPartitions(Map<String, NewPartitions> newPartitions)

其內部原理是使用Kafka自定義的一套二進制協議來實現,詳細可以參見Kafka協議。主要實現步驟:

客戶端根據方法的調用創建相應的協議請求,比如創建Topic的createTopics方法,其內部就是發送CreateTopicRequest請求??蛻舳税l送請求至Kafka Broker。

Kafka Broker處理相應的請求并回執,比如與CreateTopicRequest對應的是CreateTopicResponse??蛻舳私邮障鄳幕貓滩⑦M行解析處理。

和協議有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。

代碼如下

@Componentpublic class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put('bootstrap.servers', 'localhost:9092');/* props.put('retries', 2); // 重試次數 props.put('batch.size', 16384); // 批量發送大小 props.put('buffer.memory', 33554432); // 緩存大小,根據本機內存大小配置 props.put('linger.ms', 1000); // 發送頻率,滿足任務一個條件發送*/ props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); return props; }}

@RestControllerpublic class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping('createTopic') public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic('test1',4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping('deleteTopic') public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList('test1')); adminClient.close(); } @GetMapping('listAllTopic') public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping('getTopic') public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList('syn-test')); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println('找不到描述信息'); }else{ for (KafkaFuture<TopicDescription> value : values) {System.out.println(value); } } adminClient.close(); }}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持好吧啦網。

標簽: Spring
相關文章:
主站蜘蛛池模板: 免费大黄网站 | 日韩一区二区三区在线观看 | 精品国产精品 | 97国产精品 | 亚洲综合视频一区 | 日本一区二区三区四区不卡视频 | 6080yy午夜一二三区久久 | 欧美日韩专区 | 99re99 | 黄色片免费在线观看视频 | 亚洲精品国产综合区久久久久久久 | 成年人在线视频播放 | 国产精品99久久久久久www | 看黄网址 | 欧美日韩综合视频 | 在线成人av观看 | 国产视频一区二区在线 | 欧美视频一二 | 久久美女| 免费看国产一级特黄aaaa大片 | 国产精品免费在线 | 国产精品久久久久久久免费大片 | 99爱在线观看 | 国产高清在线a视频大全 | 最近的中文字幕在线看视频 | 亚洲一区二区在线看 | 性一级录像片片视频免费看 | 久久视频一区二区 | 91精品久久久久久久久久 | 91秦先生艺校小琴 | 欧美久久久久久久久久伊人 | 亚洲成av人片在线观看无码 | 一区二区免费看 | 久久免费精品 | 播放毛片 | 精品乱码久久久 | 波多野结衣 一区二区 | 狠狠操操| 亚洲人黄色片 | 成人高清 | 中文字幕在线观看 |