1、向后端Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key到目标Redis Server中,这个过程会一贯持续;
2、如果在迁移过程中如果要针对某个key进行修正,这个key恰好在迁移,同步迁移的逻辑是先调用SLOTSMGRTTAGONE 将这个key迁移完成才能处理要求,以担保数据的同等性。
本日我们来剖析下什么时候是同步/异步迁移如何设置,异步迁移的流程是怎么样的。

一、迁移办法是在哪里设置的
迁移是同步还是异步是保存在Slot的method字段中:
type Slot struct { id int lock struct { hold bool sync.RWMutex } refs sync.WaitGroup switched bool backend, migrate struct { id int bc sharedBackendConn } replicaGroups [][]sharedBackendConnmethodforwardMethod}
可以设置为forwardSync和forwardSemiAsync,前者对应同步,后者对应异步,Proxy在初始化时会设置为同步:
func NewRouter(config Config) Router { s := &Router{config: config} s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel) s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel) for i := range s.slots { s.slots[i].id = i//默认同步转发 s.slots[i].method = &forwardSync{} } return s}
在context的toSlot方法中会根据ctx.method设置ForwardMethod:
func (ctx context) toSlot(m models.SlotMapping, p models.Proxy) models.Slot { slot := &models.Slot{ Id: m.Id, Locked: ctx.isSlotLocked(m), ForwardMethod: ctx.method, }
而ctx.method字段在context初始化时根据配置MigrationMethod天生:
func(sTopom)newContext()(context,error){ if s.online { if err := s.refillCache(); err != nil { return nil, err } else { ctx := &context{} ctx.slots = s.cache.slots//读取配置 ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod) return ctx, nil } } else { return nil, ErrNotOnline }}
这个对应dashboard配置文件中的migration_method:
# Set arguments for data migration (only accept 'sync' & 'semi-async'). 25 migration_method = "semi-async"
二、同步和异步的处理逻辑的差异
1、处理客户端要求的差别
上面剖析了同步还是异步转发取决于配置文件,这个配置是在Slot一级,关于Slot干系操作,包括如何转发后端命令都是有差异的,为了详细地剖析差别, 我们看两个实现:forwardSync和forwardSemiAsync的差异,两者的差异如下:
1、forwardSemiAsync增加了重试,由于须要异步等待key迁移完成,对应的是Forward方法;
2、最紧张的是process方法,这个方法都由Forward调用;
再回顾下前面讲的内容,一个正常要求的调用链大概如下:
Session::loopReaderSession::handleRequestRouter::dispatchslot.forwardslot.method.Forward
即要求首先处理读事宜,读取客户发送过来的要求数据,按Redis协议编码、解码;然后将给Session的handleRequest,后者再转给Router,Router再交给Slot,而Slot末了交由上面说的两个转发方法。
关于同步的处理上一篇文章 Codis源码剖析之Slots迁移篇 已经剖析了会检讨当前Slot是否在迁移中,如果是则调用SLOTSMGRTTAGONE命令迁移当前key,并且必须等待迁移完成才往下处理要求,以担保数据的同等性:
func (d forwardSync) process(s Slot, r Request, hkey []byte) (BackendConn, error) {//如果正在迁移,查询这个key是否迁移完成 if s.migrate.bc != nil && len(hkey) != 0 { if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil { log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s", s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err) return nil, err } } r.Group = &s.refs r.Group.Add(1) return d.forward2(s, r), nil}
再来看异步的处理方法:
func(dforwardSemiAsync)process(sSlot,rRequest,hkey[]byte)(_BackendConn,retrybool,_error){ if s.migrate.bc != nil && len(hkey) != 0 { resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi) switch { case err != nil: log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s", s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err) return nil, false, err case !moved: switch { case resp != nil: r.Resp = resp return nil, false, nil } return nil, true, nil } } r.Group = &s.refs r.Group.Add(1) return d.forward2(s, r), false, nil}
调用slotsmgrtExecWrapper来处理命令:
func (d forwardHelper) slotsmgrtExecWrapper(s Slot, hkey []byte, database int32, seed uint, multi []redis.Resp) (_ redis.Resp, moved bool, _ error) { m := &Request{} m.Multi = make([]redis.Resp, 0, 2+len(multi)) m.Multi = append(m.Multi, redis.NewBulkBytes([]byte("SLOTSMGRT-EXEC-WRAPPER")), redis.NewBulkBytes(hkey), )//省略代码}
可以看到是调用redis命令SLOTSMGRT-EXEC-WRAPPER来处理要求,再看下这个命令的C实现:
再跟进去,看slotsmgrtExecWrapperCommand函数
voidslotsmgrtExecWrapperCommand(client c) {//查找命令是否存在 struct redisCommand cmd = lookupCommand(c->argv[2]->ptr); if (cmd == NULL) { addReplyLongLong(c, -1); addReplyErrorFormat(c,"invalid command specified (%s)", (char )c->argv[2]->ptr); return; } if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) { addReplyLongLong(c, -1); addReplyErrorFormat(c, "wrong number of arguments for command (%s)", (char )c->argv[2]->ptr); return; } if (lookupKeyWrite(c->db, c->argv[1]) == NULL) { addReplyLongLong(c, 0); addReplyError(c, "the specified key doesn't exist"); return; }//如果正在迁移并且当前命令是写命令则返回缺点 if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) { addReplyLongLong(c, 1); addReplyError(c, "the specified key is being migrated"); return; } else { addReplyLongLong(c, 2); robj argv = zmalloc(sizeof(robj ) (c->argc - 2)); for (int i = 2; i < c->argc; i ++) { argv[i - 2] = c->argv[i]; incrRefCount(c->argv[i]); } for (int i = 0; i < c->argc; i ++) { decrRefCount(c->argv[i]); } zfree(c->argv); c->argc = c->argc - 2; c->argv = argv; c->cmd = cmd; call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE); }}
可以看到SLOTSMGRT-EXEC-WRAPPER会判断当前操作的命令是否为写命令,并且这个key是否在迁移或壅塞中,如果是则返回缺点,这种情形下须要Proxy进行重试。
2、迁移数据的差别
前面剖析了同步和异步迁移数据调用的方法不同:
switchmethod{ case models.ForwardSync: do = func() (int, error) { return c.MigrateSlot(sid, dest) } case models.ForwardSemiAsync: var option = &redis.MigrateSlotAsyncOption{ MaxBulks: s.config.MigrationAsyncMaxBulks, MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(), NumKeys: s.config.MigrationAsyncNumKeys, Timeout: math2.MinDuration(time.Second5, s.config.MigrationTimeout.Duration()), } do = func() (int, error) { return c.MigrateSlotAsync(sid, dest, option) }
同步调用的方法前面已经剖析过了,看下异步迁移的方法,即MigrateSlotAsync:
func (c Client) MigrateSlotAsync(slot int, target string, option MigrateSlotAsyncOption) (int, error) { host, port, err := net.SplitHostPort(target) if err != nil { return 0, errors.Trace(err) } if reply, err := c.Do("SLOTSMGRTTAGSLOT-ASYNC", host, port, int(option.Timeout/time.Millisecond), option.MaxBulks, option.MaxBytes, slot, option.NumKeys); err != nil { return 0, errors.Trace(err) } else { // }}
可以看到是调用SLOTSMGRTTAGSLOT-ASYNC命令进行迁移,Redis Server实现的逻辑比较繁芜这里就不详细剖析了,大概过程如下:
1)源Redis对key进行序列化异步发送给目标Redis;
2)目标Redis通过Restore还原后回答给源Redis;
3)源Redis收到目标Redis确认后标记这个key迁移完成,迁移下一个key;
其余说下大key的处理,对付大key,如一个长度为1W的list,Codis会将key分拆成多个命令,由于通过不断的rpush终极的结果一样;
Codis会在每一个拆分后的指令中加上一个临时TTL;
等全部拆分的指令实行成功才会删除本地的key;
因此纵然中途迁移失落败,已迁移成功的key也会超时自动删除,终极效果就好比迁移没有发生一样。
三、总结
1、同步还是异步迁移取决于dashboard配置文件migration_method;
2、同步和异步有两个差异:
一是处理要求的不同,如果当前要操作的key所属Slot正在迁移,同步处理会发送命令等待后端迁移完成才往下操作,异步则是将当前要求封装成一次SLOTSMGRT-EXEC-WRAPPER调用,并且将操作命令及参数都发送过去,后者会判断这个key是否在迁移或壅塞,如果是并且当前为写命令则直接返回失落败,由Proxy重试。
二是迁移逻辑不同,同步会调用SLOTSMGRTTAGSLOT迁移,异步则是调用SLOTSMGRTTAGSLOT-ASYNC,前者每次随机迁移一个key,异步的过程则繁芜得多,对付小key须要确认才算迁移完成,对付大key还会分拆成多条命令,以担保不壅塞主流程,并且在拆分后的命令都加上TTL,以担保如果中途失落败目标Redis的key会及时清掉而不会产生脏数据。
Codis源码剖析之环境篇
Codis源码剖析之Slots迁移篇
Codis Proxy初始化篇
Codis Proxy是如何处理一个要求的
从一次线上故障来看redis删除机制