redis7.x源码分析:(8) serverCron
serverCron函数是Redis中最核心的定时处理函数,它通过 aeCreateTimeEvent 注册到事件处理器中(关于它的实现见:redis7.x源码分析:(5) ae事件处理器(二)),触发时间间隔为:1000ms / server.hz ,server.hz 可以通过配置文件中 hz 选项进行修改,默认值为 10,也就是默认100毫秒执行一次。
serverCron函数的职责非常广泛,主要包括以下几个关键技术点:
1.过期键清理:采用主动和自适应策略,清理已过期的键,释放内存。
2.数据库渐进式 Rehash:在字典容量变化时,将 rehash 操作分摊到多次 serverCron 调用中执行,避免单次操作阻塞服务。
3.持久化相关:触发 RDB 快照的后台保存,以及将 AOF 缓冲区中的数据存盘(如果开启)。
4.连接管理:关闭超时闲置的客户端,并清理输出缓冲区过大的客户端。
5.状态统计与更新:更新服务器的时间缓存、内存占用、键空间命中率等统计信息,为 INFO 命令提供数据。
serverCron函数比较常用的一个定时宏是run_with_period,用来控制多久执行相应的代码段:
// 定时时长 < 触发周期 或者 达到定时时长 就立即执行 #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
serverCron的代码实现如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int j;
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
// 如果配置开启了watchdog执行时间,则发送SIGALRM消息触发watchdogSignalHandler处理(具体处理代码没看)
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
// 更新服务器时间
updateCachedTime(1);
// 每1000毫秒执行的次数(执行间隔就是 1000 / server.hz = xxx毫秒)
server.hz = server.config_hz;
// 如果开启了动态频率,则根据当前连接的客户端数量,动态调整执行频率,最大不超过500
if (server.dynamic_hz) {
while (listLength(server.clients) / server.hz >
MAX_CLIENTS_PER_CLOCK_TICK)
{
server.hz *= 2;
if (server.hz > CONFIG_MAX_HZ) {
server.hz = CONFIG_MAX_HZ;
break;
}
}
}
/* for debug purposes: skip actual cron work if pause_cron is on */
if (server.pause_cron) return 1000/server.hz;
// 100毫秒统计一次网络io数据量
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
stat_net_input_bytes + stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
stat_net_output_bytes + stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION,
stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION,
stat_net_repl_output_bytes);
}
// 取当前时间(秒)的后23bit
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
// 统计内存使用量包括内存分配峰值,进程的内存占用等
cronUpdateMemoryStats();
/* We received a SIGTERM or SIGINT, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
if (server.shutdown_asap && !isShutdownInitiated()) {
// 第一次收到SIGTERM 或 SIGINT,尝试退出
int shutdownFlags = SHUTDOWN_NOFLAGS;
if (server.last_sig_received == SIGINT && server.shutdown_on_sigint)
shutdownFlags = server.shutdown_on_sigint;
else if (server.last_sig_received == SIGTERM && server.shutdown_on_sigterm)
shutdownFlags = server.shutdown_on_sigterm;
if (prepareForShutdown(shutdownFlags) == C_OK) exit(0);
} else if (isShutdownInitiated()) {
// 继续尝试退出
if (server.mstime >= server.shutdown_mstime || isReadyToShutdown()) {
if (finishShutdown() == C_OK) exit(0);
/* Shutdown failed. Continue running. An error has been logged. */
}
}
/* Show some info about non-empty databases */
if (server.verbosity <= LL_VERBOSE) {
// 根据日志等级5s输出一下各个数据库的用量
run_with_period(5000) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(server.db[j].dict);
used = dictSize(server.db[j].dict);
vkeys = dictSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
}
}
}
}
/* Show information about connected clients */
if (!server.sentinel_mode) {
// 非sentinel模式5s输出一次连接数的debug日志
run_with_period(5000) {
serverLog(LL_DEBUG,
"%lu clients connected (%lu replicas), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
zmalloc_used_memory());
}
}
// 处理空闲客户端连接,释放客户端空闲内存等操作
clientsCron();
// 清理数据库过期key以及resize、rehash数据库
databasesCron();
// 没在aof重写, 并且符合重写限制, 则启动重写
if (!hasActiveChildProcess() &&
server.aof_rewrite_scheduled &&
!aofRewriteLimited())
{
rewriteAppendOnlyFileBackground();
}
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
// 每1000毫秒执行一次 receiveChildInfo
run_with_period(1000) receiveChildInfo();
// 等待子进程退出并且获取退出消息(如果有子进程的话)
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
// 按照save配置执行rdb保存
struct saveparam *sp = server.saveparams+j;
// 默认的save条件:60秒内至少有10000个键被修改,300秒内至少有100个键被修改,900秒内至少有1个键被修改,并超过失败重试间隔5s
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr);
break;
}
}
// 满足重写条件, 则启动AOF重写
// AOF重写条件:当前AOF状态为AOF_ON, 没有子进程, 重写百分比大于0, 当前AOF文件大小大于最小重写大小
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
// 计算相对于基准大小的增长百分比(rewrite完成后更新基准大小为当前AOF大小)
long long growth = (server.aof_current_size*100/base) - 100;
// 大于等于重写百分比, 并且没有被限制重写, 则启动AOF重写
if (growth >= server.aof_rewrite_perc && !aofRewriteLimited()) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
}
// 更新字典策略是否可以resize(没有子进程的时候为true)
// 这个函数会在每次执行命令时调用, 但是为了确保在完全空闲的时候也能执行一次, 所以这里再次调用
updateDictResizePolicy();
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_flush_postponed_start)
{
// 如果上次推迟了fsync, 则每次循环都去尝试执行flush
flushAppendOnlyFile(0);
}
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_last_write_status == C_ERR)
{
// 每秒执行一次flush
flushAppendOnlyFile(0);
}
}
/* Clear the paused clients state if needed. */
// 按需清除暂停的客户端状态
checkClientPauseTimeoutAndReturnIfPaused();
// 处理主从复制相关的操作,如果处于failover状态, 则每100毫秒执行一次, 否则每1000毫秒执行一次
if (server.failover_state != NO_FAILOVER) {
run_with_period(100) replicationCron();
} else {
run_with_period(1000) replicationCron();
}
/* Run the Redis Cluster cron. */
// 处理集群相关操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// 哨兵模式下处理哨兵定时器相关操作
if (server.sentinel_mode) sentinelTimer();
/* Cleanup expired MIGRATE cached sockets. */
// 清理执行迁移时过期的socket
run_with_period(1000) {
migrateCloseTimedoutSockets();
}
/* Stop the I/O threads if we don't have enough pending work. */
// 如果没有足够的io操作, 则停止io线程
stopThreadedIOIfNeeded();
// 启用了键跟踪功能, 则检查键跟踪表进行大小调整
if (server.tracking_clients) trackingLimitUsedSlots();
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
*
* Note: this code must be after the replicationCron() call above so
* make sure when refactoring this file to keep this order. This is useful
* because we want to give priority to RDB savings for replication. */
if (!hasActiveChildProcess() &&
server.rdb_bgsave_scheduled &&
(server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
// 设置了需要bgsave的标志, 且没有子进程在执行bgsave,并且距离上次bgsave超过了重试间隔5s, 或者上次bgsave成功,则进行bgsave
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSaveBackground(SLAVE_REQ_NONE,server.rdb_filename,rsiptr) == C_OK)
server.rdb_bgsave_scheduled = 0;
}
run_with_period(100) {
// 100ms执行一次模块周期性任务
if (moduleCount()) modulesCron();
}
/* Fire the cron loop modules event. */
// 通知模块执行 REDISMODULE_EVENT_CRON_LOOP 事件
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,server.hz};
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
0,
&ei);
server.cronloops++;
// 返回定时时长
return 1000/server.hz;
}
Redis源码解析 文章被收录于专栏
基于redis7.x版本的源码分析
查看6道真题和解析