Redis 心跳机制 - 高并发分布式方案
架构设计
客户端 (10000+)
↓
负载均衡 (Nginx/ALB)
↓
应用服务器集群 (多实例)
↓
Redis 集群 (主从/哨兵/Cluster)
↓
消息队列 (Kafka/RabbitMQ) → 异步日志处理
↓
数据库 (MySQL/PostgreSQL)
1. 核心实现 - 使用 Sorted Set + Pipeline
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.stereotype.Service;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.*;
@Slf4j
@Service
public class HighPerformanceHeartbeatService {
private final StringRedisTemplate redisTemplate;
private final HeartbeatEventPublisher eventPublisher;
// Redis Keys
private static final String HEARTBEAT_ZSET = "hb:clients";
private static final String CLIENT_INFO_HASH = "hb:info:";
private static final String ONLINE_COUNTER = "hb:online:count";
// 配置参数
private static final long HEARTBEAT_TIMEOUT_MS = 30000; // 30秒超时
private static final int BATCH_SIZE = 100; // 批量处理大小
private static final int CLEANUP_BATCH_SIZE = 500; // 清理批次大小
// 本地缓存 - 减少 Redis 查询
private final ConcurrentHashMap<String, Long> localCache = new ConcurrentHashMap<>();
private final ScheduledExecutorService cacheCleanScheduler =
Executors.newScheduledThreadPool(1);
public HighPerformanceHeartbeatService(
StringRedisTemplate redisTemplate,
HeartbeatEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.eventPublisher = eventPublisher;
// 每5秒清理本地缓存
cacheCleanScheduler.scheduleAtFixedRate(
this::cleanLocalCache, 5, 5, TimeUnit.SECONDS
);
}
/**
* 接收心跳 - 高性能版本
* 使用 Lua 脚本保证原子性,同时减少网络往返
*/
public void receiveHeartbeat(String clientId, ClientInfo info) {
long currentTime = System.currentTimeMillis();
// 1. 先检查本地缓存,避免频繁查询 Redis
Long lastHeartbeat = localCache.get(clientId);
boolean isNewClient = (lastHeartbeat == null);
// 2. 更新本地缓存
localCache.put(clientId, currentTime);
// 3. 异步更新 Redis(提高响应速度)
CompletableFuture.runAsync(() -> {
try {
updateHeartbeatInRedis(clientId, currentTime, info);
// 4. 如果是新客户端或超时后重连,发布上线事件
if (isNewClient || (currentTime - lastHeartbeat > HEARTBEAT_TIMEOUT_MS)) {
eventPublisher.publishOnlineEvent(clientId, info);
}
} catch (Exception e) {
log.error("更新心跳失败: clientId={}", clientId, e);
}
});
}
/**
* 使用 Lua 脚本原子性更新心跳
*/
private void updateHeartbeatInRedis(String clientId, long currentTime, ClientInfo info) {
String luaScript =
"local zsetKey = KEYS[1] " +
"local clientId = ARGV[1] " +
"local currentTime = ARGV[2] " +
"local timeout = ARGV[3] " +
// 获取旧的分数(上次心跳时间)
"local oldScore = redis.call('ZSCORE', zsetKey, clientId) " + // 查询上次心跳时间
// 更新 Sorted Set
"redis.call('ZADD', zsetKey, currentTime, clientId) " + // 设置当前心跳时间
// 返回是否是新上线(1=新上线,0=已在线)
"if oldScore == false then " + // 没有心跳,则设置为新上线
" return 1 " +
"elseif (tonumber(currentTime) - tonumber(oldScore)) > tonumber(timeout) then " + // 超过超时时间,重新上线
" return 2 " +
"else " +
" return 0 " + // 已在线
"end";
redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(HEARTBEAT_ZSET),
clientId,
String.valueOf(currentTime),
String.valueOf(HEARTBEAT_TIMEOUT_MS)
);
// 异步保存客户端详细信息(使用 Hash 结构)
if (info != null) {
saveClientInfo(clientId, info);
}
}
/**
* 批量接收心跳 - 使用 Pipeline 提升性能
* 适用于批量上报场景
*/
public void batchReceiveHeartbeat(List<HeartbeatRequest> requests) {
if (requests.isEmpty()) return;
long currentTime = System.currentTimeMillis();
// 使用 Pipeline 批量执行
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
byte[] zsetKey = HEARTBEAT_ZSET.getBytes();
for (HeartbeatRequest request : requests) {
String clientId = request.getClientId();
// 更新本地缓存
localCache.put(clientId, currentTime);
// 批量添加到 Sorted Set
connection.zAdd(zsetKey, currentTime, clientId.getBytes());
}
return null;
});
log.info("批量处理心跳: {} 个客户端", requests.size());
}
/**
* 获取在线客户端数量 - 使用 ZCOUNT 命令,O(log(N)) 复杂度
*/
public Long getOnlineCount() {
long currentTime = System.currentTimeMillis();
long minScore = currentTime - HEARTBEAT_TIMEOUT_MS;
return redisTemplate.opsForZSet().count(HEARTBEAT_ZSET, minScore, currentTime);
}
/**
* 分页获取在线客户端列表
*/
public Set<String> getOnlineClients(int page, int size) {
long currentTime = System.currentTimeMillis();
long minScore = currentTime - HEARTBEAT_TIMEOUT_MS;
long start = (long) page * size;
long end = start + size - 1;
return redisTemplate.opsForZSet().reverseRangeByScore(
HEARTBEAT_ZSET,
minScore,
currentTime,
start,
end
);
}
/**
* 检查客户端是否在线 - 先查本地缓存
*/
public boolean isOnline(String clientId) {
// 1. 先查本地缓存
Long lastHeartbeat = localCache.get(clientId);
if (lastHeartbeat != null) {
long elapsed = System.currentTimeMillis() - lastHeartbeat;
if (elapsed <= HEARTBEAT_TIMEOUT_MS) {
return true;
}
}
// 2. 本地缓存未命中,查询 Redis
Double score = redisTemplate.opsForZSet().score(HEARTBEAT_ZSET, clientId);
if (score == null) return false;
long elapsed = System.currentTimeMillis() - score.longValue();
return elapsed <= HEARTBEAT_TIMEOUT_MS;
}
/**
* 批量清理超时客户端 - 分批处理,避免阻塞
*/
public void cleanTimeoutClients() {
long startTime = System.currentTimeMillis();
long maxScore = startTime - HEARTBEAT_TIMEOUT_MS;
try {
// 1. 获取超时客户端(限制数量,避免一次性加载过多)
Set<String> timeoutClients = redisTemplate.opsForZSet().rangeByScore(
HEARTBEAT_ZSET,
0,
maxScore,
0,
CLEANUP_BATCH_SIZE
);
if (timeoutClients == null || timeoutClients.isEmpty()) {
return;
}
log.info("发现 {} 个超时客户端", timeoutClients.size());
// 2. 批量发布离线事件(异步处理)
for (String clientId : timeoutClients) {
eventPublisher.publishOfflineEvent(clientId);
localCache.remove(clientId);
}
// 3. 批量删除超时客户端
redisTemplate.opsForZSet().removeRangeByScore(HEARTBEAT_ZSET, 0, maxScore);
long duration = System.currentTimeMillis() - startTime;
log.info("清理完成: {} 个客户端, 耗时: {}ms", timeoutClients.size(), duration);
} catch (Exception e) {
log.error("清理超时客户端失败", e);
}
}
/**
* 客户端主动断开
*/
public void clientDisconnect(String clientId) {
localCache.remove(clientId);
redisTemplate.opsForZSet().remove(HEARTBEAT_ZSET, clientId);
redisTemplate.delete(CLIENT_INFO_HASH + clientId);
eventPublisher.publishOfflineEvent(clientId);
}
/**
* 保存客户端详细信息 - 使用 Hash 结构
*/
private void saveClientInfo(String clientId, ClientInfo info) {
String key = CLIENT_INFO_HASH + clientId;
Map<String, String> infoMap = new HashMap<>();
infoMap.put("ip", info.getIpAddress());
infoMap.put("userAgent", info.getUserAgent());
infoMap.put("connectTime", String.valueOf(info.getConnectTime()));
redisTemplate.opsForHash().putAll(key, infoMap);
redisTemplate.expire(key, 1, TimeUnit.HOURS);
}
/**
* 获取客户端信息
*/
public ClientInfo getClientInfo(String clientId) {
String key = CLIENT_INFO_HASH + clientId;
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
if (entries.isEmpty()) return null;
ClientInfo info = new ClientInfo();
info.setClientId(clientId);
info.setIpAddress((String) entries.get("ip"));
info.setUserAgent((String) entries.get("userAgent"));
info.setConnectTime(Long.parseLong((String) entries.get("connectTime")));
return info;
}
/**
* 清理本地缓存中的过期数据
*/
private void cleanLocalCache() {
long currentTime = System.currentTimeMillis();
localCache.entrySet().removeIf(entry ->
currentTime - entry.getValue() > HEARTBEAT_TIMEOUT_MS * 2
);
}
/**
* 获取统计信息
*/
public HeartbeatStats getStats() {
HeartbeatStats stats = new HeartbeatStats();
stats.setOnlineCount(getOnlineCount());
stats.setLocalCacheSize(localCache.size());
stats.setTimestamp(System.currentTimeMillis());
return stats;
}
}
// ===== 辅助类 =====
@Data
public class HeartbeatRequest {
private String clientId;
private ClientInfo info;
}
@Data
public class ClientInfo {
private String clientId;
private String ipAddress;
private String userAgent;
private Long connectTime;
private String deviceType;
private String appVersion;
}
@Data
public class HeartbeatStats {
private Long onlineCount;
private Integer localCacheSize;
private Long timestamp;
}
2. 事件发布器 - 异步处理上下线事件
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class HeartbeatEventPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private static final String TOPIC_ONLINE = "heartbeat.client.online";
private static final String TOPIC_OFFLINE = "heartbeat.client.offline";
public HeartbeatEventPublisher(
KafkaTemplate<String, String> kafkaTemplate,
ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
/**
* 发布客户端上线事件
*/
public void publishOnlineEvent(String clientId, ClientInfo info) {
try {
HeartbeatEvent event = new HeartbeatEvent();
event.setClientId(clientId);
event.setEventType("ONLINE");
event.setTimestamp(System.currentTimeMillis());
event.setClientInfo(info);
String json = objectMapper.writeValueAsString(event);
kafkaTemplate.send(TOPIC_ONLINE, clientId, json);
log.debug("发布上线事件: {}", clientId);
} catch (Exception e) {
log.error("发布上线事件失败: {}", clientId, e);
}
}
/**
* 发布客户端离线事件
*/
public void publishOfflineEvent(String clientId) {
try {
HeartbeatEvent event = new HeartbeatEvent();
event.setClientId(clientId);
event.setEventType("OFFLINE");
event.setTimestamp(System.currentTimeMillis());
String json = objectMapper.writeValueAsString(event);
kafkaTemplate.send(TOPIC_OFFLINE, clientId, json);
log.debug("发布离线事件: {}", clientId);
} catch (Exception e) {
log.error("发布离线事件失败: {}", clientId, e);
}
}
}
@Data
class HeartbeatEvent {
private String clientId;
private String eventType;
private Long timestamp;
private ClientInfo clientInfo;
}
3. 事件消费者 - 异步写入数据库
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class HeartbeatEventConsumer {
private final HeartbeatLogRepository logRepository;
public HeartbeatEventConsumer(HeartbeatLogRepository logRepository) {
this.logRepository = logRepository;
}
/**
* 消费上线事件
*/
@KafkaListener(topics = "heartbeat.client.online", groupId = "heartbeat-log-group")
@Transactional
public void handleOnlineEvent(String message) {
try {
ObjectMapper mapper = new ObjectMapper();
HeartbeatEvent event = mapper.readValue(message, HeartbeatEvent.class);
// 异步写入数据库
ClientHeartbeatLog log = new ClientHeartbeatLog();
log.setClientId(event.getClientId());
log.setStatus("ONLINE");
log.setTimestamp(LocalDateTime.now());
if (event.getClientInfo() != null) {
log.setIpAddress(event.getClientInfo().getIpAddress());
log.setUserAgent(event.getClientInfo().getUserAgent());
}
logRepository.save(log);
log.info("记录上线日志: {}", event.getClientId());
} catch (Exception e) {
log.error("处理上线事件失败", e);
}
}
/**
* 消费离线事件
*/
@KafkaListener(topics = "heartbeat.client.offline", groupId = "heartbeat-log-group")
@Transactional
public void handleOfflineEvent(String message) {
try {
ObjectMapper mapper = new ObjectMapper();
HeartbeatEvent event = mapper.readValue(message, HeartbeatEvent.class);
// 异步写入数据库
ClientHeartbeatLog log = new ClientHeartbeatLog();
log.setClientId(event.getClientId());
log.setStatus("OFFLINE");
log.setTimestamp(LocalDateTime.now());
logRepository.save(log);
log.info("记录离线日志: {}", event.getClientId());
} catch (Exception e) {
log.error("处理离线事件失败", e);
}
}
}
4. REST Controller - 优化版
import org.springframework.web.bind.annotation.*;
import org.springframework.http.ResponseEntity;
import io.swagger.v3.oas.annotations.Operation;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@RestController
@RequestMapping("/api/v1/heartbeat")
public class HeartbeatController {
private final HighPerformanceHeartbeatService heartbeatService;
// 请求限流器(避免单个客户端发送过多请求)
private final LoadingCache<String, RateLimiter> rateLimiters = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(new CacheLoader<String, RateLimiter>() {
@Override
public RateLimiter load(String key) {
return RateLimiter.create(2.0); // 每秒最多2个请求
}
});
public HeartbeatController(HighPerformanceHeartbeatService heartbeatService) {
this.heartbeatService = heartbeatService;
}
/**
* 接收单个客户端心跳
*/
@PostMapping("/{clientId}")
@Operation(summary = "接收心跳")
public ResponseEntity<Map<String, Object>> heartbeat(
@PathVariable String clientId,
@RequestHeader(value = "X-Real-IP", required = false) String clientIp,
@RequestHeader(value = "User-Agent", required = false) String userAgent) {
// 限流检查
try {
RateLimiter limiter = rateLimiters.get(clientId);
if (!limiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
return ResponseEntity.status(429)
.body(Map.of("error", "Too many requests"));
}
} catch (Exception e) {
log.warn("限流检查失败", e);
}
// 构建客户端信息
ClientInfo info = new ClientInfo();
info.setClientId(clientId);
info.setIpAddress(clientIp);
info.setUserAgent(userAgent);
info.setConnectTime(System.currentTimeMillis());
// 处理心跳(异步)
heartbeatService.receiveHeartbeat(clientId, info);
// 快速响应
Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("timestamp", System.currentTimeMillis());
response.put("nextHeartbeat", 15000); // 建议下次心跳间隔(15秒)
return ResponseEntity.ok(response);
}
/**
* 批量接收心跳(适用于网关批量上报)
*/
@PostMapping("/batch")
@Operation(summary = "批量接收心跳")
public ResponseEntity<Map<String, Object>> batchHeartbeat(
@RequestBody List<HeartbeatRequest> requests) {
if (requests.size() > 100) {
return ResponseEntity.badRequest()
.body(Map.of("error", "Batch size exceeds limit"));
}
heartbeatService.batchReceiveHeartbeat(requests);
return ResponseEntity.ok(Map.of(
"success", true,
"processed", requests.size()
));
}
/**
* 获取在线客户端数量
*/
@GetMapping("/stats/online-count")
@Operation(summary = "获取在线数量")
public ResponseEntity<Long> getOnlineCount() {
return ResponseEntity.ok(heartbeatService.getOnlineCount());
}
/**
* 分页获取在线客户端
*/
@GetMapping("/online")
@Operation(summary = "获取在线客户端列表")
public ResponseEntity<Set<String>> getOnlineClients(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "100") int size) {
if (size > 500) size = 500; // 限制最大页大小
return ResponseEntity.ok(heartbeatService.getOnlineClients(page, size));
}
/**
* 检查客户端状态
*/
@GetMapping("/{clientId}/status")
@Operation(summary = "检查客户端状态")
public ResponseEntity<Map<String, Object>> getClientStatus(@PathVariable String clientId) {
boolean online = heartbeatService.isOnline(clientId);
ClientInfo info = heartbeatService.getClientInfo(clientId);
Map<String, Object> response = new HashMap<>();
response.put("clientId", clientId);
response.put("online", online);
response.put("info", info);
return ResponseEntity.ok(response);
}
/**
* 客户端主动断开
*/
@DeleteMapping("/{clientId}")
@Operation(summary = "客户端断开")
public ResponseEntity<Void> disconnect(@PathVariable String clientId) {
heartbeatService.clientDisconnect(clientId);
return ResponseEntity.ok().build();
}
/**
* 获取服务统计信息
*/
@GetMapping("/stats")
@Operation(summary = "获取统计信息")
public ResponseEntity<HeartbeatStats> getStats() {
return ResponseEntity.ok(heartbeatService.getStats());
}
}
5. 定时任务配置
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class HeartbeatScheduledTasks {
private final HighPerformanceHeartbeatService heartbeatService;
public HeartbeatScheduledTasks(HighPerformanceHeartbeatService heartbeatService) {
this.heartbeatService = heartbeatService;
}
/**
* 每10秒清理一次超时客户端
*/
@Scheduled(fixedRate = 10000, initialDelay = 5000)
public void cleanTimeoutClients() {
try {
heartbeatService.cleanTimeoutClients();
} catch (Exception e) {
log.error("清理超时客户端失败", e);
}
}
/**
* 每分钟记录一次统计信息
*/
@Scheduled(cron = "0 * * * * ?")
public void logStats() {
try {
HeartbeatStats stats = heartbeatService.getStats();
log.info("心跳统计 - 在线: {}, 本地缓存: {}",
stats.getOnlineCount(),
stats.getLocalCacheSize());
} catch (Exception e) {
log.error("记录统计信息失败", e);
}
}
}
6. Redis 配置 - 集群模式
# application.yml
spring:
redis:
# 单机模式
# host: localhost
# port: 6379
# 哨兵模式
# sentinel:
# master: mymaster
# nodes:
# - 192.168.1.1:26379
# - 192.168.1.2:26379
# - 192.168.1.3:26379
# 集群模式(推荐用于10000+并发)
cluster:
nodes:
- 192.168.1.1:6379
- 192.168.1.2:6379
- 192.168.1.3:6379
- 192.168.1.4:6379
- 192.168.1.5:6379
- 192.168.1.6:6379
max-redirects: 3
password: your_redis_password
timeout: 3000ms
lettuce:
pool:
max-active: 200 # 最大连接数
max-idle: 50 # 最大空闲连接
min-idle: 10 # 最小空闲连接
max-wait: 1000ms # 最大等待时间
shutdown-timeout: 100ms
cluster:
refresh:
adaptive: true # 自适应拓扑刷新
period: 60s # 定期刷新
# Kafka 配置
kafka:
bootstrap-servers:
- kafka1:9092
- kafka2:9092
- kafka3:9092
producer:
acks: 1 # 0=不等待, 1=leader确认, all=所有副本确认
retries: 3
batch-size: 16384
buffer-memory: 33554432
compression-type: lz4
key-serializer: org.springframework.kafka.support.serializer.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.StringSerializer
consumer:
group-id: heartbeat-log-group
auto-offset-reset: earliest
enable-auto-commit: false
key-deserializer: org.springframework.kafka.support.serializer.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.StringDeserializer
max-poll-records: 500
# 数据库连接池配置
datasource:
type: com.zaxxer.hikari.HikariDataSource
hikari:
maximum-pool-size: 50
minimum-idle: 10
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
# 定时任务配置
task:
scheduling:
pool:
size: 4
execution:
pool:
core-size: 8
max-size: 16
queue-capacity: 100
# 自定义配置
heartbeat:
timeout-ms: 30000 # 超时时间
cleanup-batch-size: 500 # 清理批次大小
cache-expire-seconds: 60 # 本地缓存过期时间
7. Maven 依赖
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Lettuce 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Guava(限流器) -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
</dependencies>
8. 性能优化要点
8.1 Redis 优化
# redis.conf 优化配置
# 最大内存设置
maxmemory 4gb
maxmemory-policy allkeys-lru
# 持久化策略(根据需求选择)
# RDB: 适合备份,性能较好
save 900 1
save 300 10
save 60 10000
# AOF: 数据安全性更高,但性能稍差
# appendonly yes
# appendfsync everysec
# 网络优化
tcp-backlog 511
timeout 0
tcp-keepalive 300
# 慢查询日志
slowlog-log-slower-than 10000
slowlog-max-len 128
# 客户端连接数
maxclients 10000
8.2 JVM 优化
# 启动参数
java -Xms2g -Xmx2g \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 \
-XX:+ParallelRefProcEnabled \
-XX:+UnlockExperimentalVMOptions \
-XX:+AggressiveOpts \
-XX:+UseFastAccessorMethods \
-Djava.net.preferIPv4Stack=true \
-jar your-app.jar
9. 监控指标
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.stereotype.Component;
@Component
public class HeartbeatMetrics {
private final MeterRegistry meterRegistry;
private final Timer heartbeatTimer;
public HeartbeatMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.heartbeatTimer = Timer.builder("heartbeat.process.time")
.description("心跳处理耗时")
.register(meterRegistry);
}
public void recordHeartbeat() {
meterRegistry.counter("heartbeat.received").increment();
}
public void recordOnline() {
meterRegistry.counter("heartbeat.online").increment();
}
public void recordOffline() {
meterRegistry.counter("heartbeat.offline").increment();
}
public Timer getHeartbeatTimer() {
return heartbeatTimer;
}
}
10. 性能测试基准
预期性能指标(单台服务器)
- 处理能力: 10,000 QPS
- 响应时间: P99 < 50ms
- 内存占用: < 2GB
- Redis 连接数: 50-100
- CPU 使用率: < 50%
压测命令(使用 JMeter 或 wrk)
# 使用 wrk 压测
wrk -t12 -c1000 -d30s --latency \
-s heartbeat.lua \
http://localhost:8080/api/v1/heartbeat/test-client-{id}