跳到主要内容

分布式心跳服务 - 缓存一致性问题与解决方案

问题分析

1. 数据不一致的场景

场景一:客户端切换服务器
时间轴:
T1: client-001 → Server A (本地缓存: 在线)
T2: 负载均衡切换
T3: client-001 → Server B (本地缓存: 无数据/离线)
T4: Server A 本地缓存仍然认为 client-001 在线 ❌

场景二:Redis 数据被手动清理
T1: Server A 本地缓存: client-001 在线
T2: Redis 中 client-001 数据被清理
T3: Server A 本地缓存仍然认为 client-001 在线 ❌

场景三:服务器时钟不同步
Server A 时钟: 2025-01-13 10:00:00
Server B 时钟: 2025-01-13 09:59:50 (慢10秒)
导致超时判断不一致 ❌

2. 数据不一致的后果

后果严重程度影响
误判客户端在线🔴 高向已离线客户端推送消息失败
误判客户端离线🟡 中重复发送上线通知,浪费资源
统计数据不准确🟡 中在线人数显示错误
重复事件发布🟡 中Kafka 收到重复的上下线事件
清理任务冲突🟢 低多个实例重复清理同一客户端

解决方案

方案 1: 完全依赖 Redis (推荐 ⭐⭐⭐⭐⭐)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;

/**
* 完全依赖 Redis,不使用本地缓存
* 优点: 强一致性,无数据不一致问题
* 缺点: 每次都查询 Redis,性能稍低(但可接受)
*/
@Slf4j
@Service
public class RedisOnlyHeartbeatService {

private final StringRedisTemplate redisTemplate;
private final HeartbeatEventPublisher eventPublisher;

private static final String HEARTBEAT_ZSET = "hb:clients";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;

public RedisOnlyHeartbeatService(
StringRedisTemplate redisTemplate,
HeartbeatEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.eventPublisher = eventPublisher;
}

/**
* 接收心跳 - 使用 Lua 脚本,原子性操作
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();

// Lua 脚本: 原子性地更新并返回状态
String luaScript =
"local key = KEYS[1] " +
"local clientId = ARGV[1] " +
"local currentTime = tonumber(ARGV[2]) " +
"local timeout = tonumber(ARGV[3]) " +

"-- 获取旧分数 " +
"local oldScore = redis.call('ZSCORE', key, clientId) " +

"-- 更新分数 " +
"redis.call('ZADD', key, currentTime, clientId) " +

"-- 判断状态 " +
"if oldScore == false then " +
" return 1 -- 新客户端上线 " +
"elseif (currentTime - tonumber(oldScore)) > timeout then " +
" return 2 -- 超时后重连 " +
"else " +
" return 0 -- 正常心跳 " +
"end";

Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(HEARTBEAT_ZSET),
clientId,
String.valueOf(currentTime),
String.valueOf(HEARTBEAT_TIMEOUT_MS)
);

// 根据返回值判断是否需要发布上线事件
if (result != null && result > 0) {
eventPublisher.publishOnlineEvent(clientId, info);
log.info("客户端上线: {} (状态码: {})", clientId, result);
}
}

/**
* 检查客户端是否在线 - 直接查询 Redis
*/
public boolean isOnline(String clientId) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) return false;

long elapsed = System.currentTimeMillis() - score.longValue();
return elapsed <= HEARTBEAT_TIMEOUT_MS;
}

/**
* 获取在线客户端数量
*/
public Long getOnlineCount() {
long currentTime = System.currentTimeMillis();
long minScore = currentTime - HEARTBEAT_TIMEOUT_MS;
return redisTemplate.opsForZSet().count(HEARTBEAT_ZSET, minScore, currentTime);
}
}

方案 2: 本地缓存 + 短过期时间 (推荐 ⭐⭐⭐⭐)

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

/**
* 使用本地缓存,但设置短过期时间
* 优点: 性能好,数据不一致时间窗口小
* 缺点: 仍然存在短暂的数据不一致
*/
@Slf4j
@Service
public class CaffeineCacheHeartbeatService {

private final StringRedisTemplate redisTemplate;
private final HeartbeatEventPublisher eventPublisher;

// 使用 Caffeine 缓存,5秒过期
private final Cache<String, Long> localCache = Caffeine.newBuilder()
.maximumSize(20000)
.expireAfterWrite(5, TimeUnit.SECONDS) // 5秒后过期
.recordStats()
.build();

private static final String HEARTBEAT_ZSET = "hb:clients";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;

public CaffeineCacheHeartbeatService(
StringRedisTemplate redisTemplate,
HeartbeatEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.eventPublisher = eventPublisher;
}

/**
* 接收心跳
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();

// 1. 检查本地缓存
Long lastHeartbeat = localCache.getIfPresent(clientId);
boolean isNewOrReconnect = false;

if (lastHeartbeat == null) {
// 本地缓存未命中,查询 Redis 确认状态
isNewOrReconnect = checkRedisForNewClient(clientId, currentTime);
} else {
// 本地缓存命中,判断是否超时
if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT_MS) {
isNewOrReconnect = true;
}
}

// 2. 更新本地缓存
localCache.put(clientId, currentTime);

// 3. 异步更新 Redis
CompletableFuture.runAsync(() -> {
updateRedis(clientId, currentTime);
});

// 4. 发布上线事件
if (isNewOrReconnect) {
eventPublisher.publishOnlineEvent(clientId, info);
}
}

/**
* 检查 Redis 中的客户端状态
*/
private boolean checkRedisForNewClient(String clientId, long currentTime) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);

if (score == null) {
return true; // 新客户端
}

// 检查是否超时
return (currentTime - score.longValue()) > HEARTBEAT_TIMEOUT_MS;
}

/**
* 更新 Redis
*/
private void updateRedis(String clientId, long currentTime) {
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);
}

/**
* 检查客户端是否在线 - 先查本地缓存,未命中再查 Redis
*/
public boolean isOnline(String clientId) {
// 1. 查本地缓存
Long lastHeartbeat = localCache.getIfPresent(clientId);
if (lastHeartbeat != null) {
long elapsed = System.currentTimeMillis() - lastHeartbeat;
if (elapsed <= HEARTBEAT_TIMEOUT_MS) {
return true; // 缓存命中且未超时
}
}

// 2. 缓存未命中或已过期,查 Redis
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) return false;

long elapsed = System.currentTimeMillis() - score.longValue();
boolean online = elapsed <= HEARTBEAT_TIMEOUT_MS;

// 3. 更新本地缓存
if (online) {
localCache.put(clientId, score.longValue());
}

return online;
}

/**
* 获取缓存统计信息
*/
public Map<String, Object> getCacheStats() {
var stats = localCache.stats();
Map<String, Object> result = new HashMap<>();
result.put("hitRate", stats.hitRate());
result.put("hitCount", stats.hitCount());
result.put("missCount", stats.missCount());
result.put("evictionCount", stats.evictionCount());
result.put("size", localCache.estimatedSize());
return result;
}
}

方案 3: Redis Pub/Sub 同步缓存 (推荐 ⭐⭐⭐)

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Service;

/**
* 使用 Redis Pub/Sub 在多实例间同步缓存失效
* 优点: 实时同步,数据一致性好
* 缺点: 增加 Redis 网络开销
*/
@Slf4j
@Service
public class PubSubHeartbeatService {

private final StringRedisTemplate redisTemplate;
private final RedisMessageListenerContainer listenerContainer;

private final ConcurrentHashMap<String, Long> localCache = new ConcurrentHashMap<>();

private static final String HEARTBEAT_ZSET = "hb:clients";
private static final String CACHE_INVALIDATE_CHANNEL = "hb:cache:invalidate";
private static final long HEARTBEAT_TIMEOUT_MS = 30000;

public PubSubHeartbeatService(
StringRedisTemplate redisTemplate,
RedisMessageListenerContainer listenerContainer) {
this.redisTemplate = redisTemplate;
this.listenerContainer = listenerContainer;

// 订阅缓存失效消息
subscribeCacheInvalidation();
}

/**
* 订阅缓存失效消息
*/
private void subscribeCacheInvalidation() {
listenerContainer.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
String clientId = new String(message.getBody());
log.debug("收到缓存失效通知: {}", clientId);

// 从本地缓存中移除
localCache.remove(clientId);
}
}, new ChannelTopic(CACHE_INVALIDATE_CHANNEL));
}

/**
* 接收心跳
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();

// 1. 检查本地缓存
Long lastHeartbeat = localCache.get(clientId);
boolean isNewOrReconnect = false;

if (lastHeartbeat == null) {
isNewOrReconnect = checkRedisForNewClient(clientId, currentTime);
} else if (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT_MS) {
isNewOrReconnect = true;
}

// 2. 更新本地缓存
localCache.put(clientId, currentTime);

// 3. 更新 Redis
updateRedis(clientId, currentTime);

// 4. 发布上线事件
if (isNewOrReconnect) {
eventPublisher.publishOnlineEvent(clientId, info);
}
}

/**
* 客户端断开连接 - 通知所有实例清除缓存
*/
public void clientDisconnect(String clientId) {
// 1. 从 Redis 删除
redisTemplate.opsForZSet().remove(HEARTBEAT_ZSET, clientId);

// 2. 从本地缓存删除
localCache.remove(clientId);

// 3. 发布缓存失效消息,通知其他实例
publishCacheInvalidation(clientId);

// 4. 发布离线事件
eventPublisher.publishOfflineEvent(clientId);
}

/**
* 发布缓存失效消息
*/
private void publishCacheInvalidation(String clientId) {
redisTemplate.convertAndSend(CACHE_INVALIDATE_CHANNEL, clientId);
}

/**
* 清理超时客户端 - 通知所有实例
*/
public void cleanTimeoutClients() {
long currentTime = System.currentTimeMillis();
long maxScore = currentTime - HEARTBEAT_TIMEOUT_MS;

// 获取超时客户端
Set<String> timeoutClients = redisTemplate.opsForZSet()
.rangeByScore(HEARTBEAT_ZSET, 0, maxScore, 0, 500);

if (timeoutClients == null || timeoutClients.isEmpty()) {
return;
}

log.info("清理超时客户端: {} 个", timeoutClients.size());

// 批量删除并发布失效消息
for (String clientId : timeoutClients) {
localCache.remove(clientId);
publishCacheInvalidation(clientId); // 通知其他实例
eventPublisher.publishOfflineEvent(clientId);
}

// 批量删除 Redis 数据
redisTemplate.opsForZSet().removeRangeByScore(HEARTBEAT_ZSET, 0, maxScore);
}

private boolean checkRedisForNewClient(String clientId, long currentTime) {
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
return score == null || (currentTime - score.longValue()) > HEARTBEAT_TIMEOUT_MS;
}

private void updateRedis(String clientId, long currentTime) {
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);
}
}

方案 4: 分布式锁 + 本地缓存 (不推荐 ❌)

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import java.util.concurrent.TimeUnit;

/**
* 使用分布式锁保证操作原子性
* 优点: 强一致性
* 缺点: 性能差,不适合高并发场景
*/
@Slf4j
@Service
public class DistributedLockHeartbeatService {

private final RedissonClient redissonClient;
private final StringRedisTemplate redisTemplate;
private final ConcurrentHashMap<String, Long> localCache = new ConcurrentHashMap<>();

private static final String LOCK_PREFIX = "hb:lock:";
private static final String HEARTBEAT_ZSET = "hb:clients";

public void receiveHeartbeat(String clientId, ClientInfo info) {
String lockKey = LOCK_PREFIX + clientId;
RLock lock = redissonClient.getLock(lockKey);

try {
// 尝试获取锁,最多等待100ms
if (lock.tryLock(100, 1000, TimeUnit.MILLISECONDS)) {
try {
// 在锁保护下操作
long currentTime = System.currentTimeMillis();

// 查询 Redis
Double oldScore = redisTemplate.opsForZSet()
.score(HEARTBEAT_ZSET, clientId);

// 更新 Redis
redisTemplate.opsForZSet().add(HEARTBEAT_ZSET, clientId, currentTime);

// 更新本地缓存
localCache.put(clientId, currentTime);

// 判断是否需要发布事件
if (oldScore == null ||
(currentTime - oldScore.longValue()) > HEARTBEAT_TIMEOUT_MS) {
eventPublisher.publishOnlineEvent(clientId, info);
}

} finally {
lock.unlock();
}
} else {
log.warn("获取锁超时: {}", clientId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("获取锁被中断: {}", clientId, e);
}
}
}

方案对比

方案一致性性能复杂度推荐指数
完全依赖 Redis⭐⭐⭐⭐⭐ 强一致⭐⭐⭐⭐ 较好⭐⭐⭐⭐⭐ 简单⭐⭐⭐⭐⭐
本地缓存 + 短过期⭐⭐⭐⭐ 最终一致⭐⭐⭐⭐⭐ 最好⭐⭐⭐⭐ 简单⭐⭐⭐⭐
Redis Pub/Sub⭐⭐⭐⭐ 准实时一致⭐⭐⭐ 一般⭐⭐⭐ 中等⭐⭐⭐
分布式锁⭐⭐⭐⭐⭐ 强一致⭐⭐ 差⭐⭐ 复杂⭐ 不推荐

最佳实践推荐

场景 1: 对一致性要求极高

// 使用方案 1: 完全依赖 Redis
// 适用于: 金融、支付等对数据准确性要求极高的场景
@Service
public class HighConsistencyHeartbeatService extends RedisOnlyHeartbeatService {
// 完全依赖 Redis,不使用本地缓存
}

场景 2: 平衡性能和一致性 (推荐 ⭐)

// 使用方案 2: 本地缓存 + 短过期时间 (5秒)
// 适用于: 大多数场景,10000+ 并发
@Service
public class BalancedHeartbeatService extends CaffeineCacheHeartbeatService {
// 5秒缓存过期,性能好,不一致窗口小
}

场景 3: 需要实时通知

// 使用方案 3: Redis Pub/Sub
// 适用于: 需要实时同步状态的场景
@Service
public class RealtimeHeartbeatService extends PubSubHeartbeatService {
// 实时同步缓存失效
}

监控和告警

import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.stereotype.Component;

@Component
public class CacheConsistencyMonitor {

private final MeterRegistry meterRegistry;
private final CaffeineCacheHeartbeatService heartbeatService;
private final StringRedisTemplate redisTemplate;

/**
* 定期检查缓存一致性
*/
@Scheduled(fixedRate = 60000) // 每分钟检查一次
public void checkCacheConsistency() {
// 随机抽样检查 100 个客户端
Set<String> sampleClients = getSampleClients(100);

int inconsistentCount = 0;
for (String clientId : sampleClients) {
boolean localResult = checkLocalCache(clientId);
boolean redisResult = checkRedis(clientId);

if (localResult != redisResult) {
inconsistentCount++;
log.warn("数据不一致: {} - 本地:{}, Redis:{}",
clientId, localResult, redisResult);
}
}

// 记录不一致率
double inconsistencyRate = (double) inconsistentCount / sampleClients.size();
meterRegistry.gauge("heartbeat.cache.inconsistency.rate", inconsistencyRate);

// 告警
if (inconsistencyRate > 0.05) { // 超过 5%
log.error("缓存不一致率过高: {}%", inconsistencyRate * 100);
// 发送告警
}
}

/**
* 获取缓存命中率
*/
@Scheduled(fixedRate = 30000)
public void recordCacheHitRate() {
Map<String, Object> stats = heartbeatService.getCacheStats();
double hitRate = (double) stats.get("hitRate");

meterRegistry.gauge("heartbeat.cache.hit.rate", hitRate);

log.info("缓存命中率: {}%, 大小: {}",
hitRate * 100, stats.get("size"));
}
}

配置建议

# application.yml
heartbeat:
# 缓存配置
cache:
type: caffeine # redis / caffeine / pubsub
max-size: 20000
expire-after-write: 5s # 5秒过期,减少不一致窗口

# 超时配置
timeout-ms: 30000 # 30秒

# 一致性检查
consistency-check:
enabled: true
sample-size: 100
interval: 60s

# 告警阈值
alert:
inconsistency-rate-threshold: 0.05 # 5%