From 6fb6b3c03ce4fc29022ccc40138a9193a2561416 Mon Sep 17 00:00:00 2001 From: bytedream Date: Sun, 6 Mar 2022 01:01:25 +0100 Subject: [PATCH] Moved all download related stuff to downloader.go and fixed some errors in it --- downloader.go | 295 ++++++++++++++++++++++++++++++++++++++------------ format.go | 152 +------------------------- 2 files changed, 230 insertions(+), 217 deletions(-) diff --git a/downloader.go b/downloader.go index 828f684..d0bdf7c 100644 --- a/downloader.go +++ b/downloader.go @@ -1,29 +1,53 @@ package crunchyroll import ( + "bytes" "context" "crypto/aes" "crypto/cipher" "fmt" "github.com/grafov/m3u8" + "io" "io/ioutil" "math" "net/http" "os" + "os/exec" "path/filepath" + "regexp" "sync" "sync/atomic" "time" ) +var ffmpegInfoPattern = regexp.MustCompile(`Output #0, (.+),`) + +// NewDownloader creates a downloader with default settings which should +// fit the most needs +func NewDownloader(context context.Context, writer io.Writer, goroutines int, onSegmentDownload func(segment *m3u8.MediaSegment, current, total int, file *os.File) error) Downloader { + tmp, _ := os.MkdirTemp("", "crunchy_") + + return Downloader{ + Writer: writer, + TempDir: tmp, + DeleteTempAfter: true, + Context: context, + Goroutines: goroutines, + OnSegmentDownload: onSegmentDownload, + } +} + +// Downloader is used to download Format's type Downloader struct { - // Filename is the filename of the output file - Filename string - // TempDir is the directory where the temporary files should be stored + // The output is all written to Writer + Writer io.Writer + + // TempDir is the directory where the temporary segment files should be stored. + // The files will be placed directly into the root of the directory. + // If empty a random temporary directory on the system's default tempdir + // will be created. + // If the directory does not exist, it will be created TempDir string - // If IgnoreExisting is true, existing Filename's and TempDir's may be - // overwritten or deleted - IgnoreExisting bool // If DeleteTempAfter is true, the temp directory gets deleted afterwards. // Note that in case of a hard signal exit (os.Interrupt, ...) the directory // will NOT be deleted. In such situations try to catch the signal and @@ -41,7 +65,10 @@ type Downloader struct { // Goroutines is the number of goroutines to download segments with Goroutines int - // A method to call when a segment was downloaded + // A method to call when a segment was downloaded. + // Note that the segments are downloaded asynchronously (depending on the count of + // Goroutines) and the function gets called asynchronously too, so for example it is + // first called on segment 1, then segment 254, then segment 3 and so on OnSegmentDownload func(segment *m3u8.MediaSegment, current, total int, file *os.File) error // If LockOnSegmentDownload is true, only one OnSegmentDownload function can be called at // once. Normally (because of the use of goroutines while downloading) multiple could get @@ -50,51 +77,177 @@ type Downloader struct { // If FFmpegOpts is not nil, ffmpeg will be used to merge and convert files. // The given opts will be used as ffmpeg parameters while merging. - // Some opts are already used, see mergeSegmentsFFmpeg in format.go for more details + // + // If Writer is *os.File and -f (which sets the output format) is not specified, the output + // format will be retrieved by its file ending. If this is not the case and -f is not given, + // the output format will be mpegts / mpeg transport stream. + // Execute 'ffmpeg -muxers' to see all available output formats. FFmpegOpts []string } -// NewDownloader creates a downloader with default settings which should -// fit the most needs -func NewDownloader(context context.Context, filename string, goroutines int, onSegmentDownload func(segment *m3u8.MediaSegment, current, total int, file *os.File) error) Downloader { - tmp, _ := os.MkdirTemp("", "crunchy_") - - return Downloader{ - Filename: filename, - TempDir: tmp, - DeleteTempAfter: true, - Context: context, - Goroutines: goroutines, - OnSegmentDownload: onSegmentDownload, - } -} - -// download downloads every mpeg transport stream segment to a given directory (more information below). -// After every segment download onSegmentDownload will be called with: -// the downloaded segment, the current position, the total size of segments to download, the file where the segment content was written to an error (if occurred). -// The filename is always .ts -// -// Short explanation: -// The actual crunchyroll video is split up in multiple segments (or video files) which have to be downloaded and merged after to generate a single video file. -// And this function just downloads each of this segment into the given directory. -// See https://en.wikipedia.org/wiki/MPEG_transport_stream for more information -func download(context context.Context, format *Format, tempDir string, goroutines int, lockOnSegmentDownload bool, onSegmentDownload func(segment *m3u8.MediaSegment, current, total int, file *os.File) error) error { +// download's the given format +func (d Downloader) download(format *Format) error { if err := format.InitVideo(); err != nil { return err } + if _, err := os.Stat(d.TempDir); os.IsNotExist(err) { + if err := os.Mkdir(d.TempDir, 0700); err != nil { + return err + } + } + if d.DeleteTempAfter { + defer os.RemoveAll(d.TempDir) + } + + files, err := d.downloadSegments(format) + if err != nil { + return err + } + if d.FFmpegOpts == nil { + return d.mergeSegments(files) + } else { + return d.mergeSegmentsFFmpeg(files) + } +} + +// mergeSegments reads every file in tempDir and writes their content to Downloader.Writer. +// The given output file gets created or overwritten if already existing +func (d Downloader) mergeSegments(files []string) error { + for _, file := range files { + select { + case <-d.Context.Done(): + return d.Context.Err() + default: + f, err := os.Open(file) + if err != nil { + return err + } + if _, err = io.Copy(d.Writer, f); err != nil { + f.Close() + return err + } + f.Close() + } + } + return nil +} + +// mergeSegmentsFFmpeg reads every file in tempDir and merges their content to the outputFile +// with ffmpeg (https://ffmpeg.org/). +// The given output file gets created or overwritten if already existing +func (d Downloader) mergeSegmentsFFmpeg(files []string) error { + list, err := os.Create(filepath.Join(d.TempDir, "list.txt")) + if err != nil { + return err + } + + for _, file := range files { + if _, err = fmt.Fprintf(list, "file '%s'\n", file); err != nil { + list.Close() + return err + } + } + list.Close() + + // predefined options ... custom options ... predefined output filename + command := []string{ + "-y", + "-f", "concat", + "-safe", "0", + "-i", list.Name(), + "-c", "copy", + } + if d.FFmpegOpts != nil { + command = append(command, d.FFmpegOpts...) + var found bool + for _, opts := range d.FFmpegOpts { + if opts == "-f" { + found = true + break + } + } + if !found { + if file, ok := d.Writer.(*os.File); ok { + var outBuf bytes.Buffer + infoCmd := exec.Command("ffmpeg", file.Name()) + infoCmd.Stderr = &outBuf + + if infoCmd.Run(); err != nil { + return err + } + if parsed := ffmpegInfoPattern.FindStringSubmatch(outBuf.String()); parsed != nil { + command = append(command, "-f", parsed[1]) + } + } else { + command = append(command, "-f", "mpegts") + } + } + } + command = append(command, "pipe:1") + + var errBuf bytes.Buffer + cmd := exec.Command("ffmpeg", + command...) + cmd.Stderr = &errBuf + // io.Copy may be better but this uses less code so ¯\_(ツ)_/¯ + cmd.Stdout = d.Writer + + if err = cmd.Start(); err != nil { + return err + } + + cmdChan := make(chan error, 1) + go func() { + cmdChan <- cmd.Wait() + }() + + select { + case err = <-cmdChan: + if err != nil { + if errBuf.Len() > 0 { + return fmt.Errorf(errBuf.String()) + } else { + return err + } + } + return nil + case <-d.Context.Done(): + cmd.Process.Kill() + return d.Context.Err() + } +} + +// downloadSegments downloads every mpeg transport stream segment to a given +// directory (more information below). +// After every segment download onSegmentDownload will be called with: +// the downloaded segment, the current position, the total size of segments to download, +// the file where the segment content was written to an error (if occurred). +// The filename is always .ts +// +// Short explanation: +// The actual crunchyroll video is split up in multiple segments (or video files) which +// have to be downloaded and merged after to generate a single video file. +// And this function just downloads each of this segment into the given directory. +// See https://en.wikipedia.org/wiki/MPEG_transport_stream for more information +func (d Downloader) downloadSegments(format *Format) ([]string, error) { + if err := format.InitVideo(); err != nil { + return nil, err + } + var wg sync.WaitGroup var lock sync.Mutex - chunkSize := int(math.Ceil(float64(format.Video.Chunklist.Count()) / float64(goroutines))) + chunkSize := int(math.Ceil(float64(format.Video.Chunklist.Count()) / float64(d.Goroutines))) - // when a onSegmentDownload call returns an error, this channel will be set to true and stop all goroutines - quit := make(chan bool) + // when a onSegmentDownload call returns an error, this context will be set cancelled and stop all goroutines + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // receives the decrypt block and iv from the first segment. // in my tests, only the first segment has specified this data, so the decryption data from this first segments will be used in every other segment too block, iv, err := getCrypt(format, format.Video.Chunklist.Segments[0]) if err != nil { - return err + return nil, err } var total int32 @@ -111,39 +264,42 @@ func download(context context.Context, format *Format, tempDir string, goroutine for j, segment := range format.Video.Chunklist.Segments[i:end] { select { - case <-context.Done(): - return - case <-quit: + case <-d.Context.Done(): + case <-ctx.Done(): return default: var file *os.File - k := 1 - for ; k < 4; k++ { - file, err = downloadSegment(context, format, segment, filepath.Join(tempDir, fmt.Sprintf("%d.ts", i+j)), block, iv) + for k := 0; k < 3; k++ { + filename := filepath.Join(d.TempDir, fmt.Sprintf("%d.ts", i+j)) + file, err = d.downloadSegment(format, segment, filename, block, iv) if err == nil { break } - // sleep if an error occurs. very useful because sometimes the connection times out - time.Sleep(5 * time.Duration(k) * time.Second) + if k == 2 { + cancel() + return + } + select { + case <-d.Context.Done(): + case <-ctx.Done(): + return + case <-time.After(5 * time.Duration(k) * time.Second): + // sleep if an error occurs. very useful because sometimes the connection times out + } } - if k == 4 { - quit <- true - return - } - if onSegmentDownload != nil { - if lockOnSegmentDownload { + if d.OnSegmentDownload != nil { + if d.LockOnSegmentDownload { lock.Lock() } - if err = onSegmentDownload(segment, int(atomic.AddInt32(&total, 1)), int(format.Video.Chunklist.Count()), file); err != nil { - quit <- true - if lockOnSegmentDownload { + if err = d.OnSegmentDownload(segment, int(atomic.AddInt32(&total, 1)), int(format.Video.Chunklist.Count()), file); err != nil { + if d.LockOnSegmentDownload { lock.Unlock() } file.Close() return } - if lockOnSegmentDownload { + if d.LockOnSegmentDownload { lock.Unlock() } } @@ -155,16 +311,21 @@ func download(context context.Context, format *Format, tempDir string, goroutine wg.Wait() select { - case <-context.Done(): - return context.Err() - case <-quit: - return err + case <-d.Context.Done(): + return nil, d.Context.Err() + case <-ctx.Done(): + return nil, err default: - return nil + var files []string + for i := 0; i < int(total); i++ { + files = append(files, filepath.Join(d.TempDir, fmt.Sprintf("%d.ts", i))) + } + + return files, nil } } -// getCrypt extracts the key and iv of a m3u8 segment and converts it into a cipher.Block block and a iv byte sequence +// getCrypt extracts the key and iv of a m3u8 segment and converts it into a cipher.Block and an iv byte sequence func getCrypt(format *Format, segment *m3u8.MediaSegment) (block cipher.Block, iv []byte, err error) { var resp *http.Response @@ -188,9 +349,9 @@ func getCrypt(format *Format, segment *m3u8.MediaSegment) (block cipher.Block, i } // downloadSegment downloads a segment, decrypts it and names it after the given index -func downloadSegment(context context.Context, format *Format, segment *m3u8.MediaSegment, filename string, block cipher.Block, iv []byte) (*os.File, error) { +func (d Downloader) downloadSegment(format *Format, segment *m3u8.MediaSegment, filename string, block cipher.Block, iv []byte) (*os.File, error) { // every segment is aes-128 encrypted and has to be decrypted when downloaded - content, err := decryptSegment(context, format.crunchy.Client, segment, block, iv) + content, err := d.decryptSegment(format.crunchy.Client, segment, block, iv) if err != nil { return nil, err } @@ -208,12 +369,12 @@ func downloadSegment(context context.Context, format *Format, segment *m3u8.Medi } // https://github.com/oopsguy/m3u8/blob/4150e93ec8f4f8718875a02973f5d792648ecb97/tool/crypt.go#L25 -func decryptSegment(context context.Context, client *http.Client, segment *m3u8.MediaSegment, block cipher.Block, iv []byte) ([]byte, error) { +func (d Downloader) decryptSegment(client *http.Client, segment *m3u8.MediaSegment, block cipher.Block, iv []byte) ([]byte, error) { req, err := http.NewRequest(http.MethodGet, segment.URI, nil) if err != nil { return nil, err } - req.WithContext(context) + req.WithContext(d.Context) resp, err := client.Do(req) if err != nil { @@ -229,13 +390,13 @@ func decryptSegment(context context.Context, client *http.Client, segment *m3u8. blockMode := cipher.NewCBCDecrypter(block, iv[:block.BlockSize()]) decrypted := make([]byte, len(raw)) blockMode.CryptBlocks(decrypted, raw) - raw = pkcs5UnPadding(decrypted) + raw = d.pkcs5UnPadding(decrypted) return raw, nil } // https://github.com/oopsguy/m3u8/blob/4150e93ec8f4f8718875a02973f5d792648ecb97/tool/crypt.go#L47 -func pkcs5UnPadding(origData []byte) []byte { +func (d Downloader) pkcs5UnPadding(origData []byte) []byte { length := len(origData) unPadding := int(origData[length-1]) return origData[:(length - unPadding)] diff --git a/format.go b/format.go index 9dfaf3e..ec94c16 100644 --- a/format.go +++ b/format.go @@ -1,18 +1,7 @@ package crunchyroll import ( - "bufio" - "bytes" - "context" - "fmt" "github.com/grafov/m3u8" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "sort" - "strconv" - "strings" ) type FormatType string @@ -56,144 +45,7 @@ func (f *Format) InitVideo() error { return nil } +// Download downloads the Format with the via Downloader specified options func (f *Format) Download(downloader Downloader) error { - if _, err := os.Stat(downloader.Filename); err == nil && !downloader.IgnoreExisting { - return fmt.Errorf("file %s already exists", downloader.Filename) - } - if _, err := os.Stat(downloader.TempDir); err != nil { - if os.IsNotExist(err) { - err = os.Mkdir(downloader.TempDir, 0755) - } - if err != nil { - return err - } - } else if !downloader.IgnoreExisting { - content, err := os.ReadDir(downloader.TempDir) - if err != nil { - return err - } - if len(content) > 0 { - return fmt.Errorf("directory %s is not empty", downloader.Filename) - } - } - - if downloader.DeleteTempAfter { - defer os.RemoveAll(downloader.TempDir) - } - if err := download(downloader.Context, f, downloader.TempDir, downloader.Goroutines, downloader.LockOnSegmentDownload, downloader.OnSegmentDownload); err != nil { - return err - } - - if downloader.FFmpegOpts != nil { - return mergeSegmentsFFmpeg(downloader.Context, downloader.TempDir, downloader.Filename, downloader.FFmpegOpts) - } else { - return mergeSegments(downloader.Context, downloader.TempDir, downloader.Filename) - } -} - -// mergeSegments reads every file in tempDir and writes their content to the outputFile. -// The given output file gets created or overwritten if already existing -func mergeSegments(context context.Context, tempDir string, outputFile string) error { - dir, err := os.ReadDir(tempDir) - if err != nil { - return err - } - file, err := os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return err - } - defer file.Close() - writer := bufio.NewWriter(file) - defer writer.Flush() - - // sort the directory files after their numeric names - sort.Slice(dir, func(i, j int) bool { - iNum, err := strconv.Atoi(strings.Split(dir[i].Name(), ".")[0]) - if err != nil { - return false - } - jNum, err := strconv.Atoi(strings.Split(dir[j].Name(), ".")[0]) - if err != nil { - return false - } - return iNum < jNum - }) - - for _, file := range dir { - select { - case <-context.Done(): - return context.Err() - default: - } - - bodyAsBytes, err := ioutil.ReadFile(filepath.Join(tempDir, file.Name())) - if err != nil { - return err - } - if _, err = writer.Write(bodyAsBytes); err != nil { - return err - } - } - return nil -} - -// mergeSegmentsFFmpeg reads every file in tempDir and merges their content to the outputFile -// with ffmpeg (https://ffmpeg.org/). -// The given output file gets created or overwritten if already existing -func mergeSegmentsFFmpeg(context context.Context, tempDir string, outputFile string, opts []string) error { - dir, err := os.ReadDir(tempDir) - if err != nil { - return err - } - f, err := os.Create(filepath.Join(tempDir, "list.txt")) - if err != nil { - return err - } - // -1 is the list.txt file - for i := 0; i < len(dir)-1; i++ { - fmt.Fprintf(f, "file '%s.ts'\n", filepath.Join(tempDir, strconv.Itoa(i))) - } - f.Close() - - // predefined options ... custom options ... predefined output filename - command := []string{ - "-y", - "-f", "concat", - "-safe", "0", - "-i", f.Name(), - "-c", "copy", - } - if opts != nil { - command = append(command, opts...) - } - command = append(command, outputFile) - - var errBuf bytes.Buffer - cmd := exec.Command("ffmpeg", - command...) - cmd.Stderr = &errBuf - - if err := cmd.Start(); err != nil { - return err - } - - cmdChan := make(chan error, 1) - go func() { - cmdChan <- cmd.Wait() - }() - - select { - case err = <-cmdChan: - if err != nil { - if errBuf.Len() > 0 { - return fmt.Errorf(errBuf.String()) - } else { - return err - } - } - return nil - case <-context.Done(): - cmd.Process.Kill() - return context.Err() - } + return downloader.download(f) }