Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 149 additions & 123 deletions cps/tasks/metadata_extract.py
Original file line number Diff line number Diff line change
@@ -1,205 +1,231 @@
import os
import re
import requests
import sqlite3
from datetime import datetime
from flask_babel import lazy_gettext as N_, gettext as _

from cps.constants import XKLB_DB_FILE, MAX_VIDEOS_PER_DOWNLOAD
from cps.services.worker import WorkerThread
from cps.tasks.download import TaskDownload
from cps.services.worker import CalibreTask, STAT_FINISH_SUCCESS, STAT_FAIL, STAT_STARTED, STAT_WAITING
from cps.subproc_wrapper import process_open
from cps.services.xb_utils import (
DatabaseService, format_media_url, format_original_url,
Settings, execute_subprocess
)
from cps.xb import XKLBDB, Media
from sqlalchemy.orm import Session
from .. import logger

log = logger.create()

class TaskMetadataExtract(CalibreTask):
"""Task class for metadata extraction."""

def __init__(self, task_message, media_url, original_url, current_user_name):
super(TaskMetadataExtract, self).__init__(task_message)
db = XKLBDB()
self.session = db.get_session()
self.db_service = DatabaseService(self.session)
self.message = task_message
self.media_url = self._format_media_url(media_url)
self.media_url = format_media_url(media_url)
self.media_url_link = f'<a href="{self.media_url}" target="_blank">{self.media_url}</a>'
self.original_url = self._format_original_url(original_url)
self.original_url = format_original_url(original_url)
self.is_playlist = None
self.current_user_name = current_user_name
self.start_time = self.end_time = datetime.now()
self.stat = STAT_WAITING
self.progress = 0
self.columns = None
self.shelf_title = None
self.shelf_id = None
self.unavailable = []

def _format_media_url(self, media_url):
return media_url.split("&")[0] if "&" in media_url else media_url

def _format_original_url(self, original_url):
# (?=...) is a "lookahead assertion" https://docs.python.org/3/library/re.html#regular-expression-syntax
return re.sub(r"/media(?=\?|$)", r"/meta", original_url)

def _execute_subprocess(self, subprocess_args):
"""Executes a subprocess and handles its output."""
log.debug("Executing subprocess with args: %s", subprocess_args)
try:
p = process_open(subprocess_args, newlines=True)
p = execute_subprocess(subprocess_args)
while p.poll() is None:
self.end_time = datetime.now()
line = p.stdout.readline()
if "[download] Downloading playlist:" in line:
self.is_playlist = True
self.shelf_title = line.split("Downloading playlist: ")[1].strip()
log.info("Detected playlist: %s", self.shelf_title)
break
p.wait()
self.message = self.media_url_link + "..."
return p
except Exception as e:
log.error("An error occurred during subprocess execution: %s", e)
self.message = f"{self.media_url_link} failed: {e}"
return None

def _remove_shorts_from_db(self, conn):
conn.execute("DELETE FROM media WHERE path LIKE '%shorts%'")
conn.commit()

def _fetch_requested_urls(self, conn):
try:
cursor = conn.execute("PRAGMA table_info(media)")
self.columns = [column[1] for column in cursor.fetchall()]
if "live_status" not in self.columns:
conn.execute("ALTER TABLE media ADD COLUMN live_status TEXT")
if "error" not in self.columns:
conn.execute("ALTER TABLE media ADD COLUMN error TEXT")
query = "SELECT path, duration, live_status FROM media WHERE path LIKE 'http%' AND (error IS NULL OR error = '')"
rows = conn.execute(query).fetchall()
requested_urls = {}
for path, duration, live_status in rows:
if duration is not None and duration > 0:
requested_urls[path] = {"duration": duration, "live_status": live_status}
else:
self.unavailable.append(path)
return requested_urls
except sqlite3.Error as db_error:
log.error("An error occurred while trying to connect to the database: %s", db_error)
self.message = f"{self.media_url_link} failed: An error occurred ({db_error}) while trying to connect to the database."
return {}

def _send_shelf_title(self):
"""Sends the shelf title to the original URL."""
log.debug("Sending shelf title to original URL.")
try:
response = requests.get(self.original_url, params={"current_user_name": self.current_user_name, "shelf_title": self.shelf_title})
response = requests.get(self.original_url, params={
"current_user_name": self.current_user_name,
"shelf_title": self.shelf_title
})
if response.status_code == 200:
self.shelf_id = response.json()["shelf_id"]
self.shelf_title = response.json()["shelf_title"]
log.info("Shelf title sent successfully. Shelf ID: %s", self.shelf_id)
else:
log.error("Received unexpected status code %s while sending the shelf title to %s", response.status_code, self.original_url)
log.error("Unexpected status code %s when sending shelf title.", response.status_code)
except Exception as e:
log.error("An error occurred during the shelf title sending: %s", e)

def _update_metadata(self, requested_urls):
"""Updates metadata for requested URLs."""
log.debug("Updating metadata for requested URLs.")
failed_urls = []
subprocess_args_list = [[os.getenv("LB_WRAPPER", "lb-wrapper"), "tubeadd", requested_url] for requested_url in requested_urls.keys()]
subprocess_args_list = [[Settings.LB_WRAPPER, "tubeadd", requested_url] for requested_url in requested_urls.keys()]

for index, subprocess_args in enumerate(subprocess_args_list):
try:
p = self._execute_subprocess(subprocess_args)
if p is not None:
self.progress = (index + 1) / len(subprocess_args_list)
else:
failed_urls.append(subprocess_args[2])
p.wait()
except Exception as e:
log.error("An error occurred during updating the metadata of %s: %s", subprocess_args[2], e)
self.message = f"{subprocess_args[2]} failed: {e}"
p = self._execute_subprocess(subprocess_args)
if p is not None:
self.progress = (index + 1) / len(subprocess_args_list)
log.debug("Metadata updated for URL: %s", subprocess_args[2])
else:
failed_urls.append(subprocess_args[2])
log.error("Failed to update metadata for URL: %s", subprocess_args[2])
p.wait()

requested_urls = {url: requested_urls[url] for url in requested_urls.keys() if "shorts" not in url and url not in failed_urls}

def _calculate_views_per_day(self, requested_urls, conn):
now = datetime.now()
for requested_url in requested_urls.keys():
try:
view_count = conn.execute("SELECT view_count FROM media WHERE path = ?", (requested_url,)).fetchone()[0]
time_uploaded = datetime.utcfromtimestamp(conn.execute("SELECT time_uploaded FROM media WHERE path = ?", (requested_url,)).fetchone()[0])
days_since_publish = (now - time_uploaded).days or 1
requested_urls[requested_url]["views_per_day"] = view_count / days_since_publish
except Exception as e:
log.error("An error occurred during the calculation of views per day for %s: %s", requested_url, e)
self.message = f"{requested_url} failed: {e}"
# Remove failed URLs from requested_urls
for url in failed_urls:
requested_urls.pop(url, None)
log.info("Metadata update completed.")

def _sort_and_limit_requested_urls(self, requested_urls):
return dict(sorted(requested_urls.items(), key=lambda item: item[1]["views_per_day"], reverse=True)[:min(MAX_VIDEOS_PER_DOWNLOAD, len(requested_urls))])
"""Sorts and limits the requested URLs based on views per day."""
log.debug("Sorting and limiting requested URLs.")
sorted_urls = dict(sorted(requested_urls.items(), key=lambda item: item[1]["views_per_day"], reverse=True))
limited_urls = dict(list(sorted_urls.items())[:min(Settings.MAX_VIDEOS_PER_DOWNLOAD, len(sorted_urls))])
log.info("Requested URLs sorted and limited to %d entries.", len(limited_urls))
return limited_urls

def _add_download_tasks_to_worker(self, requested_urls):
"""Adds download tasks to the worker thread."""
log.debug("Adding download tasks to the worker thread.")
num_requested_urls = len(requested_urls)
total_duration = 0
for index, (requested_url, url_data) in enumerate(requested_urls.items()):
task_download = TaskDownload(_("Downloading %(url)s...", url=requested_url),
requested_url, self.original_url,
self.current_user_name, self.shelf_id, duration=str(url_data["duration"]), live_status=url_data["live_status"])
duration = url_data["duration"]
total_duration += duration
live_status = url_data["live_status"]
task_download = TaskDownload(
_("Downloading %(url)s...", url=requested_url),
requested_url,
self.original_url,
self.current_user_name,
self.shelf_id,
str(duration),
live_status
)
WorkerThread.add(self.current_user_name, task_download)
num_requested_urls = len(requested_urls)
total_duration = sum(url_data["duration"] for url_data in requested_urls.values())
self.message = self.media_url_link + f"<br><br>" \
f"Number of Videos: {index + 1}/{num_requested_urls}<br>" \
f"Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"
if self.shelf_title:
shelf_url = re.sub(r"/meta(?=\?|$)", r"/shelf", self.original_url) + f"/{self.shelf_id}"
self.message += f"<br><br>Shelf Title: <a href='{shelf_url}' target='_blank'>{self.shelf_title}</a>"
if self.unavailable:
self.message += "<br><br>Unavailable Video(s):<br>" + "<br>".join(f'<a href="{url}" target="_blank">{url}</a>' for url in self.unavailable)
upcoming_live_urls = [url for url, url_data in requested_urls.items() if url_data["live_status"] == "is_upcoming"]
live_urls = [url for url, url_data in requested_urls.items() if url_data["live_status"] == "is_live"]
if upcoming_live_urls:
self.message += "<br><br>Upcoming Live Video(s):<br>" + "<br>".join(f'<a href="{url}" target="_blank">{url}</a>' for url in upcoming_live_urls)
if live_urls:
self.message += "<br><br>Live Video(s):<br>" + "<br>".join(f'<a href="{url}" target="_blank">{url}</a>' for url in live_urls)

self.message = self.media_url_link + f"<br><br>" \
f"Number of Videos: {num_requested_urls}/{num_requested_urls}<br>" \
f"Total Duration: {datetime.utcfromtimestamp(total_duration).strftime('%H:%M:%S')}"

if self.shelf_title:
shelf_url = re.sub(r"/meta(?=\?|$)", r"/shelf", self.original_url) + f"/{self.shelf_id}"
self.message += f"<br><br>Shelf Title: <a href='{shelf_url}' target='_blank'>{self.shelf_title}</a>"

if self.unavailable:
self.message += "<br><br>Unavailable Video(s):<br>" + "<br>".join(
f'<a href="{url}" target="_blank">{url}</a>' for url in self.unavailable)
upcoming_live_urls = [url for url, url_data in requested_urls.items() if url_data["live_status"] == "is_upcoming"]
live_urls = [url for url, url_data in requested_urls.items() if url_data["live_status"] == "is_live"]
if upcoming_live_urls:
self.message += "<br><br>Upcoming Live Video(s):<br>" + "<br>".join(
f'<a href="{url}" target="_blank">{url}</a>' for url in upcoming_live_urls)
if live_urls:
self.message += "<br><br>Live Video(s):<br>" + "<br>".join(
f'<a href="{url}" target="_blank">{url}</a>' for url in live_urls)
log.info("Download tasks added to the worker thread.")

def run(self, worker_thread):
"""Runs the metadata extraction task."""
self.worker_thread = worker_thread
log.info("Starting to fetch metadata for URL: %s", self.media_url)
self.start_time = self.end_time = datetime.now()
log.info("Starting metadata extraction task for URL: %s", self.media_url)
self.stat = STAT_STARTED
self.progress = 0

lb_executable = os.getenv("LB_WRAPPER", "lb-wrapper")
lb_executable = Settings.LB_WRAPPER
subprocess_args = [lb_executable, "tubeadd", self.media_url]

p = self._execute_subprocess(subprocess_args)
if p is None:
self.stat = STAT_FAIL
return

with sqlite3.connect(XKLB_DB_FILE) as conn:
self._remove_shorts_from_db(conn)
requested_urls = self._fetch_requested_urls(conn)
if not requested_urls:
if self.unavailable:
self.message = f"{self.media_url_link} failed: Video not available."
elif error_message := conn.execute("SELECT error FROM media WHERE ? LIKE '%' || extractor_id || '%'", (self.media_url,)).fetchone()[0]:
self.message = f"{self.media_url_link} failed previously with this error: {error_message}<br><br>To force a retry, submit the URL again."
media_id = conn.execute("SELECT id FROM media WHERE webpath = ?", (self.media_url,)).fetchone()[0]
conn.execute("DELETE FROM media WHERE webpath = ?", (self.media_url,))
conn.execute("DELETE FROM captions WHERE media_id = ?", (media_id,))
else:
self.message = f"{self.media_url_link} failed: An error occurred while trying to fetch the requested URLs."
self.stat = STAT_FAIL

elif self.is_playlist:
self._send_shelf_title()
self._update_metadata(requested_urls)
self._calculate_views_per_day(requested_urls, conn)
requested_urls = self._sort_and_limit_requested_urls(requested_urls)
conn.execute("UPDATE playlists SET path = ? WHERE path = ?", (f"{self.media_url}&timestamp={int(datetime.now().timestamp())}", self.media_url))
else:
try:
extractor_id = conn.execute("SELECT extractor_id FROM media WHERE ? LIKE '%' || extractor_id || '%'", (self.media_url,)).fetchone()[0]
requested_urls = {url: requested_urls[url] for url in requested_urls.keys() if extractor_id in url}
except Exception as e:
log.error("An error occurred during the selection of the extractor ID: %s", e)
self.message = f"{self.media_url_link} failed: {e}"
try:
with self.session.begin():
# self.db_service.remove_shorts_from_db()
requested_urls = self.db_service.fetch_requested_urls(self.unavailable)

if not requested_urls:
self._handle_no_requested_urls()
self.stat = STAT_FAIL
return

self._add_download_tasks_to_worker(requested_urls)
conn.close()
if self.is_playlist:
self._send_shelf_title()
self._update_metadata(requested_urls)
self.db_service.calculate_views_per_day(requested_urls)
requested_urls = self._sort_and_limit_requested_urls(requested_urls)
self.db_service.update_playlist_path(self.media_url)
else:
extractor_id = self.db_service.get_extractor_id(self.media_url)
if extractor_id:
requested_urls = {url: data for url, data in requested_urls.items() if extractor_id in url}
else:
self.message = f"{self.media_url_link} failed: Extractor ID not found."
self.stat = STAT_FAIL
return

self._add_download_tasks_to_worker(requested_urls)

self.progress = 1.0
self.stat = STAT_FINISH_SUCCESS
self.progress = 1.0
self.stat = STAT_FINISH_SUCCESS
self.end_time = datetime.now()
log.info("Metadata extraction task for %s completed successfully.", self.media_url)

except Exception as e:
self.session.rollback()
log.error("An error occurred during the metadata extraction task: %s", e)
self.message = f"{self.media_url_link} failed: {e}"
self.stat = STAT_FAIL
finally:
self.db_service.close_session()

def _handle_no_requested_urls(self):
"""Handles the case when no requested URLs are found."""
log.debug("Handling no requested URLs.")
if self.unavailable:
self.message = f"{self.media_url_link} failed: Video not available."
else:
try:
extractor_id = self.db_service.get_extractor_id(self.media_url)
if extractor_id:
media_entry = self.session.query(Media).filter(
Media.extractor_id == extractor_id,
Media.webpath == self.media_url
).first()
if media_entry and media_entry.error:
error_message = media_entry.error
self.message = f"{self.media_url_link} failed previously with this error: {error_message}<br><br>To force a retry, submit the URL again."
media_id = media_entry.id
# Delete media and captions entries
self.db_service.delete_media_and_captions(media_id, self.media_url)
else:
self.message = f"{self.media_url_link} failed: An error occurred while trying to fetch the requested URLs."
else:
self.message = f"{self.media_url_link} failed: Extractor ID not found."
except Exception as e:
log.error("Error while checking error message: %s", e)
self.message = f"{self.media_url_link} failed: An error occurred while checking the error message."

@property
def name(self):
Expand All @@ -210,4 +236,4 @@ def __str__(self):

@property
def is_cancellable(self):
return True # Change to True if the download task should be cancellable
return True