Skip to content

Commit

Permalink
Updating trh server queue
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinp0 committed Jan 28, 2024
1 parent b9b40b7 commit 8f69754
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 16 deletions.
13 changes: 5 additions & 8 deletions arc/job/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def write_submit_script(self) -> None:
# Get the first item from the 'queue' dictionary if it is not empty, else default to None
default_queue, default_walltime = next(iter(settings_queues.items())) if settings_queues else (None, None)

# Add queue to the attempted_queus if not already there
# Add queue to the attempted_queues if not already there
if default_queue not in self.attempted_queues:
self.attempted_queues.append(default_queue)

Expand Down Expand Up @@ -1351,18 +1351,15 @@ def troubleshoot_queue(self) -> bool:
"""
queues, run_job = trsh_job_queue(job_name=self.job_name,
server=self.server,
max_time=self.max_job_time,
max_time=24,
attempted_queues = self.attempted_queues,
)

if queues is not None:
# We use self.max_job_time to determine which queues to troubleshoot.
filtered_queues = {queue_name: walltime for queue_name, walltime in queues.items() if convert_to_hours(walltime) >= self.max_job_time}

# Now we sort the queues by walltime, and choose the one with the longest walltime.
#sorted_queues = sorted(filtered_queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=True)
# Sort queue time from shortest to longest
sorted_queues = sorted(filtered_queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=False)
#filtered_queues = {queue_name: walltime for queue_name, walltime in queues.items() if convert_to_hours(walltime) >= self.max_job_time}

sorted_queues = sorted(queues.items(), key=lambda x: convert_to_hours(x[1]), reverse=False)

self.queue = sorted_queues[0][0]
if self.queue not in self.attempted_queues:
Expand Down
36 changes: 36 additions & 0 deletions arc/job/adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import time
import shutil
import unittest
from unittest.mock import patch

import pandas as pd

Expand Down Expand Up @@ -203,8 +204,23 @@ def setUpClass(cls):
project='test',
project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'),
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
server='test_server',
testing=True,
)
cls.job_6 = GaussianAdapter(execution_type='queue',
job_name='spc1',
job_type='opt',
job_id='123456',
job_num=101,
job_server_name = 'server1',
level=Level(method='cbs-qb3'),
project='test',
project_directory=os.path.join(ARC_PATH, 'arc', 'testing', 'test_JobAdapter_ServerTimeLimit'),
species=[ARCSpecies(label='spc1', xyz=['O 0 0 1'])],
testing=True,
queue='short_queue',
attempted_queues=['short_queue']
)

def test_determine_job_array_parameters(self):
"""Test determining job array parameters"""
Expand Down Expand Up @@ -423,6 +439,26 @@ def test_determine_job_status(self):
self.assertEqual(self.job_5.job_status[1]['status'], 'errored')
self.assertEqual(self.job_5.job_status[1]['keywords'], ['ServerTimeLimit'])

@patch(
"arc.job.trsh.servers",
{
"local": {
"cluster_soft": "PBS",
"un": "test_user",
"queues": {"short_queue": "24:00:0","middle_queue": "48:00:00", "long_queue": "3600:00:00"},
}
},
)
def test_troubleshoot_queue(self):
"""Test troubleshooting a queue job"""
self.job_6.troubleshoot_queue()
self.assertEqual(self.job_6.queue, 'middle_queue')
# Assert that 'middle_queue' and 'short_queue' were attempted
# We do not do assert equal because a user may have different queues from the settings.py originally during cls
self.assertIn('short_queue', self.job_6.attempted_queues)
self.assertIn('middle_queue', self.job_6.attempted_queues)



@classmethod
def tearDownClass(cls):
Expand Down
13 changes: 5 additions & 8 deletions arc/job/trsh_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,23 +716,20 @@ def test_trsh_scan_job(self):
self.assertEqual(scan_trsh, "D 5 4 1 2 F\nD 1 2 3 6 F\nB 2 3 F\n")
self.assertEqual(scan_res, 4.0)

@patch(
"arc.common.get_logger"
)
@patch(
"arc.job.local.execute_command"
)
@patch(
"arc.job.trsh.servers",
{
"test_server": {
"cluster_soft": "PBS",
"un": "test_user",
"queue": {"short_queue": "24:00:0", "long_queue": "3600:00:00"},
"queues": {"short_queue": "24:00:0","middle_queue": "48:00:00", "long_queue": "3600:00:00"},
}
},
)
@patch(
"arc.job.trsh.execute_command"
)
def test_user_queue_setting_trsh(self, mock_logger, mock_execute_command):
def test_user_queue_setting_trsh(self, mock_execute_command):
""" Test the trsh_job_queue function with user specified queue """
# Mocking the groups and qstat command outputs
mock_execute_command.side_effect = [
Expand Down

0 comments on commit 8f69754

Please sign in to comment.