迁移播放记录到新数据结构,新增自动清理播放记录

This commit is contained in:
mtvpls
2025-12-30 21:00:10 +08:00
parent 6ad268d319
commit f43d9c7d0e
7 changed files with 381 additions and 29 deletions

View File

@@ -201,6 +201,13 @@ export async function POST(req: NextRequest) {
}
}
}
// 标记播放记录已迁移新导入的数据直接使用hash结构
const storage = (db as any).storage;
if (storage && typeof storage.client?.hSet === 'function') {
const userInfoKey = `user:${username}:info`;
await storage.client.hSet(userInfoKey, 'playrecord_migrated', 'true');
}
}
return NextResponse.json({

View File

@@ -26,6 +26,12 @@ export async function GET(request: NextRequest) {
if (userInfoV2.banned) {
return NextResponse.json({ error: '用户已被封禁' }, { status: 401 });
}
// 检查播放记录迁移标识,没有迁移标识时执行迁移
if (!userInfoV2.playrecord_migrated) {
console.log(`用户 ${authInfo.username} 播放记录未迁移,开始执行迁移...`);
await db.migratePlayRecords(authInfo.username);
}
}
const records = await db.getAllPlayRecords(authInfo.username);
@@ -92,6 +98,11 @@ export async function POST(request: NextRequest) {
await db.savePlayRecord(authInfo.username, source, id, finalRecord);
// 异步清理旧的播放记录(不阻塞响应)
(db as any).storage.cleanupOldPlayRecords(authInfo.username).catch((err: Error) => {
console.error('异步清理播放记录失败:', err);
});
return NextResponse.json({ success: true }, { status: 200 });
} catch (err) {
console.error('保存播放记录失败', err);

View File

@@ -182,6 +182,7 @@ export class DbManager {
oidcSub?: string;
enabledApis?: string[];
created_at: number;
playrecord_migrated?: boolean;
} | null> {
if (typeof (this.storage as any).getUserInfoV2 === 'function') {
return (this.storage as any).getUserInfoV2(userName);
@@ -259,6 +260,13 @@ export class DbManager {
return [];
}
// ---------- 播放记录迁移 ----------
async migratePlayRecords(userName: string): Promise<void> {
if (typeof (this.storage as any).migratePlayRecords === 'function') {
await (this.storage as any).migratePlayRecords(userName);
}
}
// ---------- 数据迁移 ----------
async migrateUsersFromConfig(adminConfig: AdminConfig): Promise<void> {
if (typeof (this.storage as any).createUserV2 !== 'function') {

View File

@@ -17,6 +17,9 @@ function ensureStringArray(value: any[]): string[] {
return value.map((item) => String(item));
}
// 内存锁:用于防止同一用户的并发播放记录操作(迁移、清理等)
const playRecordLocks = new Map<string, Promise<void>>();
// 连接配置接口
export interface RedisConnectionConfig {
url: string;
@@ -151,7 +154,12 @@ export abstract class BaseRedisStorage implements IStorage {
}
// ---------- 播放记录 ----------
private prKey(user: string, key: string) {
private prHashKey(user: string) {
return `u:${user}:pr`; // u:username:pr (hash结构)
}
// 旧版播放记录key用于迁移
private prOldKey(user: string, key: string) {
return `u:${user}:pr:${key}`; // u:username:pr:source+id
}
@@ -160,7 +168,7 @@ export abstract class BaseRedisStorage implements IStorage {
key: string
): Promise<PlayRecord | null> {
const val = await this.withRetry(() =>
this.client.get(this.prKey(userName, key))
this.client.hGet(this.prHashKey(userName), key)
);
return val ? (JSON.parse(val) as PlayRecord) : null;
}
@@ -171,32 +179,178 @@ export abstract class BaseRedisStorage implements IStorage {
record: PlayRecord
): Promise<void> {
await this.withRetry(() =>
this.client.set(this.prKey(userName, key), JSON.stringify(record))
this.client.hSet(this.prHashKey(userName), key, JSON.stringify(record))
);
}
async getAllPlayRecords(
userName: string
): Promise<Record<string, PlayRecord>> {
const pattern = `u:${userName}:pr:*`;
const keys: string[] = await this.withRetry(() => this.client.keys(pattern));
if (keys.length === 0) return {};
const values = await this.withRetry(() => this.client.mGet(keys));
const hashData = await this.withRetry(() =>
this.client.hGetAll(this.prHashKey(userName))
);
const result: Record<string, PlayRecord> = {};
keys.forEach((fullKey: string, idx: number) => {
const raw = values[idx];
if (raw) {
const rec = JSON.parse(raw) as PlayRecord;
// 截取 source+id 部分
const keyPart = ensureString(fullKey.replace(`u:${userName}:pr:`, ''));
result[keyPart] = rec;
for (const [key, value] of Object.entries(hashData)) {
if (value) {
result[key] = JSON.parse(value) as PlayRecord;
}
});
}
return result;
}
async deletePlayRecord(userName: string, key: string): Promise<void> {
await this.withRetry(() => this.client.del(this.prKey(userName, key)));
await this.withRetry(() => this.client.hDel(this.prHashKey(userName), key));
}
// 清理超出限制的旧播放记录
async cleanupOldPlayRecords(userName: string): Promise<void> {
// 检查是否已有正在进行的操作
const existingLock = playRecordLocks.get(userName);
if (existingLock) {
console.log(`用户 ${userName} 的播放记录操作正在进行中,跳过清理`);
await existingLock;
return;
}
// 创建新的操作Promise
const cleanupPromise = this.doCleanup(userName);
playRecordLocks.set(userName, cleanupPromise);
try {
await cleanupPromise;
} finally {
// 操作完成后清除锁
playRecordLocks.delete(userName);
}
}
// 实际执行清理的方法
private async doCleanup(userName: string): Promise<void> {
try {
// 获取配置的最大播放记录数默认100
const maxRecords = parseInt(process.env.MAX_PLAY_RECORDS_PER_USER || '100', 10);
const threshold = maxRecords + 10; // 超过最大值+10时才触发清理
// 获取所有播放记录
const allRecords = await this.getAllPlayRecords(userName);
const recordCount = Object.keys(allRecords).length;
// 如果记录数未超过阈值,不需要清理
if (recordCount <= threshold) {
return;
}
console.log(`用户 ${userName} 的播放记录数 ${recordCount} 超过阈值 ${threshold},开始清理...`);
// 将记录转换为数组并按 save_time 排序(从旧到新)
const sortedRecords = Object.entries(allRecords).sort(
([, a], [, b]) => a.save_time - b.save_time
);
// 计算需要删除的记录数
const deleteCount = recordCount - maxRecords;
// 删除最旧的记录
const recordsToDelete = sortedRecords.slice(0, deleteCount);
for (const [key] of recordsToDelete) {
await this.deletePlayRecord(userName, key);
}
console.log(`已删除用户 ${userName}${deleteCount} 条最旧播放记录`);
} catch (error) {
console.error(`清理用户 ${userName} 播放记录失败:`, error);
// 清理失败不影响主流程,只记录错误
}
}
// 迁移播放记录从旧的多key结构迁移到新的hash结构
async migratePlayRecords(userName: string): Promise<void> {
// 检查是否已有正在进行的迁移
const existingMigration = migrationLocks.get(userName);
if (existingMigration) {
console.log(`用户 ${userName} 的播放记录正在迁移中,等待完成...`);
await existingMigration;
return;
}
// 创建新的迁移Promise
const migrationPromise = this.doMigration(userName);
migrationLocks.set(userName, migrationPromise);
try {
await migrationPromise;
} finally {
// 迁移完成后清除锁
migrationLocks.delete(userName);
}
}
// 实际执行迁移的方法
private async doMigration(userName: string): Promise<void> {
console.log(`开始迁移用户 ${userName} 的播放记录...`);
// 1. 检查是否已经迁移过
const userInfo = await this.getUserInfoV2(userName);
if (userInfo?.playrecord_migrated) {
console.log(`用户 ${userName} 的播放记录已经迁移过,跳过`);
return;
}
// 2. 获取旧结构的所有播放记录key
const pattern = `u:${userName}:pr:*`;
const oldKeys: string[] = await this.withRetry(() => this.client.keys(pattern));
if (oldKeys.length === 0) {
console.log(`用户 ${userName} 没有旧的播放记录,标记为已迁移`);
// 即使没有数据也标记为已迁移
await this.withRetry(() =>
this.client.hSet(this.userInfoKey(userName), 'playrecord_migrated', 'true')
);
// 清除用户信息缓存
const { userInfoCache } = await import('./user-cache');
userInfoCache?.delete(userName);
return;
}
console.log(`找到 ${oldKeys.length} 条旧播放记录,开始迁移...`);
// 3. 批量获取旧数据
const oldValues = await this.withRetry(() => this.client.mGet(oldKeys));
// 4. 转换为hash格式
const hashData: Record<string, string> = {};
oldKeys.forEach((fullKey: string, idx: number) => {
const raw = oldValues[idx];
if (raw) {
// 提取 source+id 部分作为hash的field
const keyPart = ensureString(fullKey.replace(`u:${userName}:pr:`, ''));
hashData[keyPart] = raw;
}
});
// 5. 写入新的hash结构
if (Object.keys(hashData).length > 0) {
await this.withRetry(() =>
this.client.hSet(this.prHashKey(userName), hashData)
);
console.log(`成功迁移 ${Object.keys(hashData).length} 条播放记录到hash结构`);
}
// 6. 删除旧的key
await this.withRetry(() => this.client.del(oldKeys));
console.log(`删除了 ${oldKeys.length} 个旧的播放记录key`);
// 7. 标记迁移完成
await this.withRetry(() =>
this.client.hSet(this.userInfoKey(userName), 'playrecord_migrated', 'true')
);
// 8. 清除用户信息缓存,确保下次获取时能读取到最新的迁移标识
const { userInfoCache } = await import('./user-cache');
userInfoCache?.delete(userName);
console.log(`用户 ${userName} 的播放记录迁移完成`);
}
// ---------- 收藏 ----------
@@ -286,7 +440,10 @@ export abstract class BaseRedisStorage implements IStorage {
// 删除搜索历史
await this.withRetry(() => this.client.del(this.shKey(userName)));
// 删除播放记录
// 删除播放记录新hash结构
await this.withRetry(() => this.client.del(this.prHashKey(userName)));
// 删除旧的播放记录key如果有
const playRecordPattern = `u:${userName}:pr:*`;
const playRecordKeys = await this.withRetry(() =>
this.client.keys(playRecordPattern)
@@ -407,6 +564,7 @@ export abstract class BaseRedisStorage implements IStorage {
oidcSub?: string;
enabledApis?: string[];
created_at: number;
playrecord_migrated?: boolean;
} | null> {
const userInfo = await this.withRetry(() =>
this.client.hGetAll(this.userInfoKey(userName))
@@ -423,6 +581,7 @@ export abstract class BaseRedisStorage implements IStorage {
oidcSub: userInfo.oidcSub,
enabledApis: userInfo.enabledApis ? JSON.parse(userInfo.enabledApis) : undefined,
created_at: parseInt(userInfo.created_at || '0', 10),
playrecord_migrated: userInfo.playrecord_migrated === 'true',
};
}

View File

@@ -39,6 +39,8 @@ export interface IStorage {
): Promise<void>;
getAllPlayRecords(userName: string): Promise<{ [key: string]: PlayRecord }>;
deletePlayRecord(userName: string, key: string): Promise<void>;
// 清理超出限制的旧播放记录
cleanupOldPlayRecords(userName: string): Promise<void>;
// 收藏相关
getFavorite(userName: string, key: string): Promise<Favorite | null>;

View File

@@ -18,6 +18,9 @@ function ensureStringArray(value: any[]): string[] {
return value.map((item) => String(item));
}
// 内存锁:用于防止同一用户的并发播放记录操作(迁移、清理等)
const playRecordLocks = new Map<string, Promise<void>>();
// 添加Upstash Redis操作重试包装器
async function withRetry<T>(
operation: () => Promise<T>,
@@ -62,7 +65,12 @@ export class UpstashRedisStorage implements IStorage {
}
// ---------- 播放记录 ----------
private prKey(user: string, key: string) {
private prHashKey(user: string) {
return `u:${user}:pr`; // u:username:pr (hash结构)
}
// 旧版播放记录key用于迁移
private prOldKey(user: string, key: string) {
return `u:${user}:pr:${key}`; // u:username:pr:source+id
}
@@ -71,7 +79,7 @@ export class UpstashRedisStorage implements IStorage {
key: string
): Promise<PlayRecord | null> {
const val = await withRetry(() =>
this.client.get(this.prKey(userName, key))
this.client.hget(this.prHashKey(userName), key)
);
return val ? (val as PlayRecord) : null;
}
@@ -81,30 +89,174 @@ export class UpstashRedisStorage implements IStorage {
key: string,
record: PlayRecord
): Promise<void> {
await withRetry(() => this.client.set(this.prKey(userName, key), record));
await withRetry(() => this.client.hset(this.prHashKey(userName), { [key]: record }));
}
async getAllPlayRecords(
userName: string
): Promise<Record<string, PlayRecord>> {
const pattern = `u:${userName}:pr:*`;
const keys: string[] = await withRetry(() => this.client.keys(pattern));
if (keys.length === 0) return {};
const hashData = await withRetry(() =>
this.client.hgetall(this.prHashKey(userName))
);
if (!hashData || Object.keys(hashData).length === 0) return {};
const result: Record<string, PlayRecord> = {};
for (const fullKey of keys) {
const value = await withRetry(() => this.client.get(fullKey));
for (const [key, value] of Object.entries(hashData)) {
if (value) {
// 截取 source+id 部分
const keyPart = ensureString(fullKey.replace(`u:${userName}:pr:`, ''));
result[keyPart] = value as PlayRecord;
result[key] = value as PlayRecord;
}
}
return result;
}
async deletePlayRecord(userName: string, key: string): Promise<void> {
await withRetry(() => this.client.del(this.prKey(userName, key)));
await withRetry(() => this.client.hdel(this.prHashKey(userName), key));
}
// 清理超出限制的旧播放记录
async cleanupOldPlayRecords(userName: string): Promise<void> {
// 检查是否已有正在进行的操作
const existingLock = playRecordLocks.get(userName);
if (existingLock) {
console.log(`用户 ${userName} 的播放记录操作正在进行中,跳过清理`);
await existingLock;
return;
}
// 创建新的操作Promise
const cleanupPromise = this.doCleanup(userName);
playRecordLocks.set(userName, cleanupPromise);
try {
await cleanupPromise;
} finally {
// 操作完成后清除锁
playRecordLocks.delete(userName);
}
}
// 实际执行清理的方法
private async doCleanup(userName: string): Promise<void> {
try {
// 获取配置的最大播放记录数默认100
const maxRecords = parseInt(process.env.MAX_PLAY_RECORDS_PER_USER || '100', 10);
const threshold = maxRecords + 10; // 超过最大值+10时才触发清理
// 获取所有播放记录
const allRecords = await this.getAllPlayRecords(userName);
const recordCount = Object.keys(allRecords).length;
// 如果记录数未超过阈值,不需要清理
if (recordCount <= threshold) {
return;
}
console.log(`用户 ${userName} 的播放记录数 ${recordCount} 超过阈值 ${threshold},开始清理...`);
// 将记录转换为数组并按 save_time 排序(从旧到新)
const sortedRecords = Object.entries(allRecords).sort(
([, a], [, b]) => a.save_time - b.save_time
);
// 计算需要删除的记录数
const deleteCount = recordCount - maxRecords;
// 删除最旧的记录
const recordsToDelete = sortedRecords.slice(0, deleteCount);
for (const [key] of recordsToDelete) {
await this.deletePlayRecord(userName, key);
}
console.log(`已删除用户 ${userName}${deleteCount} 条最旧播放记录`);
} catch (error) {
console.error(`清理用户 ${userName} 播放记录失败:`, error);
// 清理失败不影响主流程,只记录错误
}
}
// 迁移播放记录从旧的多key结构迁移到新的hash结构
async migratePlayRecords(userName: string): Promise<void> {
// 检查是否已有正在进行的迁移
const existingMigration = playRecordLocks.get(userName);
if (existingMigration) {
console.log(`用户 ${userName} 的播放记录正在迁移中,等待完成...`);
await existingMigration;
return;
}
// 创建新的迁移Promise
const migrationPromise = this.doMigration(userName);
playRecordLocks.set(userName, migrationPromise);
try {
await migrationPromise;
} finally {
// 迁移完成后清除锁
playRecordLocks.delete(userName);
}
}
// 实际执行迁移的方法
private async doMigration(userName: string): Promise<void> {
console.log(`开始迁移用户 ${userName} 的播放记录...`);
// 1. 检查是否已经迁移过
const userInfo = await this.getUserInfoV2(userName);
if (userInfo?.playrecord_migrated) {
console.log(`用户 ${userName} 的播放记录已经迁移过,跳过`);
return;
}
// 2. 获取旧结构的所有播放记录key
const pattern = `u:${userName}:pr:*`;
const oldKeys: string[] = await withRetry(() => this.client.keys(pattern));
if (oldKeys.length === 0) {
console.log(`用户 ${userName} 没有旧的播放记录,标记为已迁移`);
// 即使没有数据也标记为已迁移
await withRetry(() =>
this.client.hset(this.userInfoKey(userName), { playrecord_migrated: true })
);
// 清除用户信息缓存
userInfoCache?.delete(userName);
return;
}
console.log(`找到 ${oldKeys.length} 条旧播放记录,开始迁移...`);
// 3. 批量获取旧数据并转换为hash格式
const hashData: Record<string, any> = {};
for (const fullKey of oldKeys) {
const value = await withRetry(() => this.client.get(fullKey));
if (value) {
// 提取 source+id 部分作为hash的field
const keyPart = ensureString(fullKey.replace(`u:${userName}:pr:`, ''));
hashData[keyPart] = value;
}
}
// 4. 写入新的hash结构
if (Object.keys(hashData).length > 0) {
await withRetry(() =>
this.client.hset(this.prHashKey(userName), hashData)
);
console.log(`成功迁移 ${Object.keys(hashData).length} 条播放记录到hash结构`);
}
// 5. 删除旧的key
await withRetry(() => this.client.del(...oldKeys));
console.log(`删除了 ${oldKeys.length} 个旧的播放记录key`);
// 6. 标记迁移完成
await withRetry(() =>
this.client.hset(this.userInfoKey(userName), { playrecord_migrated: true })
);
// 7. 清除用户信息缓存,确保下次获取时能读取到最新的迁移标识
userInfoCache?.delete(userName);
console.log(`用户 ${userName} 的播放记录迁移完成`);
}
// ---------- 收藏 ----------
@@ -322,6 +474,7 @@ export class UpstashRedisStorage implements IStorage {
oidcSub?: string;
enabledApis?: string[];
created_at: number;
playrecord_migrated?: boolean;
} | null> {
// 先从缓存获取
const cached = userInfoCache?.get(userName);
@@ -345,6 +498,16 @@ export class UpstashRedisStorage implements IStorage {
banned = userInfo.banned === 'true';
}
// 处理 playrecord_migrated 字段
let playrecord_migrated: boolean | undefined = undefined;
if (userInfo.playrecord_migrated !== undefined) {
if (typeof userInfo.playrecord_migrated === 'boolean') {
playrecord_migrated = userInfo.playrecord_migrated;
} else if (typeof userInfo.playrecord_migrated === 'string') {
playrecord_migrated = userInfo.playrecord_migrated === 'true';
}
}
// 安全解析 tags 字段
let tags: string[] | undefined = undefined;
if (userInfo.tags) {
@@ -382,6 +545,7 @@ export class UpstashRedisStorage implements IStorage {
oidcSub: userInfo.oidcSub as string | undefined,
enabledApis,
created_at: parseInt((userInfo.created_at as string) || '0', 10),
playrecord_migrated,
};
// 存入缓存

View File

@@ -8,6 +8,7 @@ interface CachedUserInfo {
oidcSub?: string;
enabledApis?: string[];
created_at: number;
playrecord_migrated?: boolean;
cachedAt: number;
}