on-demand transcoding with concurrency limit
This commit is contained in:
161
workers.go
161
workers.go
@@ -4,23 +4,35 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
"ytdlp-site/config"
|
||||
"ytdlp-site/ffmpeg"
|
||||
"ytdlp-site/media"
|
||||
"ytdlp-site/originals"
|
||||
"ytdlp-site/transcodes"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
const (
|
||||
maxConcurrent = 2
|
||||
)
|
||||
|
||||
var sem = make(chan struct{}, maxConcurrent)
|
||||
|
||||
func ensureDirFor(path string) error {
|
||||
dir := filepath.Dir(path)
|
||||
log.Debugln("Create", dir)
|
||||
return os.MkdirAll(dir, 0700)
|
||||
}
|
||||
|
||||
func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) {
|
||||
func videoToVideo(sem chan struct{}, transID uint, srcFilepath string) {
|
||||
sem <- struct{}{} // Acquire semaphore
|
||||
defer func() { <-sem }() // release semaphore
|
||||
|
||||
var trans transcodes.Transcode
|
||||
db.First(&trans, "id = ?", transID)
|
||||
originals.SetStatus(trans.OriginalID, originals.StatusTranscoding)
|
||||
|
||||
// determine destination path
|
||||
dstFilename := uuid.Must(uuid.NewV7()).String()
|
||||
@@ -30,29 +42,28 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) {
|
||||
err := ensureDirFor(dstFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: couldn't create dir for ", dstFilepath, err)
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
// FIXME: ignoring any requested audio bitrate
|
||||
|
||||
// determine audio bitrate
|
||||
var audioBitrate uint = 160
|
||||
if height <= 144 {
|
||||
if trans.Height <= 144 {
|
||||
audioBitrate = 64
|
||||
} else if height <= 480 {
|
||||
} else if trans.Height <= 480 {
|
||||
audioBitrate = 96
|
||||
} else if height < 720 {
|
||||
} else if trans.Height < 720 {
|
||||
audioBitrate = 128
|
||||
}
|
||||
|
||||
// start ffmpeg
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "running")
|
||||
var vf string
|
||||
if fps > 0 {
|
||||
vf = fmt.Sprintf("scale=-2:%d,fps=%f", height, fps)
|
||||
if trans.FPS > 0 {
|
||||
vf = fmt.Sprintf("scale=-2:%d,fps=%f", trans.Height, trans.FPS)
|
||||
} else {
|
||||
vf = fmt.Sprintf("scale=-2:%d", height)
|
||||
vf = fmt.Sprintf("scale=-2:%d", trans.Height)
|
||||
}
|
||||
stdout, stderr, err := ffmpeg.Ffmpeg("-i", srcFilepath,
|
||||
"-vf", vf, "-c:v", "libx264",
|
||||
@@ -60,13 +71,11 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) {
|
||||
dstFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: convert to video file", srcFilepath, "->", dstFilepath, string(stdout), string(stderr))
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
// look up original
|
||||
var trans Transcode
|
||||
db.First(&trans, transID)
|
||||
var orig originals.Original
|
||||
db.First(&orig, "id = ?", trans.OriginalID)
|
||||
|
||||
@@ -94,9 +103,16 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) {
|
||||
|
||||
// complete transcode
|
||||
db.Delete(&trans)
|
||||
originals.SetStatusTranscodingOrCompleted(trans.OriginalID)
|
||||
}
|
||||
|
||||
func videoToAudio(transID uint, kbps uint, videoFilepath string) {
|
||||
func videoToAudio(sem chan struct{}, transID uint, videoFilepath string) {
|
||||
sem <- struct{}{} // Acquire semaphore
|
||||
defer func() { <-sem }() // release semaphore
|
||||
|
||||
var trans transcodes.Transcode
|
||||
db.First(&trans, "id = ?", transID)
|
||||
originals.SetStatus(trans.OriginalID, originals.StatusTranscoding)
|
||||
|
||||
// determine destination path
|
||||
audioFilename := uuid.Must(uuid.NewV7()).String()
|
||||
@@ -107,31 +123,30 @@ func videoToAudio(transID uint, kbps uint, videoFilepath string) {
|
||||
err := ensureDirFor(audioFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: couldn't create dir for ", audioFilepath, err)
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "running")
|
||||
_, _, err = ffmpeg.Ffmpeg("-i", videoFilepath, "-vn", "-acodec",
|
||||
"mp3", "-b:a",
|
||||
fmt.Sprintf("%dk", kbps),
|
||||
fmt.Sprintf("%dk", trans.Kbps),
|
||||
audioFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: convert to audio file", videoFilepath, "->", audioFilepath)
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
// look up original
|
||||
var trans Transcode
|
||||
db.First(&trans, "id = ?", transID)
|
||||
|
||||
var orig originals.Original
|
||||
db.First(&orig, "id = ?", trans.OriginalID)
|
||||
|
||||
// create audio record
|
||||
audio := media.Audio{OriginalID: orig.ID,
|
||||
Filename: audioFilename,
|
||||
Bps: kbps * 1000,
|
||||
Bps: trans.Kbps * 1000,
|
||||
Source: "transcode",
|
||||
}
|
||||
|
||||
@@ -148,9 +163,17 @@ func videoToAudio(transID uint, kbps uint, videoFilepath string) {
|
||||
|
||||
// complete transcode
|
||||
db.Delete(&trans)
|
||||
originals.SetStatusTranscodingOrCompleted(trans.OriginalID)
|
||||
}
|
||||
|
||||
func audioToAudio(transID uint, kbps uint, srcFilepath string) {
|
||||
func audioToAudio(sem chan struct{}, transID uint, srcFilepath string) {
|
||||
sem <- struct{}{} // Acquire semaphore
|
||||
defer func() { <-sem }() // release semaphore
|
||||
|
||||
var trans transcodes.Transcode
|
||||
db.First(&trans, "id = ?", transID)
|
||||
|
||||
originals.SetStatus(trans.OriginalID, originals.StatusTranscoding)
|
||||
|
||||
// determine destination path
|
||||
dstFilename := uuid.Must(uuid.NewV7()).String()
|
||||
@@ -161,24 +184,22 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) {
|
||||
err := ensureDirFor(dstFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: couldn't create dir for ", dstFilepath, err)
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "running")
|
||||
_, _, err = ffmpeg.Ffmpeg("-i", srcFilepath, "-vn", "-acodec",
|
||||
"mp3", "-b:a",
|
||||
fmt.Sprintf("%dk", kbps),
|
||||
fmt.Sprintf("%dk", trans.Kbps),
|
||||
dstFilepath)
|
||||
if err != nil {
|
||||
fmt.Println("Error: convert to audio file", srcFilepath, "->", dstFilepath)
|
||||
db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed")
|
||||
return
|
||||
}
|
||||
|
||||
// look up original
|
||||
var trans Transcode
|
||||
db.First(&trans, "id = ?", transID)
|
||||
var orig originals.Original
|
||||
db.First(&orig, "id = ?", trans.OriginalID)
|
||||
|
||||
@@ -186,7 +207,7 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) {
|
||||
audio := media.Audio{
|
||||
OriginalID: orig.ID,
|
||||
Filename: dstFilename,
|
||||
Bps: kbps * 1000,
|
||||
Bps: trans.Kbps * 1000,
|
||||
Source: "transcode",
|
||||
}
|
||||
|
||||
@@ -203,45 +224,44 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) {
|
||||
|
||||
// complete transcode
|
||||
db.Delete(&trans)
|
||||
originals.SetStatusTranscodingOrCompleted(trans.OriginalID)
|
||||
}
|
||||
|
||||
func transcodePending() {
|
||||
log.Traceln("transcodePending...")
|
||||
func cleanupTranscodes() {
|
||||
log.Traceln("cleanupTranscode")
|
||||
|
||||
// any running jobs here got stuck or dead in the midde, so reset them
|
||||
db.Model(&Transcode{}).Where("status = ?", "running").Update("status", "pending")
|
||||
db.Model(&transcodes.Transcode{}).Where("status = ?", "running").Update("status", "pending")
|
||||
|
||||
// loop until no more pending jobs
|
||||
// find any originals with a transcode job -> transcoding
|
||||
var originalsToUpdate []uint
|
||||
db.Model(&originals.Original{}).
|
||||
Select("id").
|
||||
Where("id IN (?)",
|
||||
db.Model(&transcodes.Transcode{}).
|
||||
Select("original_id"),
|
||||
).
|
||||
Find(&originalsToUpdate)
|
||||
db.Model(&originals.Original{}).
|
||||
Where("id IN ?", originalsToUpdate).
|
||||
Update("status", originals.StatusTranscoding)
|
||||
|
||||
// originals marked transcoding that don't have a transcode job -> complete
|
||||
db.Model(&originals.Original{}).
|
||||
Select("id").
|
||||
Where("status = ? AND id NOT IN (?)",
|
||||
originals.StatusTranscoding,
|
||||
db.Model(&transcodes.Transcode{}).
|
||||
Select("original_id"),
|
||||
).
|
||||
Find(&originalsToUpdate)
|
||||
db.Model(&originals.Original{}).
|
||||
Where("id IN ? AND status = ?", originalsToUpdate, originals.StatusTranscoding).
|
||||
Update("status", originals.StatusCompleted)
|
||||
|
||||
// start any existing transcode jobs
|
||||
for {
|
||||
|
||||
var originalsToUpdate []uint
|
||||
|
||||
// find any originals with a transcode job and mark them as transcoding
|
||||
db.Model(&originals.Original{}).
|
||||
Select("id").
|
||||
Where("id IN (?)",
|
||||
db.Model(&Transcode{}).
|
||||
Select("original_id"),
|
||||
).
|
||||
Find(&originalsToUpdate)
|
||||
db.Model(&originals.Original{}).
|
||||
Where("id IN ?", originalsToUpdate).
|
||||
Update("status", originals.StatusTranscoding)
|
||||
|
||||
// originals marked transcoding that don't have a transcode job -> complete
|
||||
db.Model(&originals.Original{}).
|
||||
Select("id").
|
||||
Where("status = ? AND id NOT IN (?)",
|
||||
originals.StatusTranscoding,
|
||||
db.Model(&Transcode{}).
|
||||
Select("original_id"),
|
||||
).
|
||||
Find(&originalsToUpdate)
|
||||
db.Model(&originals.Original{}).
|
||||
Where("id IN ? AND status = ?", originalsToUpdate, originals.StatusTranscoding).
|
||||
Update("status", originals.StatusCompleted)
|
||||
|
||||
var trans Transcode
|
||||
var trans transcodes.Transcode
|
||||
err := db.Where("status = ?", "pending").
|
||||
Order("CASE " +
|
||||
"WHEN dst_kind = 'video' AND height = 480 THEN 0 " +
|
||||
@@ -265,15 +285,14 @@ func transcodePending() {
|
||||
srcFilepath := filepath.Join(config.GetDataDir(), srcVideo.Filename)
|
||||
|
||||
if trans.DstKind == "video" {
|
||||
videoToVideo(trans.ID, trans.Height, trans.FPS, srcFilepath)
|
||||
go videoToVideo(sem, trans.ID, srcFilepath)
|
||||
} else if trans.DstKind == "audio" {
|
||||
videoToAudio(trans.ID, trans.Rate, srcFilepath)
|
||||
go videoToAudio(sem, trans.ID, srcFilepath)
|
||||
} else {
|
||||
fmt.Println("unexpected src/dst kinds for Transcode", trans)
|
||||
db.Delete(&trans)
|
||||
}
|
||||
} else if trans.SrcKind == "audio" {
|
||||
|
||||
var srcAudio media.Audio
|
||||
err = db.First(&srcAudio, "id = ?", trans.SrcID).Error
|
||||
if err != nil {
|
||||
@@ -282,7 +301,7 @@ func transcodePending() {
|
||||
continue
|
||||
}
|
||||
srcFilepath := filepath.Join(config.GetDataDir(), srcAudio.Filename)
|
||||
audioToAudio(trans.ID, trans.Rate, srcFilepath)
|
||||
go audioToAudio(sem, trans.ID, srcFilepath)
|
||||
} else {
|
||||
fmt.Println("unexpected src kind for Transcode", trans)
|
||||
db.Delete(&trans)
|
||||
@@ -290,11 +309,3 @@ func transcodePending() {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func transcodeWorker() {
|
||||
transcodePending()
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
for range ticker.C {
|
||||
transcodePending()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user