使用elysia出现的redis连接失败问题
近期我在开发我的服务端项目时,使用了 elysia 框架,但是在连接 redis 时出现了连接失败的问题。
发现官方内置的redis
插件连接不够稳定,时不时的出现了连接失败的问题。 于是在原基础上我换了一个ioredis
插件,俩者的对比如下图:
完整代码
ioredis 插件
import Redis from "ioredis";
// 自己的redis配置
import { redisConfig } from "@/config/redis";
// 日志工具
import { logger } from "@/utils/logger";
// 改进的Redis配置
const improvedRedisConfig = {
host: redisConfig.host,
port: redisConfig.port,
password: redisConfig.password,
username: redisConfig.username,
db: redisConfig.db,
// 连接配置
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
lazyConnect: true,
// 重连配置
retryDelayOnClusterDown: 300,
// 超时配置
connectTimeout: 10000,
commandTimeout: 5000,
// 连接池配置
enableReadyCheck: true,
maxLoadingTimeout: 10000,
// 重连策略
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
// 自动重连
reconnectOnError: (err: Error) => {
const targetError = "READONLY";
if (err.message.includes(targetError)) {
return true;
}
return false;
},
};
class RedisManager {
private client: Redis;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 10;
private healthCheckTimer: NodeJS.Timeout | null = null;
constructor() {
this.client = new Redis(improvedRedisConfig);
this.setupEventHandlers();
this.startHealthCheck();
}
private setupEventHandlers() {
this.client.on("connect", () => {
logger.info("Redis 连接成功");
this.isConnected = true;
this.reconnectAttempts = 0;
});
this.client.on("ready", () => {
logger.info("Redis 准备就绪");
this.isConnected = true;
this.reconnectAttempts = 0;
});
this.client.on("error", error => {
logger.error("Redis 连接错误:" + error.message);
this.isConnected = false;
});
this.client.on("close", () => {
logger.warn("Redis 连接关闭");
this.isConnected = false;
});
this.client.on("reconnecting", () => {
logger.info("Redis 正在重连...");
this.reconnectAttempts++;
});
this.client.on("end", () => {
logger.warn("Redis 连接结束");
this.isConnected = false;
});
this.client.on("reconnect", () => {
logger.info("Redis 重连成功");
this.isConnected = true;
});
}
private startHealthCheck() {
// 每30秒检查一次连接状态
this.healthCheckTimer = setInterval(async () => {
if (this.isConnected) {
try {
await this.client.ping();
logger.info("Redis健康检查通过");
} catch {
logger.warn("Redis健康检查失败,标记为断开连接");
this.isConnected = false;
}
}
}, 30000);
}
// 健康检查
async ping(): Promise<string> {
return await this.client.ping();
}
// 获取连接状态
getConnectionStatus(): boolean {
return this.isConnected;
}
// 获取原始客户端
getClient(): Redis {
return this.client;
}
// 关闭连接
async close(): Promise<void> {
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
}
await this.client.quit();
}
// 强制重连
async forceReconnect(): Promise<void> {
logger.info("强制重连Redis");
this.reconnectAttempts = 0;
await this.client.disconnect();
await this.client.connect();
}
}
// 创建Redis管理器实例
export const redisManager = new RedisManager();
// 导出便捷方法
export const client = redisManager.getClient();
// 包装Redis操作,添加重试机制
export async function withRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3,
delay: number = 1000
): Promise<T> {
for (let i = 0; i < maxRetries; i++) {
try {
return await operation();
} catch (error: any) {
if (i === maxRetries - 1) {
throw error;
}
logger.warn(`Redis操作失败,第${i + 1}次重试:` + error.message);
await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i)));
}
}
throw new Error("Redis操作重试失败");
}
// 导出便捷的Redis操作方法
export const redisClient = {
// 基本操作
get: (key: string) => withRetry(() => client.get(key)),
set: (key: string, value: string) => withRetry(() => client.set(key, value)),
setex: (key: string, seconds: number, value: string) =>
withRetry(() => client.setex(key, seconds, value)),
del: (key: string) => withRetry(() => client.del(key)),
exists: (key: string) => withRetry(() => client.exists(key)),
expire: (key: string, seconds: number) =>
withRetry(() => client.expire(key, seconds)),
ttl: (key: string) => withRetry(() => client.ttl(key)),
// 批量操作
mset: (keyValues: Record<string, string>) =>
withRetry(() => client.mset(keyValues)),
mget: (keys: string[]) => withRetry(() => client.mget(keys)),
// 列表操作
lpush: (key: string, ...values: string[]) =>
withRetry(() => client.lpush(key, ...values)),
rpush: (key: string, ...values: string[]) =>
withRetry(() => client.rpush(key, ...values)),
lpop: (key: string) => withRetry(() => client.lpop(key)),
rpop: (key: string) => withRetry(() => client.rpop(key)),
lrange: (key: string, start: number, stop: number) =>
withRetry(() => client.lrange(key, start, stop)),
// 哈希操作
hset: (key: string, field: string, value: string) =>
withRetry(() => client.hset(key, field, value)),
hget: (key: string, field: string) =>
withRetry(() => client.hget(key, field)),
hgetall: (key: string) => withRetry(() => client.hgetall(key)),
hdel: (key: string, ...fields: string[]) =>
withRetry(() => client.hdel(key, ...fields)),
// 集合操作
sadd: (key: string, ...members: string[]) =>
withRetry(() => client.sadd(key, ...members)),
srem: (key: string, ...members: string[]) =>
withRetry(() => client.srem(key, ...members)),
smembers: (key: string) => withRetry(() => client.smembers(key)),
sismember: (key: string, member: string) =>
withRetry(() => client.sismember(key, member)),
// 有序集合操作
zadd: (key: string, score: number, member: string) =>
withRetry(() => client.zadd(key, score, member)),
zscore: (key: string, member: string) =>
withRetry(() => client.zscore(key, member)),
zrange: (key: string, start: number, stop: number, withScores?: boolean) =>
withRetry(() =>
withScores
? client.zrange(key, start, stop, "WITHSCORES")
: client.zrange(key, start, stop)
),
// 工具方法
ping: () => withRetry(() => client.ping()),
flushdb: () => withRetry(() => client.flushdb()),
info: () => withRetry(() => client.info()),
// 连接状态
getConnectionStatus: () => redisManager.getConnectionStatus(),
forceReconnect: () => redisManager.forceReconnect(),
close: () => redisManager.close(),
};
bun 内置的插件
我在原基础上进行了封装,添加了重试机制,连接状态管理,健康检查等功能。
// redis 配置
import { redisConfig } from "@/config/redis";
// 日志工具
import { logger } from "@/utils/logger";
// Redis连接状态管理
class RedisManager {
private client: any = null;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private maxReconnectAttempts: number = 10;
private reconnectDelay: number = 1000;
private reconnectTimer: NodeJS.Timeout | null = null;
private healthCheckTimer: NodeJS.Timeout | null = null;
constructor() {
this.initRedis();
this.startHealthCheck();
}
private async initRedis() {
try {
// 动态导入Redis客户端
const { RedisClient } = await import("bun");
this.client = new RedisClient(redisConfig.url);
this.setupEventHandlers();
await this.connect();
} catch (error: any) {
logger.error("Redis初始化失败:" + error.message);
this.scheduleReconnect();
}
}
private setupEventHandlers() {
if (!this.client) return;
this.client.onconnect = () => {
logger.info("Redis 连接成功");
this.isConnected = true;
this.reconnectAttempts = 0;
};
this.client.onclose = () => {
logger.warn("Redis 连接关闭");
this.isConnected = false;
this.scheduleReconnect();
};
this.client.onerror = (error: any) => {
logger.error("Redis 连接错误:" + error.message);
this.isConnected = false;
};
}
private async connect() {
try {
if (this.client && !this.isConnected) {
await this.client.connect();
}
} catch (error: any) {
logger.error("Redis 连接失败:" + error.message);
this.scheduleReconnect();
}
}
private scheduleReconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay =
this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
logger.info(
`Redis 将在 ${delay}ms 后尝试重连 (第 ${this.reconnectAttempts} 次)`
);
this.reconnectTimer = setTimeout(() => {
this.connect();
}, delay);
} else {
logger.error("Redis 重连次数已达上限,停止重连");
}
}
private startHealthCheck() {
// 每30秒检查一次连接状态
this.healthCheckTimer = setInterval(() => {
if (this.isConnected && this.client) {
this.ping().catch(() => {
logger.warn("Redis健康检查失败,标记为断开连接");
this.isConnected = false;
this.scheduleReconnect();
});
}
}, 30000);
}
private async ping(): Promise<string> {
if (!this.client) throw new Error("Redis客户端未初始化");
return await this.client.ping();
}
// 检查连接状态
private checkConnection(): boolean {
if (!this.client || !this.isConnected) {
logger.warn("Redis 未连接,尝试重连");
this.scheduleReconnect();
return false;
}
return true;
}
// 包装Redis操作,添加重试机制
private async executeWithRetry<T>(
operation: () => Promise<T>,
maxRetries: number = 3
): Promise<T> {
for (let i = 0; i < maxRetries; i++) {
try {
if (!this.checkConnection()) {
throw new Error("Redis未连接");
}
return await operation();
} catch (error: any) {
if (i === maxRetries - 1) {
throw error;
}
logger.warn(`Redis操作失败,第${i + 1}次重试:` + error.message);
await new Promise(resolve => setTimeout(resolve, 100 * (i + 1)));
}
}
throw new Error("Redis操作重试失败");
}
// Redis操作方法
async get(key: string): Promise<string | null> {
return this.executeWithRetry(async () => {
return await this.client.get(key);
});
}
async set(key: string, value: string): Promise<string | null> {
return this.executeWithRetry(async () => {
return await this.client.set(key, value);
});
}
async setex(
key: string,
seconds: number,
value: string
): Promise<string | null> {
return this.executeWithRetry(async () => {
return await this.client.setex(key, seconds, value);
});
}
async del(key: string): Promise<number> {
return this.executeWithRetry(async () => {
return await this.client.del(key);
});
}
async exists(key: string): Promise<number> {
return this.executeWithRetry(async () => {
return await this.client.exists(key);
});
}
async expire(key: string, seconds: number): Promise<number> {
return this.executeWithRetry(async () => {
return await this.client.expire(key, seconds);
});
}
async ttl(key: string): Promise<number> {
return this.executeWithRetry(async () => {
return await this.client.ttl(key);
});
}
// 获取连接状态
getConnectionStatus(): boolean {
return this.isConnected;
}
// 手动重连
async forceReconnect(): Promise<void> {
logger.info("强制重连Redis");
this.reconnectAttempts = 0;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
await this.connect();
}
// 关闭连接
async close(): Promise<void> {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
}
if (this.healthCheckTimer) {
clearInterval(this.healthCheckTimer);
}
if (this.client) {
try {
await this.client.quit();
} catch (error: any) {
logger.error("关闭Redis连接时出错:" + error.message);
}
}
}
}
// 创建Redis管理器实例
export const redisManager = new RedisManager();
// 导出便捷方法
export const client = {
get: (key: string) => redisManager.get(key),
set: (key: string, value: string) => redisManager.set(key, value),
setex: (key: string, seconds: number, value: string) =>
redisManager.setex(key, seconds, value),
del: (key: string) => redisManager.del(key),
exists: (key: string) => redisManager.exists(key),
expire: (key: string, seconds: number) => redisManager.expire(key, seconds),
ttl: (key: string) => redisManager.ttl(key),
getConnectionStatus: () => redisManager.getConnectionStatus(),
forceReconnect: () => redisManager.forceReconnect(),
close: () => redisManager.close(),
};