SenseVoice Small WebUI进阶:批量上传+队列管理+进度可视化功能开发

张开发
2026/4/16 13:20:19 15 分钟阅读

分享文章

SenseVoice Small WebUI进阶:批量上传+队列管理+进度可视化功能开发
SenseVoice Small WebUI进阶批量上传队列管理进度可视化功能开发1. 项目背景与需求分析SenseVoice Small 语音识别服务已经提供了优秀的单文件转写体验但在实际使用中用户经常需要处理大量音频文件。手动一个个上传、等待、下载结果的方式效率低下特别是在以下场景中会议录音批量整理一次会议可能有多个录音片段播客节目制作需要处理整期节目的多个音频段语音素材处理大量语音文件需要统一转写批量字幕生成为视频内容生成批量字幕文件原有的单文件处理模式无法满足这些批量需求用户需要等待当前文件处理完成才能上传下一个效率受到很大限制。基于这些实际需求我们为 SenseVoice Small WebUI 开发了三大进阶功能批量上传支持、任务队列管理、实时进度可视化。这些功能让用户能够一次性上传多个音频文件系统会自动排队处理并实时显示每个文件的处理进度。2. 批量上传功能实现2.1 多文件选择器改造原有的单文件上传器无法满足批量需求我们将其升级为支持多文件选择的上传组件import streamlit as st import tempfile import os # 多文件上传组件 uploaded_files st.file_uploader( 选择音频文件支持多选, type[wav, mp3, m4a, flac], accept_multiple_filesTrue, help可同时选择多个音频文件进行批量转写 ) if uploaded_files: st.success(f已选择 {len(uploaded_files)} 个文件准备处理)2.2 文件预处理与验证批量上传需要对每个文件进行格式验证和预处理def validate_audio_files(files): 验证音频文件格式和大小 valid_files [] for file in files: # 检查文件格式 if file.type not in [audio/wav, audio/mp3, audio/mp4, audio/flac]: st.warning(f文件 {file.name} 格式不支持已跳过) continue # 检查文件大小限制为50MB if file.size 50 * 1024 * 1024: st.warning(f文件 {file.name} 超过50MB限制已跳过) continue valid_files.append(file) return valid_files # 文件验证 valid_files validate_audio_files(uploaded_files)2.3 临时文件管理优化批量处理需要更高效的临时文件管理策略def create_batch_temp_files(files): 为批量文件创建临时存储 temp_dir tempfile.mkdtemp() file_paths [] for file in files: # 为每个文件创建临时路径 temp_path os.path.join(temp_dir, file.name) with open(temp_path, wb) as f: f.write(file.getbuffer()) file_paths.append(temp_path) return temp_dir, file_paths # 使用上下文管理器确保资源清理 contextlib.contextmanager def batch_file_manager(files): try: temp_dir, file_paths create_batch_temp_files(files) yield file_paths finally: # 清理临时文件 import shutil if os.path.exists(temp_dir): shutil.rmtree(temp_dir)3. 任务队列管理系统3.1 队列数据结构设计为了实现任务的有序处理我们设计了专门的任务队列系统from collections import deque import threading import time class TranscriptionQueue: def __init__(self): self.queue deque() self.lock threading.Lock() self.processing False self.current_task None self.progress_callbacks [] def add_task(self, file_path, languageauto, callbackNone): 添加转写任务到队列 task_id ftask_{int(time.time() * 1000)}_{len(self.queue)} task { id: task_id, file_path: file_path, language: language, status: pending, # pending, processing, completed, failed progress: 0, result: None, error: None, callback: callback, start_time: None, end_time: None } with self.lock: self.queue.append(task) return task_id def get_next_task(self): 获取下一个待处理任务 with self.lock: if not self.queue: return None # 查找第一个待处理任务 for task in self.queue: if task[status] pending: return task return None def update_task_progress(self, task_id, progress, statusNone): 更新任务进度 with self.lock: for task in self.queue: if task[id] task_id: task[progress] progress if status: task[status] status # 通知进度回调 for callback in self.progress_callbacks: callback(task) break def complete_task(self, task_id, result): 标记任务完成 with self.lock: for task in self.queue: if task[id] task_id: task[status] completed task[result] result task[end_time] time.time() task[progress] 100 if task[callback]: task[callback](task) break # 全局任务队列实例 transcription_queue TranscriptionQueue()3.2 异步处理工作线程为了避免阻塞主线程我们使用独立的工作线程处理队列任务def queue_worker(): 队列处理工作线程 while True: task transcription_queue.get_next_task() if not task: time.sleep(0.1) # 短暂休眠避免CPU占用过高 continue # 标记任务为处理中 transcription_queue.update_task_progress(task[id], 0, processing) task[start_time] time.time() try: # 模拟处理进度更新 for progress in range(0, 101, 10): transcription_queue.update_task_progress(task[id], progress) time.sleep(0.5) # 模拟处理时间 # 实际语音转写处理 result process_audio_transcription( task[file_path], task[language] ) # 标记任务完成 transcription_queue.complete_task(task[id], result) except Exception as e: # 处理失败 transcription_queue.update_task_progress( task[id], 100, failed ) transcription_queue.complete_task(task[id], None) # 启动工作线程 worker_thread threading.Thread(targetqueue_worker, daemonTrue) worker_thread.start()3.3 任务状态管理提供完整的任务状态查询和管理接口def get_queue_status(): 获取队列状态统计 with transcription_queue.lock: total len(transcription_queue.queue) pending sum(1 for t in transcription_queue.queue if t[status] pending) processing sum(1 for t in transcription_queue.queue if t[status] processing) completed sum(1 for t in transcription_queue.queue if t[status] completed) failed sum(1 for t in transcription_queue.queue if t[status] failed) return { total: total, pending: pending, processing: processing, completed: completed, failed: failed } def cancel_task(task_id): 取消指定任务 with transcription_queue.lock: for task in transcription_queue.queue: if task[id] task_id and task[status] pending: task[status] cancelled return True return False4. 进度可视化界面4.1 实时进度显示组件使用 Streamlit 组件构建直观的进度可视化界面import streamlit as st import time def display_queue_progress(): 显示队列进度面板 st.subheader( 处理进度) # 获取队列状态 status get_queue_status() # 总体进度统计 col1, col2, col3, col4 st.columns(4) with col1: st.metric(总任务数, status[total]) with col2: st.metric(等待中, status[pending]) with col3: st.metric(处理中, status[processing]) with col4: st.metric(已完成, status[completed]) # 进度条显示 if status[total] 0: overall_progress (status[completed] status[failed]) / status[total] st.progress(overall_progress, text总体进度) # 详细任务列表 st.subheader(任务详情) with transcription_queue.lock: for task in transcription_queue.queue: with st.expander(f{task[id]} - {os.path.basename(task[file_path])}): col1, col2 st.columns(2) with col1: st.write(f**状态**: {task[status]}) st.write(f**语言**: {task[language]}) with col2: if task[start_time]: elapsed time.time() - task[start_time] st.write(f**已用时间**: {elapsed:.1f}秒) if task[status] in [processing, completed]: st.progress(task[progress] / 100) # 显示处理结果 if task[status] completed and task[result]: st.text_area(转写结果, task[result], height150) # 操作按钮 if task[status] pending: if st.button(取消, keyfcancel_{task[id]}): cancel_task(task[id]) st.rerun()4.2 动态更新机制实现界面的实时动态更新让用户无需手动刷新import streamlit as st def auto_refresh_interval(): 根据队列状态自动调整刷新间隔 status get_queue_status() if status[processing] 0: # 有任务处理中快速刷新 return 2 # 每2秒刷新一次 elif status[pending] 0: # 有任务等待中中等刷新 return 5 # 每5秒刷新一次 else: # 无任务慢速刷新 return 10 # 每10秒刷新一次 # 在界面中实现自动刷新 if last_refresh not in st.session_state: st.session_state.last_refresh time.time() current_time time.time() refresh_interval auto_refresh_interval() if current_time - st.session_state.last_refresh refresh_interval: st.session_state.last_refresh current_time st.rerun()4.3 批量结果导出功能提供便捷的批量结果导出功能def export_batch_results(): 导出批量处理结果 completed_tasks [ t for t in transcription_queue.queue if t[status] completed and t[result] ] if not completed_tasks: st.warning(没有可导出的完成任务) return # 生成导出内容 export_content for task in completed_tasks: filename os.path.basename(task[file_path]) export_content f文件: {filename}\n export_content f转写结果:\n{task[result]}\n export_content - * 50 \n\n # 提供下载按钮 st.download_button( label 下载所有结果, dataexport_content, file_namebatch_transcription_results.txt, mimetext/plain ) # 在界面中添加导出按钮 if any(t[status] completed for t in transcription_queue.queue): export_batch_results()5. 性能优化与错误处理5.1 并发处理优化支持多个任务并发处理提高批量处理效率def start_concurrent_workers(num_workers2): 启动多个并发工作线程 for i in range(num_workers): thread threading.Thread( targetqueue_worker, daemonTrue, namefTranscriptionWorker-{i} ) thread.start() # 根据系统资源自动调整工作线程数量 def auto_adjust_workers(): 根据系统负载自动调整工作线程数 import psutil cpu_count psutil.cpu_count() memory_info psutil.virtual_memory() # 根据CPU和内存情况决定工作线程数 if memory_info.available 2 * 1024 * 1024 * 1024: # 小于2GB可用内存 return 1 elif cpu_count 2: return 1 else: return min(cpu_count - 1, 4) # 启动优化数量的工作线程 optimal_workers auto_adjust_workers() start_concurrent_workers(optimal_workers)5.2 错误处理与重试机制完善的错误处理和自动重试机制def process_with_retry(task, max_retries3): 带重试机制的任务处理 retries 0 while retries max_retries: try: result process_audio_transcription( task[file_path], task[language] ) return result except Exception as e: retries 1 if retries max_retries: raise e # 等待一段时间后重试 time.sleep(2 ** retries) # 指数退避 # 在工作线程中使用带重试的处理 try: result process_with_retry(task) transcription_queue.complete_task(task[id], result) except Exception as e: transcription_queue.update_task_progress( task[id], 100, failed ) transcription_queue.complete_task(task[id], None)5.3 资源监控与限制实时监控系统资源防止过载def check_system_resources(): 检查系统资源使用情况 import psutil cpu_percent psutil.cpu_percent(interval1) memory_info psutil.virtual_memory() disk_usage psutil.disk_usage(/) return { cpu_percent: cpu_percent, memory_percent: memory_info.percent, disk_percent: disk_usage.percent } def can_accept_new_task(): 检查是否可以接受新任务 resources check_system_resources() # 资源使用阈值 if resources[cpu_percent] 85: return False, CPU使用率过高 if resources[memory_percent] 90: return False, 内存使用率过高 if resources[disk_percent] 95: return False, 磁盘空间不足 return True, 系统正常6. 总结与使用指南通过批量上传、队列管理和进度可视化三大功能的开发SenseVoice Small WebUI 现在能够高效处理大量音频转写任务。这些改进显著提升了用户体验和处理效率。6.1 主要改进亮点批量处理能力支持一次性上传多个音频文件系统自动排队处理实时进度监控直观显示每个文件的处理状态和进度无需手动刷新智能队列管理自动调度任务支持并发处理最大化利用系统资源完善错误处理自动重试机制和友好的错误提示提高处理成功率便捷结果导出一键下载所有转写结果方便后续整理和使用6.2 使用步骤指南批量上传文件在文件选择器中一次选择多个音频文件设置识别语言选择适合的语音识别语言模式监控处理进度在进度面板实时查看每个文件的处理状态查看转写结果处理完成后可直接查看或复制转写文本导出所有结果使用下载按钮一次性获取所有转写结果6.3 最佳实践建议对于大量文件建议分批上传每次10-20个文件长时间处理时可以最小化浏览器窗口系统会在后台继续处理如果遇到处理失败系统会自动重试通常无需手动干预定期清理已完成的任务保持界面清爽这些进阶功能让 SenseVoice Small 不仅适用于单个文件的快速转写更能胜任批量的语音处理任务大大扩展了其应用场景和使用价值。获取更多AI镜像想探索更多AI镜像和应用场景访问 CSDN星图镜像广场提供丰富的预置镜像覆盖大模型推理、图像生成、视频生成、模型微调等多个领域支持一键部署。

更多文章