- 菠菜回收平台亚慱体育app怎么下载 | 神火股份:质押的股票照旧典质给其
- 在线博彩网站信誉排行榜赌博赌输了网上怎么办 | 《六姊妹》:刘好意思心塞
- 宝马会炸金花2021欧洲杯门票_香港这通宵!汤唯获封后,于适夺冠抢镜
- 菠菜的平台注册金必威体育app精装版_赵维伦:第一次穿上国度队战袍 那嗅
- 博彩网络营销推广体育新闻欧洲杯_冲破魔咒!中国男足有斗志,便值得点赞
香港六合彩电子游戏博彩平台新闻_SpringBoot整合漫衍式音讯平台Pulsar
各人好,我是君哥。 动作优秀的音讯流平台,Pulsar 的使用越来越多,这篇著述讲授 Pulsar 的 Java 客户端。 部署 PulsarPulsar 的部署样式主要有 3 种,土产货装配二进制文献、docker 部署、在 Kubernetes 上部署。 本文华取 docker 部署一个单节点的 Pulsar 集群。本质环境是 2 核 CPU 和 4G 内存。 部署号令如下: 皇冠hg86adocker 澳门金沙捕鱼run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone 装配历程可能会出现底下的荒唐: unknown flag: --mount See 'docker run --help'. 这是因为 docker 版块低,不复古 mount 参数,把 docker 版块升级到 17.06 以上就不错了。 部署历程中可能会因为汇注的原因失败,多试几次就不错得手了。若是看到底下的日记,就证据启动得手了。 2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone 土产货单节点集群启动后,会创建一个 namespace,名字叫 public/default Pulsar 客户端现在 Pulsar 复古多种言语的客户端,包括: Java 客户端Go 客户端Python 客户端C++ 客户端Node.js 客户端WebSocket 客户端C# 客户端 SpringBoot 竖立使用 SpringBoot 整合 Pulsar 客户端,最初引入 Pulsar 客户端依赖,代码如下: <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.9.1</version> </dependency> 然后在 properties 文献中添加竖立: ![]() # Pulsar 地址 pulsar.url=pulsar://192.168.59.155:6650 # topic pulsar.topic=testTopic # consumer group pulsar.subscription=topicGroup创建 Client 创建客户详察配浅显,代码如下: client = PulsarClient.builder() .serviceUrl(url) .build(); 上头的 url 等于 properties 文献中界说的 pulsar.url 。 创建 Client 时,即使集群莫得启得手,关节也不会报错,因为这时还莫得信得过地去结书籍群。 创建 Producerproducer = client.newProducer() .topic(topic) .compressionType(CompressionType.LZ4) .sendTimeout(0, TimeUnit.SECONDS) .enableBatching(true) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .batchingMaxMessages(1000) .maxPendingMessages(1000) .blockIfQueueFull(true) .roundRobinRouterBatchingPartitionSwitchFrequency(10) .batcherBuilder(BatcherBuilder.DEFAULT) .create(); 创建 Producer,会信得过的结书籍群,这时若是集群有问题,就会报结合荒唐。 底下证据一下创建 Producer 的参数: topic:Producer 要写入的 topic。 compressionType:压缩计谋,现在复古 4 种计谋 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 初始,只须 Consumer 的版块在 2.3 以上,这个计谋才会成效。 sendTimeout:超频繁间,若是 Producer 在超频繁间为收到 ACK,会进行重新发送。 enableBatching:是否开启音讯批量处治,这里默许 true,这个参数只须在异步发送 (sendAsync) 时才能成效,采纳同步发送会失效。 batchingMaxPublishDelay:批量发送音讯的时辰段,这里界说的是 10ms,需要扎眼的是,诞生了批量时辰,就不会受音讯数目的影响。批量发送会把要发送的批量音讯放在一个汇注包里发送出去,减少汇注 IO 次数,大大进步网卡的发送效果。 batchingMaxMessages:批量发送音讯的最大数目。 maxPendingMessages:恭候从 broker 接管 ACK 的音讯队伍最大长度。若是这个队伍满了,producer 悉数的 sendAsync 和 send 齐会失败,除非诞生了 blockIfQueueFull 值是 true。 冒险blockIfQueueFull:Producer 发送音讯时会把音讯先放入土产货 Queue 缓存,若是缓存满了,就会梗阻音讯发送。 roundRobinRouterBatchingPartition-SwitchFrequency:若是发送音讯时莫得指定 key,那默许选择 round robin 的样式发送音讯,使用 round robin 的样式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。 创建 ConsumerPulsar 的耗尽模子如下图: 从图中不错看到,Consumer 要绑定一个 subscription 才能进行耗尽。 consumer = client.newConsumer() .topic(topic) .subscriptionName(subscription) .subscriptionType(SubscriptionType.Shared) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) .receiverQueueSize(1000) .subscribe(); 底下证据一下创建 Consumer 的参数: topic:Consumer 要订阅的 topic。 subscriptionName:consumer 要干系的 subscription 名字。 subscriptionType:订阅类型,Pulsar 复古四种类型订阅: Exclusive:独占阵势,肃清个 Topic 只可有一个耗尽者,若是多个耗尽者,就会出错。Failover:灾备阵势,肃清个 Topic 不错有多个耗尽者,欧博代理网址关联词只可有一个耗尽者耗尽,其他耗尽者动作故障飘荡备用,若是刻下耗尽者出了故障,就从备用耗尽者中采纳一个进行耗尽。如下图: Shared:分享阵势,肃清个 Topic 不错由多个耗尽者订阅和耗尽。音讯通过 round robin 轮询机制分发给不同的耗尽者,而况每个音讯仅会被分发给一个耗尽者。当耗尽者断开,若是发送给它音讯莫得被耗尽,这些音讯会被重新分发给其它存活的耗尽者。如下图: Key_Shared:音讯和耗尽者齐会绑定一个key,音讯只会发送给绑定肃清个key的耗尽者。若是有新耗尽者建造结合或者有耗尽者断开结合,就需要更新一些音讯的 key。跟 Shared 阵势比拟,Key_Shared 的平正是既不错让耗尽者并发地耗尽音讯,又能保证肃清Key下的音讯法例。如下图: “小朋友,你知道安检机的‘火眼金睛’是什么吗?对了,就是X射线!它能穿透行李包装,显示里面物品的轮廓……”,在广州地铁(广州南站)进站安检口,工作人员为孩子们详细讲解安检知识及如何检测到危险物品。孩子们被眼前的场景深深吸引,一个个自告奋勇担当小小“安检员”,亲身体验安检机检测过程。 “小朋友,你知道安检机的‘火眼金睛’是什么吗?对了,就是X射线!它能穿透行李包装,显示里面物品的轮廓……”,在广州地铁(广州南站)进站安检口,工作人员为孩子们详细讲解安检知识及如何检测到危险物品。孩子们被眼前的场景深深吸引,一个个自告奋勇担当小小“安检员”,亲身体验安检机检测过程。 subscriptionInitialPosition:创建新的 subscription 时从那处初始耗尽,有两个选项: Latest:从最新的音讯初始耗尽Earliest:从最早的音讯初始耗尽 香港六合彩电子游戏negativeAckRedeliveryDelay:耗尽失败后远离多久 broker 重新发送。 receiverQueueSize:在调用 receive 步调之前,最多能蓄积些许条音讯。不错诞生为 0,这么每次只从 broker 拉取一条音讯。在 Shared 阵势下,receiverQueueSize 诞生为 0,不错看管批量音讯多发给一个 Consumer 而导致其他 Consumer 散漫。 Consumer 接管音讯有四种样式:同步单条、同步批量、异步单条和异步批量,代码如下: Message message = consumer.receive() CompletableFuture<Message> message = consumer.receiveAsync(); Messages message = consumer.batchReceive(); CompletableFuture<Messages> message = consumer.batchReceiveAsync(); 关于批量接管,也不错诞生批量接管的计谋,代码如下: consumer = client.newConsumer() .topic(topic) .subscriptionName(subscription) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024 * 1024) .timeout(200, TimeUnit.MILLISECONDS) .build()) .subscribe(); 代码中的参数证据如下: 皇冠客服飞机:@seo3687maxNumMessages:批量接管的最大音讯数目。maxNumBytes:批量接管音讯的大小,这里是 1MB。 测试最初编写 Producer 发送音讯的代码,如下: 皇冠足球皇冠体育apppublic void sendMsg(String key, String data) { CompletableFuture<MessageId> future = producer.newMessage() .key(key) .value(data.getBytes()).sendAsync(); future.handle((v, ex) -> { if (ex == null) { logger.info("发送音讯得手, key:{}, msg: {}", key, data); } else { logger.error("发送音讯失败, key:{}, msg: {}", key, data); } return null; }); future.join(); logger.info("发送音讯完成, key:{}, msg: {}", key, data); } 然后编写一个 Consumer 耗尽音讯的代码,如下: public void start() throws Exception{ while (true) { Message message = consumer.receive(); String key = message.getKey(); String data = new String(message.getData()); String topic = message.getTopicName(); if (StringUtils.isNotEmpty(data)) { try{ logger.info("收到音讯, topic:{}, key:{}, data:{}", topic, key, data); }catch(Exception e){ logger.error("接管音讯特殊,topic:{}, key:{}, data:{}", topic, key, data, e); } } consumer.acknowledge(message); } } 终末编写一个 Controller 类,调用 Producer 发送音讯,代码如下: @RequestMapping("/send") @ResponseBody public String send(@RequestParam String key, @RequestParam String data) { logger.info("收到音讯发送苦求, key:{}, value:{}", key, data); pulsarProducer.sendMsg(key, data); return "success"; } 调用 Producer 发送一条音讯,key=key1,data=data1,具体操动作在浏览器中输入底下的 url 后回车: http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 不错看到扬弃台输出底下日记: 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 发送音讯得手, key:key1, msg: data1 2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 发送音讯完成, key:key1, msg: data1 2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到音讯, topic:persistent://public/default/testTopic, key:key1, data:data1 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0 从日记中看到,这里使用的 namespace 等于创建集群时生成的public/default。 皇冠每周推出优惠活动,您更好地享受博彩乐趣。 回来从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 黑白常友好的,使用起来便捷省略。Consumer 的使用需要斟酌多一些,需要斟酌到批量、异步以及订阅类型。 2022世界杯赌博
|