专栏
天翼云开发者社区

Redis Cluster的reshard功能

2024-04-08 13:38:57 24阅读

Redis Cluster的reshard功能


Redis Cluster是一个分布式的redis集群,通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 Redis cluter支持动态扩缩容,这时意味着需要把slot和里面的数据从一个节点迁移到另一个节点上,实现数据的重平衡(reshard)功能。

背景

Redis Cluster是一个分布式的redis集群。通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 这16,384个slot会分布在所有的redis集群中。 redis cluster客户端访问时, 会通过调用cluster nodes或cluster slots的命令,查询所有slot的分布信息,再计算KEY值对应slot,往对应的节点去发送请求。
由于redis是一个纯内存的KV存储, 数据都是放在内存上。受限于内存资源的有限性,为了解决这个问题,需要把多个机器的内存组成集群来使用。 而这个集群必须具备根据实际的使用需要,能够增加或减少节点的能力。
而扩缩容节点,核心的逻辑只有2个: 增加变更节点的元数据; 从原来的节点上迁移数据到新节点上 , 即redis cluster的重平衡(reshard)功能。


使用例子

参考官方文档

redis cluster的rebalance功能,有一个官方的开源实现实现 ,

可以利用redis-cli命令行去执行:
redis-cli --cluster reshard 127.0.0.1:7000


官方源码

可以参考官方的代码

核心的源码如下:
static int clusterManagerCommandReshard(int argc, char **argv) {
    int port = 0;
    char *ip = NULL;
    if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
    clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
    if (!clusterManagerLoadInfoFromNode(node)) return 0;
    clusterManagerCheckCluster(0);
    if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
        fflush(stdout);
        fprintf(stderr,
                "*** Please fix your cluster problems before resharding\n");
        return 0;
    }
    int slots = config.cluster_manager_command.slots;
    if (!slots) {
        while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) {
            printf("How many slots do you want to move (from 1 to %d)? ",
                   CLUSTER_MANAGER_SLOTS);
            fflush(stdout);
            char buf[6];
            int nread = read(fileno(stdin),buf,6);
            if (nread <= 0) continue;
            int last_idx = nread - 1;
            if (buf[last_idx] != '\n') {
                int ch;
                while ((ch = getchar()) != '\n' && ch != EOF) {}
            }
            buf[last_idx] = '\0';
            slots = atoi(buf);
        }
    }
    char buf[255];
    char *to = config.cluster_manager_command.to,
         *from = config.cluster_manager_command.from;
    while (to == NULL) {
        printf("What is the receiving node ID? ");
        fflush(stdout);
        int nread = read(fileno(stdin),buf,255);
        if (nread <= 0) continue;
        int last_idx = nread - 1;
        if (buf[last_idx] != '\n') {
            int ch;
            while ((ch = getchar()) != '\n' && ch != EOF) {}
        }
        buf[last_idx] = '\0';
        if (strlen(buf) > 0) to = buf;
    }
    int raise_err = 0;
    clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err);
    if (target == NULL) return 0;
    list *sources = listCreate();
    list *table = NULL;
    int all = 0, result = 1;
    if (from == NULL) {
        printf("Please enter all the source node IDs.\n");
        printf("  Type 'all' to use all the nodes as source nodes for "
               "the hash slots.\n");
        printf("  Type 'done' once you entered all the source nodes IDs.\n");
        while (1) {
            printf("Source node #%lu: ", listLength(sources) + 1);
            fflush(stdout);
            int nread = read(fileno(stdin),buf,255);
            if (nread <= 0) continue;
            int last_idx = nread - 1;
            if (buf[last_idx] != '\n') {
                int ch;
                while ((ch = getchar()) != '\n' && ch != EOF) {}
            }
            buf[last_idx] = '\0';
            if (!strcmp(buf, "done")) break;
            else if (!strcmp(buf, "all")) {
                all = 1;
                break;
            } else {
                clusterManagerNode *src =
                    clusterNodeForResharding(buf, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
        }
    } else {
        char *p;
        while((p = strchr(from, ',')) != NULL) {
            *p = '\0';
            if (!strcmp(from, "all")) {
                all = 1;
                break;
            } else {
                clusterManagerNode *src =
                    clusterNodeForResharding(from, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
            from = p + 1;
        }
        /* Check if there's still another source to process. */
        if (!all && strlen(from) > 0) {
            if (!strcmp(from, "all")) all = 1;
            if (!all) {
                clusterManagerNode *src =
                    clusterNodeForResharding(from, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
        }
    }
    listIter li;
    listNode *ln;
    if (all) {
        listEmpty(sources);
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *n = ln->value;
            if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
                continue;
            if (!sdscmp(n->name, target->name)) continue;
            listAddNodeTail(sources, n);
        }
    }
    if (listLength(sources) == 0) {
        fprintf(stderr, "*** No source nodes given, operation aborted.\n");
        result = 0;
        goto cleanup;
    }
    printf("\nReady to move %d slots.\n", slots);
    printf("  Source nodes:\n");
    listRewind(sources, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *src = ln->value;
        sds info = clusterManagerNodeInfo(src, 4);
        printf("%s\n", info);
        sdsfree(info);
    }
    printf("  Destination node:\n");
    sds info = clusterManagerNodeInfo(target, 4);
    printf("%s\n", info);
    sdsfree(info);
    table = clusterManagerComputeReshardTable(sources, slots);
    printf("  Resharding plan:\n");
    clusterManagerShowReshardTable(table);
    if (!(config.cluster_manager_command.flags &
          CLUSTER_MANAGER_CMD_FLAG_YES))
    {
        printf("Do you want to proceed with the proposed "
               "reshard plan (yes/no)? ");
        fflush(stdout);
        char buf[4];
        int nread = read(fileno(stdin),buf,4);
        buf[3] = '\0';
        if (nread <= 0 || strcmp("yes", buf) != 0) {
            result = 0;
            goto cleanup;
        }
    }
    int opts = CLUSTER_MANAGER_OPT_VERBOSE;
    listRewind(table, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerReshardTableItem *item = ln->value;
        char *err = NULL;
        result = clusterManagerMoveSlot(item->source, target, item->slot,
                                        opts, &err);
        if (!result) {
            if (err != NULL) {
                clusterManagerLogErr("clusterManagerMoveSlot failed: %s\n", err);
                zfree(err);
            }
            goto cleanup;
        }
    }
cleanup:
    listRelease(sources);
    clusterManagerReleaseReshardTable(table);
    return result;
invalid_args:
    fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
    return 0;
}

核心逻辑分析


下面是基于核心源码进行分析和说明。

1.  执行CLUSTER NODES命令,获取所有节点的信息,包括每个节点对应的redis节点的slot 信息。 下面是参考的结果:
d5060aff8ab03d3555b424a8d74d60bf7f46238c 127.0.0.1:8123@18123 myself,master - 0 1707036730000 3 connected 10923-16383
106e63c4f2cc529ee11b9d8b50abbc07b4f7b41d 127.0.0.1:6123@16123 master - 0 1707036732249 1 connected 0-5460
dc6cec6b9f566d7ca6ea9a8c704792ddb0c712e3 127.0.0.1:7123@17123 master - 0 1707036731231 2 connected 5461-10922

第一行最后的10923-16383, 说明的正是这个节点分配的slot范围。


2. 下面是核心的计算逻辑:
 
1. 计算reshard的迁移计划,每个节点要多少个slot迁移到新节点上
2.  循环执行每个SLOT的迁移: (对应函数clusterManagerMoveSlot)
      2.1. 在目标节点执行 CLUSTER SETSLOT {slot} importing {源节点的名字} : 设置目标节点SLOT的状态importing 
      2.2. 在源节点执行 CLUSTER SETSLOT {slot} migrating {目标节点的名字} : 设置源节点SLOT的状态migrating 
      2.3. 迁移数据:   存在大量KEY的情况,会重复执行下面步骤:
         2.3.1  在源节点执行CLUSTER GETKEYSINSLOT {slot} {pipeline}  : 获取这个SLOT的最多{pineline}个 KEY
         2.3.2  在源节点执行MIGRATE {host} {port}  0  {timeout} REPLACE AUTH  {paasword} KEYS  key [key ...]:  迁移数据
      2.4.  在目标节点执行 CLUSTER SETSLOT {slot} node  {目标节点的名字}  : 设置目标节点已经拥有这个SLOT
      2.5.  在源节点执行 CLUSTER SETSLOT {slot} node  {源节点的名字} :  设置源节点不拥有这个SLOT
      2.6.  对其他节点执行 CLUSTER SETSLOT {slot} node  {目标节点的名字}   通知其他节点

3. 迁移中的slot状态说明。

slot有importing和migrating 两种特殊状态, 代表这个slot正在迁移中
A是源节点,B是迁移的目标节点,执行下面命令更新slot状态
We send B: CLUSTER SETSLOT 8 IMPORTING A
We send A: CLUSTER SETSLOT 8 MIGRATING B

这时slot 8的key依然发向节点A。如果A存在目标KEY,则直接返回。 如果KEY不存在,这时会返回转发到B上处理。

参考下面例子: 一个请求被转发了两次
127.0.0.1:6123> get 1
-> Redirected to slot [9842] located at 127.0.0.1:9123
-> Redirected to slot [9842] located at 127.0.0.1:7123
(nil)

但让服务端处理IMPORTING的slot的请求, 需要客户端执行asking命令,打开对应的开关才行。  但这个行为一般客户端(比如JEDIS)都会自动执行 ,用户不需要额外关心。


总结

Redis Cluster提供的reshard功能,结合实际的使用需求,可以在线的扩缩容redis cluster集群的容量,不需要担心期间服务的不可用。
但需要注意的是, 由于迁移期间,为了不太多影响业务的正常执行, 每次只会迁移一个slot,这必然会导致最终迁移时间边长。 再者,迁移执行的MIGRATE命令,本质是批量的dump和restore,以及可选的del命令组合。必然还是会对迁移的相关节点有一定的性能影响。 建议如果选择扩缩容,也考虑在业务压力较低的时段再进行。
  • 2
  • 2
  • 0
0 评论
0/1000
评论(0) 发表评论
叶****伟

叶****伟

6 篇文章 0 粉丝
关注

Redis Cluster的reshard功能

2024-04-08 13:38:57 24阅读

Redis Cluster的reshard功能


Redis Cluster是一个分布式的redis集群,通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 Redis cluter支持动态扩缩容,这时意味着需要把slot和里面的数据从一个节点迁移到另一个节点上,实现数据的重平衡(reshard)功能。

背景

Redis Cluster是一个分布式的redis集群。通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 这16,384个slot会分布在所有的redis集群中。 redis cluster客户端访问时, 会通过调用cluster nodes或cluster slots的命令,查询所有slot的分布信息,再计算KEY值对应slot,往对应的节点去发送请求。
由于redis是一个纯内存的KV存储, 数据都是放在内存上。受限于内存资源的有限性,为了解决这个问题,需要把多个机器的内存组成集群来使用。 而这个集群必须具备根据实际的使用需要,能够增加或减少节点的能力。
而扩缩容节点,核心的逻辑只有2个: 增加变更节点的元数据; 从原来的节点上迁移数据到新节点上 , 即redis cluster的重平衡(reshard)功能。


使用例子

参考官方文档

redis cluster的rebalance功能,有一个官方的开源实现实现 ,

可以利用redis-cli命令行去执行:
redis-cli --cluster reshard 127.0.0.1:7000


官方源码

可以参考官方的代码

核心的源码如下:
static int clusterManagerCommandReshard(int argc, char **argv) {
    int port = 0;
    char *ip = NULL;
    if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
    clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
    if (!clusterManagerLoadInfoFromNode(node)) return 0;
    clusterManagerCheckCluster(0);
    if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
        fflush(stdout);
        fprintf(stderr,
                "*** Please fix your cluster problems before resharding\n");
        return 0;
    }
    int slots = config.cluster_manager_command.slots;
    if (!slots) {
        while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) {
            printf("How many slots do you want to move (from 1 to %d)? ",
                   CLUSTER_MANAGER_SLOTS);
            fflush(stdout);
            char buf[6];
            int nread = read(fileno(stdin),buf,6);
            if (nread <= 0) continue;
            int last_idx = nread - 1;
            if (buf[last_idx] != '\n') {
                int ch;
                while ((ch = getchar()) != '\n' && ch != EOF) {}
            }
            buf[last_idx] = '\0';
            slots = atoi(buf);
        }
    }
    char buf[255];
    char *to = config.cluster_manager_command.to,
         *from = config.cluster_manager_command.from;
    while (to == NULL) {
        printf("What is the receiving node ID? ");
        fflush(stdout);
        int nread = read(fileno(stdin),buf,255);
        if (nread <= 0) continue;
        int last_idx = nread - 1;
        if (buf[last_idx] != '\n') {
            int ch;
            while ((ch = getchar()) != '\n' && ch != EOF) {}
        }
        buf[last_idx] = '\0';
        if (strlen(buf) > 0) to = buf;
    }
    int raise_err = 0;
    clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err);
    if (target == NULL) return 0;
    list *sources = listCreate();
    list *table = NULL;
    int all = 0, result = 1;
    if (from == NULL) {
        printf("Please enter all the source node IDs.\n");
        printf("  Type 'all' to use all the nodes as source nodes for "
               "the hash slots.\n");
        printf("  Type 'done' once you entered all the source nodes IDs.\n");
        while (1) {
            printf("Source node #%lu: ", listLength(sources) + 1);
            fflush(stdout);
            int nread = read(fileno(stdin),buf,255);
            if (nread <= 0) continue;
            int last_idx = nread - 1;
            if (buf[last_idx] != '\n') {
                int ch;
                while ((ch = getchar()) != '\n' && ch != EOF) {}
            }
            buf[last_idx] = '\0';
            if (!strcmp(buf, "done")) break;
            else if (!strcmp(buf, "all")) {
                all = 1;
                break;
            } else {
                clusterManagerNode *src =
                    clusterNodeForResharding(buf, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
        }
    } else {
        char *p;
        while((p = strchr(from, ',')) != NULL) {
            *p = '\0';
            if (!strcmp(from, "all")) {
                all = 1;
                break;
            } else {
                clusterManagerNode *src =
                    clusterNodeForResharding(from, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
            from = p + 1;
        }
        /* Check if there's still another source to process. */
        if (!all && strlen(from) > 0) {
            if (!strcmp(from, "all")) all = 1;
            if (!all) {
                clusterManagerNode *src =
                    clusterNodeForResharding(from, target, &raise_err);
                if (src != NULL) listAddNodeTail(sources, src);
                else if (raise_err) {
                    result = 0;
                    goto cleanup;
                }
            }
        }
    }
    listIter li;
    listNode *ln;
    if (all) {
        listEmpty(sources);
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
            clusterManagerNode *n = ln->value;
            if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
                continue;
            if (!sdscmp(n->name, target->name)) continue;
            listAddNodeTail(sources, n);
        }
    }
    if (listLength(sources) == 0) {
        fprintf(stderr, "*** No source nodes given, operation aborted.\n");
        result = 0;
        goto cleanup;
    }
    printf("\nReady to move %d slots.\n", slots);
    printf("  Source nodes:\n");
    listRewind(sources, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *src = ln->value;
        sds info = clusterManagerNodeInfo(src, 4);
        printf("%s\n", info);
        sdsfree(info);
    }
    printf("  Destination node:\n");
    sds info = clusterManagerNodeInfo(target, 4);
    printf("%s\n", info);
    sdsfree(info);
    table = clusterManagerComputeReshardTable(sources, slots);
    printf("  Resharding plan:\n");
    clusterManagerShowReshardTable(table);
    if (!(config.cluster_manager_command.flags &
          CLUSTER_MANAGER_CMD_FLAG_YES))
    {
        printf("Do you want to proceed with the proposed "
               "reshard plan (yes/no)? ");
        fflush(stdout);
        char buf[4];
        int nread = read(fileno(stdin),buf,4);
        buf[3] = '\0';
        if (nread <= 0 || strcmp("yes", buf) != 0) {
            result = 0;
            goto cleanup;
        }
    }
    int opts = CLUSTER_MANAGER_OPT_VERBOSE;
    listRewind(table, &li);
    while ((ln = listNext(&li)) != NULL) {
        clusterManagerReshardTableItem *item = ln->value;
        char *err = NULL;
        result = clusterManagerMoveSlot(item->source, target, item->slot,
                                        opts, &err);
        if (!result) {
            if (err != NULL) {
                clusterManagerLogErr("clusterManagerMoveSlot failed: %s\n", err);
                zfree(err);
            }
            goto cleanup;
        }
    }
cleanup:
    listRelease(sources);
    clusterManagerReleaseReshardTable(table);
    return result;
invalid_args:
    fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
    return 0;
}

核心逻辑分析


下面是基于核心源码进行分析和说明。

1.  执行CLUSTER NODES命令,获取所有节点的信息,包括每个节点对应的redis节点的slot 信息。 下面是参考的结果:
d5060aff8ab03d3555b424a8d74d60bf7f46238c 127.0.0.1:8123@18123 myself,master - 0 1707036730000 3 connected 10923-16383
106e63c4f2cc529ee11b9d8b50abbc07b4f7b41d 127.0.0.1:6123@16123 master - 0 1707036732249 1 connected 0-5460
dc6cec6b9f566d7ca6ea9a8c704792ddb0c712e3 127.0.0.1:7123@17123 master - 0 1707036731231 2 connected 5461-10922

第一行最后的10923-16383, 说明的正是这个节点分配的slot范围。


2. 下面是核心的计算逻辑:
 
1. 计算reshard的迁移计划,每个节点要多少个slot迁移到新节点上
2.  循环执行每个SLOT的迁移: (对应函数clusterManagerMoveSlot)
      2.1. 在目标节点执行 CLUSTER SETSLOT {slot} importing {源节点的名字} : 设置目标节点SLOT的状态importing 
      2.2. 在源节点执行 CLUSTER SETSLOT {slot} migrating {目标节点的名字} : 设置源节点SLOT的状态migrating 
      2.3. 迁移数据:   存在大量KEY的情况,会重复执行下面步骤:
         2.3.1  在源节点执行CLUSTER GETKEYSINSLOT {slot} {pipeline}  : 获取这个SLOT的最多{pineline}个 KEY
         2.3.2  在源节点执行MIGRATE {host} {port}  0  {timeout} REPLACE AUTH  {paasword} KEYS  key [key ...]:  迁移数据
      2.4.  在目标节点执行 CLUSTER SETSLOT {slot} node  {目标节点的名字}  : 设置目标节点已经拥有这个SLOT
      2.5.  在源节点执行 CLUSTER SETSLOT {slot} node  {源节点的名字} :  设置源节点不拥有这个SLOT
      2.6.  对其他节点执行 CLUSTER SETSLOT {slot} node  {目标节点的名字}   通知其他节点

3. 迁移中的slot状态说明。

slot有importing和migrating 两种特殊状态, 代表这个slot正在迁移中
A是源节点,B是迁移的目标节点,执行下面命令更新slot状态
We send B: CLUSTER SETSLOT 8 IMPORTING A
We send A: CLUSTER SETSLOT 8 MIGRATING B

这时slot 8的key依然发向节点A。如果A存在目标KEY,则直接返回。 如果KEY不存在,这时会返回转发到B上处理。

参考下面例子: 一个请求被转发了两次
127.0.0.1:6123> get 1
-> Redirected to slot [9842] located at 127.0.0.1:9123
-> Redirected to slot [9842] located at 127.0.0.1:7123
(nil)

但让服务端处理IMPORTING的slot的请求, 需要客户端执行asking命令,打开对应的开关才行。  但这个行为一般客户端(比如JEDIS)都会自动执行 ,用户不需要额外关心。


总结

Redis Cluster提供的reshard功能,结合实际的使用需求,可以在线的扩缩容redis cluster集群的容量,不需要担心期间服务的不可用。
但需要注意的是, 由于迁移期间,为了不太多影响业务的正常执行, 每次只会迁移一个slot,这必然会导致最终迁移时间边长。 再者,迁移执行的MIGRATE命令,本质是批量的dump和restore,以及可选的del命令组合。必然还是会对迁移的相关节点有一定的性能影响。 建议如果选择扩缩容,也考虑在业务压力较低的时段再进行。
文章来自专栏

中间件随笔

6 篇文章 1 订阅
0 评论
0/1000
评论(0) 发表评论
  • 2
    点赞
  • 2
    收藏
  • 0
    评论