🔭 人间指南

🎶 批量加速音频文件:游泳中的播客时光 🏊‍♂️

嗨,亲爱的读者们!👋 最近我开始了一个有趣的项目,就是批量处理我那些 podcast 音频文件。为什么会有这样的奇怪念头呢?听我娓娓道来~ 😊

🏊‍♂️ 游泳池里的独特体验

首先,我必须提到我的爱好——游泳。🏊‍♂️ 沉浸在水中的感觉实在是太神奇了。但你知道吗?最近,我又为游泳加入了一个新的元素:听播客!

有一天,我试着用骨传导耳机边游泳边听播客,结果是…太棒了!🎉 我戴上耳塞,然后用骨传导耳机播放播客,仿佛整个世界只剩下我和那深沉、悠扬的声音。水声、人声统统被隔绝,我仿佛漂浮在知识的海洋中。

🎙 问题来了:为什么要制作这个工具?

虽然体验很棒,但我很快发现了一个问题:许多播客的音量太小了,而且有时候主播讲话的速度太慢,导致我需要更长的时间才能听完一个话题。而我的游泳时间有限,所以我想到了制作一个工具,可以批量调整音频文件的速度和音量!

🛠 实践过程:我的小小工程冒险之旅

1. 加速播客的速度

一个急脾气的小癖好:你懂的,有些播客主播真的喜欢慢悠悠地说话。😴 我需要的是信息,快一点!

我的神奇解决术:没错,就是那个超级酷炫的 FFmpeg! 这玩意儿就像是多媒体的瑞士军刀。一行命令,音频就被我魔法般地加速了!

1
speed_command = f'ffmpeg -i "{output_file_path_temp}" -filter:a "atempo={speed}" -acodec libmp3lame "{output_file_path}" -nostats -loglevel 0'

2. 让音量飙起来 🔊

小问题大困扰:骨传导耳机的音效是不错,但音量上真的有点小,尤其是在我潜入水下的时候。

如何打破寂静:再次召唤我的小助手 FFmpeg! 它不仅能让音频飞起来,还能让音量翻倍!

1
volume_command = f'ffmpeg -i "{input_file_path}" -vn -af "volume=1.5" "{output_file_path_temp}" -nostats -loglevel 0'

3. 并行处理的魔法 🌪

待办清单太长了:嗯,我承认,我是一个播客狂魔,我的文件简直多到数不过来!

多任务解决大计:多亏了 MultiprocessingThreading,我可以同时处理好多好多的文件。电脑的每一个核心都被我调动起来,为我效力!

1
with Pool(processes=os.cpu_count()) as pool:

每一个挑战,都变成了我前进的脚步。这,就是一个程序媛的日常!😎🤟

🎉 结果:完美的游泳体验

在完成这个工具后,我每次游泳都带上我的骨传导耳机和处理过的播客。那种身体和大脑同时得到锻炼的感觉,真的是太棒了!

最后附上我开发的小工具的截图和全部的代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from multiprocessing import Pool, Manager
import os
import subprocess
import streamlit as st
import threading
import time


def update_progress(progress_bar):
    while subprocess.poll() is None:
        line = subprocess.stdout.readline()
        if not line:
            break
        if "time=" in line:
            time_str = line.strip().split("time=")[1].split()[0]
            time_list = time_str.split(":")
            time_in_seconds = int(
                time_list[0]) * 3600 + int(time_list[1]) * 60 + float(time_list[2])
            progress_bar.progress(int(time_in_seconds))
    subprocess.stdout.close()


# 安装必要的库
st.title("批量加速音频文件")
st.write("选择一个文件夹,将文件夹中的所有音频文件加速")

# 让用户选择文件夹
folder_path = st.sidebar.selectbox(
    label="选择一个文件夹",
    options=os.listdir("."),
    index=0
)

# 让用户输入加速倍数
speed = st.sidebar.number_input(
    label="加速倍数",
    value=1.35,
    min_value=0.1,
    max_value=10.0,
    step=0.1
)

# 创建一个和所选文件夹同级的 output 文件夹
output_folder_path = os.path.join(os.path.dirname(folder_path), "output")
os.makedirs(output_folder_path, exist_ok=True)

# 添加一个“清空输出文件夹”按钮
if st.button("清空输出文件夹"):
    for file_name in os.listdir(output_folder_path):
        file_path = os.path.join(output_folder_path, file_name)
        os.remove(file_path)
    download_folder_path = os.path.join(
        os.path.dirname(folder_path), "downloads")
    for file_name in os.listdir(download_folder_path):
        file_path = os.path.join(download_folder_path, file_name)
        os.remove(file_path)
    st.write("已清空下载和输出文件夹!")


def process_audio_file(file_name, folder_path, output_folder_path, speed, progress_dict):
    # 构建输入文件和输出文件的路径
    input_file_path = os.path.join(folder_path, file_name)
    output_file_path = os.path.join(
        output_folder_path, f"accelerated_{os.path.splitext(file_name)[0]}.mp3")
    output_file_path_temp = f"{os.path.splitext(output_file_path)[0]}_temp.mp3"
    output_file_path = os.path.abspath(output_file_path)  # 获取绝对路径
    output_file_path = output_file_path.replace("'", "\\\\'")  # 转义单引号
    output_file_path = output_file_path.replace('"', '\\\\"')  # 转义双引号

    # 构建 FFmpeg 命令行 - 调整音量
    volume_command = f'ffmpeg -i "{input_file_path}" -vn -af "volume=1.5" "{output_file_path_temp}" -nostats -loglevel 0'

    # 构建 FFmpeg 命令行 - 加速
    speed_command = f'ffmpeg -i "{output_file_path_temp}" -filter:a "atempo={speed}" -acodec libmp3lame "{output_file_path}" -nostats -loglevel 0'

    # 执行 FFmpeg 命令行 - 调整音量
    execute_ffmpeg_command(volume_command, file_name, "音量调整", progress_dict)

    # 执行 FFmpeg 命令行 - 加速
    execute_ffmpeg_command(speed_command, file_name, "加速", progress_dict)

    # 删除临时文件
    os.remove(output_file_path_temp)


def execute_ffmpeg_command(command, file_name, process_name, progress_dict):
    # 执行 FFmpeg 命令行,并将进度输出到 Streamlit 页面
    st.write(f"正在处理文件 {file_name} - {process_name}...")
    progress_bar = st.progress(0)

    # 使用shell=True以便支持Windows上的命令行
    process = subprocess.Popen(command, stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT, shell=True)

    # 更新进度条
    update_progress_thread = threading.Thread(
        target=update_progress, args=(process, progress_bar))
    update_progress_thread.start()

    # 等待进程完成
    process.wait()

    # 等待更新进度条的线程完成
    update_progress_thread.join()

    # 更新进度字典
    progress_dict[file_name] = 100
    st.write(f"文件 {file_name} - {process_name}处理完成!")


# 处理文件夹中的所有音频文件
if folder_path:
    st.write(f"已选择文件夹:{folder_path}")

    audio_files = [
        file_name for file_name in os.listdir(folder_path)
        if file_name.endswith(".mp3") or file_name.endswith(".wav")
    ]

    if len(audio_files) > 0:
        if st.button("开始处理"):
            with st.spinner("正在处理,请稍等..."):
                # 创建进度字典
                progress_dict = Manager().dict()

                # 使用多进程处理文件
                with Pool(processes=os.cpu_count()) as pool:
                    for file_name in audio_files:
                        pool.apply_async(
                            process_audio_file,
                            args=(file_name, folder_path,
                                  output_folder_path, speed, progress_dict)
                        )

                    pool.close()
                    pool.join()

                st.success("处理完成!")

                # 下载处理后的文件
                download_folder_path = os.path.join(
                    os.path.dirname(folder_path), "downloads")
                os.makedirs(download_folder_path, exist_ok=True)
                for file_name in os.listdir(output_folder_path):
                    file_path = os.path.join(output_folder_path, file_name)
                    new_file_path = os.path.join(
                        download_folder_path, file_name)
                    os.rename(file_path, new_file_path)

                st.write("处理后的文件已准备好下载:")
                for file_name in os.listdir(download_folder_path):
                    file_path = os.path.join(download_folder_path, file_name)
                    st.download_button(
                        label=file_name,
                        data=open(file_path, "rb").read(),
                        file_name=file_name
                    )

                # 显示进度信息
                st.write("处理进度:")
                for file_name, progress in progress_dict.items():
                    st.write(f"{file_name}: {progress}%")

        else:
            st.write("点击“开始处理”按钮开始处理音频文件")
    else:
        st.warning("所选文件夹中没有音频文件")
else:
    st.warning("请选择一个文件夹")

启动命令:

1
streamlit run app.py

技术栈 & 类库

  1. Python:整个工具的主要编程语言。

  2. Multiprocessing:Python内置库,支持并发执行。

  3. OS:Python内置库,用于操作文件和目录。

  4. Subprocess:Python内置库,用于执行shell命令。

  5. Threading:Python内置库,支持并发线程执行。

  6. Streamlit:用于构建交互式web应用。

  7. FFmpeg:用于音频处理。

感谢你们的阅读,上面的代码都是我在 ChatGPT 的指导下完成的,如果你们对如何用 ChatGPT 做出更多有趣应用感兴趣,欢迎留言,我会找出最有趣的 idea 在下一期文章里实现一个原型!❤️