Java 秋招面试之数据库与缓存话题
第8章 数据库与缓存
面试重要程度:⭐⭐⭐⭐⭐
常见提问方式:MySQL索引优化、Redis数据结构、缓存穿透
预计阅读时间:50分钟
开场白
兄弟,数据库和缓存绝对是后端面试的重头戏!我敢说,95%的Java面试都会深入考察MySQL和Redis。这不仅是技术基础,更能体现你对系统性能优化的理解深度。
今天我们就把数据库优化和缓存设计的核心知识点彻底搞透,让你在面试中展现出扎实的数据存储功底。
🗄️ 8.1 MySQL性能优化
索引设计原则与B+树结构
面试必问:
面试官:"说说MySQL的索引原理,为什么使用B+树而不是B树或红黑树?"
B+树结构优势:
-- B+树的特点分析 -- 1. 所有数据都存储在叶子节点 -- 2. 叶子节点之间有指针连接,便于范围查询 -- 3. 非叶子节点只存储键值,不存储数据,可以存储更多索引项 -- 4. 树的高度更低,减少磁盘IO次数 -- 示例:用户表索引设计 CREATE TABLE user ( id BIGINT PRIMARY KEY AUTO_INCREMENT, -- 主键索引(聚簇索引) username VARCHAR(50) NOT NULL, -- 普通索引 email VARCHAR(100) NOT NULL, -- 唯一索引 age INT, -- 普通索引 city VARCHAR(50), -- 组合索引的一部分 create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 索引创建 UNIQUE KEY uk_email (email), -- 唯一索引 KEY idx_username (username), -- 单列索引 KEY idx_age (age), -- 单列索引 KEY idx_city_age (city, age), -- 组合索引 KEY idx_create_time (create_time) -- 时间索引 );
索引设计最佳实践:
-- 1. 最左前缀原则 -- 组合索引 idx_city_age (city, age) 可以支持以下查询: SELECT * FROM user WHERE city = '北京'; -- ✅ 使用索引 SELECT * FROM user WHERE city = '北京' AND age = 25; -- ✅ 使用索引 SELECT * FROM user WHERE age = 25; -- ❌ 不使用索引 -- 2. 覆盖索引优化 -- 创建覆盖索引,避免回表查询 CREATE INDEX idx_username_email ON user(username, email); -- 这个查询只需要访问索引,不需要回表 SELECT username, email FROM user WHERE username = 'zhangsan'; -- 3. 前缀索引 -- 对于长字符串字段,使用前缀索引节省空间 CREATE INDEX idx_email_prefix ON user(email(10)); -- 4. 函数索引(MySQL 8.0+) -- 为经常使用函数的查询创建函数索引 CREATE INDEX idx_upper_username ON user((UPPER(username))); SELECT * FROM user WHERE UPPER(username) = 'ZHANGSAN';
执行计划分析
面试重点:
面试官:"如何分析SQL的执行计划?各个字段代表什么意思?"
EXPLAIN详解:
-- 分析复杂查询的执行计划 EXPLAIN SELECT u.username, u.email, o.order_no, o.total_amount FROM user u INNER JOIN order_info o ON u.id = o.user_id WHERE u.city = '北京' AND u.age BETWEEN 20 AND 30 AND o.create_time >= '2024-01-01' ORDER BY o.create_time DESC LIMIT 10; -- 执行计划字段解析: /* +----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+ | id | select_type | table | partitions | type | possible_keys | key | key_len | ref | rows | filtered | Extra | +----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+ | 1 | SIMPLE | u | NULL | range | idx_city_age | idx_... | 158 | NULL | 1000 | 100.00 | Using where | | 1 | SIMPLE | o | NULL | ref | idx_user_id_time | idx_... | 8 | u.id | 10 | 33.33 | Using where | +----+-------------+-------+------------+-------+------------------+---------+---------+-------+------+----------+-------------+ 字段含义: - id: 查询序列号,数字越大越先执行 - select_type: 查询类型(SIMPLE、PRIMARY、SUBQUERY等) - table: 表名 - type: 访问类型(system > const > eq_ref > ref > range > index > ALL) - possible_keys: 可能使用的索引 - key: 实际使用的索引 - key_len: 索引长度 - ref: 索引的哪一列被使用了 - rows: 预估扫描的行数 - filtered: 过滤的百分比 - Extra: 额外信息 */
性能优化案例:
-- 慢查询优化实例 -- 原始慢查询(耗时3秒) SELECT * FROM order_info WHERE status = 1 AND create_time >= '2024-01-01' AND total_amount > 100 ORDER BY create_time DESC LIMIT 20; -- 问题分析: -- 1. 没有合适的索引 -- 2. 全表扫描 -- 3. 文件排序 -- 优化方案1:创建组合索引 CREATE INDEX idx_status_time_amount ON order_info(status, create_time, total_amount); -- 优化方案2:调整查询条件顺序 SELECT * FROM order_info WHERE status = 1 AND create_time >= '2024-01-01' AND total_amount > 100 ORDER BY create_time DESC LIMIT 20; -- 优化后执行计划: -- type: range(范围扫描) -- key: idx_status_time_amount -- rows: 100(从原来的10万行降到100行) -- Extra: Using index condition(使用索引条件下推)
慢查询优化
慢查询监控配置:
-- 开启慢查询日志 SET GLOBAL slow_query_log = 'ON'; SET GLOBAL long_query_time = 1; -- 超过1秒的查询记录到慢查询日志 SET GLOBAL log_queries_not_using_indexes = 'ON'; -- 记录未使用索引的查询 -- 查看慢查询配置 SHOW VARIABLES LIKE '%slow_query%'; SHOW VARIABLES LIKE 'long_query_time';
常见慢查询优化技巧:
-- 1. 避免SELECT * -- ❌ 不好的写法 SELECT * FROM user WHERE username = 'zhangsan'; -- ✅ 好的写法 SELECT id, username, email FROM user WHERE username = 'zhangsan'; -- 2. 使用LIMIT优化大结果集 -- ❌ 不好的写法 SELECT * FROM order_info ORDER BY create_time DESC; -- ✅ 好的写法 SELECT * FROM order_info ORDER BY create_time DESC LIMIT 20; -- 3. 优化深分页查询 -- ❌ 传统分页(OFFSET很大时性能差) SELECT * FROM order_info ORDER BY id LIMIT 100000, 20; -- ✅ 使用游标分页 SELECT * FROM order_info WHERE id > 100000 ORDER BY id LIMIT 20; -- 4. 避免在WHERE子句中使用函数 -- ❌ 不好的写法 SELECT * FROM user WHERE YEAR(create_time) = 2024; -- ✅ 好的写法 SELECT * FROM user WHERE create_time >= '2024-01-01' AND create_time < '2025-01-01'; -- 5. 使用EXISTS替代IN -- ❌ 性能较差(子查询结果集很大时) SELECT * FROM user WHERE id IN (SELECT user_id FROM order_info WHERE status = 1); -- ✅ 性能更好 SELECT * FROM user u WHERE EXISTS (SELECT 1 FROM order_info o WHERE o.user_id = u.id AND o.status = 1);
主从复制与读写分离
主从复制原理:
-- 主库配置(my.cnf) [mysqld] server-id = 1 log-bin = mysql-bin binlog-format = ROW sync_binlog = 1 innodb_flush_log_at_trx_commit = 1 -- 从库配置(my.cnf) [mysqld] server-id = 2 relay-log = mysql-relay-bin read_only = 1 super_read_only = 1 -- 主库创建复制用户 CREATE USER 'repl'@'%' IDENTIFIED BY 'password'; GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%'; -- 从库配置主从关系 CHANGE MASTER TO MASTER_HOST='192.168.1.100', MASTER_USER='repl', MASTER_PASSWORD='password', MASTER_LOG_FILE='mysql-bin.000001', MASTER_LOG_POS=154; START SLAVE; SHOW SLAVE STATUS\G
读写分离实现:
// 数据源配置 @Configuration public class DataSourceConfig { @Bean @Primary public DataSource masterDataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:mysql://master:3306/test"); dataSource.setUsername("root"); dataSource.setPassword("password"); dataSource.setMaximumPoolSize(20); return dataSource; } @Bean public DataSource slaveDataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:mysql://slave:3306/test"); dataSource.setUsername("root"); dataSource.setPassword("password"); dataSource.setMaximumPoolSize(20); return dataSource; } @Bean public DataSource routingDataSource() { Map<Object, Object> dataSourceMap = new HashMap<>(); dataSourceMap.put("master", masterDataSource()); dataSourceMap.put("slave", slaveDataSource()); DynamicRoutingDataSource routingDataSource = new DynamicRoutingDataSource(); routingDataSource.setTargetDataSources(dataSourceMap); routingDataSource.setDefaultTargetDataSource(masterDataSource()); return routingDataSource; } } // 动态数据源路由 public class DynamicRoutingDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { return DataSourceContextHolder.getDataSourceType(); } } // 数据源上下文 public class DataSourceContextHolder { private static final ThreadLocal<String> contextHolder = new ThreadLocal<>(); public static void setDataSourceType(String dataSourceType) { contextHolder.set(dataSourceType); } public static String getDataSourceType() { return contextHolder.get(); } public static void clearDataSourceType() { contextHolder.remove(); } } // 读写分离注解 @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ReadOnly { } // AOP切面实现读写分离 @Aspect @Component public class DataSourceAspect { @Before("@annotation(readOnly)") public void setReadDataSourceType(ReadOnly readOnly) { DataSourceContextHolder.setDataSourceType("slave"); } @Before("execution(* com.example.service.*.select*(..)) || " + "execution(* com.example.service.*.find*(..)) || " + "execution(* com.example.service.*.get*(..))") public void setReadDataSourceTypeForQuery() { if (DataSourceContextHolder.getDataSourceType() == null) { DataSourceContextHolder.setDataSourceType("slave"); } } @After("@annotation(readOnly) || " + "execution(* com.example.service.*.select*(..)) || " + "execution(* com.example.service.*.find*(..)) || " + "execution(* com.example.service.*.get*(..))") public void clearDataSourceType() { DataSourceContextHolder.clearDataSourceType(); } }
🚀 8.2 Redis核心数据结构
String、Hash、List、Set、ZSet底层实现
面试高频:
面试官:"Redis的五种数据结构底层是如何实现的?各自的使用场景是什么?"
String类型:
// String的底层实现:SDS(Simple Dynamic String) // 相比C字符串的优势: // 1. O(1)时间复杂度获取字符串长度 // 2. 杜绝缓冲区溢出 // 3. 减少修改字符串时带来的内存重分配次数 // 4. 二进制安全 // 使用场景和命令 @Service public class RedisStringService { @Autowired private StringRedisTemplate redisTemplate; // 1. 缓存对象 public void cacheUser(User user) { String key = "user:" + user.getId(); String value = JSON.toJSONString(user); redisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES); } // 2. 计数器 public Long incrementViewCount(String articleId) { String key = "article:view:" + articleId; return redisTemplate.opsForValue().increment(key); } // 3. 分布式锁 public boolean tryLock(String lockKey, String requestId, int expireTime) { String result = redisTemplate.execute((RedisCallback<String>) connection -> { return connection.set(lockKey.getBytes(), requestId.getBytes(), Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT); }); return "OK".equals(result); } // 4. 限流器(令牌桶) public boolean isAllowed(String userId, int limit, int window) { String key = "rate_limit:" + userId + ":" + (System.currentTimeMillis() / window); Long count = redisTemplate.opsForValue().increment(key); if (count == 1) { redisTemplate.expire(key, window, TimeUnit.SECONDS); } return count <= limit; } }
Hash类型:
// Hash的底层实现: // 1. 压缩列表(ziplist):当元素数量少且值较小时使用 // 2. 哈希表(hashtable):当元素数量多或值较大时使用 @Service public class RedisHashService { @Autowired private RedisTemplate<String, Object> redisTemplate; // 1. 存储对象属性 public void saveUserInfo(Long userId, Map<String, String> userInfo) { String key = "user:info:" + userId; redisTemplate.opsForHash().putAll(key, userInfo); redisTemplate.expire(key, 1, TimeUnit.HOURS); } // 2. 购物车实现 public void addToCart(Long userId, Long productId, Integer quantity) { String key = "cart:" + userId; redisTemplate.opsForHash().put(key, productId.toString(), quantity.toString()); } public Map<Object, Object> getCart(Long userId) { String key = "cart:" + userId; return redisTemplate.opsForHash().entries(key); } // 3. 统计信息 public void updateStatistics(String date, String metric, Long value) { String key = "stats:" + date; redisTemplate.opsForHash().increment(key, metric, value); } }
List类型:
// List的底层实现: // 1. 压缩列表(ziplist):元素数量少且值较小时 // 2. 双向链表(linkedlist):元素数量多或值较大时 // 3. 快速列表(quicklist):Redis 3.2+,结合了ziplist和linkedlist的优点 @Service public class RedisListService { @Autowired private RedisTemplate<String, Object> redisTemplate; // 1. 消息队列 public void sendMessage(String queue, String message) { redisTemplate.opsForList().leftPush(queue, message); } public String receiveMessage(String queue) { return (String) redisTemplate.opsForList().rightPop(queue); } // 2. 最新动态列表 public void addActivity(Long userId, String activity) { String key = "user:activities:" + userId; redisTemplate.opsForList().leftPush(key, activity); // 只保留最新的100条 redisTemplate.opsForList().trim(key, 0, 99); } // 3. 阻塞队列 public String blockingReceive(String queue, int timeout) { List<Object> result = redisTemplate.opsForList().rightPop(queue, timeout, TimeUnit.SECONDS); return result != null && !result.isEmpty() ? (String) result.get(1) : null; } }
Set类型:
// Set的底层实现: // 1. 整数集合(intset):当所有元素都是整数且数量较少时 // 2. 哈希表(hashtable):其他情况 @Service public class RedisSetService { @Autowired private RedisTemplate<String, Object> redisTemplate; // 1. 标签系统 public void addUserTags(Long userId, String... tags) { String key = "user:tags:" + userId; redisTemplate.opsForSet().add(key, (Object[]) tags); } public Set<Object> getUserTags(Long userId) { String key = "user:tags:" + userId; return redisTemplate.opsForSet().members(key); } // 2. 共同关注 public Set<Object> getCommonFollows(Long userId1, Long userId2) { String key1 = "user:follows:" + userId1; String key2 = "user:follows:" + userId2; return redisTemplate.opsForSet().intersect(key1, key2); } // 3. 去重统计 public void addUniqueVisitor(String date, Long userId) { String key = "unique:visitors:" + date; redisTemplate.opsForSet().add(key, userId); } public Long getUniqueVisitorCount(String date) { String key = "unique:visitors:" + date; return redisTemplate.opsForSet().size(key); } // 4. 抽奖系统 public Object randomDraw(String activityId) { String key = "lottery:" + activityId; return redisTemplate.opsForSet().randomMember(key); } }
ZSet类型:
// ZSet的底层实现: // 1. 压缩列表(ziplist):元素数量少时 // 2. 跳跃表(skiplist)+ 哈希表:元素数量多时 @Service public class RedisZSetService { @Autowired private RedisTemplate<String, Object> redisTemplate; // 1. 排行榜 public void updateScore(String leaderboard, String player, double score) { redisTemplate.opsForZSet().add(leaderboard, player, score); } public Set<ZSetOperations.TypedTuple<Object>> getTopPlayers(String leaderboard, int count) { return redisTemplate.opsForZSet().reverseRangeWithScores(leaderboard, 0, count - 1); } public Long getPlayerRank(String leaderboard, String player) { return redisTemplate.opsForZSet().reverseRank(leaderboard, player); } // 2. 延时队列 public void addDelayedTask(String queue, String task, long delaySeconds) { long executeTime = System.currentTimeMillis() + delaySeconds * 1000; redisTemplate.opsForZSet().add(queue, task, executeTime); } public Set<Object> getReadyTasks(String queue) { long now = System.currentTimeMillis(); return redisTemplate.opsForZSet().rangeByScore(queue, 0, now); } // 3. 时间窗口统计 public void recordEvent(String event, String identifier) { String key = "events:" + event; long timestamp = System.currentTimeMillis(); redisTemplate.opsForZSet().add(key, identifier, timestamp); // 清理1小时前的数据 long oneHourAgo = timestamp - 3600 * 1000; redisTemplate.opsForZSet().removeRangeByScore(key, 0, oneHourAgo); } public Long countEventsInWindow(String event, long windowSeconds) { String key = "events:" + event; long now = System.currentTimeMillis(); long windowStart = now - windowSeconds * 1000; return redisTemplate.opsForZSet().count(key, windowStart, now); } }
跳跃表与压缩列表
跳跃表原理:
// 跳跃表的实现原理(简化版) public class SkipList { private static final int MAX_LEVEL = 16; private static final double P = 0.5; private Node header; private int level; static class Node { String key; double score; Node[] forward; Node(int level) { forward = new Node[level + 1]; } } public SkipList() { header = new Node(MAX_LEVEL); level = 0; } // 随机生成层数 private int randomLevel() { int level = 0; while (Math.random() < P && level < MAX_LEVEL) { level++; } return level; } // 插入节点 public void insert(String key, double score) { Node[] update = new Node[MAX_LEVEL + 1]; Node current = header; // 从最高层开始查找插入位置 for (int i = level; i >= 0; i--) { while (current.forward[i] != null && current.forward[i].score < score) { current = current.forward[i]; } update[i] = current; } // 创建新节点 int newLevel = randomLevel(); if (newLevel > level) { for (int i = level + 1; i <= newLevel; i++) { update[i] = header; } level = newLevel; } Node newNode = new Node(newLevel); newNode.key = key; newNode.score = score; // 更新指针 for (int i = 0; i <= newLevel; i++) { newNode.forward[i] = update[i].forward[i]; update[i].forward[i] = newNode; } } // 查找节点 public Node search(double score) { Node current = header; for (int i = level; i >= 0; i--) { while (current.forward[i] != null && current.forward[i].score < score) { current = current.forward[i]; } } current = current.forward[0]; return (current != null && current.score == score) ? current : null; } }
🔄 8.3 Redis高可用方案
主从复制、哨兵模式、集群模式
面试重点:
面试官:"Redis的高可用方案有哪些?各自的优缺点是什么?"
主从复制配置:
# 主节点配置(redis.conf) bind 0.0.0.0 port 6379 requirepass masterpassword # 从节点配置(redis.conf) bind 0.0.0.0 port 6380 slaveof 192.168.1.100 6379 masterauth masterpassword slave-read-only yes
哨兵模式配置:
# sentinel.conf port 26379 sentinel monitor mymaster 192.168.1.100 6379 2 sentinel auth-pass mymaster masterpassword sentinel down-after-milliseconds mymaster 5000 sentinel failover-timeout mymaster 10000 sentinel parallel-syncs mymaster 1 # 启动哨兵 redis-sentinel sentinel.conf
Java客户端哨兵配置:
@Configuration public class RedisSentinelConfig { @Bean public LettuceConnectionFactory redisConnectionFactory() { // 哨兵配置 RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration() .master("mymaster") .sentinel("192.168.1.101", 26379) .sentinel("192.168.1.102", 26379) .sentinel("192.168.1.103", 26379); sentinelConfig.setPassword("masterpassword"); // 连接池配置 GenericObjectPoolConfig<Object> poolConfig = new GenericObjectPoolConfig<>(); poolConfig.setMaxTotal(20); poolConfig.setMaxIdle(10); poolConfig.setMinIdle(5); poolConfig.setTestOnBorrow(true); LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() .poolConfig(poolConfig) .build(); return new LettuceConnectionFactory(sentinelConfig, clientConfig); } @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory()); // 序列化配置 Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); template.setDefaultSerializer(serializer); template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); return template; } }
Redis Cluster集群配置:
# 集群节点配置(redis.conf) port 7000 cluster-enabled yes cluster-config-file nodes-7000.conf cluster-node-timeout 5000 appendonly yes # 创建集群 redis-cli --cluster create \ 192.168.1.100:7000 192.168.1.100:7001 192.168.1.100:7002 \ 192.168.1.101:7000 192.168.1.101:7001 192.168.1.101:7002 \ --cluster-replicas 1
Java客户端集群配置:
@Configuration public class RedisClusterConfig { @Bean public LettuceConnectionFactory redisConnectionFactory() { // 集群节点配置 RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(); clusterConfig.clusterNode("192.168.1.100", 7000); clusterConfig.clusterNode("192.168.1.100", 7001); clusterConfig.clusterNode("192.168.1.100", 7002); clusterConfig.clusterNode("192.168.1.101", 7000); clusterConfig.clusterNode("192.168.1.101", 7001); clusterConfig.clusterNode("192.168.1.101", 7002); clusterConfig.setMaxRedirects(3); return new LettuceConnectionFactory(clusterConfig); } }
持久化机制(RDB vs AOF)
RDB持久化:
# RDB配置(redis.conf) save 900 1 # 900秒内至少1个key发生变化 save 300 10 # 300秒内至少10个key发生变化 save 60 10000 # 60秒内至少10000个key发生变化 stop-writes-on-bgsave-error yes rdbcompression yes rdbchecksum yes dbfilename dump.rdb dir /var/lib/redis
AOF持久化:
# AOF配置(redis.conf) appendonly yes appendfilename "appendonly.aof" # 同步策略 appendfsync everysec # 每秒同步(推荐) # appendfsync always # 每个写命令都同步(安全但慢) # appendfsync no # 由操作系统决定(快但不安全) # AOF重写 auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb
持久化对比:
// RDB vs AOF 对比分析 /* RDB优点: 1. 文件紧凑,适合备份 2. 恢复速度快 3. 对性能影响小 RDB缺点: 1. 可能丢失最后一次快照后的数据 2. fork子进程时可能阻塞 AOF优点: 1. 数据安全性高 2. 可读性好,可以手动修复 3. 支持重写压缩 AOF缺点: 1. 文件大,恢复慢 2. 对性能影响大 3. 可能出现bug导致无法恢复 混合持久化(Redis 4.0+): 结合RDB和AOF的优点,RDB做全量备份,AOF做增量备份 */ @Component public class RedisBackupService { @Autowired private RedisTemplate<String, Object> redisTemplate; // 手动触发RDB备份 public void createSnapshot() { redisTemplate.execute((RedisCallback<String>) connection -> { connection.bgSave(); return "OK"; }); } // 手动触发AOF重写 public void rewriteAOF() { redisTemplate.execute((RedisCallback<String>) connection -> { connection.bgReWriteAof(); return "OK"; }); } // 获取持久化信息 public Properties getPersistenceInfo() { return redisTemplate.execute((RedisCallback<Properties>) connection -> { return connection.info("persistence"); }); } }
🛡️ 8.4 缓存设计模式
缓存穿透、击穿、雪崩解决方案
面试高频:
面试官:"什么是缓存穿透、缓存击穿、缓存雪崩?如何解决?"
缓存穿透解决方案:
// 问题:查询不存在的数据,缓存和数据库都没有,导致每次都查询数据库 // 解决方案1:布隆过滤器 @Component public class BloomFilterService { private BloomFilter<String> bloomFilter; @PostConstruct public void init() { // 创建布隆过滤器,预期插入100万个元素,误判率0.01% bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), 1000000, 0.0001); // 初始化时将所有存在的key加入布隆过滤器 initBloomFilter(); } private void initBloomFilter() { // 从数据库加载所有存在的key List<String> existingKeys = loadAllKeysFromDatabase(); for (String key : existingKeys) { bloomFilter.put(key); } } public boolean mightExist(String key) { return bloomFilter.mightContain(key); } public void addKey(String key) { bloomFilter.put(key); } } // 解决方案2:缓存空值 @Service public class UserCacheService { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private UserMapper userMapper; @Autowired private BloomFilterService bloomFilterService; public User getUserById(Long userId) { String key = "user:" + userId; // 1. 布隆过滤器预检查 if (!bloomFilterService.mightExist(key)) { return null; // 肯定不存在 } // 2. 查询缓存 Object cached = redisTemplate.opsForValue().get(key); if (cached != null) { if ("NULL".equals(cached)) { return null; // 缓存的空值 } return JSON.parseObject(cached.toString(), User.class); } // 3. 查询数据库 User user = userMapper.selectById(userId); if (user != null) { // 缓存真实数据 redisTemplate.opsForValue().set(key, JSON.toJSONString(user), 30, TimeUnit.MINUTES); bloomFilterService.addKey(key); } else { // 缓存空值,防止缓存穿透 redisTemplate.opsForValue().set(key, "NULL", 5, TimeUnit.MINUTES); } return user; } }
缓存击穿解决方案:
// 问题:热点key过期时,大量请求同时查询数据库 // 解决方案:分布式锁 + 双重检查 @Service public class HotKeyCacheService { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private UserMapper userMapper; public User getHotUser(Long userId) { String key = "hot:user:" + userId; String lockKey = "lock:" + key; // 1. 查询缓存 Object cached = redisTemplate.opsForValue().get(key); if (cached != null) { return JSON.parseObject(cached.toString(), User.class); } // 2. 获取分布式锁 String requestId = UUID.randomUUID().toString(); boolean lockAcquired = tryLock(lockKey, requestId, 10); if (lockAcquired) { try { // 3. 双重检查 cached = redisTemplate.opsForValue().get(key); if (cached != null) { return JSON.parseObject(cached.toString(), User.class); } // 4. 查询数据库 User user = userMapper.selectById(userId); if (user != null) { // 5. 更新缓存,设置随机过期时间防止雪崩 int randomExpire = 30 + new Random().nextInt(10); // 30-40分钟 redisTemplate.opsForValue().set(key, JSON.toJSONString(user), randomExpire, TimeUnit.MINUTES); } return user; } finally { // 6. 释放锁 releaseLock(lockKey, requestId); } } else { // 7. 获取锁失败,等待一段时间后重试 try { Thread.sleep(100); return getHotUser(userId); // 递归重试 } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } } } private boolean tryLock(String lockKey, String requestId, int expireTime) { String result = redisTemplate.execute((RedisCallback<String>) connection -> { return connection.set(lockKey.getBytes(), requestId.getBytes(), Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT); }); return "OK".equals(result); } private void releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(lockKey), requestId); } }
缓存雪崩解决方案:
// 问题:大量key同时过期,导致请求都打到数据库 // 解决方案:随机过期时间 + 多级缓存 + 限流降级 @Service public class CacheAvalancheService { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private UserMapper userMapper; // 本地缓存作为二级缓存 private final Cache<String, User> localCache = Caffeine.newBuilder() .maximumSize(1000) .expireAfterWrite(5, TimeUnit.MINUTES) .build(); // 限流器 private final RateLimiter rateLimiter = RateLimiter.create(100); // 每秒100个请求 public User getUserWithMultiLevelCache(Long userId) { String key = "user:" + userId; // 1. 查询本地缓存 User user = localCache.getIfPresent(key); if (user != null) { return user; } // 2. 查询Redis缓存 Object cached = redisTemplate.opsForValue().get(key); if (cached != null) { user = JSON.parseObject(cached.toString(), User.class); localCache.put(key, user); return user; } // 3. 限流检查 if (!rateLimiter.tryAcquire()) { // 限流时返回默认值或抛出异常 return getDefaultUser(); } // 4. 查询数据库 user = userMapper.selectById(userId); if (user != null) { // 5. 更新缓存,设置随机过期时间 int baseExpire = 30; // 基础过期时间30分钟 int randomExpire = baseExpire + new Random().nextInt(10); // 随机增加0-10分钟 redisTemplate.opsForValue().set(key, JSON.toJSONString(user), randomExpire, TimeUnit.MINUTES); localCache.put(key, user); } return user; } private User getDefaultUser() { // 返回默认用户或缓存的兜底数据 return new User(-1L, "默认用户", "default@example.com"); } // 预热缓存 @EventListener(ApplicationReadyEvent.class) public void warmUpCache() { CompletableFuture.runAsync(() -> { List<Long> hotUserIds = getHotUserIds(); for (Long userId : hotUserIds) { try { getUserWithMultiLevelCache(userId); Thread.sleep(10); // 避免瞬间压力过大 } catch (Exception e) { log.error("预热缓存失败: userId={}", userId, e); } } }); } private List<Long> getHotUserIds() { // 获取热点用户ID列表 return Arrays.asList(1L, 2L, 3L, 4L, 5L); } }
分布式锁实现
Redis分布式锁完整实现:
@Component public class RedisDistributedLock { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String LOCK_PREFIX = "distributed_lock:"; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; /** * 尝试获取锁 * @param lockKey 锁的key * @param requestId 请求ID,用于标识锁的持有者 * @param expireTime 锁的过期时间(秒) * @return 是否获取成功 */ public boolean tryLock(String lockKey, String requestId, int expireTime) { String key = LOCK_PREFIX + lockKey; String result = redisTemplate.execute((RedisCallback<String>) connection -> { return connection.set(key.getBytes(), requestId.getBytes(), Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT); }); return "OK".equals(result); } /** * 释放锁 * @param lockKey 锁的key * @param requestId 请求ID * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String requestId) { String key = LOCK_PREFIX + lockKey; Long result = redisTemplate.execute(new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class), Collections.singletonList(key), requestId); return Long.valueOf(1).equals(result); } /** * 带重试的获取锁 * @param lockKey 锁的key * @param requestId 请求ID * @param expireTime 锁的过期时间(秒) * @param retryTimes 重试次数 * @param retryInterval 重试间隔(毫秒) * @return 是否获取成功 */ public boolean tryLockWithRetry(String lockKey, String requestId, int expireTime, int retryTimes, long retryInterval) { for (int i = 0; i < retryTimes; i++) { if (tryLock(lockKey, requestId, expireTime)) { return true; } if (i < retryTimes - 1) { try { Thread.sleep(retryInterval); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } } return false; } } // 使用示例 @Service public class OrderService { @Autowired private RedisDistributedLock distributedLock; public boolean processOrder(String orderNo) { String lockKey = "order:" + orderNo; String requestId = UUID.randomUUID().toString(); // 尝试获取锁,最多重试3次,每次间隔100ms boolean lockAcquired = distributedLock.tryLockWithRetry(lockKey, requestId, 30, 3, 100); if (lockAcquired) { try { // 执行业务逻辑 return doProcessOrder(orderNo); } finally { // 释放锁 distributedLock.releaseLock(lockKey, requestId); } } else { throw new BusinessException("订单正在处理中,请稍后重试"); } } private boolean doProcessOrder(String orderNo) { // 具体的订单处理逻辑 return true; } }
💡 阿里真题:千万级数据分页优化
面试场景:
面试官:"有一张千万级的订单表,如何优化深分页查询?比如查询第100万页的数据。"
优化方案:
-- 问题SQL:传统LIMIT分页(性能极差) SELECT * FROM order_info ORDER BY create_time DESC LIMIT 1000000, 20; -- 需要扫描100万+20行数据 -- 优化方案1:使用游标分页(推荐) -- 第一页 SELECT * FROM order_info WHERE create_time <= '2024-01-01 23:59:59' ORDER BY create_time DESC, id DESC LIMIT 20; -- 下一页(使用上一页最后一条记录的时间和ID作为游标) SELECT * FROM order_info WHERE (create_time < '2024-01-01 20:30:15' OR (create_time = '2024-01-01 20:30:15' AND id < 12345)) ORDER BY create_time DESC, id DESC LIMIT 20; -- 优化方案2:子查询优化(适用于必须使用OFFSET的场景) SELECT o.* FROM order_info o INNER JOIN ( SELECT id FROM order_info ORDER BY create_time DESC LIMIT 1000000, 20 ) t ON o.id = t.id ORDER BY o.create_time DESC; -- 优化方案3:分段查询 -- 先查询ID范围 SELECT MIN(id) as min_id, MAX(id) as max_id FROM ( SELECT id FROM order_info ORDER BY create_time DESC LIMIT 1000000, 20 ) t; -- 再根据ID范围查询详细数据 SELECT * FROM order_info WHERE id BETWEEN min_id AND max_id ORDER BY create_time DESC;
Java实现游标分页:
@Service public class OrderPaginationService { @Autowired private OrderMapper orderMapper; // 游标分页查询 public PageResult<Order> getOrdersByCursor(CursorPageRequest request) { List<Order> orders; if (request.isFirstPage()) { // 第一页查询 orders = orderMapper.selectFirstPage(request.getPageSize()); } else { // 后续页查询 orders = orderMapper.selectNextPage( request.getCursorTime(), request.getCursorId(), request.getPageSize() ); } // 构建下一页游标 String nextCursor = null; if (!orders.isEmpty() && orders.size() == request.getPageSize()) { Order lastOrder = orders.get(orders.size() - 1); nextCursor = buildCursor(lastOrder.getCreateTime(), lastOrder.getId()); } return new PageResult<>(orders, nextCursor, orders.size() == request.getPageSize()); } private String buildCursor(LocalDateTime createTime, Long id) { // 将时间和ID编码为游标字符串 String timeStr = createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")); return Base64.getEncoder().encodeToString((timeStr + "," + id).getBytes()); } private CursorInfo parseCursor(String cursor) { try { String decoded = new String(Base64.getDecoder().decode(cursor)); String[] parts = decoded.split(","); LocalDateTime createTime = LocalDateTime.parse(parts[0], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")); Long id = Long.parseLong(parts[1]); return new CursorInfo(createTime, id); } catch (Exception e) { throw new IllegalArgumentException("Invalid cursor: " + cursor); } } } // 分页请求对象 public class CursorPageRequest { private String cursor; private int pageSize = 20; public boolean isFirstPage() { return cursor == null || cursor.isEmpty(); } public LocalDateTime getCursorTime() { if (isFirstPage()) return null; return parseCursor(cursor).getCreateTime(); } public Long getCursorId() { if (isFirstPage()) return null; return parseCursor(cursor).getId(); } } // 分页结果对象 public class PageResult<T> { private List<T> data; private String nextCursor; private boolean hasNext; // 构造器、getter、setter }
总结
数据库和缓存是后端系统的核心组件,掌握其优化技巧对于Java开发者至关重要。面试中这部分内容的考察重点:
核心要点回顾:
- MySQL优化:索引设计、执行计划分析、慢查询优化、主从复制
- Redis应用:数据结构选择、高可用方案、持久化机制
- 缓存设计:穿透/击穿/雪崩解决方案、分布式锁实现
- 性能优化:分页优化、批量操作、连接池配置
面试建议:
- 深入理解底层原理,不要只停留在使用层面
- 结合实际项目经验,展现解决问题的能力
- 掌握性能调优技巧,体现技术深度
- 了解最新技术发展,如MySQL 8.0新特性、Redis 6.0新功能
本章核心要点:
- ✅ MySQL索引原理和优化策略
- ✅ Redis数据结构底层实现
- ✅ 高可用方案设计和配置
- ✅ 缓存常见问题解决方案
- ✅ 分布式锁和分页优化实战
下一章预告: 消息队列与分布式 - RabbitMQ、Kafka、分布式事务等核心技术
#java面试##秋招笔面试记录##面试##java速通##秋招投递攻略#Java面试圣经 文章被收录于专栏
Java面试圣经