2 changed files with 322 additions and 0 deletions
@ -0,0 +1,320 @@ |
|||||
|
from pydub import AudioSegment |
||||
|
import os |
||||
|
from datetime import datetime |
||||
|
import re |
||||
|
import sys |
||||
|
from PyQt6.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, |
||||
|
QPushButton, QFileDialog, QListWidget, QLineEdit, QLabel, |
||||
|
QProgressBar, QMessageBox, QScrollArea) |
||||
|
from PyQt6.QtCore import Qt, QThread, pyqtSignal |
||||
|
from PyQt6.QtGui import QPainter, QColor |
||||
|
import subprocess |
||||
|
|
||||
|
class TimelineWidget(QWidget): |
||||
|
def __init__(self, file_info, total_duration, parent=None): |
||||
|
super().__init__(parent) |
||||
|
self.file_info = file_info # List of dicts with filename, timestamp, duration |
||||
|
self.total_duration = total_duration # Total duration in seconds |
||||
|
self.zoom_factor = 1.0 # Default zoom (1.0 = full timeline fits window) |
||||
|
self.setMinimumHeight(100) |
||||
|
|
||||
|
def set_zoom(self, zoom_factor): |
||||
|
self.zoom_factor = max(0.1, min(zoom_factor, 10.0)) # Limit zoom range |
||||
|
# Adjust widget size for scrolling |
||||
|
if self.total_duration > 0: |
||||
|
self.setMinimumWidth(int(self.total_duration * self.zoom_factor * 10)) # Pixels per second |
||||
|
self.update() |
||||
|
|
||||
|
def paintEvent(self, event): |
||||
|
painter = QPainter(self) |
||||
|
width = self.width() |
||||
|
height = self.height() |
||||
|
if not self.file_info or self.total_duration <= 0: |
||||
|
return |
||||
|
|
||||
|
# Scale factor: pixels per second, adjusted by zoom |
||||
|
visible_duration = self.total_duration / self.zoom_factor |
||||
|
scale = width / visible_duration if visible_duration > 0 else 1 |
||||
|
painter.setPen(Qt.PenStyle.SolidLine) |
||||
|
painter.setBrush(QColor(100, 150, 255)) |
||||
|
|
||||
|
# Draw each file as a rectangle |
||||
|
for info in self.file_info: |
||||
|
start_offset = info['start_offset'] |
||||
|
duration = info['duration'] |
||||
|
x = int(start_offset * scale) |
||||
|
w = max(int(duration * scale), 2) # Ensure minimum width |
||||
|
painter.drawRect(x, 20, w, 30) |
||||
|
|
||||
|
# Draw filename and timestamp |
||||
|
timestamp_str = f"{info['filename']} ({start_offset:.1f}s)" |
||||
|
painter.drawText(x, 15, timestamp_str) |
||||
|
|
||||
|
# Draw time markers (adjust interval based on zoom) |
||||
|
painter.setPen(QColor(0, 0, 0)) |
||||
|
marker_interval = max(60 / self.zoom_factor, 10) # Seconds between markers |
||||
|
for t in range(0, int(self.total_duration), int(marker_interval)): |
||||
|
x = int(t * scale) |
||||
|
painter.drawLine(x, 50, x, 60) |
||||
|
painter.drawText(x, 65, f"{t}s") |
||||
|
|
||||
|
class MergeThread(QThread): |
||||
|
progress = pyqtSignal(int) |
||||
|
status = pyqtSignal(str) |
||||
|
finished = pyqtSignal(str, list, float) # Includes file_info and total_duration |
||||
|
|
||||
|
def __init__(self, input_files, output_path): |
||||
|
super().__init__() |
||||
|
self.input_files = input_files |
||||
|
self.output_path = output_path |
||||
|
|
||||
|
def parse_timestamp(self, filename): |
||||
|
match = re.match(r'rec_(\d{2})-(\d{2})-(\d{4})_(\d{2})(\d{2})(\d{2})\.mp3', filename) |
||||
|
if not match: |
||||
|
raise ValueError(f"Invalid filename format: {filename}") |
||||
|
month, day, year, hour, minute, second = map(int, match.groups()) |
||||
|
return datetime(year, month, day, hour, minute, second) |
||||
|
|
||||
|
def get_audio_duration(self, file_path): |
||||
|
audio = AudioSegment.from_mp3(file_path) |
||||
|
return len(audio) / 1000.0 |
||||
|
|
||||
|
def run(self): |
||||
|
try: |
||||
|
if not self.input_files: |
||||
|
self.status.emit("Error: No MP3 files selected") |
||||
|
return |
||||
|
|
||||
|
# Parse and sort files |
||||
|
file_info = [] |
||||
|
for file_path in self.input_files: |
||||
|
filename = os.path.basename(file_path) |
||||
|
timestamp = self.parse_timestamp(filename) |
||||
|
duration = self.get_audio_duration(file_path) |
||||
|
file_info.append({ |
||||
|
'filename': filename, |
||||
|
'timestamp': timestamp, |
||||
|
'duration': duration, |
||||
|
'path': file_path |
||||
|
}) |
||||
|
file_info.sort(key=lambda x: x['timestamp']) |
||||
|
|
||||
|
# Calculate total duration and start offsets |
||||
|
earliest_time = file_info[0]['timestamp'] |
||||
|
total_duration = 0 |
||||
|
for info in file_info: |
||||
|
info['start_offset'] = (info['timestamp'] - earliest_time).total_seconds() |
||||
|
end_time = info['start_offset'] + info['duration'] |
||||
|
total_duration = max(total_duration, end_time) |
||||
|
|
||||
|
# Create output audio |
||||
|
total_duration_ms = int(total_duration * 1000) |
||||
|
output_audio = AudioSegment.silent(duration=total_duration_ms) |
||||
|
|
||||
|
# Overlay audio files with progress updates |
||||
|
total_files = len(file_info) |
||||
|
for i, info in enumerate(file_info): |
||||
|
audio = AudioSegment.from_mp3(info['path']) |
||||
|
offset_ms = int(info['start_offset'] * 1000) |
||||
|
output_audio = output_audio.overlay(audio, position=offset_ms) |
||||
|
self.progress.emit(int((i + 1) / total_files * 100)) |
||||
|
|
||||
|
# Save output |
||||
|
output_audio.export(self.output_path, format='mp3') |
||||
|
|
||||
|
# Generate .cue file |
||||
|
cue_path = os.path.splitext(self.output_path)[0] + '.cue' |
||||
|
with open(cue_path, 'w') as f: |
||||
|
f.write(f'FILE "{os.path.basename(self.output_path)}" MP3\n') |
||||
|
for i, info in enumerate(file_info): |
||||
|
minutes = int(info['start_offset'] // 60) |
||||
|
seconds = int(info['start_offset'] % 60) |
||||
|
frames = int((info['start_offset'] % 1) * 75) # CD frames (75 per second) |
||||
|
f.write(f'TRACK {i+1:02d} AUDIO\n') |
||||
|
f.write(f' TITLE "{info["filename"]}"\n') |
||||
|
f.write(f' INDEX 01 {minutes:02d}:{seconds:02d}:{frames:02d}\n') |
||||
|
|
||||
|
# Optional: Generate text file with timestamps |
||||
|
# txt_path = os.path.splitext(self.output_path)[0] + '.txt' |
||||
|
# with open(txt_path, 'w') as f: |
||||
|
# for info in file_info: |
||||
|
# f.write(f"{info['filename']}: {info['start_offset']:.1f}s\n") |
||||
|
|
||||
|
self.finished.emit(f"Merged MP3 saved as {self.output_path}\nCUE file saved as {cue_path}", file_info, total_duration) |
||||
|
|
||||
|
except Exception as e: |
||||
|
self.status.emit(f"Error: {str(e)}") |
||||
|
|
||||
|
class MP3MergerWindow(QMainWindow): |
||||
|
def __init__(self): |
||||
|
super().__init__() |
||||
|
self.setWindowTitle("MP3 Merger") |
||||
|
self.setMinimumSize(600, 500) # Allow resizing |
||||
|
self.resize(800, 600) # Default size |
||||
|
|
||||
|
# Check for ffmpeg |
||||
|
if not self.check_ffmpeg(): |
||||
|
QMessageBox.critical(self, "Error", "ffmpeg is not installed or not found. Please install it using 'sudo pacman -S ffmpeg'.") |
||||
|
sys.exit(1) |
||||
|
|
||||
|
# Main widget and layout |
||||
|
self.central_widget = QWidget() |
||||
|
self.setCentralWidget(self.central_widget) |
||||
|
self.layout = QVBoxLayout(self.central_widget) |
||||
|
|
||||
|
# Title |
||||
|
self.layout.addWidget(QLabel("<h2>MP3 Merger</h2>", alignment=Qt.AlignmentFlag.AlignCenter)) |
||||
|
|
||||
|
# File selection |
||||
|
self.select_files_btn = QPushButton("Select MP3 Files") |
||||
|
self.select_files_btn.clicked.connect(self.select_files) |
||||
|
self.layout.addWidget(self.select_files_btn) |
||||
|
|
||||
|
self.file_list = QListWidget() |
||||
|
self.layout.addWidget(self.file_list) |
||||
|
|
||||
|
# Output directory |
||||
|
self.output_dir_layout = QHBoxLayout() |
||||
|
self.output_dir_label = QLabel("Output Directory:") |
||||
|
self.output_dir_edit = QLineEdit(os.getcwd()) |
||||
|
self.output_dir_btn = QPushButton("Browse") |
||||
|
self.output_dir_btn.clicked.connect(self.select_output_dir) |
||||
|
self.output_dir_layout.addWidget(self.output_dir_label) |
||||
|
self.output_dir_layout.addWidget(self.output_dir_edit) |
||||
|
self.output_dir_layout.addWidget(self.output_dir_btn) |
||||
|
self.layout.addLayout(self.output_dir_layout) |
||||
|
|
||||
|
# Output filename |
||||
|
self.output_file_label = QLabel("Output Filename:") |
||||
|
self.output_file_edit = QLineEdit("merged_output.mp3") |
||||
|
self.layout.addWidget(self.output_file_label) |
||||
|
self.layout.addWidget(self.output_file_edit) |
||||
|
|
||||
|
# Zoom controls |
||||
|
self.zoom_layout = QHBoxLayout() |
||||
|
self.zoom_label = QLabel("Timeline Zoom:") |
||||
|
self.zoom_in_btn = QPushButton("+") |
||||
|
self.zoom_in_btn.clicked.connect(self.zoom_in) |
||||
|
self.zoom_out_btn = QPushButton("−") |
||||
|
self.zoom_out_btn.clicked.connect(self.zoom_out) |
||||
|
self.zoom_layout.addWidget(self.zoom_label) |
||||
|
self.zoom_layout.addWidget(self.zoom_in_btn) |
||||
|
self.zoom_layout.addWidget(self.zoom_out_btn) |
||||
|
self.zoom_layout.addStretch() |
||||
|
self.layout.addLayout(self.zoom_layout) |
||||
|
|
||||
|
# Timeline (initially empty) |
||||
|
self.timeline_scroll = QScrollArea() |
||||
|
self.timeline_scroll.setWidgetResizable(True) |
||||
|
self.timeline_widget = TimelineWidget([], 0) |
||||
|
self.timeline_scroll.setWidget(self.timeline_widget) |
||||
|
self.timeline_scroll.setMinimumHeight(120) |
||||
|
self.layout.addWidget(self.timeline_scroll) |
||||
|
|
||||
|
# Progress bar |
||||
|
self.progress_bar = QProgressBar() |
||||
|
self.progress_bar.setValue(0) |
||||
|
self.layout.addWidget(self.progress_bar) |
||||
|
|
||||
|
# Merge button |
||||
|
self.merge_btn = QPushButton("Merge MP3s") |
||||
|
self.merge_btn.clicked.connect(self.start_merge) |
||||
|
self.layout.addWidget(self.merge_btn) |
||||
|
|
||||
|
# Status label |
||||
|
self.status_label = QLabel("") |
||||
|
self.status_label.setWordWrap(True) |
||||
|
self.layout.addWidget(self.status_label) |
||||
|
|
||||
|
self.input_files = [] |
||||
|
self.merge_thread = MergeThread([], "") # Dummy thread for parsing in select_files |
||||
|
|
||||
|
def check_ffmpeg(self): |
||||
|
try: |
||||
|
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
||||
|
return True |
||||
|
except (subprocess.CalledProcessError, FileNotFoundError): |
||||
|
return False |
||||
|
|
||||
|
def select_files(self): |
||||
|
files, _ = QFileDialog.getOpenFileNames(self, "Select MP3 Files", "", "MP3 Files (*.mp3)") |
||||
|
if files: |
||||
|
self.input_files = files |
||||
|
self.file_list.clear() |
||||
|
for file in files: |
||||
|
self.file_list.addItem(os.path.basename(file)) |
||||
|
# Update timeline with preliminary file info |
||||
|
try: |
||||
|
file_info = [] |
||||
|
earliest_time = None |
||||
|
for file_path in files: |
||||
|
filename = os.path.basename(file_path) |
||||
|
timestamp = self.merge_thread.parse_timestamp(filename) |
||||
|
duration = self.merge_thread.get_audio_duration(file_path) |
||||
|
if earliest_time is None: |
||||
|
earliest_time = timestamp |
||||
|
file_info.append({ |
||||
|
'filename': filename, |
||||
|
'timestamp': timestamp, |
||||
|
'duration': duration, |
||||
|
'start_offset': (timestamp - earliest_time).total_seconds() |
||||
|
}) |
||||
|
total_duration = max(info['start_offset'] + info['duration'] for info in file_info) |
||||
|
self.timeline_widget = TimelineWidget(file_info, total_duration) |
||||
|
self.timeline_scroll.setWidget(self.timeline_widget) |
||||
|
except Exception as e: |
||||
|
self.status_label.setText(f"Error parsing files: {str(e)}") |
||||
|
|
||||
|
def select_output_dir(self): |
||||
|
directory = QFileDialog.getExistingDirectory(self, "Select Output Directory") |
||||
|
if directory: |
||||
|
self.output_dir_edit.setText(directory) |
||||
|
|
||||
|
def zoom_in(self): |
||||
|
self.timeline_widget.set_zoom(self.timeline_widget.zoom_factor * 1.5) |
||||
|
self.timeline_scroll.horizontalScrollBar().setValue(0) # Reset scroll position |
||||
|
|
||||
|
def zoom_out(self): |
||||
|
self.timeline_widget.set_zoom(self.timeline_widget.zoom_factor / 1.5) |
||||
|
self.timeline_scroll.horizontalScrollBar().setValue(0) # Reset scroll position |
||||
|
|
||||
|
def start_merge(self): |
||||
|
output_file = self.output_file_edit.text() |
||||
|
output_dir = self.output_dir_edit.text() |
||||
|
if not output_file.endswith('.mp3'): |
||||
|
QMessageBox.critical(self, "Error", "Output filename must end with .mp3") |
||||
|
return |
||||
|
if not os.path.isdir(output_dir): |
||||
|
QMessageBox.critical(self, "Error", "Invalid output directory") |
||||
|
return |
||||
|
output_path = os.path.join(output_dir, output_file) |
||||
|
self.status_label.setText("Merging MP3s...") |
||||
|
self.merge_btn.setEnabled(False) |
||||
|
|
||||
|
# Start merge thread |
||||
|
self.merge_thread = MergeThread(self.input_files, output_path) |
||||
|
self.merge_thread.progress.connect(self.progress_bar.setValue) |
||||
|
self.merge_thread.status.connect(self.handle_error) |
||||
|
self.merge_thread.finished.connect(self.handle_finished) |
||||
|
self.merge_thread.start() |
||||
|
|
||||
|
def handle_error(self, message): |
||||
|
self.status_label.setText(message) |
||||
|
QMessageBox.critical(self, "Error", message) |
||||
|
self.merge_btn.setEnabled(True) |
||||
|
self.progress_bar.setValue(0) |
||||
|
|
||||
|
def handle_finished(self, message, file_info, total_duration): |
||||
|
self.status_label.setText(message) |
||||
|
QMessageBox.information(self, "Success", message) |
||||
|
self.merge_btn.setEnabled(True) |
||||
|
self.progress_bar.setValue(0) |
||||
|
# Update timeline with final file info |
||||
|
self.timeline_widget = TimelineWidget(file_info, total_duration) |
||||
|
self.timeline_scroll.setWidget(self.timeline_widget) |
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
app = QApplication(sys.argv) |
||||
|
window = MP3MergerWindow() |
||||
|
window.show() |
||||
|
sys.exit(app.exec()) |
||||
@ -0,0 +1,2 @@ |
|||||
|
pydub==0.25.1 |
||||
|
PyQt6==6.7.0 |
||||
Loading…
Reference in new issue