1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
from multiprocessing import Pool, Manager
import os
import subprocess
import streamlit as st
import threading
import time
def update_progress(progress_bar):
while subprocess.poll() is None:
line = subprocess.stdout.readline()
if not line:
break
if "time=" in line:
time_str = line.strip().split("time=")[1].split()[0]
time_list = time_str.split(":")
time_in_seconds = int(
time_list[0]) * 3600 + int(time_list[1]) * 60 + float(time_list[2])
progress_bar.progress(int(time_in_seconds))
subprocess.stdout.close()
# 安装必要的库
st.title("批量加速音频文件")
st.write("选择一个文件夹,将文件夹中的所有音频文件加速")
# 让用户选择文件夹
folder_path = st.sidebar.selectbox(
label="选择一个文件夹",
options=os.listdir("."),
index=0
)
# 让用户输入加速倍数
speed = st.sidebar.number_input(
label="加速倍数",
value=1.35,
min_value=0.1,
max_value=10.0,
step=0.1
)
# 创建一个和所选文件夹同级的 output 文件夹
output_folder_path = os.path.join(os.path.dirname(folder_path), "output")
os.makedirs(output_folder_path, exist_ok=True)
# 添加一个“清空输出文件夹”按钮
if st.button("清空输出文件夹"):
for file_name in os.listdir(output_folder_path):
file_path = os.path.join(output_folder_path, file_name)
os.remove(file_path)
download_folder_path = os.path.join(
os.path.dirname(folder_path), "downloads")
for file_name in os.listdir(download_folder_path):
file_path = os.path.join(download_folder_path, file_name)
os.remove(file_path)
st.write("已清空下载和输出文件夹!")
def process_audio_file(file_name, folder_path, output_folder_path, speed, progress_dict):
# 构建输入文件和输出文件的路径
input_file_path = os.path.join(folder_path, file_name)
output_file_path = os.path.join(
output_folder_path, f"accelerated_{os.path.splitext(file_name)[0]}.mp3")
output_file_path_temp = f"{os.path.splitext(output_file_path)[0]}_temp.mp3"
output_file_path = os.path.abspath(output_file_path) # 获取绝对路径
output_file_path = output_file_path.replace("'", "\\\\'") # 转义单引号
output_file_path = output_file_path.replace('"', '\\\\"') # 转义双引号
# 构建 FFmpeg 命令行 - 调整音量
volume_command = f'ffmpeg -i "{input_file_path}" -vn -af "volume=1.5" "{output_file_path_temp}" -nostats -loglevel 0'
# 构建 FFmpeg 命令行 - 加速
speed_command = f'ffmpeg -i "{output_file_path_temp}" -filter:a "atempo={speed}" -acodec libmp3lame "{output_file_path}" -nostats -loglevel 0'
# 执行 FFmpeg 命令行 - 调整音量
execute_ffmpeg_command(volume_command, file_name, "音量调整", progress_dict)
# 执行 FFmpeg 命令行 - 加速
execute_ffmpeg_command(speed_command, file_name, "加速", progress_dict)
# 删除临时文件
os.remove(output_file_path_temp)
def execute_ffmpeg_command(command, file_name, process_name, progress_dict):
# 执行 FFmpeg 命令行,并将进度输出到 Streamlit 页面
st.write(f"正在处理文件 {file_name} - {process_name}...")
progress_bar = st.progress(0)
# 使用shell=True以便支持Windows上的命令行
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
# 更新进度条
update_progress_thread = threading.Thread(
target=update_progress, args=(process, progress_bar))
update_progress_thread.start()
# 等待进程完成
process.wait()
# 等待更新进度条的线程完成
update_progress_thread.join()
# 更新进度字典
progress_dict[file_name] = 100
st.write(f"文件 {file_name} - {process_name}处理完成!")
# 处理文件夹中的所有音频文件
if folder_path:
st.write(f"已选择文件夹:{folder_path}")
audio_files = [
file_name for file_name in os.listdir(folder_path)
if file_name.endswith(".mp3") or file_name.endswith(".wav")
]
if len(audio_files) > 0:
if st.button("开始处理"):
with st.spinner("正在处理,请稍等..."):
# 创建进度字典
progress_dict = Manager().dict()
# 使用多进程处理文件
with Pool(processes=os.cpu_count()) as pool:
for file_name in audio_files:
pool.apply_async(
process_audio_file,
args=(file_name, folder_path,
output_folder_path, speed, progress_dict)
)
pool.close()
pool.join()
st.success("处理完成!")
# 下载处理后的文件
download_folder_path = os.path.join(
os.path.dirname(folder_path), "downloads")
os.makedirs(download_folder_path, exist_ok=True)
for file_name in os.listdir(output_folder_path):
file_path = os.path.join(output_folder_path, file_name)
new_file_path = os.path.join(
download_folder_path, file_name)
os.rename(file_path, new_file_path)
st.write("处理后的文件已准备好下载:")
for file_name in os.listdir(download_folder_path):
file_path = os.path.join(download_folder_path, file_name)
st.download_button(
label=file_name,
data=open(file_path, "rb").read(),
file_name=file_name
)
# 显示进度信息
st.write("处理进度:")
for file_name, progress in progress_dict.items():
st.write(f"{file_name}: {progress}%")
else:
st.write("点击“开始处理”按钮开始处理音频文件")
else:
st.warning("所选文件夹中没有音频文件")
else:
st.warning("请选择一个文件夹")
|