深度解析

前端多线程实现大文件上传:从分片到秒传的完整方案

深入探索 Web Worker 多线程技术,实现高性能大文件分片上传、断点续传与秒传功能

2024年9月22日20 分钟734 阅读
Web Worker文件上传性能优化多线程
前端多线程实现大文件上传:从分片到秒传的完整方案

前端多线程实现大文件上传

在现代 Web 应用中,大文件上传是一个常见且复杂的需求。传统的单线程上传方式在处理 GB 级别文件时会导致页面卡顿、计算 Hash 耗时过长等问题。本文将深入探讨如何利用 Web Worker 多线程技术,实现高性能的大文件上传方案。

核心挑战与解决思路

大文件上传面临的核心挑战:

┌─────────────────────────────────────────────────────────────┐
│                    大文件上传核心挑战                         │
├─────────────────────────────────────────────────────────────┤
│  1. 文件过大 ──────────► 网络超时、内存溢出                    │
│  2. Hash 计算 ─────────► 主线程阻塞、页面卡顿                  │
│  3. 上传中断 ─────────► 需要重新上传、用户体验差                │
│  4. 重复上传 ─────────► 带宽浪费、服务器存储冗余                │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│                      解决方案架构                            │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐             │
│   │ 文件分片  │───►│ 多线程Hash│───►│ 并发上传  │             │
│   └──────────┘    └──────────┘    └──────────┘             │
│        │               │               │                    │
│        ▼               ▼               ▼                    │
│   解决内存问题     解决卡顿问题     提升上传速度               │
│                                                             │
│   ┌──────────┐    ┌──────────┐                             │
│   │ 断点续传  │───►│  秒传    │                              │
│   └──────────┘    └──────────┘                             │
│        │               │                                    │
│        ▼               ▼                                    │
│   解决中断问题     解决重复问题                               │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Web Worker 多线程基础

为什么需要 Web Worker

JavaScript 是单线程语言,计算密集型任务会阻塞主线程:

typescript
// ❌ 主线程计算 Hash - 页面卡死
async function calculateHashInMainThread(file: File): Promise<string> {
  const buffer = await file.arrayBuffer()
  const hashBuffer = await crypto.subtle.digest('SHA-256', buffer)
  // 大文件计算期间,页面完全无响应
  return bufferToHex(hashBuffer)
}

// ✅ Web Worker 计算 Hash - 页面流畅
function calculateHashWithWorker(file: File): Promise<string> {
  return new Promise((resolve, reject) => {
    const worker = new Worker('/workers/hash-worker.js')
    worker.postMessage(file)
    worker.onmessage = (e) => {
      resolve(e.data.hash)
      worker.terminate()
    }
    worker.onerror = reject
  })
}

Web Worker 通信机制

typescript
// 主线程与 Worker 的通信流程
interface WorkerMessage<T = any> {
  type: 'START' | 'PROGRESS' | 'COMPLETE' | 'ERROR'
  payload: T
}

// 主线程
class WorkerManager {
  private worker: Worker
  private callbacks: Map<string, Function> = new Map()

  constructor(workerPath: string) {
    this.worker = new Worker(workerPath)
    this.setupListeners()
  }

  private setupListeners() {
    this.worker.onmessage = (e: MessageEvent<WorkerMessage>) => {
      const { type, payload } = e.data
      const callback = this.callbacks.get(type)
      callback?.(payload)
    }

    this.worker.onerror = (error) => {
      console.error('Worker error:', error)
      const errorCallback = this.callbacks.get('ERROR')
      errorCallback?.(error)
    }
  }

  send<T>(data: T, transfer?: Transferable[]) {
    this.worker.postMessage(data, transfer || [])
  }

  on(type: string, callback: Function) {
    this.callbacks.set(type, callback)
  }

  terminate() {
    this.worker.terminate()
  }
}

文件分片策略

动态分片算法

根据文件大小动态调整分片策略:

typescript
interface ChunkConfig {
  chunkSize: number      // 分片大小
  chunkCount: number     // 分片数量
  parallelLimit: number  // 并发数量
}

function calculateChunkConfig(fileSize: number): ChunkConfig {
  // 动态分片策略
  const strategies = [
    { maxSize: 100 * 1024 * 1024, chunkSize: 2 * 1024 * 1024, parallel: 3 },   // < 100MB
    { maxSize: 500 * 1024 * 1024, chunkSize: 5 * 1024 * 1024, parallel: 5 },   // < 500MB
    { maxSize: 1024 * 1024 * 1024, chunkSize: 10 * 1024 * 1024, parallel: 6 }, // < 1GB
    { maxSize: Infinity, chunkSize: 20 * 1024 * 1024, parallel: 8 }            // >= 1GB
  ]

  const strategy = strategies.find(s => fileSize <= s.maxSize)!
  
  return {
    chunkSize: strategy.chunkSize,
    chunkCount: Math.ceil(fileSize / strategy.chunkSize),
    parallelLimit: strategy.parallel
  }
}

// 文件分片生成器
function* createFileChunks(
  file: File, 
  chunkSize: number
): Generator<{ chunk: Blob; index: number; start: number; end: number }> {
  let index = 0
  let start = 0

  while (start < file.size) {
    const end = Math.min(start + chunkSize, file.size)
    const chunk = file.slice(start, end)
    
    yield { chunk, index, start, end }
    
    index++
    start = end
  }
}

分片数据结构

typescript
interface FileChunk {
  index: number          // 分片索引
  chunk: Blob           // 分片数据
  hash: string          // 分片 Hash
  size: number          // 分片大小
  start: number         // 起始位置
  end: number           // 结束位置
  status: ChunkStatus   // 上传状态
  retries: number       // 重试次数
  progress: number      // 上传进度
}

enum ChunkStatus {
  PENDING = 'pending',
  UPLOADING = 'uploading',
  SUCCESS = 'success',
  ERROR = 'error',
  PAUSED = 'paused'
}

interface UploadFile {
  file: File
  fileHash: string           // 整文件 Hash
  fileName: string
  fileSize: number
  chunks: FileChunk[]
  status: 'pending' | 'uploading' | 'paused' | 'success' | 'error'
  progress: number
  uploadedSize: number
  startTime: number
  speed: number              // 上传速度 bytes/s
}

多线程 Hash 计算

Hash Worker 实现

使用 spark-md5 库在 Worker 中增量计算大文件 Hash:

typescript
// workers/hash-worker.ts
import SparkMD5 from 'spark-md5'

interface HashTask {
  file: File
  chunkSize: number
}

interface HashProgress {
  type: 'PROGRESS' | 'COMPLETE' | 'ERROR'
  progress?: number
  hash?: string
  chunkHashes?: string[]
  error?: string
}

self.onmessage = async (e: MessageEvent<HashTask>) => {
  const { file, chunkSize } = e.data
  
  try {
    const result = await calculateFileHash(file, chunkSize)
    self.postMessage({
      type: 'COMPLETE',
      hash: result.fileHash,
      chunkHashes: result.chunkHashes
    } as HashProgress)
  } catch (error) {
    self.postMessage({
      type: 'ERROR',
      error: (error as Error).message
    } as HashProgress)
  }
}

async function calculateFileHash(
  file: File, 
  chunkSize: number
): Promise<{ fileHash: string; chunkHashes: string[] }> {
  const spark = new SparkMD5.ArrayBuffer()
  const chunkHashes: string[] = []
  const totalChunks = Math.ceil(file.size / chunkSize)
  
  let currentChunk = 0

  while (currentChunk < totalChunks) {
    const start = currentChunk * chunkSize
    const end = Math.min(start + chunkSize, file.size)
    const chunk = file.slice(start, end)
    
    const buffer = await chunk.arrayBuffer()
    
    // 计算整文件 Hash
    spark.append(buffer)
    
    // 计算分片 Hash
    const chunkSpark = new SparkMD5.ArrayBuffer()
    chunkSpark.append(buffer)
    chunkHashes.push(chunkSpark.end())
    
    currentChunk++
    
    // 报告进度
    self.postMessage({
      type: 'PROGRESS',
      progress: Math.round((currentChunk / totalChunks) * 100)
    } as HashProgress)
  }

  return {
    fileHash: spark.end(),
    chunkHashes
  }
}

多 Worker 并行计算

对于超大文件,使用多个 Worker 并行计算:

typescript
class ParallelHashCalculator {
  private workerCount: number
  private workers: Worker[] = []

  constructor(workerCount = navigator.hardwareConcurrency || 4) {
    this.workerCount = Math.min(workerCount, 8)
  }

  async calculate(file: File, chunkSize: number): Promise<string[]> {
    const totalChunks = Math.ceil(file.size / chunkSize)
    const chunksPerWorker = Math.ceil(totalChunks / this.workerCount)
    
    const tasks: Promise<string[]>[] = []

    for (let i = 0; i < this.workerCount; i++) {
      const startChunk = i * chunksPerWorker
      const endChunk = Math.min(startChunk + chunksPerWorker, totalChunks)
      
      if (startChunk >= totalChunks) break
      
      tasks.push(this.calculateChunkRange(file, chunkSize, startChunk, endChunk))
    }

    const results = await Promise.all(tasks)
    return results.flat()
  }

  private calculateChunkRange(
    file: File,
    chunkSize: number,
    startChunk: number,
    endChunk: number
  ): Promise<string[]> {
    return new Promise((resolve, reject) => {
      const worker = new Worker('/workers/chunk-hash-worker.js')
      this.workers.push(worker)

      worker.postMessage({
        file,
        chunkSize,
        startChunk,
        endChunk
      })

      worker.onmessage = (e) => {
        if (e.data.type === 'COMPLETE') {
          resolve(e.data.hashes)
          worker.terminate()
        }
      }

      worker.onerror = (error) => {
        reject(error)
        worker.terminate()
      }
    })
  }

  terminate() {
    this.workers.forEach(w => w.terminate())
    this.workers = []
  }
}

并发上传控制

请求并发池

typescript
interface UploadTask {
  chunk: FileChunk
  fileHash: string
  fileName: string
  totalChunks: number
}

class ConcurrencyPool {
  private maxConcurrency: number
  private currentCount = 0
  private queue: Array<() => Promise<void>> = []
  private paused = false

  constructor(maxConcurrency: number) {
    this.maxConcurrency = maxConcurrency
  }

  async add<T>(task: () => Promise<T>): Promise<T> {
    return new Promise((resolve, reject) => {
      const wrappedTask = async () => {
        try {
          const result = await task()
          resolve(result)
        } catch (error) {
          reject(error)
        } finally {
          this.currentCount--
          this.runNext()
        }
      }

      if (this.paused) {
        this.queue.push(wrappedTask)
        return
      }

      if (this.currentCount < this.maxConcurrency) {
        this.currentCount++
        wrappedTask()
      } else {
        this.queue.push(wrappedTask)
      }
    })
  }

  private runNext() {
    if (this.paused || this.queue.length === 0) return
    if (this.currentCount >= this.maxConcurrency) return

    const task = this.queue.shift()
    if (task) {
      this.currentCount++
      task()
    }
  }

  pause() {
    this.paused = true
  }

  resume() {
    this.paused = false
    while (
      this.currentCount < this.maxConcurrency && 
      this.queue.length > 0
    ) {
      this.runNext()
    }
  }

  clear() {
    this.queue = []
  }

  get pending() {
    return this.queue.length
  }

  get running() {
    return this.currentCount
  }
}

分片上传请求

typescript
interface UploadChunkResponse {
  success: boolean
  chunkIndex: number
  message?: string
}

async function uploadChunk(
  task: UploadTask,
  onProgress: (loaded: number) => void,
  signal?: AbortSignal
): Promise<UploadChunkResponse> {
  const formData = new FormData()
  formData.append('file', task.chunk.chunk)
  formData.append('hash', task.fileHash)
  formData.append('chunkHash', task.chunk.hash)
  formData.append('chunkIndex', String(task.chunk.index))
  formData.append('totalChunks', String(task.totalChunks))
  formData.append('fileName', task.fileName)

  return new Promise((resolve, reject) => {
    const xhr = new XMLHttpRequest()
    
    xhr.upload.onprogress = (e) => {
      if (e.lengthComputable) {
        onProgress(e.loaded)
      }
    }

    xhr.onload = () => {
      if (xhr.status >= 200 && xhr.status < 300) {
        resolve(JSON.parse(xhr.responseText))
      } else {
        reject(new Error(`Upload failed: ${xhr.status}`))
      }
    }

    xhr.onerror = () => reject(new Error('Network error'))
    xhr.onabort = () => reject(new Error('Upload aborted'))

    if (signal) {
      signal.addEventListener('abort', () => xhr.abort())
    }

    xhr.open('POST', '/api/upload/chunk')
    xhr.send(formData)
  })
}

断点续传实现

本地存储进度

typescript
interface UploadProgress {
  fileHash: string
  fileName: string
  fileSize: number
  chunkSize: number
  uploadedChunks: number[]
  createdAt: number
  updatedAt: number
}

class UploadProgressStore {
  private storageKey = 'upload_progress_'
  private db: IDBDatabase | null = null

  async init(): Promise<void> {
    return new Promise((resolve, reject) => {
      const request = indexedDB.open('FileUploadDB', 1)

      request.onerror = () => reject(request.error)
      request.onsuccess = () => {
        this.db = request.result
        resolve()
      }

      request.onupgradeneeded = (event) => {
        const db = (event.target as IDBOpenDBRequest).result
        if (!db.objectStoreNames.contains('progress')) {
          const store = db.createObjectStore('progress', { keyPath: 'fileHash' })
          store.createIndex('updatedAt', 'updatedAt')
        }
      }
    })
  }

  async save(progress: UploadProgress): Promise<void> {
    if (!this.db) await this.init()
    
    return new Promise((resolve, reject) => {
      const transaction = this.db!.transaction(['progress'], 'readwrite')
      const store = transaction.objectStore('progress')
      
      progress.updatedAt = Date.now()
      const request = store.put(progress)
      
      request.onsuccess = () => resolve()
      request.onerror = () => reject(request.error)
    })
  }

  async get(fileHash: string): Promise<UploadProgress | null> {
    if (!this.db) await this.init()
    
    return new Promise((resolve, reject) => {
      const transaction = this.db!.transaction(['progress'], 'readonly')
      const store = transaction.objectStore('progress')
      const request = store.get(fileHash)
      
      request.onsuccess = () => resolve(request.result || null)
      request.onerror = () => reject(request.error)
    })
  }

  async delete(fileHash: string): Promise<void> {
    if (!this.db) await this.init()
    
    return new Promise((resolve, reject) => {
      const transaction = this.db!.transaction(['progress'], 'readwrite')
      const store = transaction.objectStore('progress')
      const request = store.delete(fileHash)
      
      request.onsuccess = () => resolve()
      request.onerror = () => reject(request.error)
    })
  }

  async cleanup(maxAge = 7 * 24 * 60 * 60 * 1000): Promise<void> {
    if (!this.db) await this.init()
    
    const cutoff = Date.now() - maxAge
    
    return new Promise((resolve, reject) => {
      const transaction = this.db!.transaction(['progress'], 'readwrite')
      const store = transaction.objectStore('progress')
      const index = store.index('updatedAt')
      const range = IDBKeyRange.upperBound(cutoff)
      
      const request = index.openCursor(range)
      
      request.onsuccess = (event) => {
        const cursor = (event.target as IDBRequest).result
        if (cursor) {
          cursor.delete()
          cursor.continue()
        } else {
          resolve()
        }
      }
      
      request.onerror = () => reject(request.error)
    })
  }
}

服务端校验已上传分片

typescript
interface VerifyResponse {
  uploaded: boolean           // 文件是否已完全上传(秒传)
  uploadedChunks: number[]    // 已上传的分片索引
}

async function verifyUploadedChunks(
  fileHash: string,
  fileName: string,
  totalChunks: number
): Promise<VerifyResponse> {
  const response = await fetch('/api/upload/verify', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ fileHash, fileName, totalChunks })
  })
  
  return response.json()
}

完整上传器实现

FileUploader 类

typescript
interface UploaderOptions {
  chunkSize?: number
  maxConcurrency?: number
  maxRetries?: number
  onProgress?: (progress: UploadProgressEvent) => void
  onSuccess?: (response: UploadSuccessEvent) => void
  onError?: (error: UploadErrorEvent) => void
  onHashProgress?: (progress: number) => void
}

interface UploadProgressEvent {
  fileHash: string
  fileName: string
  progress: number
  uploadedSize: number
  totalSize: number
  speed: number
  remainingTime: number
}

interface UploadSuccessEvent {
  fileHash: string
  fileName: string
  url: string
  instantUpload: boolean
}

interface UploadErrorEvent {
  fileHash: string
  fileName: string
  error: Error
  chunk?: FileChunk
}

class FileUploader {
  private options: Required<UploaderOptions>
  private pool: ConcurrencyPool
  private store: UploadProgressStore
  private abortController: AbortController | null = null
  private uploadFile: UploadFile | null = null
  private speedSamples: number[] = []
  private lastUploadedSize = 0
  private lastSampleTime = 0

  constructor(options: UploaderOptions = {}) {
    this.options = {
      chunkSize: 5 * 1024 * 1024,
      maxConcurrency: 5,
      maxRetries: 3,
      onProgress: () => {},
      onSuccess: () => {},
      onError: () => {},
      onHashProgress: () => {},
      ...options
    }

    this.pool = new ConcurrencyPool(this.options.maxConcurrency)
    this.store = new UploadProgressStore()
  }

  async upload(file: File): Promise<void> {
    try {
      await this.store.init()
      this.abortController = new AbortController()

      // 1. 计算文件 Hash
      const { fileHash, chunkHashes } = await this.calculateHash(file)

      // 2. 验证已上传分片(秒传检查)
      const config = calculateChunkConfig(file.size)
      const verifyResult = await verifyUploadedChunks(
        fileHash,
        file.name,
        config.chunkCount
      )

      // 秒传:文件已存在
      if (verifyResult.uploaded) {
        this.options.onSuccess({
          fileHash,
          fileName: file.name,
          url: `/files/${fileHash}`,
          instantUpload: true
        })
        return
      }

      // 3. 创建分片
      const chunks = this.createChunks(file, config, chunkHashes, verifyResult.uploadedChunks)

      // 4. 初始化上传状态
      this.uploadFile = {
        file,
        fileHash,
        fileName: file.name,
        fileSize: file.size,
        chunks,
        status: 'uploading',
        progress: 0,
        uploadedSize: this.calculateUploadedSize(chunks),
        startTime: Date.now(),
        speed: 0
      }

      // 5. 保存进度
      await this.saveProgress()

      // 6. 上传分片
      await this.uploadChunks()

      // 7. 合并分片
      await this.mergeChunks(fileHash, file.name, chunks.length)

      // 8. 清理进度
      await this.store.delete(fileHash)

      this.options.onSuccess({
        fileHash,
        fileName: file.name,
        url: `/files/${fileHash}`,
        instantUpload: false
      })

    } catch (error) {
      if ((error as Error).message !== 'Upload aborted') {
        this.options.onError({
          fileHash: this.uploadFile?.fileHash || '',
          fileName: this.uploadFile?.fileName || file.name,
          error: error as Error
        })
      }
    }
  }

  private async calculateHash(file: File): Promise<{
    fileHash: string
    chunkHashes: string[]
  }> {
    return new Promise((resolve, reject) => {
      const worker = new Worker('/workers/hash-worker.js')

      worker.postMessage({
        file,
        chunkSize: this.options.chunkSize
      })

      worker.onmessage = (e) => {
        if (e.data.type === 'PROGRESS') {
          this.options.onHashProgress(e.data.progress)
        } else if (e.data.type === 'COMPLETE') {
          resolve({
            fileHash: e.data.hash,
            chunkHashes: e.data.chunkHashes
          })
          worker.terminate()
        } else if (e.data.type === 'ERROR') {
          reject(new Error(e.data.error))
          worker.terminate()
        }
      }

      worker.onerror = (error) => {
        reject(error)
        worker.terminate()
      }
    })
  }

  private createChunks(
    file: File,
    config: ChunkConfig,
    chunkHashes: string[],
    uploadedChunks: number[]
  ): FileChunk[] {
    const chunks: FileChunk[] = []
    const uploadedSet = new Set(uploadedChunks)

    for (const { chunk, index, start, end } of createFileChunks(file, config.chunkSize)) {
      chunks.push({
        index,
        chunk,
        hash: chunkHashes[index],
        size: chunk.size,
        start,
        end,
        status: uploadedSet.has(index) ? ChunkStatus.SUCCESS : ChunkStatus.PENDING,
        retries: 0,
        progress: uploadedSet.has(index) ? 100 : 0
      })
    }

    return chunks
  }

  private async uploadChunks(): Promise<void> {
    if (!this.uploadFile) return

    const pendingChunks = this.uploadFile.chunks.filter(
      c => c.status === ChunkStatus.PENDING || c.status === ChunkStatus.ERROR
    )

    const tasks = pendingChunks.map(chunk => 
      this.pool.add(() => this.uploadSingleChunk(chunk))
    )

    await Promise.all(tasks)
  }

  private async uploadSingleChunk(chunk: FileChunk): Promise<void> {
    if (!this.uploadFile) return

    chunk.status = ChunkStatus.UPLOADING

    try {
      await uploadChunk(
        {
          chunk,
          fileHash: this.uploadFile.fileHash,
          fileName: this.uploadFile.fileName,
          totalChunks: this.uploadFile.chunks.length
        },
        (loaded) => {
          chunk.progress = (loaded / chunk.size) * 100
          this.updateProgress()
        },
        this.abortController?.signal
      )

      chunk.status = ChunkStatus.SUCCESS
      chunk.progress = 100
      await this.saveProgress()
      this.updateProgress()

    } catch (error) {
      if ((error as Error).message === 'Upload aborted') {
        chunk.status = ChunkStatus.PAUSED
        throw error
      }

      chunk.retries++
      
      if (chunk.retries < this.options.maxRetries) {
        chunk.status = ChunkStatus.PENDING
        await this.uploadSingleChunk(chunk)
      } else {
        chunk.status = ChunkStatus.ERROR
        throw error
      }
    }
  }

  private async mergeChunks(
    fileHash: string,
    fileName: string,
    totalChunks: number
  ): Promise<void> {
    const response = await fetch('/api/upload/merge', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ fileHash, fileName, totalChunks })
    })

    if (!response.ok) {
      throw new Error('Failed to merge chunks')
    }
  }

  private calculateUploadedSize(chunks: FileChunk[]): number {
    return chunks
      .filter(c => c.status === ChunkStatus.SUCCESS)
      .reduce((sum, c) => sum + c.size, 0)
  }

  private updateProgress(): void {
    if (!this.uploadFile) return

    const uploadedSize = this.uploadFile.chunks.reduce((sum, chunk) => {
      if (chunk.status === ChunkStatus.SUCCESS) {
        return sum + chunk.size
      }
      return sum + (chunk.size * chunk.progress / 100)
    }, 0)

    const now = Date.now()
    const timeDiff = (now - this.lastSampleTime) / 1000
    
    if (timeDiff >= 0.5) {
      const bytesUploaded = uploadedSize - this.lastUploadedSize
      const speed = bytesUploaded / timeDiff
      
      this.speedSamples.push(speed)
      if (this.speedSamples.length > 5) {
        this.speedSamples.shift()
      }
      
      this.lastUploadedSize = uploadedSize
      this.lastSampleTime = now
    }

    const avgSpeed = this.speedSamples.length > 0
      ? this.speedSamples.reduce((a, b) => a + b, 0) / this.speedSamples.length
      : 0

    const remainingSize = this.uploadFile.fileSize - uploadedSize
    const remainingTime = avgSpeed > 0 ? remainingSize / avgSpeed : 0

    this.uploadFile.uploadedSize = uploadedSize
    this.uploadFile.progress = (uploadedSize / this.uploadFile.fileSize) * 100
    this.uploadFile.speed = avgSpeed

    this.options.onProgress({
      fileHash: this.uploadFile.fileHash,
      fileName: this.uploadFile.fileName,
      progress: this.uploadFile.progress,
      uploadedSize,
      totalSize: this.uploadFile.fileSize,
      speed: avgSpeed,
      remainingTime
    })
  }

  private async saveProgress(): Promise<void> {
    if (!this.uploadFile) return

    await this.store.save({
      fileHash: this.uploadFile.fileHash,
      fileName: this.uploadFile.fileName,
      fileSize: this.uploadFile.fileSize,
      chunkSize: this.options.chunkSize,
      uploadedChunks: this.uploadFile.chunks
        .filter(c => c.status === ChunkStatus.SUCCESS)
        .map(c => c.index),
      createdAt: this.uploadFile.startTime,
      updatedAt: Date.now()
    })
  }

  pause(): void {
    this.pool.pause()
    this.abortController?.abort()
    
    if (this.uploadFile) {
      this.uploadFile.status = 'paused'
      this.uploadFile.chunks
        .filter(c => c.status === ChunkStatus.UPLOADING)
        .forEach(c => c.status = ChunkStatus.PAUSED)
    }
  }

  async resume(): Promise<void> {
    if (!this.uploadFile) return

    this.abortController = new AbortController()
    this.uploadFile.status = 'uploading'
    this.uploadFile.chunks
      .filter(c => c.status === ChunkStatus.PAUSED)
      .forEach(c => c.status = ChunkStatus.PENDING)
    
    this.pool.resume()
    await this.uploadChunks()
  }

  cancel(): void {
    this.pause()
    this.pool.clear()
    
    if (this.uploadFile) {
      this.store.delete(this.uploadFile.fileHash)
      this.uploadFile = null
    }
  }
}

React Hook 封装

typescript
import { useState, useCallback, useRef } from 'react'

interface UseFileUploadOptions {
  chunkSize?: number
  maxConcurrency?: number
  maxRetries?: number
}

interface UploadState {
  status: 'idle' | 'hashing' | 'uploading' | 'paused' | 'success' | 'error'
  progress: number
  hashProgress: number
  speed: number
  remainingTime: number
  error: Error | null
  instantUpload: boolean
}

export function useFileUpload(options: UseFileUploadOptions = {}) {
  const [state, setState] = useState<UploadState>({
    status: 'idle',
    progress: 0,
    hashProgress: 0,
    speed: 0,
    remainingTime: 0,
    error: null,
    instantUpload: false
  })

  const uploaderRef = useRef<FileUploader | null>(null)

  const upload = useCallback(async (file: File) => {
    setState(prev => ({ ...prev, status: 'hashing', progress: 0, hashProgress: 0 }))

    uploaderRef.current = new FileUploader({
      ...options,
      onHashProgress: (hashProgress) => {
        setState(prev => ({ ...prev, hashProgress }))
      },
      onProgress: ({ progress, speed, remainingTime }) => {
        setState(prev => ({
          ...prev,
          status: 'uploading',
          progress,
          speed,
          remainingTime
        }))
      },
      onSuccess: ({ instantUpload }) => {
        setState(prev => ({
          ...prev,
          status: 'success',
          progress: 100,
          instantUpload
        }))
      },
      onError: ({ error }) => {
        setState(prev => ({
          ...prev,
          status: 'error',
          error
        }))
      }
    })

    await uploaderRef.current.upload(file)
  }, [options])

  const pause = useCallback(() => {
    uploaderRef.current?.pause()
    setState(prev => ({ ...prev, status: 'paused' }))
  }, [])

  const resume = useCallback(async () => {
    setState(prev => ({ ...prev, status: 'uploading' }))
    await uploaderRef.current?.resume()
  }, [])

  const cancel = useCallback(() => {
    uploaderRef.current?.cancel()
    setState({
      status: 'idle',
      progress: 0,
      hashProgress: 0,
      speed: 0,
      remainingTime: 0,
      error: null,
      instantUpload: false
    })
  }, [])

  return {
    ...state,
    upload,
    pause,
    resume,
    cancel
  }
}

上传组件示例

tsx
import { useFileUpload } from '@/hooks/useFileUpload'
import { formatBytes, formatTime } from '@/utils/format'

export function FileUploader() {
  const {
    status,
    progress,
    hashProgress,
    speed,
    remainingTime,
    error,
    instantUpload,
    upload,
    pause,
    resume,
    cancel
  } = useFileUpload({
    chunkSize: 5 * 1024 * 1024,
    maxConcurrency: 5,
    maxRetries: 3
  })

  const handleFileChange = async (e: React.ChangeEvent<HTMLInputElement>) => {
    const file = e.target.files?.[0]
    if (file) {
      await upload(file)
    }
  }

  return (
    <div className="p-6 max-w-md mx-auto bg-white rounded-xl shadow-lg">
      <input
        type="file"
        onChange={handleFileChange}
        disabled={status === 'uploading' || status === 'hashing'}
        className="mb-4"
      />

      {status === 'hashing' && (
        <div className="mb-4">
          <p className="text-sm text-gray-600 mb-2">正在计算文件指纹...</p>
          <div className="h-2 bg-gray-200 rounded-full overflow-hidden">
            <div
              className="h-full bg-blue-500 transition-all duration-300"
              style={{ width: `${hashProgress}%` }}
            />
          </div>
          <p className="text-xs text-gray-500 mt-1">{hashProgress}%</p>
        </div>
      )}

      {(status === 'uploading' || status === 'paused') && (
        <div className="mb-4">
          <div className="flex justify-between text-sm text-gray-600 mb-2">
            <span>上传进度</span>
            <span>{progress.toFixed(1)}%</span>
          </div>
          <div className="h-2 bg-gray-200 rounded-full overflow-hidden">
            <div
              className="h-full bg-green-500 transition-all duration-300"
              style={{ width: `${progress}%` }}
            />
          </div>
          <div className="flex justify-between text-xs text-gray-500 mt-1">
            <span>速度: {formatBytes(speed)}/s</span>
            <span>剩余: {formatTime(remainingTime)}</span>
          </div>
        </div>
      )}

      {status === 'success' && (
        <div className="p-4 bg-green-50 rounded-lg">
          <p className="text-green-600">
            {instantUpload ? '秒传成功!' : '上传成功!'}
          </p>
        </div>
      )}

      {status === 'error' && (
        <div className="p-4 bg-red-50 rounded-lg">
          <p className="text-red-600">上传失败: {error?.message}</p>
        </div>
      )}

      <div className="flex gap-2 mt-4">
        {status === 'uploading' && (
          <button
            onClick={pause}
            className="px-4 py-2 bg-yellow-500 text-white rounded hover:bg-yellow-600"
          >
            暂停
          </button>
        )}
        {status === 'paused' && (
          <button
            onClick={resume}
            className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600"
          >
            继续
          </button>
        )}
        {(status === 'uploading' || status === 'paused') && (
          <button
            onClick={cancel}
            className="px-4 py-2 bg-red-500 text-white rounded hover:bg-red-600"
          >
            取消
          </button>
        )}
      </div>
    </div>
  )
}

服务端实现(Node.js)

typescript
// api/upload/chunk.ts
import { NextApiRequest, NextApiResponse } from 'next'
import formidable from 'formidable'
import fs from 'fs/promises'
import path from 'path'

export const config = {
  api: { bodyParser: false }
}

const UPLOAD_DIR = path.join(process.cwd(), 'uploads')

export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method not allowed' })
  }

  const form = formidable({ multiples: false })

  const [fields, files] = await new Promise<[formidable.Fields, formidable.Files]>(
    (resolve, reject) => {
      form.parse(req, (err, fields, files) => {
        if (err) reject(err)
        else resolve([fields, files])
      })
    }
  )

  const hash = fields.hash as string
  const chunkIndex = fields.chunkIndex as string
  const file = files.file as formidable.File

  // 创建文件夹
  const chunkDir = path.join(UPLOAD_DIR, hash)
  await fs.mkdir(chunkDir, { recursive: true })

  // 移动分片文件
  const chunkPath = path.join(chunkDir, chunkIndex)
  await fs.rename(file.filepath, chunkPath)

  res.json({ success: true, chunkIndex: Number(chunkIndex) })
}

// api/upload/merge.ts
export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  const { fileHash, fileName, totalChunks } = req.body

  const chunkDir = path.join(UPLOAD_DIR, fileHash)
  const filePath = path.join(UPLOAD_DIR, `${fileHash}_${fileName}`)

  // 合并分片
  const writeStream = fs.createWriteStream(filePath)
  
  for (let i = 0; i < totalChunks; i++) {
    const chunkPath = path.join(chunkDir, String(i))
    const chunkBuffer = await fs.readFile(chunkPath)
    writeStream.write(chunkBuffer)
  }

  writeStream.end()

  // 清理分片目录
  await fs.rm(chunkDir, { recursive: true })

  res.json({ success: true, url: `/files/${fileHash}_${fileName}` })
}

// api/upload/verify.ts
export default async function handler(
  req: NextApiRequest,
  res: NextApiResponse
) {
  const { fileHash, totalChunks } = req.body

  const filePath = path.join(UPLOAD_DIR, fileHash)
  const chunkDir = path.join(UPLOAD_DIR, fileHash)

  // 检查文件是否已存在(秒传)
  try {
    const files = await fs.readdir(UPLOAD_DIR)
    const exists = files.some(f => f.startsWith(fileHash + '_'))
    
    if (exists) {
      return res.json({ uploaded: true, uploadedChunks: [] })
    }
  } catch {}

  // 检查已上传的分片
  const uploadedChunks: number[] = []
  
  try {
    const chunks = await fs.readdir(chunkDir)
    chunks.forEach(chunk => {
      uploadedChunks.push(Number(chunk))
    })
  } catch {}

  res.json({ uploaded: false, uploadedChunks })
}

性能优化要点

优化策略说明效果
Web Worker 计算 Hash避免主线程阻塞页面保持流畅
多 Worker 并行计算利用多核 CPUHash 速度提升 4-8 倍
动态分片大小根据文件大小调整平衡请求数和效率
并发控制池限制同时上传数避免带宽拥塞
断点续传IndexedDB 存储进度上传中断可恢复
秒传检测服务端 Hash 比对重复文件无需上传
分片重试失败自动重试提高上传成功率

总结

"

大文件上传的核心在于将复杂问题拆解:用分片解决大小问题,用多线程解决性能问题,用断点续传解决稳定性问题,用秒传解决效率问题。

通过 Web Worker 多线程技术,我们可以在不阻塞主线程的情况下完成文件 Hash 计算;结合分片上传、并发控制和断点续传机制,可以实现稳定、高效的大文件上传功能。这套方案适用于云盘、视频平台、文件共享等需要处理大文件的场景。

文章标签

# Web Worker# 文件上传# 性能优化# 多线程
返回首页