From 04ec3e956e20ef539a74013c353359f050f85b22 Mon Sep 17 00:00:00 2001 From: virusdefender Date: Sun, 24 Apr 2016 16:46:05 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E4=BD=BF=E7=94=A8=E6=9B=B4?= =?UTF-8?q?=E7=BB=86=E7=B2=92=E5=BA=A6=E7=9A=84=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- judge_dispatcher/models.py | 14 ---- judge_dispatcher/tasks.py | 132 ++++++++++++++++++++----------------- 2 files changed, 71 insertions(+), 75 deletions(-) diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py index 15cfa2c86..7305eb26d 100644 --- a/judge_dispatcher/models.py +++ b/judge_dispatcher/models.py @@ -17,20 +17,6 @@ class JudgeServer(models.Model): status = models.BooleanField(default=True) create_time = models.DateTimeField(auto_now_add=True, blank=True, null=True) - def use_judge_instance(self): - # 因为use 和 release 中间是判题时间,可能这个 model 的数据已经被修改了,所以不能直接使用self.xxx,否则取到的是旧数据 - server = JudgeServer.objects.select_for_update().get(id=self.id) - server.left_instance_number -= 1 - server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) - server.save() - - def release_judge_instance(self): - # 使用原子操作 - server = JudgeServer.objects.select_for_update().get(id=self.id) - server.left_instance_number += 1 - server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) - server.save() - class Meta: db_table = "judge_server" diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py index 14023dbef..32b24a08c 100644 --- a/judge_dispatcher/tasks.py +++ b/judge_dispatcher/tasks.py @@ -38,25 +38,35 @@ def __init__(self, submission_id, time_limit, memory_limit, test_case_id, spj, s self.spj_version = spj_version def choose_judge_server(self): - servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") - if servers.exists(): - return servers.first() + with transaction.atomic(): + servers = JudgeServer.objects.select_for_update().filter(workload__lt=100, lock=False, status=True).order_by("-workload") + if servers.exists(): + server = servers.first() + server.left_instance_number -= 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + server.save() + return server + + def release_judge_instance(self, judge_server_id): + with transaction.atomic(): + # 使用原子操作, 同时因为use和release中间间隔了判题过程,需要重新查询一下 + server = JudgeServer.objects.select_for_update().get(id=judge_server_id) + server.left_instance_number += 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + server.save() def judge(self): self.submission.judge_start_time = int(time.time() * 1000) - with transaction.atomic(): - judge_server = self.choose_judge_server() - - # 如果没有合适的判题服务器,就放入等待队列中等待判题 - if not judge_server: - JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, - memory_limit=self.memory_limit, test_case_id=self.test_case_id, - spj=self.spj, spj_language=self.spj_language, spj_code=self.spj_code, - spj_version=self.spj_version) - return + judge_server = self.choose_judge_server() - judge_server.use_judge_instance() + # 如果没有合适的判题服务器,就放入等待队列中等待判题 + if not judge_server: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id, + spj=self.spj, spj_language=self.spj_language, spj_code=self.spj_code, + spj_version=self.spj_version) + return try: s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), @@ -82,8 +92,7 @@ def judge(self): self.submission.result = result["system_error"] self.submission.info = str(e) finally: - with transaction.atomic(): - judge_server.release_judge_instance() + self.release_judge_instance(judge_server.id) self.submission.judge_end_time = int(time.time() * 1000) self.submission.save(update_fields=["judge_start_time", "result", "info", "accepted_answer_time", "judge_end_time"]) @@ -111,32 +120,32 @@ def judge(self): spj_version=waiting_submission.spj_version) def update_problem_status(self): - problem = Problem.objects.get(id=self.submission.problem_id) - - # 更新普通题目的计数器 - problem.add_submission_number() - - # 更新用户做题状态 - user = User.objects.get(id=self.submission.user_id) - - problems_status = user.problems_status - if "problems" not in problems_status: - problems_status["problems"] = {} - - # 增加用户提交计数器 - user.userprofile.add_submission_number() - - # 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重 - if problems_status["problems"].get(str(problem.id), -1) != 1 and self.submission.result == result["accepted"]: - user.userprofile.add_accepted_problem_number() - - if self.submission.result == result["accepted"]: - problem.add_ac_number() - problems_status["problems"][str(problem.id)] = 1 - else: - problems_status["problems"][str(problem.id)] = 2 - user.problems_status = problems_status - user.save(update_fields=["problem_status"]) + with transaction.atomic(): + problem = Problem.objects.select_for_update().get(id=self.submission.problem_id) + # 更新普通题目的计数器 + problem.add_submission_number() + + # 更新用户做题状态 + user = User.objects.select_for_update().get(id=self.submission.user_id) + + problems_status = user.problems_status + if "problems" not in problems_status: + problems_status["problems"] = {} + + # 增加用户提交计数器 + user.userprofile.add_submission_number() + + # 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重 + if problems_status["problems"].get(str(problem.id), -1) != 1 and self.submission.result == result["accepted"]: + user.userprofile.add_accepted_problem_number() + + if self.submission.result == result["accepted"]: + problem.add_ac_number() + problems_status["problems"][str(problem.id)] = 1 + else: + problems_status["problems"][str(problem.id)] = 2 + user.problems_status = problems_status + user.save(update_fields=["problems_status"]) # 普通题目的话,到这里就结束了 def update_contest_problem_status(self): @@ -146,31 +155,32 @@ def update_contest_problem_status(self): logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id) return - contest_problem = ContestProblem.objects.get(contest=contest, id=self.submission.problem_id) - contest_problem.add_submission_number() + with transaction.atomic(): + contest_problem = ContestProblem.objects.select_for_update().get(contest=contest, id=self.submission.problem_id) + contest_problem.add_submission_number() - user = User.objects.get(id=self.submission.user_id) - problems_status = user.problems_status + user = User.objects.select_for_update().get(id=self.submission.user_id) + problems_status = user.problems_status - if "contest_problems" not in problems_status: - problems_status["contest_problems"] = {} + if "contest_problems" not in problems_status: + problems_status["contest_problems"] = {} - # 增加用户提交计数器 - user.userprofile.add_submission_number() + # 增加用户提交计数器 + user.userprofile.add_submission_number() - # 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重 - if problems_status["contest_problems"].get(str(contest_problem.id), -1) != 1 and \ - self.submission.result == result["accepted"]: - user.userprofile.add_accepted_problem_number() + # 之前状态不是ac, 现在是ac了 需要更新用户ac题目数量计数器,这里需要判重 + if problems_status["contest_problems"].get(str(contest_problem.id), -1) != 1 and \ + self.submission.result == result["accepted"]: + user.userprofile.add_accepted_problem_number() - if self.submission.result == result["accepted"]: - contest_problem.add_ac_number() - problems_status["contest_problems"][str(contest_problem.id)] = 1 - else: - problems_status["contest_problems"][str(contest_problem.id)] = 2 + if self.submission.result == result["accepted"]: + contest_problem.add_ac_number() + problems_status["contest_problems"][str(contest_problem.id)] = 1 + else: + problems_status["contest_problems"][str(contest_problem.id)] = 2 - user.problems_status = problems_status - user.save(update_fields=["problem_status"]) + user.problems_status = problems_status + user.save(update_fields=["problems_status"]) self.update_contest_rank(contest)