Redis 简介



特点

  • C 语言编写
  • 基于内存的 Key-Value 数据存储
  • 不需要预定义数据结构
  • 支持多种类型的数据:string,hash,list,set,sorted set,bitmaps,hyperloglog,geospatial
  • 只支持简单的读写操作,不支持 SQL,操作都是原子的
  • 读写业务采用单线程模式,网络 IO 可以采用多线程(版本 6 引入,默认关闭)
  • 周期性的数据落盘(可以关闭,默认是启动的,每 15 分钟落盘)
  • 支持 TTL
  • 支持事务处理
  • 支持 Pipeline 批量处理
  • 支持消息的 Pub/Sub
  • Stream 消息队列(版本 5 引入),支持历史数据、分组、游标、阻塞/非阻塞消费
  • 安全机制:密码认证(默认关闭),版本 6 开始支持 SSL 证书认证、ACL 权限控制
  • 支持分布式部署(最终一致性)
  • 性能:每秒执行 10W+ 命令(单机、非批量模式)
  • 应用:数据库,缓存(LRU 算法),消息队列(Pub/Sub、Stream),分布式锁(setnx)

更多内容查看官网 https://redis.io/

基本配置

配置文件 redis.conf

# index(类似关系数据库的表,用数字表示,没有名字)的个数,默认 16
databases 16

# 数据落盘,比如每 300 秒如果有 10 条数据更新,将更新的数据落盘,可以同时配多个规则,默认打开
save 900 1
save 300 10
save 60 10000

# aof 机制,将每个写操作记录到磁盘,默认关闭
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec

# 允许使用的最大内存,默认不限制
maxmemory <bytes>

# 超过最大内存后如何处理
# 默认 noeviction,即不处理,直接报错
# 可以设置使用 LRU 算法 volatile-lru 将长时间没使用的数据删除,这样 redis 就变成传统意义上的缓存
maxmemory-policy noeviction       

# 密码(默认不配置)
requirepass foobared

查看该文件可以看到更多配置

单线程设计

Redis 的高性能和稳定性主要有以下原因

  • 基于内存,读写性能好
  • 只支持简单的操作和数据结构,从根本上保证用户不会因为复杂的操作影响性能
  • C 语言性能更好
  • 读写业务采用单线程设计,更简单,更稳定
  • 分布式部署时采用最终一致性,而不是强一致性,提高了响应速度和稳定性

这里讲一下单线程

Redis 的单线程设计,指的是读写业务由一个线程完成,并且没有使用异步编程,也就是说 Redis 同一时间只能操作一个命令,不支持并发操作,但这并不影响性能,因为基于内存、数据简单、命令简单,所以每个命令都是很快执行完的,实际上 CPU 并不是瓶颈,所以单个线程就足够了

其他需要通过多线程,或是通过异步编程,来提供高并发性能的场景,通常都是因为需要磁盘 IO 操作,或是命令执行起来本身就比较复杂比较耗时,所以才需要多线程、异步编程的方式来提高性能

由于单线程模式,要明显地比多线程模式、异步编程模式,要简单很多,稳定很多,所以也提高了 Redis 的稳定性

Redis 还有其他线程处理其他事情,比如周期性落盘需要自己的线程,但这和读写业务是独立的,互不影响的

从版本 6 开始,可以通过配置,将网络 IO 分出来,用多个线程处理,这是因为随着业务量越来越大,虽然 CPU 不是瓶颈,但网络能力会有一定的限制,为了提高并发能力,用多个线程处理网络 IO,但读写业务还是单线程的

基本命令

help
    127.0.0.1:6379> help
    redis-cli 3.0.6
    Type: "help @<group>" to get a list of commands in <group>
          "help <command>" for help on <command>
          "help <tab>" to get a list of possible help topics
          "quit" to exit
    127.0.0.1:6379> 
    127.0.0.1:6379> help @hash
    127.0.0.1:6379> 
    127.0.0.1:6379> help hget


@connection
    ping            测定连接是否存活
    select index    选择数据库,index 用数字值指定,以 0 作为起始索引值,默认使用 0 号数据库


@server
    info            获取服务器的信息和统计
    dbsize          返回当前数据库中 key 的数目
    flushdb         删除当前选择数据库中所有的 key
    flushall        删除所有数据库中的所有的 key
    config get p    获取服务器配置的信息
    config set p v  设置配置项信息
    bgsave          后台保存 rdb 快照
    save            保存 rdb 快照


@generic
    keys pattern    查找相应的 key,支持通配符 *、?、[],keys * 返回所有 key
    SCAN cursor     迭代获取数据库的 key
    exists key      判断某个 key 是否存在,不支持通配符
    type key        返回 key 存储的类型
    move key index  将 key 移动到另一个数据库
    del key         删除 key
    expire key      设置 key 的生命周期以秒为单位
    sort key        对 list,set 里面的数据排序


@string
    set key value [ex 秒][px 毫秒][nx/xx]   设置 key 的 value
                                            ex/px 设置超时时间
                                            nx 表示只有 key 不存在才执行,xx 表示只有 key 存在才执行
                                            默认不会超时,并且无论 key 是否存在都会执行

    setnx key value                         等于 set key value nx
    setex key time value                    等于 set key value ex time
    psetex key milliseconds value           等于 set key value px time

    mset key1 value1 key2 value2            一次性设置多个

    get key                                 
    mget key1 key2                          一次性读取多个

    getset key value                        返回旧值,设置新值

    strlen key                              取指定 key 的 value 值的长度
    append key value                        把 value 追加到 key 的原值上

    setrange key offset value               部分重写字符串
    getrange key start stop                 获取字符串中 [start, stop] 范围的值
  
    incr key                                加 1
    incrby key increment                    加 increment
    incrbyfloat by increment
    
    decr key
    decrby key decrement

    setbit key offset value                 设置 offset 对应位的值,返回该位上的旧值
    getbit key offset
    
    bitop operation destkey key1 [key2..]   对 key1 key2 做 operation (operation) 并将结果保存在 destkey

@list
    lpush key value                         把值插入到链表头部
    rpush key value                         把值插入到链表尾部
    lpop key                                返回并删除链表头部元素
    rpop key                                返回并删除链表尾部元素
    lrange key start stop                   返回链表中 [start, stop] 中的元素
    lrem key count value                    从链表中删除 value 值,最多删除数量为 count
    ltrim key start stop                    剪切 key 对应的链表,取 [start, stop] 一段,并把该段值重新赋给 key
    lindex key index                        返回列表中第 index 个元素
    llen key                                链表的元素个数
    linsert key after|before search value   在 key 链表中寻找 search,并在 search 值之前|之后插入 value
    rpoplpush source dest                   把 source 的末尾拿出,放到 dest 头部,并返回该元素

@set
    sadd key value1 value2            往集合里面添加元素
    smembers key                      获取集合所有的元素
    srem key value                    删除集合某个元素
    spop key                          随机返回并删除集合中1个元素
    srandmember key                   随机取一个元素
    sismember key value               判断集合是否有某个值
    scard key                         返回集合元素的个数
    smove source dest value           把 source 的 value 移动到 dest 集合中
    sinter key1 key2 key3             求交集
    sunion key1 key2                  求并集
    sdiff key1 key2                   求差集
    sinterstore res key1 key2         求交集并存在 res

@sorted_set
    zadd key score member [score member ...]    添加 member 到集合,按 score 排序,如果 member 已经存在则更改 score
    zcard key                                   返回集合的所有元素
    zcount key min max                          返回 score 在 [min, max] 区间内元素数量
    zrem key member [member ...]                删除元素
    zremrangebyscore key min max                删除 score 在 [min, max] 之间的元素
    zremrangebyrank key start end               删除次序在 [start, end] 之间的元素  
    zrank key member                            查询 member 排名,升序
    zrevrank key member                         查询 member 排名,降序
    zrange key start stop                       返回次序在 [start,stop] 的元素
    zrangebyscore key min max                   返回 score 在 [min, max] 内的元素

@hash
    hset key field value                        设置 field 的值
    hsetnx key field value                      只在 field 不存在的情况下设置
    hmset key field1 value1 field2 value2       同时设置多个 field
    hget key field                              获取指定的 field
    hmget key field1 field2                     获取多个 field
    hincrby key field increment                 指定的 field 加上给定的值
    hexists key field                           指定的 field 是否存在
    hlen key                                    返回 field 数量
    hdel key field                              删除指定的 field
    hkeys key                                   返回所有的 field
    hvals key                                   返回所有的 value
    hgetall key                                 获取全部的 field 及 value  

可以看到这些命令,以及数据结构,都比较简单

这些命令全都是原子操作

更多命令查看 help

Python 代码

import redis

pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)

redisConn.set("key_1", 123) 
redisConn.set("key_2", "value_2")

print redisConn.keys()
print redisConn.get("key_1")

需要先通过 pip install redis 安装 python 包

Java 代码(单独使用)

pom

            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.3.0</version>
            </dependency>

使用

        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(10);
        config.setMaxIdle(10);
        config.setMaxWaitMillis(10000);
        
        JedisPool jedisPool = new JedisPool(config, "localhost", 6379, 10000);
        Jedis jedis = jedisPool.getResource();
        
        jedis.set("key_3", "hello world");
        System.out.println(jedis.keys("*"));
        System.out.println(jedis.get("key_3"));

可以是 pool 也可以直接连接

Java 代码(结合 SpringBoot)

pom

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency> 

配置

spring:
  redis:
    host: localhost
    port: 6379
    database: 0
    timeout: 10000
    pool:
      max-active: 200  

初始化

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
        
        template.setConnectionFactory(factory);

        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
                new Jackson2JsonRedisSerializer<Object>(Object.class);
        jackson2JsonRedisSerializer.setObjectMapper(mapper);
        
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        
        return template;
    }

使用

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @PostConstruct
    public void init() {
        redisTemplate.opsForValue().set("key_5", 123456);
        System.out.println(redisTemplate.keys("*"));
        System.out.println(redisTemplate.opsForValue().get("key_5"));
    }

不定义 redisTemplate 函数也可以,但那样系统自动初始化的会是 RedisTemplate<Object, Object>,也就是 key 和 value 都是 Object 的,需要自己再转,不好用,所以最好还是自己定义 redisTemplate 函数,自己初始化 RedisTemplate,把 key 类型设置成 String 的比较好

TTL 设置

redis> SET mykey "Hello"
"OK"
redis> EXPIRE mykey 10
(integer) 1
redis> TTL mykey
(integer) 10
redis> SET mykey "Hello World"
"OK"
redis> TTL mykey
(integer) -1
redis> 

EXPIRE 命令可以设置超时时间,单位是秒
TTL 命令查看还剩多少时间超时,返回 -1 表示没有设置超时时间,返回 -2 表示 key 不存在
如果重新设置 key 的值,会自动取消超时时间设置

Transactions 事务性

> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

Redis 的事务不做回滚,而且可能部分成功部分失败,只保证将整个事务作为一个原子操作,即 Redis 在处理事务的过程中,不会处理其他 client 的请求,要等事务处理完了才继续处理其他请求

Pipeline 流水线处理

# python
import redis

pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)

pipeline = redisConn.pipeline()
pipeline.set("key_1", 10)
pipeline.set("key_2", 20)
pipeline.set("key_3", 30)

pipeline.execute()

pipeline 功能貌似在命令行下不支持,必须通过代码实现

execute 的时候才真正执行
连续发送请求,但是中间不等待回复,而是最后一次性接收所有回复
不保证原子性,不会阻止其他 client 的请求

Pub-Sub

sub

# sub
import redis

pool = redis.ConnectionPool(host="localhost", port=6379, db=3)
redisConn = redis.Redis(connection_pool = pool)

pubsub = redisConn.pubsub()
pubsub.subscribe(["channel_1", "channel_2"])
# pubsub.psubscribe(["channel*", "*hi*"])

for item in pubsub.listen():
       if item[‘type‘] == ‘message‘:
           print item[‘channel‘]
           print item[‘data‘]

pub

# pub
import redis

pool = redis.ConnectionPool(host="localhost", port=6379, db=5)
redisConn = redis.Redis(connection_pool = pool)

redisConn.publish("channel_1", "Hello")
redisConn.publish("channel_2", "World")

通过 Pub-Sub 可以作为消息通知机制使用(类似于 Kafka)

分布式锁

SET key value EX 120 NX

该命令是原子操作,表示只有在 key 不存在的情况下,才会赋值成功,并且 120 秒后会自动删除,这样就实现了带超时时间的互斥锁功能,获得锁的程序删除 key 就是释放了锁,如果程序出错退出,达到超时时间后也会保证锁能被释放
??
这种方法在比较极端的情况下可能失效
??Redis 是分布式部署
??程序 A 获取锁
??master 将数据同步到 slave 之前 master 宕机了
??slave 被选为 master 且没有 A 的锁
??程序 B 获取锁成功,这时候程序 A 还在正常运行,导致两个程序同时获得锁

分布式部署

Redis HA



Redis 简介

上一篇:丢弃法


下一篇:利用cobbler实现系统自动化安装