package com.archive.plugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class ArchivePlugin {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private StringRedisTemplate redisTemplate;
private static final int PAGE_SIZE = 1000;
/**
* 模式1: RENAME 归档
* @param tableName 原表名
*/
public void archiveByRename(String tableName) {
String historyTable = tableName + "_history";
log.info("开始 RENAME 归档: {} -> {}", tableName, historyTable);
// 1. 重命名表
jdbcTemplate.execute("RENAME TABLE " + tableName + " TO " + historyTable);
log.info("表已重命名: {} -> {}", tableName, historyTable);
// 2. 根据历史表重建新表
jdbcTemplate.execute("CREATE TABLE " + tableName + " LIKE " + historyTable);
log.info("新表已创建: {}", tableName);
}
/**
* 模式2: 分页迁移归档
* @param tableName 原表名
* @param whereCondition SQL WHERE 条件,如 "created_at < '2024-01-01'"
*/
public void archiveByPagination(String tableName, String whereCondition) {
String historyTable = tableName + "_history";
String cursorKey = "archive:cursor:" + tableName;
log.info("开始分页归档: {} -> {}, 条件: {}", tableName, historyTable, whereCondition);
// 确保历史表存在
ensureHistoryTableExists(tableName, historyTable);
long lastId = getLastId(cursorKey);
int totalArchived = 0;
while (true) {
// 分页查询
String sql = String.format(
"SELECT * FROM %s WHERE id > %d AND (%s) ORDER BY id ASC LIMIT %d",
tableName, lastId, whereCondition, PAGE_SIZE
);
List<Map<String, Object>> rows = jdbcTemplate.queryForList(sql);
if (rows.isEmpty()) {
log.info("归档完成,共归档 {} 条记录", totalArchived);
break;
}
// 事务迁移
int archived = archiveBatch(tableName, historyTable, rows);
totalArchived += archived;
// 更新游标
lastId = (Long) rows.get(rows.size() - 1).get("id");
saveLastId(cursorKey, lastId);
log.info("已归档 {} 条,当前游标 lastId: {}", totalArchived, lastId);
}
// 清除游标
redisTemplate.delete(cursorKey);
}
/**
* 批量归档(事务控制)
*/
@Transactional(rollbackFor = Exception.class)
public int archiveBatch(String tableName, String historyTable, List<Map<String, Object>> rows) {
for (Map<String, Object> row : rows) {
// 插入历史表
insertIntoHistory(historyTable, row);
// 删除源表
Long id = (Long) row.get("id");
jdbcTemplate.update("DELETE FROM " + tableName + " WHERE id = ?", id);
}
return rows.size();
}
/**
* 插入历史表
*/
private void insertIntoHistory(String historyTable, Map<String, Object> row) {
StringBuilder columns = new StringBuilder();
StringBuilder values = new StringBuilder();
Object[] params = new Object[row.size()];
int i = 0;
for (Map.Entry<String, Object> entry : row.entrySet()) {
if (i > 0) {
columns.append(", ");
values.append(", ");
}
columns.append(entry.getKey());
values.append("?");
params[i++] = entry.getValue();
}
String sql = String.format("INSERT INTO %s (%s) VALUES (%s)",
historyTable, columns, values);
jdbcTemplate.update(sql, params);
}
/**
* 确保历史表存在
*/
private void ensureHistoryTableExists(String tableName, String historyTable) {
String checkSql = "SHOW TABLES LIKE '" + historyTable + "'";
List<Map<String, Object>> result = jdbcTemplate.queryForList(checkSql);
if (result.isEmpty()) {
jdbcTemplate.execute("CREATE TABLE " + historyTable + " LIKE " + tableName);
log.info("历史表已创建: {}", historyTable);
}
}
/**
* 获取游标
*/
private long getLastId(String key) {
String val = redisTemplate.opsForValue().get(key);
return val == null ? 0L : Long.parseLong(val);
}
/**
* 保存游标
*/
private void saveLastId(String key, long lastId) {
redisTemplate.opsForValue().set(key, String.valueOf(lastId));
}
}