diff --git a/Dockerfile b/Dockerfile index 2fb88c6..89418c7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,6 +14,7 @@ ADD handlers /src/handlers ADD media /src/media ADD originals /src/originals Add playlists /src/playlists +ADD transcodes /src/transcodes Add ytdlp /src/ytdlp ADD go.mod /src/. diff --git a/handlers.go b/handlers.go index dead94d..c8f6a48 100644 --- a/handlers.go +++ b/handlers.go @@ -22,6 +22,7 @@ import ( "ytdlp-site/media" "ytdlp-site/originals" "ytdlp-site/playlists" + "ytdlp-site/transcodes" "ytdlp-site/ytdlp" ) @@ -461,21 +462,46 @@ func getAudioMeta(path string) (AudioMeta, error) { }, nil } -func addAudioTranscode(mediaId, originalId, bitrate uint, srcKind string) { - t := Transcode{ +func newAudioTranscode(mediaId, originalId, kbps uint, srcKind string) { + t := transcodes.Transcode{ SrcID: mediaId, OriginalID: originalId, SrcKind: srcKind, DstKind: "audio", - Rate: bitrate, + Kbps: kbps, TimeSubmit: time.Now(), Status: "pending", } db.Create(&t) + + if srcKind == "video" { + var srcVideo media.Video + err := db.First(&srcVideo, "id = ?", t.SrcID).Error + if err != nil { + fmt.Println("no such source video for video Transcode", t) + db.Delete(&t) + return + } + srcFilepath := filepath.Join(config.GetDataDir(), srcVideo.Filename) + go videoToAudio(sem, t.ID, srcFilepath) + } else if srcKind == "audio" { + var srcAudio media.Audio + err := db.First(&srcAudio, "id = ?", t.SrcID).Error + if err != nil { + log.Errorln("no such source audio for audio Transcode", t) + db.Delete(&t) + return + } + srcFilepath := filepath.Join(config.GetDataDir(), srcAudio.Filename) + go audioToAudio(sem, t.ID, srcFilepath) + } else { + fmt.Println("unexpected src/dst kinds for Transcode", t) + db.Delete(&t) + } } -func addVideoTranscode(videoId, originalId, targetHeight uint, targetFPS float64) { - t := Transcode{ +func newVideoTranscode(videoId, originalId, targetHeight uint, targetFPS float64) { + t := transcodes.Transcode{ SrcID: videoId, OriginalID: originalId, SrcKind: "video", @@ -486,6 +512,17 @@ func addVideoTranscode(videoId, originalId, targetHeight uint, targetFPS float64 Status: "pending", } db.Create(&t) + + var srcVideo media.Video + err := db.First(&srcVideo, "id = ?", t.SrcID).Error + if err != nil { + fmt.Println("no such source video for video Transcode", t) + db.Delete(&t) + return + } + srcFilepath := filepath.Join(config.GetDataDir(), srcVideo.Filename) + + go videoToVideo(sem, t.ID, srcFilepath) } func processOriginal(originalID uint) { @@ -514,14 +551,14 @@ func processOriginal(originalID uint) { } // create audio transcodes - for _, bitrate := range []uint{64 /*, 96, 128, 160, 192*/} { - addAudioTranscode(video.ID, originalID, bitrate, "video") + for _, kbps := range []uint{64 /*, 96, 128, 160, 192*/} { + newAudioTranscode(video.ID, originalID, kbps, "video") } // create video transcodes for _, targetHeight := range []uint{480, 240, 144} { if targetHeight <= video.Height { - addVideoTranscode(video.ID, originalID, targetHeight, video.FPS) + newVideoTranscode(video.ID, originalID, targetHeight, video.FPS) break } } @@ -536,8 +573,8 @@ func processOriginal(originalID uint) { } // create audio transcodes - for _, bitrate := range []uint{64 /*, 96, 128, 160, 192*/} { - addAudioTranscode(audio.ID, originalID, bitrate, "audio") + for _, kbps := range []uint{64 /*, 96, 128, 160, 192*/} { + newAudioTranscode(audio.ID, originalID, kbps, "audio") } } else { @@ -550,7 +587,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { log.Debugf("startDownload audioOnly=%t", audioOnly) // metadata phase - originals.SetStatus(db, originalID, originals.StatusMetadata) + originals.SetStatus(originalID, originals.StatusMetadata) var origMeta Meta var err error if audioOnly { @@ -560,7 +597,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { } if err != nil { log.Errorln("couldn't retrieve metadata:", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } log.Debugf("original metadata %v", origMeta) @@ -570,19 +607,19 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { }).Error if err != nil { log.Errorln("couldn't store metadata:", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } // download original - originals.SetStatus(db, originalID, originals.StatusDownloading) + originals.SetStatus(originalID, originals.StatusDownloading) // create temporary directory // do this in the data directory since /tmp is sometimes a different filesystem tempDir, err := os.MkdirTemp(config.GetDataDir(), "dl") if err != nil { log.Errorln("Error creating temporary directory:", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } defer os.RemoveAll(tempDir) @@ -603,7 +640,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { err = cmd.Run() if err != nil { log.Errorln("yt-dlp failed") - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } @@ -611,7 +648,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { dirEnts, err := os.ReadDir(tempDir) if err != nil { log.Errorln("Error reading directory:", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } dlFilename := "" @@ -624,7 +661,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { } if dlFilename == "" { log.Errorln("couldn't find a downloaded file") - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) } // move to data directory @@ -634,7 +671,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { err = os.Rename(srcPath, dlFilepath) if err != nil { log.Errorln("rename downloaded media error", srcPath, "->", dlFilepath, ":", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } @@ -642,7 +679,7 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { mediaMeta, err := getAudioMeta(dlFilepath) if err != nil { log.Errorln("couldn't get audio file metadata", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } @@ -658,14 +695,14 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { fmt.Println("create Audio", audio) if db.Create(&audio).Error != nil { fmt.Println("Couldn't create audio entry", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } } else { mediaMeta, err := getVideoMeta(dlFilepath) if err != nil { log.Errorln("couldn't get video file metadata", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } @@ -684,12 +721,12 @@ func startDownload(originalID uint, videoURL string, audioOnly bool) { log.Debugln("create Video", video) if db.Create(&video).Error != nil { log.Errorln("Couldn't create video entry", err) - originals.SetStatus(db, originalID, originals.StatusFailed) + originals.SetStatus(originalID, originals.StatusFailed) return } } - originals.SetStatus(db, originalID, originals.StatusDownloadCompleted) + originals.SetStatus(originalID, originals.StatusDownloadCompleted) processOriginal(originalID) } @@ -697,14 +734,14 @@ func startPlaylist(id uint, url string, audioOnly bool) { // retrieve playlist metadata pl, err := getYtdlpPlaylist(url) if err != nil { - playlists.SetStatus(db, id, playlists.StatusFailed) + playlists.SetStatus(id, playlists.StatusFailed) return } err = db.Model(&playlists.Playlist{}).Where("id = ?", id).Updates(map[string]interface{}{ "title": pl.Title, }).Error if err != nil { - playlists.SetStatus(db, id, playlists.StatusFailed) + playlists.SetStatus(id, playlists.StatusFailed) return } @@ -720,11 +757,11 @@ func startPlaylist(id uint, url string, audioOnly bool) { } err = db.Create(&original).Error if err != nil { - playlists.SetStatus(db, id, playlists.StatusFailed) + playlists.SetStatus(id, playlists.StatusFailed) return } } - playlists.SetStatus(db, id, playlists.StatusCompleted) + playlists.SetStatus(id, playlists.StatusCompleted) } func videosHandler(c echo.Context) error { @@ -898,7 +935,7 @@ func videoRestartHandler(c echo.Context) error { func deleteTranscodes(originalID uint) { log.Debugln("Delete Transcode entries for Original", originalID) - db.Delete(&Transcode{}, "original_id = ?", originalID) + db.Delete(&transcodes.Transcode{}, "original_id = ?", originalID) } func deleteTranscodedVideos(originalID uint) { @@ -1045,7 +1082,7 @@ func transcodeToVideoHandler(c echo.Context) error { if err == gorm.ErrRecordNotFound { log.Errorf("no video record for original %d: %v", originalId, err) } else { - addVideoTranscode(video.ID, uint(originalId), uint(height), fps) + newVideoTranscode(video.ID, uint(originalId), uint(height), fps) } return c.Redirect(http.StatusSeeOther, referrer) @@ -1074,9 +1111,9 @@ func transcodeToAudioHandler(c echo.Context) error { } if hasOriginalVideo { - addAudioTranscode(video.ID, uint(originalId), uint(kbps), "video") + newAudioTranscode(video.ID, uint(originalId), uint(kbps), "video") } else if hasOriginalAudio { - addAudioTranscode(audio.ID, uint(originalId), uint(kbps), "audio") + newAudioTranscode(audio.ID, uint(originalId), uint(kbps), "audio") } else { log.Errorln("no audio or video record for original", originalId) } @@ -1102,7 +1139,7 @@ func processHandler(c echo.Context) error { deleteAudiosWithSource(uint(id), "transcode") deleteTranscodedVideos(uint(id)) - err := originals.SetStatus(db, uint(id), originals.StatusDownloadCompleted) + err := originals.SetStatus(uint(id), originals.StatusDownloadCompleted) if err != nil { log.Errorf("error while setting original %d status: %v", id, err) } diff --git a/main.go b/main.go index e057a42..6348af1 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "ytdlp-site/media" "ytdlp-site/originals" "ytdlp-site/playlists" + "ytdlp-site/transcodes" "ytdlp-site/ytdlp" ) @@ -57,6 +58,8 @@ func main() { ffmpeg.Init(log) handlers.Init(log) ytdlp.Init(log) + originals.Init(log) + defer originals.Fini() gormLogger := logger.New( golog.New(os.Stdout, "\r\n", golog.LstdFlags), // io writer @@ -93,7 +96,7 @@ func main() { // Migrate the schema db.AutoMigrate(&originals.Original{}, &playlists.Playlist{}, &media.Video{}, - &media.Audio{}, &User{}, &TempURL{}, &Transcode{}) + &media.Audio{}, &User{}, &TempURL{}, &transcodes.Transcode{}) database.Init(db, log) defer database.Fini() @@ -168,8 +171,9 @@ func main() { Secure: secure, } - // start the transcode worker - go transcodeWorker() + // tidy up the transcodes database + log.Debug("tidy transcodes database...") + cleanupTranscodes() // Start server e.Logger.Fatal(e.Start(":8080")) diff --git a/models.go b/models.go index d8a6104..9c40fbc 100644 --- a/models.go +++ b/models.go @@ -12,25 +12,6 @@ import ( "gorm.io/gorm" ) -type Transcode struct { - gorm.Model - Status string // "pending", "running", "failed" - SrcID uint // Video.ID or Audio.ID of the source file - OriginalID uint // Original.ID - SrcKind string // "video", "audio" - DstKind string // "video", "audio" - TimeSubmit time.Time - TimeStart time.Time - - // video fields - Height uint // target height - Width uint // target width - FPS float64 // target FPS - - // audio & video fields - Rate uint -} - type User struct { gorm.Model Username string `gorm:"unique"` diff --git a/originals/init.go b/originals/init.go new file mode 100644 index 0000000..ebd7e36 --- /dev/null +++ b/originals/init.go @@ -0,0 +1,14 @@ +package originals + +import "github.com/sirupsen/logrus" + +var log *logrus.Logger + +func Init(logger *logrus.Logger) error { + log = logger.WithFields(logrus.Fields{ + "component": "originals", + }).Logger + return nil +} + +func Fini() {} diff --git a/originals/originals.go b/originals/originals.go index b63a14f..96afc2a 100644 --- a/originals/originals.go +++ b/originals/originals.go @@ -1,6 +1,11 @@ package originals -import "gorm.io/gorm" +import ( + "ytdlp-site/database" + "ytdlp-site/transcodes" + + "gorm.io/gorm" +) type Status string @@ -29,6 +34,29 @@ type Original struct { PlaylistID uint // Playlist.ID (if part of a playlist) } -func SetStatus(db *gorm.DB, id uint, status Status) error { +func SetStatus(id uint, status Status) error { + db := database.Get() + log.Debugln("original", id, "status -> ", status) return db.Model(&Original{}).Where("id = ?", id).Update("status", status).Error } + +// if there is an active transcode for this original, +// set the status to transcode. otherwise ,to completed +func SetStatusTranscodingOrCompleted(id uint) error { + db := database.Get() + + var count int64 + err := db.Model(&transcodes.Transcode{}).Where("original_id = ?", id).Count(&count).Error + if err != nil { + return err + } + + if count > 0 { + log.Debugln("found transcodes for original", id) + return SetStatus(id, StatusTranscoding) + } else { + log.Debugln("no transcodes for original", id) + return SetStatus(id, StatusCompleted) + } + +} diff --git a/playlists/model.go b/playlists/model.go index fa42363..0e4c654 100644 --- a/playlists/model.go +++ b/playlists/model.go @@ -1,6 +1,10 @@ package playlists -import "gorm.io/gorm" +import ( + "ytdlp-site/database" + + "gorm.io/gorm" +) type Status string @@ -21,6 +25,7 @@ const ( StatusFailed Status = "failed" ) -func SetStatus(db *gorm.DB, id uint, status Status) error { +func SetStatus(id uint, status Status) error { + db := database.Get() return db.Model(&Playlist{}).Where("id = ?", id).Update("status", status).Error } diff --git a/transcodes/transcode.go b/transcodes/transcode.go new file mode 100644 index 0000000..c429629 --- /dev/null +++ b/transcodes/transcode.go @@ -0,0 +1,26 @@ +package transcodes + +import ( + "time" + + "gorm.io/gorm" +) + +type Transcode struct { + gorm.Model + Status string // "pending", "running", "failed" + SrcID uint // Video.ID or Audio.ID of the source file + OriginalID uint // Original.ID + SrcKind string // "video", "audio" + DstKind string // "video", "audio" + TimeSubmit time.Time + TimeStart time.Time + + // video fields + Height uint // target height + Width uint // target width + FPS float64 // target FPS + + // audio & video fields + Kbps uint +} diff --git a/workers.go b/workers.go index e1a98f2..6b94320 100644 --- a/workers.go +++ b/workers.go @@ -4,23 +4,35 @@ import ( "fmt" "os" "path/filepath" - "time" "ytdlp-site/config" "ytdlp-site/ffmpeg" "ytdlp-site/media" "ytdlp-site/originals" + "ytdlp-site/transcodes" "github.com/google/uuid" "gorm.io/gorm" ) +const ( + maxConcurrent = 2 +) + +var sem = make(chan struct{}, maxConcurrent) + func ensureDirFor(path string) error { dir := filepath.Dir(path) log.Debugln("Create", dir) return os.MkdirAll(dir, 0700) } -func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) { +func videoToVideo(sem chan struct{}, transID uint, srcFilepath string) { + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // release semaphore + + var trans transcodes.Transcode + db.First(&trans, "id = ?", transID) + originals.SetStatus(trans.OriginalID, originals.StatusTranscoding) // determine destination path dstFilename := uuid.Must(uuid.NewV7()).String() @@ -30,29 +42,28 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) { err := ensureDirFor(dstFilepath) if err != nil { fmt.Println("Error: couldn't create dir for ", dstFilepath, err) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "failed") return } // FIXME: ignoring any requested audio bitrate - // determine audio bitrate var audioBitrate uint = 160 - if height <= 144 { + if trans.Height <= 144 { audioBitrate = 64 - } else if height <= 480 { + } else if trans.Height <= 480 { audioBitrate = 96 - } else if height < 720 { + } else if trans.Height < 720 { audioBitrate = 128 } // start ffmpeg - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running") + db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "running") var vf string - if fps > 0 { - vf = fmt.Sprintf("scale=-2:%d,fps=%f", height, fps) + if trans.FPS > 0 { + vf = fmt.Sprintf("scale=-2:%d,fps=%f", trans.Height, trans.FPS) } else { - vf = fmt.Sprintf("scale=-2:%d", height) + vf = fmt.Sprintf("scale=-2:%d", trans.Height) } stdout, stderr, err := ffmpeg.Ffmpeg("-i", srcFilepath, "-vf", vf, "-c:v", "libx264", @@ -60,13 +71,11 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) { dstFilepath) if err != nil { fmt.Println("Error: convert to video file", srcFilepath, "->", dstFilepath, string(stdout), string(stderr)) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", trans.ID).Update("status", "failed") return } // look up original - var trans Transcode - db.First(&trans, transID) var orig originals.Original db.First(&orig, "id = ?", trans.OriginalID) @@ -94,9 +103,16 @@ func videoToVideo(transID uint, height uint, fps float64, srcFilepath string) { // complete transcode db.Delete(&trans) + originals.SetStatusTranscodingOrCompleted(trans.OriginalID) } -func videoToAudio(transID uint, kbps uint, videoFilepath string) { +func videoToAudio(sem chan struct{}, transID uint, videoFilepath string) { + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // release semaphore + + var trans transcodes.Transcode + db.First(&trans, "id = ?", transID) + originals.SetStatus(trans.OriginalID, originals.StatusTranscoding) // determine destination path audioFilename := uuid.Must(uuid.NewV7()).String() @@ -107,31 +123,30 @@ func videoToAudio(transID uint, kbps uint, videoFilepath string) { err := ensureDirFor(audioFilepath) if err != nil { fmt.Println("Error: couldn't create dir for ", audioFilepath, err) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed") return } - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "running") _, _, err = ffmpeg.Ffmpeg("-i", videoFilepath, "-vn", "-acodec", "mp3", "-b:a", - fmt.Sprintf("%dk", kbps), + fmt.Sprintf("%dk", trans.Kbps), audioFilepath) if err != nil { fmt.Println("Error: convert to audio file", videoFilepath, "->", audioFilepath) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed") return } // look up original - var trans Transcode - db.First(&trans, "id = ?", transID) + var orig originals.Original db.First(&orig, "id = ?", trans.OriginalID) // create audio record audio := media.Audio{OriginalID: orig.ID, Filename: audioFilename, - Bps: kbps * 1000, + Bps: trans.Kbps * 1000, Source: "transcode", } @@ -148,9 +163,17 @@ func videoToAudio(transID uint, kbps uint, videoFilepath string) { // complete transcode db.Delete(&trans) + originals.SetStatusTranscodingOrCompleted(trans.OriginalID) } -func audioToAudio(transID uint, kbps uint, srcFilepath string) { +func audioToAudio(sem chan struct{}, transID uint, srcFilepath string) { + sem <- struct{}{} // Acquire semaphore + defer func() { <-sem }() // release semaphore + + var trans transcodes.Transcode + db.First(&trans, "id = ?", transID) + + originals.SetStatus(trans.OriginalID, originals.StatusTranscoding) // determine destination path dstFilename := uuid.Must(uuid.NewV7()).String() @@ -161,24 +184,22 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) { err := ensureDirFor(dstFilepath) if err != nil { fmt.Println("Error: couldn't create dir for ", dstFilepath, err) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed") return } - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "running") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "running") _, _, err = ffmpeg.Ffmpeg("-i", srcFilepath, "-vn", "-acodec", "mp3", "-b:a", - fmt.Sprintf("%dk", kbps), + fmt.Sprintf("%dk", trans.Kbps), dstFilepath) if err != nil { fmt.Println("Error: convert to audio file", srcFilepath, "->", dstFilepath) - db.Model(&Transcode{}).Where("id = ?", transID).Update("status", "failed") + db.Model(&transcodes.Transcode{}).Where("id = ?", transID).Update("status", "failed") return } // look up original - var trans Transcode - db.First(&trans, "id = ?", transID) var orig originals.Original db.First(&orig, "id = ?", trans.OriginalID) @@ -186,7 +207,7 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) { audio := media.Audio{ OriginalID: orig.ID, Filename: dstFilename, - Bps: kbps * 1000, + Bps: trans.Kbps * 1000, Source: "transcode", } @@ -203,45 +224,44 @@ func audioToAudio(transID uint, kbps uint, srcFilepath string) { // complete transcode db.Delete(&trans) + originals.SetStatusTranscodingOrCompleted(trans.OriginalID) } -func transcodePending() { - log.Traceln("transcodePending...") +func cleanupTranscodes() { + log.Traceln("cleanupTranscode") // any running jobs here got stuck or dead in the midde, so reset them - db.Model(&Transcode{}).Where("status = ?", "running").Update("status", "pending") + db.Model(&transcodes.Transcode{}).Where("status = ?", "running").Update("status", "pending") - // loop until no more pending jobs + // find any originals with a transcode job -> transcoding + var originalsToUpdate []uint + db.Model(&originals.Original{}). + Select("id"). + Where("id IN (?)", + db.Model(&transcodes.Transcode{}). + Select("original_id"), + ). + Find(&originalsToUpdate) + db.Model(&originals.Original{}). + Where("id IN ?", originalsToUpdate). + Update("status", originals.StatusTranscoding) + + // originals marked transcoding that don't have a transcode job -> complete + db.Model(&originals.Original{}). + Select("id"). + Where("status = ? AND id NOT IN (?)", + originals.StatusTranscoding, + db.Model(&transcodes.Transcode{}). + Select("original_id"), + ). + Find(&originalsToUpdate) + db.Model(&originals.Original{}). + Where("id IN ? AND status = ?", originalsToUpdate, originals.StatusTranscoding). + Update("status", originals.StatusCompleted) + + // start any existing transcode jobs for { - - var originalsToUpdate []uint - - // find any originals with a transcode job and mark them as transcoding - db.Model(&originals.Original{}). - Select("id"). - Where("id IN (?)", - db.Model(&Transcode{}). - Select("original_id"), - ). - Find(&originalsToUpdate) - db.Model(&originals.Original{}). - Where("id IN ?", originalsToUpdate). - Update("status", originals.StatusTranscoding) - - // originals marked transcoding that don't have a transcode job -> complete - db.Model(&originals.Original{}). - Select("id"). - Where("status = ? AND id NOT IN (?)", - originals.StatusTranscoding, - db.Model(&Transcode{}). - Select("original_id"), - ). - Find(&originalsToUpdate) - db.Model(&originals.Original{}). - Where("id IN ? AND status = ?", originalsToUpdate, originals.StatusTranscoding). - Update("status", originals.StatusCompleted) - - var trans Transcode + var trans transcodes.Transcode err := db.Where("status = ?", "pending"). Order("CASE " + "WHEN dst_kind = 'video' AND height = 480 THEN 0 " + @@ -265,15 +285,14 @@ func transcodePending() { srcFilepath := filepath.Join(config.GetDataDir(), srcVideo.Filename) if trans.DstKind == "video" { - videoToVideo(trans.ID, trans.Height, trans.FPS, srcFilepath) + go videoToVideo(sem, trans.ID, srcFilepath) } else if trans.DstKind == "audio" { - videoToAudio(trans.ID, trans.Rate, srcFilepath) + go videoToAudio(sem, trans.ID, srcFilepath) } else { fmt.Println("unexpected src/dst kinds for Transcode", trans) db.Delete(&trans) } } else if trans.SrcKind == "audio" { - var srcAudio media.Audio err = db.First(&srcAudio, "id = ?", trans.SrcID).Error if err != nil { @@ -282,7 +301,7 @@ func transcodePending() { continue } srcFilepath := filepath.Join(config.GetDataDir(), srcAudio.Filename) - audioToAudio(trans.ID, trans.Rate, srcFilepath) + go audioToAudio(sem, trans.ID, srcFilepath) } else { fmt.Println("unexpected src kind for Transcode", trans) db.Delete(&trans) @@ -290,11 +309,3 @@ func transcodePending() { } } - -func transcodeWorker() { - transcodePending() - ticker := time.NewTicker(10 * time.Second) - for range ticker.C { - transcodePending() - } -}