diff --git a/engine/engine/processors/audio_processor.py b/engine/engine/processors/audio_processor.py index e39a62b..7347468 100644 --- a/engine/engine/processors/audio_processor.py +++ b/engine/engine/processors/audio_processor.py @@ -85,6 +85,7 @@ def denoise(audio_path: str) -> str: def process_audio(audio_path: str) -> str: if env.AUDIO_ENHANCE_ENABLED in ["yes", "on", "enabled"]: - return __AudioChainProcessor().add_filter(mp4_to_wav).add_filter(denoise).add_filter(remove_music).filter(audio_path) + return __AudioChainProcessor().add_filter(mp4_to_wav).add_filter(denoise).add_filter(remove_music).filter( + audio_path) else: return __AudioChainProcessor().add_filter(mp4_to_wav).filter(audio_path) diff --git a/engine/engine/server.py b/engine/engine/server.py index 2104e08..d1113af 100644 --- a/engine/engine/server.py +++ b/engine/engine/server.py @@ -2,7 +2,6 @@ import platform from json import dumps -from playhouse.shortcuts import model_to_dict from sanic import Sanic, Request, response from sanic import json, text from sanic.log import logger @@ -89,12 +88,11 @@ async def process_youtube_video(request: Request): url = request.json['url'] provider = request.json['provider'] youtube_service = YoutubeService(url) - video = await asyncio.create_task(youtube_service.fetch_video_data(provider)) - video = VideoService.analysis_video(video.id) + data = await asyncio.create_task(youtube_service.fetch_video_data(provider)) return json({ "status_code": 200, "message": "Successfully fetch video data, analyze video in processing.", - "payload": model_to_dict(video) + "payload": data }) @@ -106,11 +104,11 @@ async def opts_analysis_youtube_video(request: Request): @app.post("/api/video/analysis") async def analysis_youtube_video(request: Request): video_id: int = int(request.json['video_id']) - video = VideoService.analysis_video(video_id) + data = await asyncio.create_task(VideoService.analysis_video(video_id)) return json({ "status_code": 200, "message": "Analyze video in processing.", - "payload": model_to_dict(video) + "payload": data }) @@ -121,11 +119,11 @@ async def opts_detail(request: Request, video_id: int): @app.get("/api/video/detail/") async def get_video_detail(request: Request, video_id: int): - video = VideoService.find_video_by_id(video_id) + data = VideoService.get_video_detail(video_id) return json({ "status_code": 200, "message": "Successfully get video detail", - "payload": model_to_dict(video) + "payload": data }) @@ -140,11 +138,12 @@ async def summary(request: Request): lang_code = request.json['lang_code'] provider = request.json['provider'] model = request.json.get("model", None) - video = await asyncio.create_task(VideoService.summary_video(vid, lang_code, provider, model)) + data = await asyncio.create_task(VideoService.summary_video(vid, lang_code, provider, model)) + asyncio.create_task(VideoService.analysis_summary_video(vid, model, provider)) return json({ "status_code": 200, "message": "Successfully summary video", - "payload": video.summary + "payload": data }) @@ -169,13 +168,13 @@ async def opts_list_videos(request: Request, page: int): @app.get("/api/videos/") async def list_videos(request: Request, page: int): - total, videos = VideoService.get(page) + total, data = VideoService.get(page) return json({ "status_code": 200, "message": "Successfully", "payload": { "total": total, - "videos": videos + "videos": data } }) @@ -236,4 +235,4 @@ async def clear_chat(request: Request, video_id: int): logger.debug(' . . . "fork" will be explicit set') Sanic.START_METHOD_SET = True Sanic.start_method = "fork" - app.run(host="0.0.0.0", port=8000, access_log=True, debug=True, workers=5) + app.run(host="0.0.0.0", port=8000, access_log=True, dev=True, debug=True, workers=3) diff --git a/engine/engine/services/ai_service.py b/engine/engine/services/ai_service.py index b411c35..dbfdb43 100644 --- a/engine/engine/services/ai_service.py +++ b/engine/engine/services/ai_service.py @@ -71,7 +71,8 @@ def __get_local_embedding_encoder(): device = "cuda" else: device = "cpu" - logger.log(f"using {device} for embedding") + + logger.debug(f"using {device} for embedding") local_model_path: str = str(os.path.join(env.APP_DIR, env.LOCAL_EMBEDDING_MODEL)) if not os.path.exists(local_model_path): encoder = SentenceTransformer( diff --git a/engine/engine/services/video_service.py b/engine/engine/services/video_service.py index a0662d1..c0ff713 100644 --- a/engine/engine/services/video_service.py +++ b/engine/engine/services/video_service.py @@ -46,7 +46,14 @@ def find_video_by_id(vid: int): return Video.get_or_none(Video.id == vid) @staticmethod - def analysis_video(vid: int): + def get_video_detail(vid: int): + video = Video.get_or_none(Video.id == vid) + if video is None: + raise VideoError("video is not found") + return model_to_dict(video) + + @staticmethod + async def analysis_video(vid: int): """ Analyzes a video by its ID and updates its analysis status. @@ -64,28 +71,28 @@ def analysis_video(vid: int): Exception: If an error occurs during the analysis process. """ video: Video = VideoService.find_video_by_id(vid) + if video is None: + raise VideoError("video is not found") + asyncio.create_task(VideoService.__internal_analysis(video)) + return model_to_dict(video) + + @staticmethod + async def __internal_analysis(video): + if video.analysis_state in [constants.ANALYSIS_STAGE_COMPLETED, constants.ANALYSIS_STAGE_PROCESSING]: + return try: - if video is None: - raise VideoError("video is not found") - if video.analysis_state in [constants.ANALYSIS_STAGE_COMPLETED, constants.ANALYSIS_STAGE_PROCESSING]: - return video - asyncio.create_task(VideoService.__internal_analysis(video)) - return video + logger.debug("start analysis video") + video_chapters = VideoService.__get_video_chapters(video) + VideoService.__update_analysis_content_state(video, constants.ANALYSIS_STAGE_PROCESSING) + VideoService.__prepare_video_transcript(video, video_chapters) + video.total_parts = await VideoService.__analysis_chapters(video_chapters, video.embedding_provider) + video.analysis_state = constants.ANALYSIS_STAGE_COMPLETED + VideoService.save(video, video_chapters) + logger.debug("finish analysis video") except Exception as e: VideoService.__update_analysis_content_state(video, constants.ANALYSIS_STAGE_INITIAL) raise e - @staticmethod - async def __internal_analysis(video): - logger.debug("start analysis video") - video_chapters = VideoService.__get_video_chapters(video) - VideoService.__update_analysis_content_state(video, constants.ANALYSIS_STAGE_PROCESSING) - VideoService.__prepare_video_transcript(video, video_chapters) - video.total_parts = await VideoService.__analysis_chapters(video_chapters, video.embedding_provider) - video.analysis_state = constants.ANALYSIS_STAGE_COMPLETED - VideoService.save(video, video_chapters) - logger.debug("finish analysis video") - @staticmethod def __prepare_video_transcript(video: Video, video_chapters: list[VideoChapter]): """ @@ -112,14 +119,16 @@ def __prepare_video_transcript(video: Video, video_chapters: list[VideoChapter]) if not video.raw_transcript: logger.debug("start to recognize video transcript") with ThreadPoolExecutor(max_workers=4) as executor: - futures = [executor.submit(AiService.speech_to_text, chapter.audio_path, chapter.start_time) for chapter in video_chapters] + futures = [executor.submit(AiService.speech_to_text, chapter.audio_path, chapter.start_time) for chapter + in video_chapters] transcripts = [] predict_langs = [] for future in concurrent.futures.as_completed(futures): lang, transcript = future.result() transcripts.append(transcript) predict_langs.append(lang) - sorted_transcripts = sorted([item for sublist in transcripts for item in sublist], key=lambda x: x['start_time']) + sorted_transcripts = sorted([item for sublist in transcripts for item in sublist], + key=lambda x: x['start_time']) logger.debug("finish to recognize video transcript") video.raw_transcript = json.dumps(sorted_transcripts, ensure_ascii=False) predict_lang, count = Counter(predict_langs).most_common(1)[0] @@ -165,7 +174,8 @@ def __pair_video_chapters_with_transcripts(video: Video, video_chapters: list[Vi if chapter_transcript != "": chapter.transcript = chapter_transcript - video_transcript = "\n".join([f"## {ct.title}\n-----\n{ct.transcript}" for ct in video_chapters if ct.transcript]) + video_transcript = "\n".join( + [f"## {ct.title}\n-----\n{ct.transcript}" for ct in video_chapters if ct.transcript]) video.transcript = video_transcript @staticmethod @@ -208,15 +218,20 @@ async def __analysis_chapters(video_chapters: list[VideoChapter], provider: str) with ThreadPoolExecutor(max_workers=len(video_chapters)) as executor: if provider == "gemini": - futures = [executor.submit(VideoService.__analysis_video_with_gemini, chapter) for chapter in video_chapters] + futures = [executor.submit(VideoService.__analysis_video_with_gemini, chapter) for chapter in + video_chapters] elif provider == "local": - futures = [executor.submit(VideoService.__analysis_video_with_local, chapter) for chapter in video_chapters] + futures = [executor.submit(VideoService.__analysis_video_with_local, chapter) for chapter in + video_chapters] elif provider == "mistral": - futures = [executor.submit(VideoService.__analysis_video_with_mistral, chapter) for chapter in video_chapters] + futures = [executor.submit(VideoService.__analysis_video_with_mistral, chapter) for chapter in + video_chapters] elif provider == "openai": - futures = [executor.submit(VideoService.__analysis_video_with_openai, chapter) for chapter in video_chapters] + futures = [executor.submit(VideoService.__analysis_video_with_openai, chapter) for chapter in + video_chapters] elif provider == "voyageai": - futures = [executor.submit(VideoService.__analysis_video_with_voyageai, chapter) for chapter in video_chapters] + futures = [executor.submit(VideoService.__analysis_video_with_voyageai, chapter) for chapter in + video_chapters] else: logger.debug(f"selected provider: {provider}") raise AiError("unknown embedding provider") @@ -226,7 +241,8 @@ async def __analysis_chapters(video_chapters: list[VideoChapter], provider: str) @staticmethod def __get_video_chapters(video: Video) -> list[VideoChapter]: - video_chapters = list(VideoChapter.select().where(VideoChapter.video == video).order_by(VideoChapter.chapter_no)) + video_chapters = list( + VideoChapter.select().where(VideoChapter.video == video).order_by(VideoChapter.chapter_no)) for video_chapter in video_chapters: video_chapter.vid = video.id return video_chapters @@ -276,7 +292,7 @@ def __analysis_video_with_local(chapter: VideoChapter): return len(texts) @staticmethod - async def summary_video(vid: int, lang_code: str, provider: str, model: str = None): + async def summary_video(vid: int, lang_code: str, provider: str, model: str = None) -> str: """ Generates a summary of a video. @@ -298,9 +314,8 @@ async def summary_video(vid: int, lang_code: str, provider: str, model: str = No raise VideoError("video is not found") video.summary = VideoService.__summary_content(lang_code, model, provider, video) video.save() - asyncio.create_task(VideoService.__analysis_summary_video(model, provider, video)) logger.debug("finish summary video") - return video + return video.summary @staticmethod def __summary_content(lang_code: str, model: str, provider: str, video: Video) -> str: @@ -315,7 +330,10 @@ def __summary_content(lang_code: str, model: str, provider: str, video: Video) - return VideoService.__get_response_from_ai(model=model, prompt=prompt, provider=provider) @staticmethod - async def __analysis_summary_video(model: str, provider: str, video: Video): + async def analysis_summary_video(vid: int, model: str, provider: str): + video: Video = VideoService.find_video_by_id(vid) + if video is None: + raise VideoError("video is not found") if video.analysis_summary_state in [constants.ANALYSIS_STAGE_COMPLETED, constants.ANALYSIS_STAGE_PROCESSING]: return logger.debug("start analysis summary video") diff --git a/engine/engine/services/youtube_service.py b/engine/engine/services/youtube_service.py index e77f328..11f5671 100644 --- a/engine/engine/services/youtube_service.py +++ b/engine/engine/services/youtube_service.py @@ -6,6 +6,7 @@ import pytubefix from audio_extract import extract_audio +from playhouse.shortcuts import model_to_dict from pytubefix import YouTube, Caption from pytubefix.cli import on_progress from sanic.log import logger @@ -68,7 +69,7 @@ async def fetch_video_data(self, provider: str) -> Video: video.raw_transcript = json.dumps(transcript, ensure_ascii=False) if transcript else None video.amount_chapters = len(video_chapters) VideoService.save(video, video_chapters) - return video + return model_to_dict(video) def __extract_chapters(self) -> list[VideoChapter]: chapters: list[pytubefix.Chapter] = self.__agent.chapters diff --git a/web/pages/index.vue b/web/pages/index.vue index ec2d3bb..e526f2b 100644 --- a/web/pages/index.vue +++ b/web/pages/index.vue @@ -95,6 +95,14 @@ const process = async () => { videos.value.pop() } } + // try to fetch analysis immediately + try { + request(config.public.apiUrl + `/api/video/analysis`, 'POST', { + 'video_id': data.payload.id + }) + } catch (error) { + console.warn(error) + } } catch (e) { console.debug(e) } finally {