在AI模型生产平台中,文件存储场景包含图片、模型文件(.pt/.h5)、训练数据、用户数据集(CSV/ZIP)、标注数据集(COCO格式)等多样化类型。其中模型文件平均大小达20-50GB,原始数据集经压缩后普遍超过100GB。传统单文件上传方案存在以下痛点:
大文件分片上传技术流程
关键技术特征
def dynamic_chunk_size(bandwidth):
if bandwidth < 10: return 5*1024*1024
elif 10 <= bandwidth < 100: return 20*1024*1024
else: return 200*1024*1024
// 前端切割文件
const sliceFile = async (file) => {
const chunkSize = file.size > 1024 * 1024 * 1024 ? 200 * 1024 * 1024 : 5 * 1024 * 1024;
let offset = 0;
while (offset < file.size) {
const chunk = file.slice(offset, offset + chunkSize);
chunks.push({
index: Math.floor(offset / chunkSize),
hash: await calculateMD5(chunk)
});
offset += chunkSize;
}
};
CREATE TABLE upload_task (
task_id VARCHAR(64) PRIMARY KEY COMMENT '任务唯一标识(UUIDv4)',
file_hash CHAR(40) NOT NULL COMMENT '文件SHA1指纹(全大写HEX字符串)',
total_chunks INT NOT NULL COMMENT '总分片数量(1-65535)',
completed_chunks LONGTEXT NOT NULL COMMENT '已上传分片序号数组(JSON数组格式,如[1,2,3])',
status ENUM('PENDING','UPLOADING','MERGING','COMPLETED') NOT NULL DEFAULT 'PENDING' COMMENT '任务状态:待处理/上传中/合并中/已完成',
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '任务创建时间(带毫秒)',
updated_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '最后更新时间',
expire_time DATETIME COMMENT '任务过期时间(默认创建后7天自动清理)',
retry_count TINYINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '合并失败重试次数(0-255)',
storage_path VARCHAR(512) COMMENT '最终存储路径(合并完成后更新)',
INDEX idx_file_hash (file_hash) USING BTREE COMMENT '文件哈希索引',
INDEX idx_status (status) USING BTREE COMMENT '状态索引',
INDEX idx_expire (expire_time) USING BTREE COMMENT '过期时间索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
COMMENT='文件分片任务元数据表'
-- 查询待合并任务 有分片上传完成时执行
SELECT * FROM upload_task
WHERE status = 'UPLOADING' and task_id = #{taskId}
AND JSON_LENGTH(completed_chunks) = total_chunks
AND created_at >= DATE_SUB(NOW(), INTERVAL 2 HOUR);
@PostMapping("/upload-chunk")
public ResponseEntity<?> uploadChunk(
@RequestParam String fileHash,
@RequestParam int chunkNumber,
@RequestParam MultipartFile chunk) {
// 验证分片是否已存在(幂等设计)
if (storageService.isChunkExists(fileHash, chunkNumber)) {
return ResponseEntity.ok().build();
}
// 存储分片到MinIO临时目录
String path = String.format("/temp/%s/%d", fileHash, chunkNumber);
storageService.upload(path, chunk.getInputStream());
// 更新任务进度
taskService.updateChunkStatus(fileHash, chunkNumber);
return ResponseEntity.ok().build();
}
public void mergeFile(String fileHash) throws IOException {
List<String> chunkPaths = getSortedChunkPaths(fileHash);
ForkJoinPool customPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
customPool.submit(() ->
IntStream.range(0, chunkPaths.size()).parallel().forEach(i -> {
String chunkPath = chunkPaths.get(i);
long position = offsets.get(i);
try (FileChannel source = FileChannel.open(Paths.get(chunkPath), StandardOpenOption.READ);
FileChannel dest = FileChannel.open(finalPath, StandardOpenOption.WRITE)) {
source.transferTo(0, source.size(), dest.position(position));
} catch (IOException e) {
throw new RuntimeException(e);
}
})
).get();
// 生成最终文件的MD5校验码
String mergedHash = DigestUtils.md5Hex(Files.newInputStream(finalPath));
if (!mergedHash.equals(fileHash)) {
throw new RuntimeException("File integrity check failed!");
}
}
// 未优化的浏览器端哈希计算(可能阻塞主线程)
async function calculateHash(chunk) {
return crypto.subtle.digest('SHA-256', chunk);
}
本文系作者在时代Java发表,未经许可,不得转载。
如有侵权,请联系nowjava@qq.com删除。