Java 秋招面试之数据库与缓存话题

第8章 数据库与缓存

面试重要程度:⭐⭐⭐⭐⭐

常见提问方式:MySQL索引优化、Redis数据结构、缓存穿透

预计阅读时间:50分钟

开场白

兄弟,数据库和缓存绝对是后端面试的重头戏!我敢说,95%的Java面试都会深入考察MySQL和Redis。这不仅是技术基础,更能体现你对系统性能优化的理解深度。

今天我们就把数据库优化和缓存设计的核心知识点彻底搞透,让你在面试中展现出扎实的数据存储功底。

🗄️ 8.1 MySQL性能优化

索引设计原则与B+树结构

面试必问:

面试官:"说说MySQL的索引原理,为什么使用B+树而不是B树或红黑树?"

B+树结构优势:

-- B+树的特点分析
-- 1. 所有数据都存储在叶子节点
-- 2. 叶子节点之间有指针连接,便于范围查询
-- 3. 非叶子节点只存储键值,不存储数据,可以存储更多索引项
-- 4. 树的高度更低,减少磁盘IO次数

-- 示例:用户表索引设计
CREATE TABLE user (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,  -- 主键索引(聚簇索引)
    username VARCHAR(50) NOT NULL,         -- 普通索引
    email VARCHAR(100) NOT NULL,           -- 唯一索引
    age INT,                               -- 普通索引
    city VARCHAR(50),                      -- 组合索引的一部分
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    
    -- 索引创建
    UNIQUE KEY uk_email (email),           -- 唯一索引
    KEY idx_username (username),           -- 单列索引
    KEY idx_age (age),                     -- 单列索引
    KEY idx_city_age (city, age),          -- 组合索引
    KEY idx_create_time (create_time)      -- 时间索引
);

索引设计最佳实践:

-- 1. 最左前缀原则
-- 组合索引 idx_city_age (city, age) 可以支持以下查询:
SELECT * FROM user WHERE city = '北京';                    -- ✅ 使用索引
SELECT * FROM user WHERE city = '北京' AND age = 25;       -- ✅ 使用索引
SELECT * FROM user WHERE age = 25;                         -- ❌ 不使用索引

-- 2. 覆盖索引优化
-- 创建覆盖索引,避免回表查询
CREATE INDEX idx_username_email ON user(username, email);

-- 这个查询只需要访问索引,不需要回表
SELECT username, email FROM user WHERE username = 'zhangsan';

-- 3. 前缀索引
-- 对于长字符串字段,使用前缀索引节省空间
CREATE INDEX idx_email_prefix ON user(email(10));

-- 4. 函数索引(MySQL 8.0+)
-- 为经常使用函数的查询创建函数索引
CREATE INDEX idx_upper_username ON user((UPPER(username)));
SELECT * FROM user WHERE UPPER(username) = 'ZHANGSAN';

执行计划分析

面试重点:

面试官:"如何分析SQL的执行计划?各个字段代表什么意思?"

EXPLAIN详解:

-- 分析复杂查询的执行计划
EXPLAIN SELECT 
    u.username, 
    u.email, 
    o.order_no, 
    o.total_amount
FROM user u
INNER JOIN order_info o ON u.id = o.user_id
WHERE u.city = '北京' 
  AND u.age BETWEEN 20 AND 30
  AND o.create_time >= '2024-01-01'
ORDER BY o.create_time DESC
LIMIT 10;

-- 执行计划字段解析:
/*
+----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+
| id | select_type | table | partitions | type  | possible_keys    | key     | key_len | ref   | rows | filtered | Extra       |
+----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+
|  1 | SIMPLE      | u     | NULL       | range | idx_city_age     | idx_... | 158     | NULL  | 1000 |   100.00 | Using where |
|  1 | SIMPLE      | o     | NULL       | ref   | idx_user_id_time | idx_... | 8       | u.id  |   10 |    33.33 | Using where |
+----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+

字段含义:
- id: 查询序列号,数字越大越先执行
- select_type: 查询类型(SIMPLE、PRIMARY、SUBQUERY等)
- table: 表名
- type: 访问类型(system > const > eq_ref > ref > range > index > ALL)
- possible_keys: 可能使用的索引
- key: 实际使用的索引
- key_len: 索引长度
- ref: 索引的哪一列被使用了
- rows: 预估扫描的行数
- filtered: 过滤的百分比
- Extra: 额外信息
*/

性能优化案例:

-- 慢查询优化实例
-- 原始慢查询(耗时3秒)
SELECT * FROM order_info 
WHERE status = 1 
  AND create_time >= '2024-01-01' 
  AND total_amount > 100
ORDER BY create_time DESC
LIMIT 20;

-- 问题分析:
-- 1. 没有合适的索引
-- 2. 全表扫描
-- 3. 文件排序

-- 优化方案1:创建组合索引
CREATE INDEX idx_status_time_amount ON order_info(status, create_time, total_amount);

-- 优化方案2:调整查询条件顺序
SELECT * FROM order_info 
WHERE status = 1 
  AND create_time >= '2024-01-01' 
  AND total_amount > 100
ORDER BY create_time DESC
LIMIT 20;

-- 优化后执行计划:
-- type: range(范围扫描)
-- key: idx_status_time_amount
-- rows: 100(从原来的10万行降到100行)
-- Extra: Using index condition(使用索引条件下推)

慢查询优化

慢查询监控配置:

-- 开启慢查询日志
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;  -- 超过1秒的查询记录到慢查询日志
SET GLOBAL log_queries_not_using_indexes = 'ON';  -- 记录未使用索引的查询

-- 查看慢查询配置
SHOW VARIABLES LIKE '%slow_query%';
SHOW VARIABLES LIKE 'long_query_time';

常见慢查询优化技巧:

-- 1. 避免SELECT *
-- ❌ 不好的写法
SELECT * FROM user WHERE username = 'zhangsan';

-- ✅ 好的写法
SELECT id, username, email FROM user WHERE username = 'zhangsan';

-- 2. 使用LIMIT优化大结果集
-- ❌ 不好的写法
SELECT * FROM order_info ORDER BY create_time DESC;

-- ✅ 好的写法
SELECT * FROM order_info ORDER BY create_time DESC LIMIT 20;

-- 3. 优化深分页查询
-- ❌ 传统分页(OFFSET很大时性能差)
SELECT * FROM order_info ORDER BY id LIMIT 100000, 20;

-- ✅ 使用游标分页
SELECT * FROM order_info WHERE id > 100000 ORDER BY id LIMIT 20;

-- 4. 避免在WHERE子句中使用函数
-- ❌ 不好的写法
SELECT * FROM user WHERE YEAR(create_time) = 2024;

-- ✅ 好的写法
SELECT * FROM user WHERE create_time >= '2024-01-01' AND create_time < '2025-01-01';

-- 5. 使用EXISTS替代IN
-- ❌ 性能较差(子查询结果集很大时)
SELECT * FROM user WHERE id IN (SELECT user_id FROM order_info WHERE status = 1);

-- ✅ 性能更好
SELECT * FROM user u WHERE EXISTS (SELECT 1 FROM order_info o WHERE o.user_id = u.id AND o.status = 1);

主从复制与读写分离

主从复制原理:

-- 主库配置(my.cnf)
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
sync_binlog = 1
innodb_flush_log_at_trx_commit = 1

-- 从库配置(my.cnf)
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read_only = 1
super_read_only = 1

-- 主库创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';

-- 从库配置主从关系
CHANGE MASTER TO
    MASTER_HOST='192.168.1.100',
    MASTER_USER='repl',
    MASTER_PASSWORD='password',
    MASTER_LOG_FILE='mysql-bin.000001',
    MASTER_LOG_POS=154;

START SLAVE;
SHOW SLAVE STATUS\G

读写分离实现:

// 数据源配置
@Configuration
public class DataSourceConfig {
    
    @Bean
    @Primary
    public DataSource masterDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl("jdbc:mysql://master:3306/test");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        dataSource.setMaximumPoolSize(20);
        return dataSource;
    }
    
    @Bean
    public DataSource slaveDataSource() {
        HikariDataSource dataSource = new HikariDataSource();
        dataSource.setJdbcUrl("jdbc:mysql://slave:3306/test");
        dataSource.setUsername("root");
        dataSource.setPassword("password");
        dataSource.setMaximumPoolSize(20);
        return dataSource;
    }
    
    @Bean
    public DataSource routingDataSource() {
        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put("master", masterDataSource());
        dataSourceMap.put("slave", slaveDataSource());
        
        DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource();
        routingDataSource.setTargetDataSources(dataSourceMap);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        
        return routingDataSource;
    }
}

// 动态数据源路由
public class DynamicRoutingDataSource extends AbstractRoutingDataSource {
    
    @Override
    protected Object determineCurrentLookupKey() {
        return DataSourceContextHolder.getDataSourceType();
    }
}

// 数据源上下文
public class DataSourceContextHolder {
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();
    
    public static void setDataSourceType(String dataSourceType) {
        contextHolder.set(dataSourceType);
    }
    
    public static String getDataSourceType() {
        return contextHolder.get();
    }
    
    public static void clearDataSourceType() {
        contextHolder.remove();
    }
}

// 读写分离注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ReadOnly {
}

// AOP切面实现读写分离
@Aspect
@Component
public class DataSourceAspect {
    
    @Before("@annotation(readOnly)")
    public void setReadDataSourceType(ReadOnly readOnly) {
        DataSourceContextHolder.setDataSourceType("slave");
    }
    
    @Before("execution(* com.example.service.*.select*(..)) || " +
            "execution(* com.example.service.*.find*(..)) || " +
            "execution(* com.example.service.*.get*(..))")
    public void setReadDataSourceTypeForQuery() {
        if (DataSourceContextHolder.getDataSourceType() == null) {
            DataSourceContextHolder.setDataSourceType("slave");
        }
    }
    
    @After("@annotation(readOnly) || " +
           "execution(* com.example.service.*.select*(..)) || " +
           "execution(* com.example.service.*.find*(..)) || " +
           "execution(* com.example.service.*.get*(..))")
    public void clearDataSourceType() {
        DataSourceContextHolder.clearDataSourceType();
    }
}

🚀 8.2 Redis核心数据结构

String、Hash、List、Set、ZSet底层实现

面试高频:

面试官:"Redis的五种数据结构底层是如何实现的?各自的使用场景是什么?"

String类型:

// String的底层实现:SDS(Simple Dynamic String)
// 相比C字符串的优势:
// 1. O(1)时间复杂度获取字符串长度
// 2. 杜绝缓冲区溢出
// 3. 减少修改字符串时带来的内存重分配次数
// 4. 二进制安全

// 使用场景和命令
@Service
public class RedisStringService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    // 1. 缓存对象
    public void cacheUser(User user) {
        String key = "user:" + user.getId();
        String value = JSON.toJSONString(user);
        redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
    }
    
    // 2. 计数器
    public Long incrementViewCount(String articleId) {
        String key = "article:view:" + articleId;
        return redisTemplate.opsForValue().increment(key);
    }
    
    // 3. 分布式锁
    public boolean tryLock(String lockKey, String requestId, int expireTime) {
        String result = redisTemplate.execute((RedisCallback<String>) connection -> {
            return connection.set(lockKey.getBytes(), requestId.getBytes(), 
                Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT);
        });
        return "OK".equals(result);
    }
    
    // 4. 限流器(令牌桶)
    public boolean isAllowed(String userId, int limit, int window) {
        String key = "rate_limit:" + userId + ":" + (System.currentTimeMillis() / window);
        Long count = redisTemplate.opsForValue().increment(key);
        if (count == 1) {
            redisTemplate.expire(key, window, TimeUnit.SECONDS);
        }
        return count <= limit;
    }
}

Hash类型:

// Hash的底层实现:
// 1. 压缩列表(ziplist):当元素数量少且值较小时使用
// 2. 哈希表(hashtable):当元素数量多或值较大时使用

@Service
public class RedisHashService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 1. 存储对象属性
    public void saveUserInfo(Long userId, Map<String, String> userInfo) {
        String key = "user:info:" + userId;
        redisTemplate.opsForHash().putAll(key, userInfo);
        redisTemplate.expire(key, 1, TimeUnit.HOURS);
    }
    
    // 2. 购物车实现
    public void addToCart(Long userId, Long productId, Integer quantity) {
        String key = "cart:" + userId;
        redisTemplate.opsForHash().put(key, productId.toString(), quantity.toString());
    }
    
    public Map<Object, Object> getCart(Long userId) {
        String key = "cart:" + userId;
        return redisTemplate.opsForHash().entries(key);
    }
    
    // 3. 统计信息
    public void updateStatistics(String date, String metric, Long value) {
        String key = "stats:" + date;
        redisTemplate.opsForHash().increment(key, metric, value);
    }
}

List类型:

// List的底层实现:
// 1. 压缩列表(ziplist):元素数量少且值较小时
// 2. 双向链表(linkedlist):元素数量多或值较大时
// 3. 快速列表(quicklist):Redis 3.2+,结合了ziplist和linkedlist的优点

@Service
public class RedisListService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 1. 消息队列
    public void sendMessage(String queue, String message) {
        redisTemplate.opsForList().leftPush(queue, message);
    }
    
    public String receiveMessage(String queue) {
        return (String) redisTemplate.opsForList().rightPop(queue);
    }
    
    // 2. 最新动态列表
    public void addActivity(Long userId, String activity) {
        String key = "user:activities:" + userId;
        redisTemplate.opsForList().leftPush(key, activity);
        // 只保留最新的100条
        redisTemplate.opsForList().trim(key, 0, 99);
    }
    
    // 3. 阻塞队列
    public String blockingReceive(String queue, int timeout) {
        List<Object> result = redisTemplate.opsForList().rightPop(queue, timeout, TimeUnit.SECONDS);
        return result != null && !result.isEmpty() ? (String) result.get(1) : null;
    }
}

Set类型:

// Set的底层实现:
// 1. 整数集合(intset):当所有元素都是整数且数量较少时
// 2. 哈希表(hashtable):其他情况

@Service
public class RedisSetService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 1. 标签系统
    public void addUserTags(Long userId, String... tags) {
        String key = "user:tags:" + userId;
        redisTemplate.opsForSet().add(key, (Object[]) tags);
    }
    
    public Set<Object> getUserTags(Long userId) {
        String key = "user:tags:" + userId;
        return redisTemplate.opsForSet().members(key);
    }
    
    // 2. 共同关注
    public Set<Object> getCommonFollows(Long userId1, Long userId2) {
        String key1 = "user:follows:" + userId1;
        String key2 = "user:follows:" + userId2;
        return redisTemplate.opsForSet().intersect(key1, key2);
    }
    
    // 3. 去重统计
    public void addUniqueVisitor(String date, Long userId) {
        String key = "unique:visitors:" + date;
        redisTemplate.opsForSet().add(key, userId);
    }
    
    public Long getUniqueVisitorCount(String date) {
        String key = "unique:visitors:" + date;
        return redisTemplate.opsForSet().size(key);
    }
    
    // 4. 抽奖系统
    public Object randomDraw(String activityId) {
        String key = "lottery:" + activityId;
        return redisTemplate.opsForSet().randomMember(key);
    }
}

ZSet类型:

// ZSet的底层实现:
// 1. 压缩列表(ziplist):元素数量少时
// 2. 跳跃表(skiplist)+ 哈希表:元素数量多时

@Service
public class RedisZSetService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 1. 排行榜
    public void updateScore(String leaderboard, String player, double score) {
        redisTemplate.opsForZSet().add(leaderboard, player, score);
    }
    
    public Set<ZSetOperations.TypedTuple<Object>> getTopPlayers(String leaderboard, int count) {
        return redisTemplate.opsForZSet().reverseRangeWithScores(leaderboard, 0, count - 1);
    }
    
    public Long getPlayerRank(String leaderboard, String player) {
        return redisTemplate.opsForZSet().reverseRank(leaderboard, player);
    }
    
    // 2. 延时队列
    public void addDelayedTask(String queue, String task, long delaySeconds) {
        long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
        redisTemplate.opsForZSet().add(queue, task, executeTime);
    }
    
    public Set<Object> getReadyTasks(String queue) {
        long now = System.currentTimeMillis();
        return redisTemplate.opsForZSet().rangeByScore(queue, 0, now);
    }
    
    // 3. 时间窗口统计
    public void recordEvent(String event, String identifier) {
        String key = "events:" + event;
        long timestamp = System.currentTimeMillis();
        redisTemplate.opsForZSet().add(key, identifier, timestamp);
        
        // 清理1小时前的数据
        long oneHourAgo = timestamp - 3600 * 1000;
        redisTemplate.opsForZSet().removeRangeByScore(key, 0, oneHourAgo);
    }
    
    public Long countEventsInWindow(String event, long windowSeconds) {
        String key = "events:" + event;
        long now = System.currentTimeMillis();
        long windowStart = now - windowSeconds * 1000;
        return redisTemplate.opsForZSet().count(key, windowStart, now);
    }
}

跳跃表与压缩列表

跳跃表原理:

// 跳跃表的实现原理(简化版)
public class SkipList {
    private static final int MAX_LEVEL = 16;
    private static final double P = 0.5;
    
    private Node header;
    private int level;
    
    static class Node {
        String key;
        double score;
        Node[] forward;
        
        Node(int level) {
            forward = new Node[level + 1];
        }
    }
    
    public SkipList() {
        header = new Node(MAX_LEVEL);
        level = 0;
    }
    
    // 随机生成层数
    private int randomLevel() {
        int level = 0;
        while (Math.random() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }
    
    // 插入节点
    public void insert(String key, double score) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = header;
        
        // 从最高层开始查找插入位置
        for (int i = level; i >= 0; i--) {
            while (current.forward[i] != null && 
                   current.forward[i].score < score) {
                current = current.forward[i];
            }
            update[i] = current;
        }
        
        // 创建新节点
        int newLevel = randomLevel();
        if (newLevel > level) {
            for (int i = level + 1; i <= newLevel; i++) {
                update[i] = header;
            }
            level = newLevel;
        }
        
        Node newNode = new Node(newLevel);
        newNode.key = key;
        newNode.score = score;
        
        // 更新指针
        for (int i = 0; i <= newLevel; i++) {
            newNode.forward[i] = update[i].forward[i];
            update[i].forward[i] = newNode;
        }
    }
    
    // 查找节点
    public Node search(double score) {
        Node current = header;
        
        for (int i = level; i >= 0; i--) {
            while (current.forward[i] != null && 
                   current.forward[i].score < score) {
                current = current.forward[i];
            }
        }
        
        current = current.forward[0];
        return (current != null && current.score == score) ? current : null;
    }
}

🔄 8.3 Redis高可用方案

主从复制、哨兵模式、集群模式

面试重点:

面试官:"Redis的高可用方案有哪些?各自的优缺点是什么?"

主从复制配置:

# 主节点配置(redis.conf)
bind 0.0.0.0
port 6379
requirepass masterpassword

# 从节点配置(redis.conf)
bind 0.0.0.0
port 6380
slaveof 192.168.1.100 6379
masterauth masterpassword
slave-read-only yes

哨兵模式配置:

# sentinel.conf
port 26379
sentinel monitor mymaster 192.168.1.100 6379 2
sentinel auth-pass mymaster masterpassword
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
sentinel parallel-syncs mymaster 1

# 启动哨兵
redis-sentinel sentinel.conf

Java客户端哨兵配置:

@Configuration
public class RedisSentinelConfig {
    
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        // 哨兵配置
        RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
            .master("mymaster")
            .sentinel("192.168.1.101", 26379)
            .sentinel("192.168.1.102", 26379)
            .sentinel("192.168.1.103", 26379);
        
        sentinelConfig.setPassword("masterpassword");
        
        // 连接池配置
        GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(20);
        poolConfig.setMaxIdle(10);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        
        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
            .poolConfig(poolConfig)
            .build();
        
        return new LettuceConnectionFactory(sentinelConfig, clientConfig);
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());
        
        // 序列化配置
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        template.setDefaultSerializer(serializer);
        template.setKeySerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        
        return template;
    }
}

Redis Cluster集群配置:

# 集群节点配置(redis.conf)
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes

# 创建集群
redis-cli --cluster create \
  192.168.1.100:7000 192.168.1.100:7001 192.168.1.100:7002 \
  192.168.1.101:7000 192.168.1.101:7001 192.168.1.101:7002 \
  --cluster-replicas 1

Java客户端集群配置:

@Configuration
public class RedisClusterConfig {
    
    @Bean
    public LettuceConnectionFactory redisConnectionFactory() {
        // 集群节点配置
        RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
        clusterConfig.clusterNode("192.168.1.100", 7000);
        clusterConfig.clusterNode("192.168.1.100", 7001);
        clusterConfig.clusterNode("192.168.1.100", 7002);
        clusterConfig.clusterNode("192.168.1.101", 7000);
        clusterConfig.clusterNode("192.168.1.101", 7001);
        clusterConfig.clusterNode("192.168.1.101", 7002);
        
        clusterConfig.setMaxRedirects(3);
        
        return new LettuceConnectionFactory(clusterConfig);
    }
}

持久化机制(RDB vs AOF)

RDB持久化:

# RDB配置(redis.conf)
save 900 1      # 900秒内至少1个key发生变化
save 300 10     # 300秒内至少10个key发生变化
save 60 10000   # 60秒内至少10000个key发生变化

stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir /var/lib/redis

AOF持久化:

# AOF配置(redis.conf)
appendonly yes
appendfilename "appendonly.aof"

# 同步策略
appendfsync everysec    # 每秒同步(推荐)
# appendfsync always    # 每个写命令都同步(安全但慢)
# appendfsync no        # 由操作系统决定(快但不安全)

# AOF重写
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb

持久化对比:

// RDB vs AOF 对比分析
/*
RDB优点:
1. 文件紧凑,适合备份
2. 恢复速度快
3. 对性能影响小

RDB缺点:
1. 可能丢失最后一次快照后的数据
2. fork子进程时可能阻塞

AOF优点:
1. 数据安全性高
2. 可读性好,可以手动修复
3. 支持重写压缩

AOF缺点:
1. 文件大,恢复慢
2. 对性能影响大
3. 可能出现bug导致无法恢复

混合持久化(Redis 4.0+):
结合RDB和AOF的优点,RDB做全量备份,AOF做增量备份
*/

@Component
public class RedisBackupService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    // 手动触发RDB备份
    public void createSnapshot() {
        redisTemplate.execute((RedisCallback<String>) connection -> {
            connection.bgSave();
            return "OK";
        });
    }
    
    // 手动触发AOF重写
    public void rewriteAOF() {
        redisTemplate.execute((RedisCallback<String>) connection -> {
            connection.bgReWriteAof();
            return "OK";
        });
    }
    
    // 获取持久化信息
    public Properties getPersistenceInfo() {
        return redisTemplate.execute((RedisCallback<Properties>) connection -> {
            return connection.info("persistence");
        });
    }
}

🛡️ 8.4 缓存设计模式

缓存穿透、击穿、雪崩解决方案

面试高频:

面试官:"什么是缓存穿透、缓存击穿、缓存雪崩?如何解决?"

缓存穿透解决方案:

// 问题:查询不存在的数据,缓存和数据库都没有,导致每次都查询数据库
// 解决方案1:布隆过滤器
@Component
public class BloomFilterService {
    
    private BloomFilter<String> bloomFilter;
    
    @PostConstruct
    public void init() {
        // 创建布隆过滤器,预期插入100万个元素,误判率0.01%
        bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 1000000, 0.0001);
        
        // 初始化时将所有存在的key加入布隆过滤器
        initBloomFilter();
    }
    
    private void initBloomFilter() {
        // 从数据库加载所有存在的key
        List<String> existingKeys = loadAllKeysFromDatabase();
        for (String key : existingKeys) {
            bloomFilter.put(key);
        }
    }
    
    public boolean mightExist(String key) {
        return bloomFilter.mightContain(key);
    }
    
    public void addKey(String key) {
        bloomFilter.put(key);
    }
}

// 解决方案2:缓存空值
@Service
public class UserCacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    @Autowired
    private BloomFilterService bloomFilterService;
    
    public User getUserById(Long userId) {
        String key = "user:" + userId;
        
        // 1. 布隆过滤器预检查
        if (!bloomFilterService.mightExist(key)) {
            return null; // 肯定不存在
        }
        
        // 2. 查询缓存
        Object cached = redisTemplate.opsForValue().get(key);
        if (cached != null) {
            if ("NULL".equals(cached)) {
                return null; // 缓存的空值
            }
            return JSON.parseObject(cached.toString(), User.class);
        }
        
        // 3. 查询数据库
        User user = userMapper.selectById(userId);
        if (user != null) {
            // 缓存真实数据
            redisTemplate.opsForValue().set(key, JSON.toJSONString(user), 30, TimeUnit.MINUTES);
            bloomFilterService.addKey(key);
        } else {
            // 缓存空值,防止缓存穿透
            redisTemplate.opsForValue().set(key, "NULL", 5, TimeUnit.MINUTES);
        }
        
        return user;
    }
}

缓存击穿解决方案:

// 问题:热点key过期时,大量请求同时查询数据库
// 解决方案:分布式锁 + 双重检查
@Service
public class HotKeyCacheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    public User getHotUser(Long userId) {
        String key = "hot:user:" + userId;
        String lockKey = "lock:" + key;
        
        // 1. 查询缓存
        Object cached = redisTemplate.opsForValue().get(key);
        if (cached != null) {
            return JSON.parseObject(cached.toString(), User.class);
        }
        
        // 2. 获取分布式锁
        String requestId = UUID.randomUUID().toString();
        boolean lockAcquired = tryLock(lockKey, requestId, 10);
        
        if (lockAcquired) {
            try {
                // 3. 双重检查
                cached = redisTemplate.opsForValue().get(key);
                if (cached != null) {
                    return JSON.parseObject(cached.toString(), User.class);
                }
                
                // 4. 查询数据库
                User user = userMapper.selectById(userId);
                if (user != null) {
                    // 5. 更新缓存,设置随机过期时间防止雪崩
                    int randomExpire = 30 + new Random().nextInt(10); // 30-40分钟
                    redisTemplate.opsForValue().set(key, JSON.toJSONString(user), randomExpire, TimeUnit.MINUTES);
                }
                
                return user;
                
            } finally {
                // 6. 释放锁
                releaseLock(lockKey, requestId);
            }
        } else {
            // 7. 获取锁失败,等待一段时间后重试
            try {
                Thread.sleep(100);
                return getHotUser(userId); // 递归重试
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }
    
    private boolean tryLock(String lockKey, String requestId, int expireTime) {
        String result = redisTemplate.execute((RedisCallback<String>) connection -> {
            return connection.set(lockKey.getBytes(), requestId.getBytes(), 
                Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT);
        });
        return "OK".equals(result);
    }
    
    private void releaseLock(String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), 
            Collections.singletonList(lockKey), requestId);
    }
}

缓存雪崩解决方案:

// 问题:大量key同时过期,导致请求都打到数据库
// 解决方案:随机过期时间 + 多级缓存 + 限流降级
@Service
public class CacheAvalancheService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private UserMapper userMapper;
    
    // 本地缓存作为二级缓存
    private final Cache<String, User> localCache = Caffeine.newBuilder()
        .maximumSize(1000)
        .expireAfterWrite(5, TimeUnit.MINUTES)
        .build();
    
    // 限流器
    private final RateLimiter rateLimiter = RateLimiter.create(100); // 每秒100个请求
    
    public User getUserWithMultiLevelCache(Long userId) {
        String key = "user:" + userId;
        
        // 1. 查询本地缓存
        User user = localCache.getIfPresent(key);
        if (user != null) {
            return user;
        }
        
        // 2. 查询Redis缓存
        Object cached = redisTemplate.opsForValue().get(key);
        if (cached != null) {
            user = JSON.parseObject(cached.toString(), User.class);
            localCache.put(key, user);
            return user;
        }
        
        // 3. 限流检查
        if (!rateLimiter.tryAcquire()) {
            // 限流时返回默认值或抛出异常
            return getDefaultUser();
        }
        
        // 4. 查询数据库
        user = userMapper.selectById(userId);
        if (user != null) {
            // 5. 更新缓存,设置随机过期时间
            int baseExpire = 30; // 基础过期时间30分钟
            int randomExpire = baseExpire + new Random().nextInt(10); // 随机增加0-10分钟
            
            redisTemplate.opsForValue().set(key, JSON.toJSONString(user), randomExpire, TimeUnit.MINUTES);
            localCache.put(key, user);
        }
        
        return user;
    }
    
    private User getDefaultUser() {
        // 返回默认用户或缓存的兜底数据
        return new User(-1L, "默认用户", "default@example.com");
    }
    
    // 预热缓存
    @EventListener(ApplicationReadyEvent.class)
    public void warmUpCache() {
        CompletableFuture.runAsync(() -> {
            List<Long> hotUserIds = getHotUserIds();
            for (Long userId : hotUserIds) {
                try {
                    getUserWithMultiLevelCache(userId);
                    Thread.sleep(10); // 避免瞬间压力过大
                } catch (Exception e) {
                    log.error("预热缓存失败: userId={}", userId, e);
                }
            }
        });
    }
    
    private List<Long> getHotUserIds() {
        // 获取热点用户ID列表
        return Arrays.asList(1L, 2L, 3L, 4L, 5L);
    }
}

分布式锁实现

Redis分布式锁完整实现:

@Component
public class RedisDistributedLock {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    private static final String LOCK_PREFIX = "distributed_lock:";
    private static final String UNLOCK_SCRIPT = 
        "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "return redis.call('del', KEYS[1]) " +
        "else return 0 end";
    
    /**
     * 尝试获取锁
     * @param lockKey 锁的key
     * @param requestId 请求ID,用于标识锁的持有者
     * @param expireTime 锁的过期时间(秒)
     * @return 是否获取成功
     */
    public boolean tryLock(String lockKey, String requestId, int expireTime) {
        String key = LOCK_PREFIX + lockKey;
        
        String result = redisTemplate.execute((RedisCallback<String>) connection -> {
            return connection.set(key.getBytes(), requestId.getBytes(), 
                Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT);
        });
        
        return "OK".equals(result);
    }
    
    /**
     * 释放锁
     * @param lockKey 锁的key
     * @param requestId 请求ID
     * @return 是否释放成功
     */
    public boolean releaseLock(String lockKey, String requestId) {
        String key = LOCK_PREFIX + lockKey;
        
        Long result = redisTemplate.execute(new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class), 
            Collections.singletonList(key), requestId);
        
        return Long.valueOf(1).equals(result);
    }
    
    /**
     * 带重试的获取锁
     * @param lockKey 锁的key
     * @param requestId 请求ID
     * @param expireTime 锁的过期时间(秒)
     * @param retryTimes 重试次数
     * @param retryInterval 重试间隔(毫秒)
     * @return 是否获取成功
     */
    public boolean tryLockWithRetry(String lockKey, String requestId, int expireTime, 
                                   int retryTimes, long retryInterval) {
        for (int i = 0; i < retryTimes; i++) {
            if (tryLock(lockKey, requestId, expireTime)) {
                return true;
            }
            
            if (i < retryTimes - 1) {
                try {
                    Thread.sleep(retryInterval);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
            }
        }
        
        return false;
    }
}

// 使用示例
@Service
public class OrderService {
    
    @Autowired
    private RedisDistributedLock distributedLock;
    
    public boolean processOrder(String orderNo) {
        String lockKey = "order:" + orderNo;
        String requestId = UUID.randomUUID().toString();
        
        // 尝试获取锁,最多重试3次,每次间隔100ms
        boolean lockAcquired = distributedLock.tryLockWithRetry(lockKey, requestId, 30, 3, 100);
        
        if (lockAcquired) {
            try {
                // 执行业务逻辑
                return doProcessOrder(orderNo);
            } finally {
                // 释放锁
                distributedLock.releaseLock(lockKey, requestId);
            }
        } else {
            throw new BusinessException("订单正在处理中,请稍后重试");
        }
    }
    
    private boolean doProcessOrder(String orderNo) {
        // 具体的订单处理逻辑
        return true;
    }
}

💡 阿里真题:千万级数据分页优化

面试场景:

面试官:"有一张千万级的订单表,如何优化深分页查询?比如查询第100万页的数据。"

优化方案:

-- 问题SQL:传统LIMIT分页(性能极差)
SELECT * FROM order_info 
ORDER BY create_time DESC 
LIMIT 1000000, 20;  -- 需要扫描100万+20行数据

-- 优化方案1:使用游标分页(推荐)
-- 第一页
SELECT * FROM order_info 
WHERE create_time <= '2024-01-01 23:59:59'
ORDER BY create_time DESC, id DESC 
LIMIT 20;

-- 下一页(使用上一页最后一条记录的时间和ID作为游标)
SELECT * FROM order_info 
WHERE (create_time < '2024-01-01 20:30:15' 
       OR (create_time = '2024-01-01 20:30:15' AND id < 12345))
ORDER BY create_time DESC, id DESC 
LIMIT 20;

-- 优化方案2:子查询优化(适用于必须使用OFFSET的场景)
SELECT o.* FROM order_info o
INNER JOIN (
    SELECT id FROM order_info 
    ORDER BY create_time DESC 
    LIMIT 1000000, 20
) t ON o.id = t.id
ORDER BY o.create_time DESC;

-- 优化方案3:分段查询
-- 先查询ID范围
SELECT MIN(id) as min_id, MAX(id) as max_id 
FROM (
    SELECT id FROM order_info 
    ORDER BY create_time DESC 
    LIMIT 1000000, 20
) t;

-- 再根据ID范围查询详细数据
SELECT * FROM order_info 
WHERE id BETWEEN min_id AND max_id 
ORDER BY create_time DESC;

Java实现游标分页:

@Service
public class OrderPaginationService {
    
    @Autowired
    private OrderMapper orderMapper;
    
    // 游标分页查询
    public PageResult<Order> getOrdersByCursor(CursorPageRequest request) {
        List<Order> orders;
        
        if (request.isFirstPage()) {
            // 第一页查询
            orders = orderMapper.selectFirstPage(request.getPageSize());
        } else {
            // 后续页查询
            orders = orderMapper.selectNextPage(
                request.getCursorTime(), 
                request.getCursorId(), 
                request.getPageSize()
            );
        }
        
        // 构建下一页游标
        String nextCursor = null;
        if (!orders.isEmpty() && orders.size() == request.getPageSize()) {
            Order lastOrder = orders.get(orders.size() - 1);
            nextCursor = buildCursor(lastOrder.getCreateTime(), lastOrder.getId());
        }
        
        return new PageResult<>(orders, nextCursor, orders.size() == request.getPageSize());
    }
    
    private String buildCursor(LocalDateTime createTime, Long id) {
        // 将时间和ID编码为游标字符串
        String timeStr = createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
        return Base64.getEncoder().encodeToString((timeStr + "," + id).getBytes());
    }
    
    private CursorInfo parseCursor(String cursor) {
        try {
            String decoded = new String(Base64.getDecoder().decode(cursor));
            String[] parts = decoded.split(",");
            LocalDateTime createTime = LocalDateTime.parse(parts[0], 
                DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            Long id = Long.parseLong(parts[1]);
            return new CursorInfo(createTime, id);
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid cursor: " + cursor);
        }
    }
}

// 分页请求对象
public class CursorPageRequest {
    private String cursor;
    private int pageSize = 20;
    
    public boolean isFirstPage() {
        return cursor == null || cursor.isEmpty();
    }
    
    public LocalDateTime getCursorTime() {
        if (isFirstPage()) return null;
        return parseCursor(cursor).getCreateTime();
    }
    
    public Long getCursorId() {
        if (isFirstPage()) return null;
        return parseCursor(cursor).getId();
    }
}

// 分页结果对象
public class PageResult<T> {
    private List<T> data;
    private String nextCursor;
    private boolean hasNext;
    
    // 构造器、getter、setter
}

总结

数据库和缓存是后端系统的核心组件,掌握其优化技巧对于Java开发者至关重要。面试中这部分内容的考察重点:

核心要点回顾:

  1. MySQL优化:索引设计、执行计划分析、慢查询优化、主从复制
  2. Redis应用:数据结构选择、高可用方案、持久化机制
  3. 缓存设计:穿透/击穿/雪崩解决方案、分布式锁实现
  4. 性能优化:分页优化、批量操作、连接池配置

面试建议:

  • 深入理解底层原理,不要只停留在使用层面
  • 结合实际项目经验,展现解决问题的能力
  • 掌握性能调优技巧,体现技术深度
  • 了解最新技术发展,如MySQL 8.0新特性、Redis 6.0新功能

本章核心要点:

  • ✅ MySQL索引原理和优化策略
  • ✅ Redis数据结构底层实现
  • ✅ 高可用方案设计和配置
  • ✅ 缓存常见问题解决方案
  • ✅ 分布式锁和分页优化实战

下一章预告: 消息队列与分布式 - RabbitMQ、Kafka、分布式事务等核心技术

#java面试##秋招笔面试记录##面试##java速通##秋招投递攻略#
Java面试圣经 文章被收录于专栏

Java面试圣经

全部评论

相关推荐

评论
2
3
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务