Redis基本使用-消息队列

目录

做消息队列

我们平时说到消息队列,一般都是指 RabbitMQ、RocketMQ、ActiveMQ 以及大数据里边的 Kafka,这些是我们比较常见的消息中间件,也是非常专业的消息中间件,作为专业的中间件,它里边提供了许多功能。

但是,当我们需要使用消息中间件的时候,并非每次都需要非常专业的消息中间件,假如我们只有一个消息队列,只有一个消费者,那就没有必要去使用上面这些专业的消息中间件,这种情况我们可以直接使用 Redis 来做消息队列。

Redis 的消息队列不是特别专业,他没有很多高级特性,适用简单的场景,如果对于消息可靠性有着极高的追求,那么不适合使用 Redis 做消息队列。

1 消息队列

Redis 做消息队列,使用它里边的List 数据结构 就可以实现,我们可以使用 lpush/rpush 操作来实现入队,然后使用 lpop/rpop 来实现出队。

127.0.0.1:6379> lpush xiaoxin-queue java php python vue js
(integer) 5
127.0.0.1:6379> llen xiaoxin-queue
(integer) 5
127.0.0.1:6379> lpop xiaoxin-queue
"js"
127.0.0.1:6379> lpop xiaoxin-queue
"vue"
127.0.0.1:6379> llen xiaoxin-queue
(integer) 3
127.0.0.1:6379> 

在客户端(例如 Java 端),我们会维护一个死循环来不停的从队列中读取消息,并处理,如果队列中有消息,则直接获取到,如果没有消息,就会陷入死循环,直到下一次有消息进入,这种死循环会造成大量的资源浪费,这个时候,我们可以使用之前讲的 blpop/brpop 。

2 延迟消息队列

延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到redis 中,然后通过轮询的方式,去不断的读取消息出来。

首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化,这里我们使用 JSON 来实现序列化和反序列化。

1.pom.xml配置

<dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.12.3</version>
        </dependency>

2.环境

public interface CallWithJedis {
    void call(Jedis jedis);
}
public class Redis {
    private JedisPool pool;

    public Redis() {
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        //连接池最大空闲数
        config.setMaxIdle(300);
        //最大连接数
        config.setMaxTotal(1000);
        //连接最大等待时间,如果是-1表示没有限制
        config.setMaxWaitMillis(30000);
        //在空闲时检查有效性
        config.setTestOnBorrow(true);
        pool=new JedisPool(config,"120.79.82.106",6379,30000,"xiaozhong");
    }

    public void execute(CallWithJedis callWithJedis){
        try(Jedis jedis=pool.getResource()){
            callWithJedis.call(jedis);
        }
    }
}

3.构建消息对象

public class Message {
    private String id;
    private Object data;

    @Override
    public String toString() {
        return "Message{" +
                "id='" + id + '\'' +
                ", data=" + data +
                '}';
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }

}

4.封装一个消息队列

public class DelayQueue {
    private Jedis jedis;
    private String queue;

    public DelayQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    //消息入队
    public void queue(Object data){
        //构造一个Message
        Message msg = new Message();
        msg.setId(UUID.randomUUID().toString());
        msg.setData(data);
        //序列化
        try {
            String s = new ObjectMapper().writeValueAsString(msg);
            System.out.println("msg publish:"+new Date());
            //消息发送 score 延迟5秒
            System.out.println("入队");
            jedis.zadd(queue,System.currentTimeMillis() +5000,s);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    //消息消费
    public void loop(){
    while (!Thread.interrupted()){
        //读取 score在0到当前时间戳之间的消息
        Set<String> zrange = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
        if (zrange.isEmpty()){
            //如果消息是空的,则休息500毫秒然后继续
            try {
                System.out.println("消息为空,线程暂停");
                Thread.sleep(500);
            } catch (InterruptedException e) {
                break;
            }
            continue;
        }
        //如果读取到了消息,则直接读取消息出来
        String next = zrange.iterator().next();
        if (jedis.zrem(queue,next)>0){
            //抢到了,接下来处理业务
            try {
                Message msg = new ObjectMapper().readValue(next, Message.class);
                System.out.println("receive msg" + msg);
            } catch (JsonMappingException e) {
                e.printStackTrace();
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }
    }

}

5.测试

public class DelayMsgTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            //构造一个消息队列
            DelayQueue queue = new DelayQueue(jedis, "xiaoxin-delay-queue");
            //构造消息生产者
            Thread producer = new Thread() {
                @Override
                public void run() {
                    for (int i = 0; i < 5; i++) {
                        System.out.println("入队");
                        queue.queue("www.Ishton.org>>>>"+i);
                    }
                }
            };
            //构造一个消费者
            Thread consumer = new Thread() {
                @Override
                public void run() {
                    queue.loop();
                }
            };
            producer.start();
            consumer.start();
            try {
                Thread.sleep(7000);
                //中断线程
                //consumer.interrupt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });


    }
}

上一篇:redis 简单整理——java 客户端jedis[十六]


下一篇:Redis-CentOS7安装