分布式心跳服务 - 缓存一致性问题与解决方案
问题分析
1. 数据不一致的场景
场景一:客户端切换服务器
时间轴:
T1: client-001 → Server A (本地缓存: 在线)
T2: 负载均衡切换
T3: client-001 → Server B (本地缓存: 无数据/离线)
T4: Server A 本地缓存仍然认为 client-001 在线 ❌
场景二:Redis 数据被手动清理
T1: Server A 本地缓存: client-001 在线
T2: Redis 中 client-001 数据被清理
T3: Server A 本地缓存仍然认为 client-001 在线 ❌
场景三:服务器时钟不同步
Server A 时钟: 2025-01-13 10:00:00
Server B 时钟: 2025-01-13 09:59:50 (慢10秒)
导致超时判断不一致 ❌
2. 数据不一致的后果
| 后果 | 严重程度 | 影响 |
|---|---|---|
| 误判客户端在线 | 🔴 高 | 向已离线客户端推送消息失败 |
| 误判客户端离线 | 🟡 中 | 重复发送上线通知,浪费资源 |
| 统计数据不准确 | 🟡 中 | 在线人数显示错误 |
| 重复事件发布 | 🟡 中 | Kafka 收到重复的上下线事件 |
| 清理任务冲突 | 🟢 低 | 多个实例重复清理同一客户端 |
解决方案
方案 1: 完全依赖 Redis (推荐 ⭐⭐⭐⭐⭐)
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
/**
* 完全依赖 Redis,不使用本地缓存
* 优点: 强一致性,无数据不一致问题
* 缺点: 每次都查询 Redis,性能稍低(但可接受)
*/
@Slf4j
@Service
public class RedisOnlyHeartbeatService {
private final StringRedisTemplate redisTemplate;
private final HeartbeatEventPublisher eventPublisher;
private static final String HEARTBEAT_ZSET = "hb:clients";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;
public RedisOnlyHeartbeatService(
StringRedisTemplate redisTemplate,
HeartbeatEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.eventPublisher = eventPublisher;
}
/**
* 接收心跳 - 使用 Lua 脚本,原子性操作
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();
// Lua 脚本: 原子性地更新并返回状态
String luaScript =
"local key = KEYS[1] " +
"local clientId = ARGV[1] " +
"local currentTime = tonumber(ARGV[2]) " +
"local timeout = tonumber(ARGV[3]) " +
"-- 获取旧分数 " +
"local oldScore = redis.call('ZSCORE', key, clientId) " +
"-- 更新分数 " +
"redis.call('ZADD', key, currentTime, clientId) " +
"-- 判断状态 " +
"if oldScore == false then " +
" return 1 -- 新客户端上线 " +
"elseif (currentTime - tonumber(oldScore)) > timeout then " +
" return 2 -- 超时后重连 " +
"else " +
" return 0 -- 正常心跳 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(HEARTBEAT_ZSET),
clientId,
String.valueOf(currentTime),
String.valueOf(HEARTBEAT_TIMEOUT_MS)
);
// 根据返回值判断是否需要发布上线事件
if (result != null && result > 0) {
eventPublisher.publishOnlineEvent(clientId, info);
log.info("客户端上线: {} (状态码: {})", clientId, result);
}
}
/**
* 检查客户端是否在线 - 直接查询 Redis
*/
public boolean isOnline(String clientId) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) return false;
long elapsed = System.currentTimeMillis() - score.longValue();
return elapsed <= HEARTBEAT_TIMEOUT_MS;
}
/**
* 获取在线客户端数量
*/
public Long getOnlineCount() {
long currentTime = System.currentTimeMillis();
long minScore = currentTime - HEARTBEAT_TIMEOUT_MS;
return redisTemplate.opsForZSet().count(HEARTBEAT_ZSET, minScore, currentTime);
}
}
方案 2: 本地缓存 + 短过期时间 (推荐 ⭐⭐⭐⭐)
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
/**
* 使用本地缓存,但设置短过期时间
* 优点: 性能好,数据不一致时间窗口小
* 缺点: 仍然存在短暂的数据不一致
*/
@Slf4j
@Service
public class CaffeineCacheHeartbeatService {
private final StringRedisTemplate redisTemplate;
private final HeartbeatEventPublisher eventPublisher;
// 使用 Caffeine 缓存,5秒过期
private final Cache<String, Long> localCache = Caffeine.newBuilder()
.maximumSize(20000)
.expireAfterWrite(5, TimeUnit.SECONDS) // 5秒后过期
.recordStats()
.build();
private static final String HEARTBEAT_ZSET = "hb:clients";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;
public CaffeineCacheHeartbeatService(
StringRedisTemplate redisTemplate,
HeartbeatEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.eventPublisher = eventPublisher;
}
/**
* 接收心跳
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();
// 1. 检查本地缓存
Long lastHeartbeat = localCache.getIfPresent(clientId);
boolean isNewOrReconnect = false;
if (lastHeartbeat == null) {
// 本地缓存未命中,查询 Redis 确认状态
isNewOrReconnect = checkRedisForNewClient(clientId, currentTime);
} else {
// 本地缓存命中,判断是否超时
if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT_MS) {
isNewOrReconnect = true;
}
}
// 2. 更新本地缓存
localCache.put(clientId, currentTime);
// 3. 异步更新 Redis
CompletableFuture.runAsync(() -> {
updateRedis(clientId, currentTime);
});
// 4. 发布上线事件
if (isNewOrReconnect) {
eventPublisher.publishOnlineEvent(clientId, info);
}
}
/**
* 检查 Redis 中的客户端状态
*/
private boolean checkRedisForNewClient(String clientId, long currentTime) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) {
return true; // 新客户端
}
// 检查是否超时
return (currentTime - score.longValue()) > HEARTBEAT_TIMEOUT_MS;
}
/**
* 更新 Redis
*/
private void updateRedis(String clientId, long currentTime) {
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);
}
/**
* 检查客户端是否在线 - 先查本地缓存,未命中再查 Redis
*/
public boolean isOnline(String clientId) {
// 1. 查本地缓存
Long lastHeartbeat = localCache.getIfPresent(clientId);
if (lastHeartbeat != null) {
long elapsed = System.currentTimeMillis() - lastHeartbeat;
if (elapsed <= HEARTBEAT_TIMEOUT_MS) {
return true; // 缓存命中且未超时
}
}
// 2. 缓存未命中或已过期,查 Redis
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) return false;
long elapsed = System.currentTimeMillis() - score.longValue();
boolean online = elapsed <= HEARTBEAT_TIMEOUT_MS;
// 3. 更新本地缓存
if (online) {
localCache.put(clientId, score.longValue());
}
return online;
}
/**
* 获取缓存统计信息
*/
public Map<String, Object> getCacheStats() {
var stats = localCache.stats();
Map<String, Object> result = new HashMap<>();
result.put("hitRate", stats.hitRate());
result.put("hitCount", stats.hitCount());
result.put("missCount", stats.missCount());
result.put("evictionCount", stats.evictionCount());
result.put("size", localCache.estimatedSize());
return result;
}
}
方案 3: Redis Pub/Sub 同步缓存 (推荐 ⭐⭐⭐)
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;
/**
* 使用 Redis Pub/Sub 在多实例间同步缓存失效
* 优点: 实时同步,数据一致性好
* 缺点: 增加 Redis 网络开销
*/
@Slf4j
@Service
public class PubSubHeartbeatService {
private final StringRedisTemplate redisTemplate;
private final RedisMessageListenerContainer listenerContainer;
private final ConcurrentHashMap<String, Long> localCache = new ConcurrentHashMap<>();
private static final String HEARTBEAT_ZSET = "hb:clients";
private static final String CACHE_INVALIDATE_CHANNEL = "hb:cache:invalidate";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;
public PubSubHeartbeatService(
StringRedisTemplate redisTemplate,
RedisMessageListenerContainer listenerContainer) {
this.redisTemplate = redisTemplate;
this.listenerContainer = listenerContainer;
// 订阅缓存失效消息
subscribeCacheInvalidation();
}
/**
* 订阅缓存失效消息
*/
private void subscribeCacheInvalidation() {
listenerContainer.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
String clientId = new String(message.getBody());
log.debug("收到缓存失效通知: {}", clientId);
// 从本地缓存中移除
localCache.remove(clientId);
}
}, new ChannelTopic(CACHE_INVALIDATE_CHANNEL));
}
/**
* 接收心跳
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();
// 1. 检查本地缓存
Long lastHeartbeat = localCache.get(clientId);
boolean isNewOrReconnect = false;
if (lastHeartbeat == null) {
isNewOrReconnect = checkRedisForNewClient(clientId, currentTime);
} else if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT_MS) {
isNewOrReconnect = true;
}
// 2. 更新本地缓存
localCache.put(clientId, currentTime);
// 3. 更新 Redis
updateRedis(clientId, currentTime);
// 4. 发布上线事件
if (isNewOrReconnect) {
eventPublisher.publishOnlineEvent(clientId, info);
}
}
/**
* 客户端断开连接 - 通知所有实例清除缓存
*/
public void clientDisconnect(String clientId) {
// 1. 从 Redis 删除
redisTemplate.opsForZSet().remove(HEARTBEAT_ZSET, clientId);
// 2. 从本地缓存删除
localCache.remove(clientId);
// 3. 发布缓存失效消息,通知其他实例
publishCacheInvalidation(clientId);
// 4. 发布离线事件
eventPublisher.publishOfflineEvent(clientId);
}
/**
* 发布缓存失效消息
*/
private void publishCacheInvalidation(String clientId) {
redisTemplate.convertAndSend(CACHE_INVALIDATE_CHANNEL, clientId);
}
/**
* 清理超时客户端 - 通知所有实例
*/
public void cleanTimeoutClients() {
long currentTime = System.currentTimeMillis();
long maxScore = currentTime - HEARTBEAT_TIMEOUT_MS;
// 获取超时客户端
Set<String> timeoutClients = redisTemplate.opsForZSet()
.rangeByScore(HEARTBEAT_ZSET, 0, maxScore, 0, 500);
if (timeoutClients == null || timeoutClients.isEmpty()) {
return;
}
log.info("清理超时客户端: {} 个", timeoutClients.size());
// 批量删除并发布失效消息
for (String clientId : timeoutClients) {
localCache.remove(clientId);
publishCacheInvalidation(clientId); // 通知其他实例
eventPublisher.publishOfflineEvent(clientId);
}
// 批量删除 Redis 数据
redisTemplate.opsForZSet().removeRangeByScore(HEARTBEAT_ZSET, 0, maxScore);
}
private boolean checkRedisForNewClient(String clientId, long currentTime) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
return score == null || (currentTime - score.longValue()) > HEARTBEAT_TIMEOUT_MS;
}
private void updateRedis(String clientId, long currentTime) {
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);
}
}
方案 4: 分布式锁 + 本地缓存 (不推荐 ❌)
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;
/**
* 使用分布式锁保证操作原子性
* 优点: 强一致性
* 缺点: 性能差,不适合高并发场景
*/
@Slf4j
@Service
public class DistributedLockHeartbeatService {
private final RedissonClient redissonClient;
private final StringRedisTemplate redisTemplate;
private final ConcurrentHashMap<String, Long> localCache = new ConcurrentHashMap<>();
private static final String LOCK_PREFIX = "hb:lock:";
private static final String HEARTBEAT_ZSET = "hb:clients";
public void receiveHeartbeat(String clientId, ClientInfo info) {
String lockKey = LOCK_PREFIX + clientId;
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取锁,最多等待100ms
if (lock.tryLock(100, 1000, TimeUnit.MILLISECONDS)) {
try {
// 在锁保护下操作
long currentTime = System.currentTimeMillis();
// 查询 Redis
Double oldScore = redisTemplate.opsForZSet()
.score(HEARTBEAT_ZSET, clientId);
// 更新 Redis
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);
// 更新本地缓存
localCache.put(clientId, currentTime);
// 判断是否需要发布事件
if (oldScore == null ||
(currentTime - oldScore.longValue()) > HEARTBEAT_TIMEOUT_MS) {
eventPublisher.publishOnlineEvent(clientId, info);
}
} finally {
lock.unlock();
}
} else {
log.warn("获取锁超时: {}", clientId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("获取锁被中断: {}", clientId, e);
}
}
}
方案对比
| 方案 | 一致性 | 性能 | 复杂度 | 推荐指数 |
|---|---|---|---|---|
| 完全依赖 Redis | ⭐⭐⭐⭐⭐ 强一致 | ⭐⭐⭐⭐ 较好 | ⭐⭐⭐⭐⭐ 简单 | ⭐⭐⭐⭐⭐ |
| 本地缓存 + 短过期 | ⭐⭐⭐⭐ 最终一致 | ⭐⭐⭐⭐⭐ 最好 | ⭐⭐⭐⭐ 简单 | ⭐⭐⭐⭐ |
| Redis Pub/Sub | ⭐⭐⭐⭐ 准实时一致 | ⭐⭐⭐ 一般 | ⭐⭐⭐ 中等 | ⭐⭐⭐ |
| 分布式锁 | ⭐⭐⭐⭐⭐ 强一致 | ⭐⭐ 差 | ⭐⭐ 复杂 | ⭐ 不推荐 |
最佳实践推荐
场景 1: 对一致性要求极高
// 使用方案 1: 完全依赖 Redis
// 适用于: 金融、支付等对数据准确性要求极高的场景
@Service
public class HighConsistencyHeartbeatService extends RedisOnlyHeartbeatService {
// 完全依赖 Redis,不使用本地缓存
}
场景 2: 平衡性能和一致性 (推荐 ⭐)
// 使用方案 2: 本地缓存 + 短过期时间 (5秒)
// 适用于: 大多数场景,10000+ 并发
@Service
public class BalancedHeartbeatService extends CaffeineCacheHeartbeatService {
// 5秒缓存过期,性能好,不一致窗口小
}
场景 3: 需要实时通知
// 使用方案 3: Redis Pub/Sub
// 适用于: 需要实时同步状态的场景
@Service
public class RealtimeHeartbeatService extends PubSubHeartbeatService {
// 实时同步缓存失效
}
监控和告警
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Component;
@Component
public class CacheConsistencyMonitor {
private final MeterRegistry meterRegistry;
private final CaffeineCacheHeartbeatService heartbeatService;
private final StringRedisTemplate redisTemplate;
/**
* 定期检查缓存一致性
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkCacheConsistency() {
// 随机抽样检查 100 个客户端
Set<String> sampleClients = getSampleClients(100);
int inconsistentCount = 0;
for (String clientId : sampleClients) {
boolean localResult = checkLocalCache(clientId);
boolean redisResult = checkRedis(clientId);
if (localResult != redisResult) {
inconsistentCount++;
log.warn("数据不一致: {} - 本地:{}, Redis:{}",
clientId, localResult, redisResult);
}
}
// 记录不一致率
double inconsistencyRate = (double) inconsistentCount / sampleClients.size();
meterRegistry.gauge("heartbeat.cache.inconsistency.rate", inconsistencyRate);
// 告警
if (inconsistencyRate > 0.05) { // 超过 5%
log.error("缓存不一致率过高: {}%", inconsistencyRate * 100);
// 发送告警
}
}
/**
* 获取缓存命中率
*/
@Scheduled(fixedRate = 30000)
public void recordCacheHitRate() {
Map<String, Object> stats = heartbeatService.getCacheStats();
double hitRate = (double) stats.get("hitRate");
meterRegistry.gauge("heartbeat.cache.hit.rate", hitRate);
log.info("缓存命中率: {}%, 大小: {}",
hitRate * 100, stats.get("size"));
}
}
配置建议
# application.yml
heartbeat:
# 缓存配置
cache:
type: caffeine # redis / caffeine / pubsub
max-size: 20000
expire-after-write: 5s # 5秒过期,减少不一致窗口
# 超时配置
timeout-ms: 30000 # 30秒
# 一致性检查
consistency-check:
enabled: true
sample-size: 100
interval: 60s
# 告警阈值
alert:
inconsistency-rate-threshold: 0.05 # 5%