Skip to content

Commit

Permalink
add clear exception when serper index fails (#137)
Browse files Browse the repository at this point in the history
Co-authored-by: Fadl <chaos@efqr.dev>
  • Loading branch information
himynamesdave and fqrious authored Feb 26, 2025
1 parent 02e3528 commit 526bef3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
9 changes: 7 additions & 2 deletions history4feed/h4fscripts/sitemap_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
import time
from collections import namedtuple
Expand All @@ -13,7 +14,8 @@
from dateparser import parse as parse_date
DEFAULT_USER_AGENT = "curl"


class SearchIndexError(FatalError):
pass

def fetch_posts_links_with_serper(site, from_time: dt, to_time: dt = None) -> dict[str, PostDict]:
s = requests.Session()
Expand All @@ -34,9 +36,10 @@ def fetch_posts_links_with_serper(site, from_time: dt, to_time: dt = None) -> di
while frame_start < to_time:
frame_end = frame_start + timedelta(days=100)
params.update(q=f"site:{site} after:{frame_start.date().isoformat()} before:{frame_end.date().isoformat()}", page=1)
frame_start = frame_end - timedelta(days=1)
while True:
resp = s.get("https://google.serper.dev/search", params=params)
if not resp.ok:
raise SearchIndexError(f"Serper Request GOT {resp.status_code}: {resp.text}")
data = resp.json()
credits_used += data['credits']
for d in data['organic']:
Expand All @@ -50,5 +53,7 @@ def fetch_posts_links_with_serper(site, from_time: dt, to_time: dt = None) -> di
params['page'] += 1
if len(data['organic']) < params['num']:
break
frame_start = frame_end - timedelta(days=1)
logging.info(f"got {len(entries)} posts between {from_time} and {to_time}, used {credits_used} credits")
return entries

26 changes: 19 additions & 7 deletions history4feed/h4fscripts/task_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def new_job(feed: models.Feed, include_remote_blogs):
if not queue_lock(feed, job_obj):
raise Throttled(detail={"message": "A job is already running for this feed", **cache.get(get_lock_id(feed))})

(start_job.s(job_obj.pk)| retrieve_posts_from_links.s(job_obj.pk) | wait_for_all_with_retry.s() | collect_and_schedule_removal.si(job_obj.pk)).apply_async(countdown=5)
(start_job.s(job_obj.pk)| retrieve_posts_from_links.s(job_obj.pk) | wait_for_all_with_retry.s() | collect_and_schedule_removal.si(job_obj.pk)).apply_async(countdown=5, link_error=error_handler.s(job_obj.pk))
return job_obj

def new_patch_posts_job(feed: models.Feed, posts: list[models.Post], include_remote_blogs=True):
Expand All @@ -60,7 +60,7 @@ def new_patch_posts_job(feed: models.Feed, posts: list[models.Post], include_rem
link=post.link,
) for post in posts]
chain = celery.chain([retrieve_full_text.si(ft_job.pk) for ft_job in ft_jobs])
( start_post_job.si(job_obj.id) | chain | collect_and_schedule_removal.si(job_obj.pk)).apply_async()
( start_post_job.si(job_obj.id) | chain | collect_and_schedule_removal.si(job_obj.pk)).apply_async(link_error=error_handler.s(job_obj.pk))
return job_obj

@shared_task(bind=True, default_retry_delay=10)
Expand Down Expand Up @@ -109,7 +109,6 @@ def retrieve_posts_from_links(urls, job_id):
for index, url in enumerate(urls):
error = None
if feed.feed_type == models.FeedType.SEARCH_INDEX:
print(job.run_datetime, settings.EARLIEST_SEARCH_DATE, feed.freshness)
start_time = feed.freshness or settings.EARLIEST_SEARCH_DATE
if not start_time.tzinfo:
start_time = start_time.replace(tzinfo=UTC)
Expand Down Expand Up @@ -149,13 +148,16 @@ def retrieve_posts_from_links(urls, job_id):
def collect_and_schedule_removal(sender, job_id):
logger.print(f"===> {sender=}, {job_id=} ")
job = models.Job.objects.get(pk=job_id)
remove_lock(job)
if job.state == models.JobState.RUNNING:
job.state = models.JobState.SUCCESS
job.save()

def remove_lock(job):
if cache.delete(get_lock_id(job.feed)):
logger.debug("lock deleted")
else:
logger.debug("Failed to remove lock")
if job.state == models.JobState.RUNNING:
job.state = models.JobState.SUCCESS
job.save()

def retrieve_posts_from_url(url, db_feed: models.Feed, job: models.Job):
back_off_seconds = settings.WAYBACK_SLEEP_SECONDS
Expand Down Expand Up @@ -233,4 +235,14 @@ def retrieve_full_text(self, ftjob_pk):
from celery import signals
@signals.worker_ready.connect
def mark_old_jobs_as_failed(**kwargs):
models.Job.objects.filter(state=models.JobState.PENDING).update(state=models.JobState.FAILED, info="marked as failed on startup")
models.Job.objects.filter(state__in=[models.JobState.PENDING, models.JobState.RUNNING]).update(state=models.JobState.FAILED, info="marked as failed on startup")

@shared_task
def error_handler(request, exc: Exception, traceback, job_id):
job = models.Job.objects.get(pk=job_id)
job.state = models.JobState.FAILED
job.info = f"job failed: {exc}"
job.save()
remove_lock(job)
logger.error('Job {3} with task_id {0} raised exception: {1!r}\n{2!r}'.format(
request.id, exc, traceback, job_id))

0 comments on commit 526bef3

Please sign in to comment.