redis?stream怎么实现消息队列


本篇内容主要讲解“redisstream怎么实现消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“redisstream怎么实现消息队列”吧!

    Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

    基于redis实现消息队列的方式有很多:

    • PUB/SUB,订阅/发布模式

    • 基于List的 LPUSH+BRPOP 的实现

    redis 实现消息对列4中方法

    发布订阅

    发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

    发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

    使用场景:微服务间的消息同步,如 分布式webSocker,数据同步等。

    list 队列

    生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

    **list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

    **list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

    zset 队列

    生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

    zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

    zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加锁或其他方式解决消费的幂等性。

    zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

    Stream 队列

    Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

    参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

    提供消息ack机制。

    基本命令

    xadd 生产消息

    往 stream 内创建消息 语法为:

    XADD key ID field string [field string …]

    #*表示自动生成idredis会根据时间戳+序列号自动生成id,不建议我们自己指定idxaddstream1*namezsage23

    读取消息

    读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #表示从stream1内取出一条消息,从第0条消息读取(0表示最小的id)xreadcount1streamsstream10#表示从stream1内id=1649143363972-0开始读取一条消息,读取的是指定id的下一条消息xreadcount1streamsmsg1649143363972-0#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)xreadcount1block0streamsstream1$xrangestream-+10

    XRANGE key startID endID count

    #表示从stream1内取10条消息起始位置为-(最小ID)结束位置为+(最大ID)xrangestream1-+10

    xgroup 消费者组

    redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

    每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

    创建消费者组:

    #消费消息首先得创建消费者组#表示为队列stream1创建一个消费者组group1从消息id=0(第一条消息)开始读取消息xgroupcreatestream1group10#查询stream1内的所有消费者组信息xinfogroupsstream1

    xreadgroup 消费消息

    通过xreadgroup可以在消费者组内创建消费者消费消息

    XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #创建消费者读取消息#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息(>表示未分配过消费者的消息)xreadgrupgroupgroup1consumer1count1streamsstream1>

    Pending 等待列表

    通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

    每个Pending的消息有4个属性:

    • 消息ID

    • 所属消费者

    • IDLE,已读取时长

    • delivery counter,消息被读取次数

    XPENDING key group [start end count] [consumer]

    #查看pending列表#查看group1组内的consumer1的pending列表-表示最小的消息id+表示最大的消息IDxpendingstream1group1-+10consumer1#查看group1组内的所有消费者pending类表xpendingstream1group1-+10

    消息确认

    当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

    XACK key gruopName ID

    xackstream1group1xxx

    消息转移

    当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLAIM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

    通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

    #表示将ID为1553585533795-1的消息转移到消费者B消费,前提是消费XCLAIMstream1group1consumer136000001553585533795-1

    信息监控

    redis提供了xinfo来查看stream的信息

    #查看sream信息xinfostreamsteam1#查询消费者组信息xinfogroupsgroup1#查询消费者信息xinfoconsumersconsumer1

    SpringBoot 整合

    1 引入依赖

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>

    2 编写消费者

    @Slf4jponentpublicclassEmailConsumerimplementsStreamListener<String,MapRecord<String,String,String>>{publicfinalStringstreamName="emailStream";publicfinalStringgroupName="emailGroup";publicfinalStringconsumerName="emailConsumer";@AutowiredprivateStringRedisTemplatestringRedisTemplate;@OverridepublicvoidonMessage(MapRecord<String,String,String>message){//log.info("stream名称-->{}",message.getStream());//log.info("消息ID-->{}",message.getId());log.info("消息内容-->{}",message.getValue());Map<String,String>msgMap=message.getValue();if(msgMap.get("sID")!=null&&Integer.valueOf(msgMap.get("sID"))%3==0){//消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务log.info("消费异常-->"+message);return;}StreamOperations<String,String,String>streamOperations=stringRedisTemplate.opsForStream();//消息应答streamOperations.acknowledge(streamName,groupName,message.getId());}//我们可以启动定时任务不断监听pending列表,处理死信消息}

    3 配置redis

    序列化配置

    @EnableCaching@ConfigurationpublicclassRedisConfig{/***设置redis序列化规则*/@BeanpublicJackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer(){Jackson2JsonRedisSerializer<Object>jackson2JsonRedisSerializer=newJackson2JsonRedisSerializer<>(Object.class);ObjectMapperom=newObjectMapper();om.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);jackson2JsonRedisSerializer.setObjectMapper(om);returnjackson2JsonRedisSerializer;}/***RedisTemplate配置*/@BeanpublicRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryredisConnectionFactory,Jackson2JsonRedisSerializerjackson2JsonRedisSerializer){//配置redisTemplateRedisTemplate<String,Object>redisTemplate=newRedisTemplate<>();redisTemplate.setConnectionFactory(redisConnectionFactory);RedisSerializer<?>stringSerializer=newStringRedisSerializer();//key序列化redisTemplate.setKeySerializer(stringSerializer);//value序列化redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);//Hashkey序列化redisTemplate.setHashKeySerializer(stringSerializer);//Hashvalue序列化redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();returnredisTemplate;}}

    消费者组和消费者配置

    @Slf4j@ConfigurationpublicclassRedisStreamConfig{@AutowiredprivateEmailConsumeremailConsumer;@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@BeanpublicStreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,MapRecord<String,String,String>>emailListenerContainerOptions(){StringRedisSerializerstringRedisSerializer=newStringRedisSerializer();returnStreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()//block读取超时时间.pollTimeout(Duration.ofSeconds(3))//count数量(一次只获取一条消息).batchSize(1)//序列化规则.serializer(stringRedisSerializer).build();}/***开启监听器接收消息*/@BeanpublicStreamMessageListenerContainer<String,MapRecord<String,String,String>>emailListenerContainer(RedisConnectionFactoryfactory,StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String,MapRecord<String,String,String>>streamMessageListenerContainerOptions){StreamMessageListenerContainer<String,MapRecord<String,String,String>>listenerContainer=StreamMessageListenerContainer.create(factory,streamMessageListenerContainerOptions);//如果流不存在创建stream流if(!redisTemplate.hasKey(emailConsumer.streamName)){redisTemplate.opsForStream().add(emailConsumer.streamName,Collections.singletonMap("",""));log.info("初始化stream{}success",emailConsumer.streamName);}//创建消费者组try{redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);}catch(Exceptione){log.info("消费者组{}已存在",emailConsumer.groupName);}//注册消费者消费者名称,从哪条消息开始消费,消费者类//>表示没消费过的消息//$表示最新的消息listenerContainer.receive(Consumer.from(emailConsumer.groupName,emailConsumer.consumerName),StreamOffset.create(emailConsumer.streamName,ReadOffset.lastConsumed()),emailConsumer);listenerContainer.start();returnlistenerContainer;}}

    4.生产者生产消息

    @GetMapping("/redis/ps")publicStringredisPublish(Stringcontent,Integercount){StreamOperationsstreamOperations=redisTemplate.opsForStream();for(inti=0;i<count;i++){AtomicIntegernum=newAtomicInteger(i);MapmsgMap=newHashMap();msgMap.put("count",i);msgMap.put("sID",num);//新增消息streamOperations.add("emailStream",msgMap);}return"success";}

    到此,相信大家对“redisstream怎么实现消息队列”有了更深的了解,不妨来实际操作一番吧!这里是主机评测网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


    上一篇:Java怎么从服务端下载Excel模板文件

    下一篇:jQuery编程动画的基本方法实例分析


    Copyright © 2002-2019 测速网 www.inhv.cn 皖ICP备2023010105号
    测速城市 测速地区 测速街道 网速测试城市 网速测试地区 网速测试街道
    温馨提示:部分文章图片数据来源与网络,仅供参考!版权归原作者所有,如有侵权请联系删除!

    热门搜索 城市网站建设 地区网站制作 街道网页设计 大写数字 热点城市 热点地区 热点街道 热点时间 房贷计算器