redis7.x源码分析:(8) serverCron

​serverCron函数是Redis中最核心的定时处理函数,它通过 aeCreateTimeEvent 注册到事件处理器中(关于它的实现见:redis7.x源码分析:(5) ae事件处理器(二)),触发时间间隔为:1000ms / server.hz ,server.hz 可以通过配置文件中 hz 选项进行修改,默认值为 10,也就是默认100毫秒执行一次。

serverCron函数的职责非常广泛,主要包括以下几个关键技术点:

1.过期键清理:采用主动和自适应策略,清理已过期的键,释放内存。

2.数据库渐进式 Rehash:在字典容量变化时,将 rehash 操作分摊到多次 serverCron 调用中执行,避免单次操作阻塞服务。

3.持久化相关:触发 RDB 快照的后台保存,以及将 AOF 缓冲区中的数据存盘(如果开启)。

4.连接管理:关闭超时闲置的客户端,并清理输出缓冲区过大的客户端。

5.状态统计与更新:更新服务器的时间缓存、内存占用、键空间命中率等统计信息,为 INFO 命令提供数据。

serverCron函数比较常用的一个定时宏是run_with_period,用来控制多久执行相应的代码段:​

// 定时时长 < 触发周期 或者 达到定时时长 就立即执行
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

serverCron的代码实现如下:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    UNUSED(eventLoop);
    UNUSED(id);
    UNUSED(clientData);

    // 如果配置开启了watchdog执行时间,则发送SIGALRM消息触发watchdogSignalHandler处理(具体处理代码没看)
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    /* Update the time cache. */
    // 更新服务器时间
    updateCachedTime(1);

    // 每1000毫秒执行的次数(执行间隔就是 1000 / server.hz = xxx毫秒)
    server.hz = server.config_hz;
    // 如果开启了动态频率,则根据当前连接的客户端数量,动态调整执行频率,最大不超过500
    if (server.dynamic_hz) {
        while (listLength(server.clients) / server.hz >
               MAX_CLIENTS_PER_CLOCK_TICK)
        {
            server.hz *= 2;
            if (server.hz > CONFIG_MAX_HZ) {
                server.hz = CONFIG_MAX_HZ;
                break;
            }
        }
    }

    /* for debug purposes: skip actual cron work if pause_cron is on */
    if (server.pause_cron) return 1000/server.hz;

    // 100毫秒统计一次网络io数据量
    run_with_period(100) {
        long long stat_net_input_bytes, stat_net_output_bytes;
        long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
        atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
        atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
        atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
        atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);

        trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
                stat_net_input_bytes + stat_net_repl_input_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
                stat_net_output_bytes + stat_net_repl_output_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION,
                                 stat_net_repl_input_bytes);
        trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION,
                                 stat_net_repl_output_bytes);
    }

    // 取当前时间(秒)的后23bit
    unsigned int lruclock = getLRUClock();
    atomicSet(server.lruclock,lruclock);

    // 统计内存使用量包括内存分配峰值,进程的内存占用等
    cronUpdateMemoryStats();

    /* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    if (server.shutdown_asap && !isShutdownInitiated()) {
        // 第一次收到SIGTERM 或 SIGINT,尝试退出
        int shutdownFlags = SHUTDOWN_NOFLAGS;
        if (server.last_sig_received == SIGINT && server.shutdown_on_sigint)
            shutdownFlags = server.shutdown_on_sigint;
        else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm)
            shutdownFlags = server.shutdown_on_sigterm;

        if (prepareForShutdown(shutdownFlags) == C_OK) exit(0);
    } else if (isShutdownInitiated()) {
        // 继续尝试退出
        if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
            if (finishShutdown() == C_OK) exit(0);
            /* Shutdown failed. Continue running. An error has been logged. */
        }
    }

    /* Show some info about non-empty databases */
    if (server.verbosity <= LL_VERBOSE) {
        // 根据日志等级5s输出一下各个数据库的用量
        run_with_period(5000) {
            for (j = 0; j < server.dbnum; j++) {
                long long size, used, vkeys;

                size = dictSlots(server.db[j].dict);
                used = dictSize(server.db[j].dict);
                vkeys = dictSize(server.db[j].expires);
                if (used || vkeys) {
                    serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                }
            }
        }
    }

    /* Show information about connected clients */
    if (!server.sentinel_mode) {
        // 非sentinel模式5s输出一次连接数的debug日志
        run_with_period(5000) {
            serverLog(LL_DEBUG,
                "%lu clients connected (%lu replicas), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    // 处理空闲客户端连接,释放客户端空闲内存等操作
    clientsCron();

    // 清理数据库过期key以及resize、rehash数据库
    databasesCron();

    // 没在aof重写, 并且符合重写限制, 则启动重写
    if (!hasActiveChildProcess() &&
        server.aof_rewrite_scheduled &&
        !aofRewriteLimited())
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (hasActiveChildProcess() || ldbPendingChildren())
    {
        // 每1000毫秒执行一次 receiveChildInfo
        run_with_period(1000) receiveChildInfo();
        // 等待子进程退出并且获取退出消息(如果有子进程的话)
        checkChildrenDone();
    } else {
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now. */
        for (j = 0; j < server.saveparamslen; j++) {
            // 按照save配置执行rdb保存
            struct saveparam *sp = server.saveparams+j;

            // 默认的save条件:60秒内至少有10000个键被修改,300秒内至少有100个键被修改,900秒内至少有1个键被修改,并超过失败重试间隔5s
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
                break;
            }
        }

        // 满足重写条件, 则启动AOF重写
        // AOF重写条件:当前AOF状态为AOF_ON, 没有子进程, 重写百分比大于0, 当前AOF文件大小大于最小重写大小
        if (server.aof_state == AOF_ON &&
            !hasActiveChildProcess() &&
            server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        {
            long long base = server.aof_rewrite_base_size ?
                server.aof_rewrite_base_size : 1;
            // 计算相对于基准大小的增长百分比(rewrite完成后更新基准大小为当前AOF大小)
            long long growth = (server.aof_current_size*100/base) - 100;
            // 大于等于重写百分比, 并且没有被限制重写, 则启动AOF重写
            if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
        }
    }

    // 更新字典策略是否可以resize(没有子进程的时候为true)
    // 这个函数会在每次执行命令时调用, 但是为了确保在完全空闲的时候也能执行一次, 所以这里再次调用
    updateDictResizePolicy();


    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
        server.aof_flush_postponed_start)
    {
        // 如果上次推迟了fsync, 则每次循环都去尝试执行flush
        flushAppendOnlyFile(0);
    }

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * a higher frequency. */
    run_with_period(1000) {
        if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
            server.aof_last_write_status == C_ERR) 
            {
                // 每秒执行一次flush
                flushAppendOnlyFile(0);
            }
    }

    /* Clear the paused clients state if needed. */
    // 按需清除暂停的客户端状态
    checkClientPauseTimeoutAndReturnIfPaused();

    // 处理主从复制相关的操作,如果处于failover状态, 则每100毫秒执行一次, 否则每1000毫秒执行一次
    if (server.failover_state != NO_FAILOVER) {
        run_with_period(100) replicationCron();
    } else {
        run_with_period(1000) replicationCron();
    }

    /* Run the Redis Cluster cron. */
    // 处理集群相关操作
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    /* Run the Sentinel timer if we are in sentinel mode. */
    // 哨兵模式下处理哨兵定时器相关操作
    if (server.sentinel_mode) sentinelTimer();

    /* Cleanup expired MIGRATE cached sockets. */
    // 清理执行迁移时过期的socket
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    /* Stop the I/O threads if we don't have enough pending work. */
    // 如果没有足够的io操作, 则停止io线程
    stopThreadedIOIfNeeded();

    // 启用了键跟踪功能, 则检查键跟踪表进行大小调整
    if (server.tracking_clients) trackingLimitUsedSlots();

    /* Start a scheduled BGSAVE if the corresponding flag is set. This is
     * useful when we are forced to postpone a BGSAVE because an AOF
     * rewrite is in progress.
     *
     * Note: this code must be after the replicationCron() call above so
     * make sure when refactoring this file to keep this order. This is useful
     * because we want to give priority to RDB savings for replication. */
    if (!hasActiveChildProcess() &&
        server.rdb_bgsave_scheduled &&
        (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == C_OK))
    {
        // 设置了需要bgsave的标志, 且没有子进程在执行bgsave,并且距离上次bgsave超过了重试间隔5s, 或者上次bgsave成功,则进行bgsave
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK)
            server.rdb_bgsave_scheduled = 0;
    }

    run_with_period(100) {
        // 100ms执行一次模块周期性任务
        if (moduleCount()) modulesCron();
    }

    /* Fire the cron loop modules event. */
    // 通知模块执行 REDISMODULE_EVENT_CRON_LOOP 事件
    RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
    moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
                          0,
                          &ei);

    server.cronloops++;
    // 返回定时时长
    return 1000/server.hz;
}

Redis源码解析 文章被收录于专栏

基于redis7.x版本的源码分析

全部评论

相关推荐

昨天 18:02
已编辑
香港中文大学 golang
秋招有幸一开始就拿了淘天的笔面,并且美团转正的意向也顺利通过后续在淘天和字节两个&nbsp;9&nbsp;月主要流程都走到了&nbsp;hr&nbsp;面,国庆节后一个通过,一个横向挂了其他面过的包括:b&nbsp;站一面挂&nbsp;八股还行,最后手撕给了个笔试压轴限时&nbsp;15min...整段垮掉阿里控股&nbsp;kpi一面➕换部门走到二面,控股的都不喜欢开摄像头京东一面挂&nbsp;常规问题,但是疑似成都&nbsp;base&nbsp;hc&nbsp;很少,并且透露了已经转正,目前池子里无人捞腾讯正在二面&nbsp;一面体验不错,还指出了要改进的地方,提示二面不会再问问过的问题快手一面未知小红书一面未知字节换部门一面不喜欢业务,又回到了人才库大麦约面,准备拒掉虾皮一面&nbsp;无后续流程,面试聊的还行,感觉上海&nbsp;base&nbsp;池子满了---------------------------------------------------------------------------感觉秋招可以结束了,后续感觉走完这个腾讯流程就随缘面面&nbsp;t&nbsp;和&nbsp;b,主包家在南京,奈何南京没啥好的民营企业和互联网氛围,以及好国企又太难进,不知道淘天这个意向够不够直接结束秋招了...今天去深圳&nbsp;nip&nbsp;主场看了一下入围赛,主队不是这两家,还是觉得&nbsp;ig&nbsp;可惜了,有很好的机会没有抓住。感触和我字节&nbsp;hr&nbsp;面挂一样评论区有推荐的字节杭州上海base的业务线或者有字节&nbsp;hr&nbsp;uu&nbsp;可以捞一下吗?
我的求职进度条
点赞 评论 收藏
分享
评论
1
1
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务