From b00c3aef603f517f7c2b83b68dfd4e55be094c02 Mon Sep 17 00:00:00 2001 From: Daniel Posada Date: Tue, 17 Apr 2018 10:30:37 -0500 Subject: [PATCH] Provides /usage sub-map with all pools in the default case (#806) --- integration/tests/cook/test_basic.py | 14 ++- integration/tests/cook/test_cli.py | 28 ++--- integration/tests/cook/test_multi_user.py | 2 +- scheduler/src/cook/mesos/api.clj | 82 ++++++++++---- scheduler/src/cook/mesos/util.clj | 17 +++ scheduler/test/cook/test/mesos/api.clj | 128 +++++++++++++++++++++- scheduler/test/cook/test/mesos/util.clj | 24 ++-- 7 files changed, 241 insertions(+), 54 deletions(-) diff --git a/integration/tests/cook/test_basic.py b/integration/tests/cook/test_basic.py index be692d6b62..6c8c625b71 100644 --- a/integration/tests/cook/test_basic.py +++ b/integration/tests/cook/test_basic.py @@ -1673,7 +1673,7 @@ def test_user_usage_basic(self): self.assertEqual(resp.status_code, 200, resp.content) usage_data = resp.json() # Check that the response structure looks as expected - self.assertEqual(list(usage_data.keys()), ['total_usage'], usage_data) + self.assertEqual(list(usage_data.keys()), ['total_usage', 'pools'], usage_data) self.assertEqual(len(usage_data['total_usage']), 4, usage_data) # Since we don't know what other test jobs are currently running, # we conservatively check current usage with the >= operation. @@ -1700,7 +1700,7 @@ def test_user_usage_grouped(self): self.assertEqual(resp.status_code, 200, resp.content) usage_data = resp.json() # Check that the response structure looks as expected - self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped'}, usage_data) + self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped', 'pools'}, usage_data) self.assertEqual(set(usage_data['ungrouped'].keys()), {'running_jobs', 'usage'}, usage_data) my_group_usage = next(x for x in usage_data['grouped'] if x['group']['uuid'] == group_uuid) self.assertEqual(set(my_group_usage.keys()), {'group', 'usage'}, my_group_usage) @@ -1731,6 +1731,13 @@ def test_user_usage_grouped(self): self.assertEqual(usage_data['total_usage']['gpus'], breakdowns_total['gpus'], usage_data) self.assertEqual(usage_data['total_usage']['jobs'], breakdowns_total['jobs'], usage_data) # Pool-specific checks + pools, _ = util.pools(self.cook_url) + for pool in pools: + # There should be a sub-map under pools for each pool in the + # system, since we didn't specify the pool in the usage request + pool_usage = usage_data['pools'][pool['name']] + self.assertEqual(set(pool_usage.keys()), {'total_usage', 'grouped', 'ungrouped'}, pool_usage) + self.assertEqual(set(pool_usage['ungrouped'].keys()), {'running_jobs', 'usage'}, pool_usage) default_pool = util.default_pool(self.cook_url) if default_pool: # If there is a default pool configured, make sure that our jobs, @@ -1747,7 +1754,6 @@ def test_user_usage_grouped(self): self.assertEqual(my_group_usage['usage']['jobs'], job_count, my_group_usage) # If there is a non-default pool, make sure that # our jobs don't appear under that pool's usage - pools, _ = util.pools(self.cook_url) non_default_pools = [p for p in pools if p['name'] != default_pool] if len(non_default_pools) > 0: pool = non_default_pools[0]['name'] @@ -1777,7 +1783,7 @@ def test_user_usage_ungrouped(self): self.assertEqual(resp.status_code, 200, resp.content) usage_data = resp.json() # Check that the response structure looks as expected - self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped'}, usage_data) + self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped', 'pools'}, usage_data) ungrouped_data = usage_data['ungrouped'] self.assertEqual(set(ungrouped_data.keys()), {'running_jobs', 'usage'}, ungrouped_data) # Our jobs should be included in the ungrouped breakdown diff --git a/integration/tests/cook/test_cli.py b/integration/tests/cook/test_cli.py index 95a1aeabfa..3b798cbfff 100644 --- a/integration/tests/cook/test_cli.py +++ b/integration/tests/cook/test_cli.py @@ -439,23 +439,23 @@ def test_list_by_state(self): self.assertEqual(0, cp.returncode, cp.stderr) running_uuid = uuids[0] - # Submit a successful job - cp, uuids = cli.submit('ls', self.cook_url, submit_flags='--name %s' % name) - self.assertEqual(0, cp.returncode, cp.stderr) - success_uuid = uuids[0] + try: + # Submit a successful job + cp, uuids = cli.submit('ls', self.cook_url, submit_flags='--name %s' % name) + self.assertEqual(0, cp.returncode, cp.stderr) + success_uuid = uuids[0] - # Submit a failed job - cp, uuids = cli.submit('exit 1', self.cook_url, submit_flags='--name %s' % name) - self.assertEqual(0, cp.returncode, cp.stderr) - failed_uuid = uuids[0] + # Submit a failed job + cp, uuids = cli.submit('exit 1', self.cook_url, submit_flags='--name %s' % name) + self.assertEqual(0, cp.returncode, cp.stderr) + failed_uuid = uuids[0] - # Wait for the desired states to be reached - util.wait_for_job(self.cook_url, waiting_uuid, 'waiting') - util.wait_for_job(self.cook_url, running_uuid, 'running') - util.wait_for_job(self.cook_url, success_uuid, 'completed') - util.wait_for_job(self.cook_url, failed_uuid, 'completed') + # Wait for the desired states to be reached + util.wait_for_job(self.cook_url, waiting_uuid, 'waiting') + util.wait_for_job(self.cook_url, running_uuid, 'running') + util.wait_for_job(self.cook_url, success_uuid, 'completed') + util.wait_for_job(self.cook_url, failed_uuid, 'completed') - try: # waiting cp, jobs = self.list_jobs(name, user, 'waiting') self.assertEqual(0, cp.returncode, cp.stderr) diff --git a/integration/tests/cook/test_multi_user.py b/integration/tests/cook/test_multi_user.py index e3b585efba..dff15eefa4 100644 --- a/integration/tests/cook/test_multi_user.py +++ b/integration/tests/cook/test_multi_user.py @@ -75,7 +75,7 @@ def test_multi_user_usage(self): self.assertEqual(resp.status_code, 200, resp.content) usage_data = resp.json() # Check that the response structure looks as expected - self.assertEqual(list(usage_data.keys()), ['total_usage'], usage_data) + self.assertEqual(list(usage_data.keys()), ['total_usage', 'pools'], usage_data) self.assertEqual(len(usage_data['total_usage']), 4, usage_data) # Check that each user's usage is as expected self.assertEqual(usage_data['total_usage']['mem'], job_resources['mem'] * i, usage_data) diff --git a/scheduler/src/cook/mesos/api.clj b/scheduler/src/cook/mesos/api.clj index f7fd4696e9..fedcbe4313 100644 --- a/scheduler/src/cook/mesos/api.clj +++ b/scheduler/src/cook/mesos/api.clj @@ -26,6 +26,7 @@ [compojure.api.middleware :as c-mw] [compojure.api.sweet :as c-api] [compojure.core :refer [ANY GET POST routes]] + [cook.config :as config] [cook.cors :as cors] [cook.datomic :as datomic] [cook.mesos.pool :as pool] @@ -2024,41 +2025,77 @@ "Helper-schema for :ungrouped jobs in UserUsageResponse." {:running_jobs [s/Uuid], :usage UsageInfo}) -(def UserUsageResponse - "Schema for a usage response." +(def UserUsageInPool + "Schema for a user's usage within a particular pool." {:total_usage UsageInfo (s/optional-key :grouped) [{:group UsageGroupInfo, :usage UsageInfo}] (s/optional-key :ungrouped) JobsUsageResponse}) +(def UserUsageResponse + "Schema for a usage response." + (assoc UserUsageInPool + (s/optional-key :pools) {s/Str UserUsageInPool})) + (def zero-usage "Resource usage map 'zero' value" (util/total-resources-of-jobs nil)) +(defn user-usage + "Given a collection of jobs, returns the usage information for those jobs." + [with-group-breakdown? jobs] + (merge + ; basic user usage response + {:total-usage (util/total-resources-of-jobs jobs)} + ; (optional) user's total usage with breakdown by job groups + (when with-group-breakdown? + (let [breakdowns (->> jobs + (group-by util/job-ent->group-uuid) + (map-vals (juxt #(mapv :job/uuid %) + util/total-resources-of-jobs + #(-> % first :group/_job first))))] + {:grouped (for [[guuid [job-uuids usage group]] breakdowns + :when guuid] + {:group {:uuid (:group/uuid group) + :name (:group/name group) + :running-jobs job-uuids} + :usage usage}) + :ungrouped (let [[job-uuids usage] (get breakdowns nil)] + {:running-jobs job-uuids + :usage (or usage zero-usage)})})))) + +(defn no-usage-map + "Returns a resource usage map showing no usage" + [with-group-breakdown?] + (cond-> {:total-usage zero-usage} + with-group-breakdown? (assoc :grouped [] + :ungrouped {:running-jobs [] + :usage zero-usage}))) + (defn get-user-usage "Query a user's current resource usage based on running jobs." [db ctx] (let [user (get-in ctx [:request :query-params :user]) with-group-breakdown? (get-in ctx [:request :query-params :group_breakdown]) - pool (get-in ctx [:request :query-params :pool]) - running-jobs (util/get-user-running-job-ents db user pool)] - (merge - ; basic user usage response - {:total-usage (util/total-resources-of-jobs running-jobs)} - ; (optional) user's total usage with breakdown by job groups - (when with-group-breakdown? - (let [breakdowns (->> running-jobs - (group-by util/job-ent->group-uuid) - (map-vals (juxt #(mapv :job/uuid %) - util/total-resources-of-jobs - #(-> % first :group/_job first))))] - {:grouped (for [[guuid [job-uuids usage group]] breakdowns - :when guuid] - {:group {:uuid (:group/uuid group) - :name (:group/name group) - :running-jobs job-uuids} - :usage usage}) - :ungrouped (let [[job-uuids usage] (get breakdowns nil)] - {:running-jobs job-uuids :usage (or usage zero-usage)})}))))) + pool (get-in ctx [:request :query-params :pool])] + (if pool + (let [jobs (util/get-user-running-job-ents-in-pool db user pool)] + (user-usage with-group-breakdown? jobs)) + (let [jobs (util/get-user-running-job-ents db user) + pools (pool/all-pools db)] + (if (pos? (count pools)) + (let [default-pool-name (config/default-pool) + pool-name->jobs (group-by + #(if-let [pool (:job/pool %)] + (:pool/name pool) + default-pool-name) + jobs) + no-usage (no-usage-map with-group-breakdown?) + pool-name->usage (map-vals (partial user-usage with-group-breakdown?) pool-name->jobs) + pool-name->no-usage (into {} (map (fn [{:keys [pool/name]}] [name no-usage]) pools)) + default-pool-usage (get pool-name->usage default-pool-name no-usage)] + (assoc default-pool-usage + :pools (merge pool-name->no-usage pool-name->usage))) + (user-usage with-group-breakdown? jobs)))))) (defn read-usage-handler "Handle GET requests for a user's current usage." @@ -2389,6 +2426,7 @@ JobResponse (partial map-keys ->snake_case) GroupResponse (partial map-keys ->snake_case) UserUsageResponse (partial map-keys ->snake_case) + UserUsageInPool (partial map-keys ->snake_case) UsageGroupInfo (partial map-keys ->snake_case) JobsUsageResponse (partial map-keys ->snake_case) s/Uuid str})] diff --git a/scheduler/src/cook/mesos/util.clj b/scheduler/src/cook/mesos/util.clj index e66910364e..bfcede3101 100644 --- a/scheduler/src/cook/mesos/util.clj +++ b/scheduler/src/cook/mesos/util.clj @@ -470,6 +470,23 @@ (= pool-name (config/default-pool))) (defn get-user-running-job-ents + "Returns all running job entities for a specific user." + ([db user] + (timers/time! + get-user-running-jobs-duration + (->> (q + '[:find [?j ...] + :in $ ?user + :where + ;; Note: We're assuming that many users will have significantly more + ;; completed jobs than there are jobs currently running in the system. + ;; If not, we might want to swap these two constraints. + [?j :job/state :job.state/running] + [?j :job/user ?user]] + db user) + (map (partial d/entity db)))))) + +(defn get-user-running-job-ents-in-pool "Returns all running job entities for a specific user and pool." [db user pool-name] (let [requesting-default-pool? (or (nil? pool-name) (default-pool? pool-name)) diff --git a/scheduler/test/cook/test/mesos/api.clj b/scheduler/test/cook/test/mesos/api.clj index 74c76933d1..350756152b 100644 --- a/scheduler/test/cook/test/mesos/api.clj +++ b/scheduler/test/cook/test/mesos/api.clj @@ -28,7 +28,11 @@ [cook.mesos.reason :as reason] [cook.mesos.scheduler :as sched] [cook.mesos.util :as util] - [cook.test.testutil :refer [restore-fresh-database! flush-caches! create-dummy-job create-dummy-instance]] + [cook.test.testutil :refer [create-dummy-instance + create-dummy-job + create-pool + flush-caches! + restore-fresh-database!]] [datomic.api :as d :refer [q db]] [mesomatic.scheduler :as msched] [schema.core :as s]) @@ -1632,3 +1636,125 @@ (is (nil? (:job/name job-entity))) (is (= job-uuid (:uuid job-map-for-api))) (is (= "cookjob" (:name job-map-for-api)))))) + +(deftest test-get-user-usage-no-usage + (let [conn (restore-fresh-database! "datomic:mem://test-get-user-usage") + request-context {:request {:query-params {:user "alice"}}}] + (is (= {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + (api/get-user-usage (d/db conn) request-context))) + + (create-pool conn "foo") + (create-pool conn "bar") + (create-pool conn "baz") + (is (= {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0} + :pools {"foo" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + "bar" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + "baz" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}}}} + (api/get-user-usage (d/db conn) request-context))))) + +(deftest test-get-user-usage-with-some-usage + (let [conn (restore-fresh-database! "datomic:mem://test-get-user-usage-with-some-usage") + request-context {:request {:query-params {:user "alice"}}}] + ; No pools in the database + (create-dummy-job conn + :user "alice" + :job-state :job.state/running + :ncpus 12 + :memory 34 + :gpus 56) + (is (= {:total-usage {:cpus 12.0 + :mem 34.0 + :gpus 56.0 + :jobs 1}} + (api/get-user-usage (d/db conn) request-context))) + + ; Jobs with no pool should show up in the default pool + (create-pool conn "foo") + (create-pool conn "bar") + (create-pool conn "baz") + (is (= {:total-usage {:cpus 12.0 + :mem 34.0 + :gpus 56.0 + :jobs 1} + :pools {"foo" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + "bar" {:total-usage {:cpus 12.0 + :mem 34.0 + :gpus 56.0 + :jobs 1}} + "baz" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}}}} + (with-redefs [config/default-pool (constantly "bar")] + (api/get-user-usage (d/db conn) request-context)))) + + ; Jobs with a pool should show up in that pool + (create-dummy-job conn + :user "alice" + :job-state :job.state/running + :ncpus 78 + :memory 910 + :gpus 1112 + :pool "baz") + (is (= {:total-usage {:cpus 12.0 + :mem 34.0 + :gpus 56.0 + :jobs 1} + :pools {"foo" {:total-usage {:cpus 12.0 + :mem 34.0 + :gpus 56.0 + :jobs 1}} + "bar" {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + "baz" {:total-usage {:cpus 78.0 + :mem 910.0 + :gpus 1112.0 + :jobs 1}}}} + (with-redefs [config/default-pool (constantly "foo")] + (api/get-user-usage (d/db conn) request-context)))) + + ; Asking for a specific pool should return that pool's + ; usage at the top level, and should not produce sub-map + (create-dummy-job conn + :user "alice" + :job-state :job.state/running + :ncpus 13 + :memory 14 + :gpus 15 + :pool "bar") + (with-redefs [config/default-pool (constantly "baz")] + (is (= {:total-usage {:cpus 0.0 + :mem 0.0 + :gpus 0.0 + :jobs 0}} + (api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "foo")))) + (is (= {:total-usage {:cpus 13.0 + :mem 14.0 + :gpus 15.0 + :jobs 1}} + (api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "bar")))) + (is (= {:total-usage {:cpus 90.0 + :mem 944.0 + :gpus 1168.0 + :jobs 2}} + (api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "baz"))))))) diff --git a/scheduler/test/cook/test/mesos/util.clj b/scheduler/test/cook/test/mesos/util.clj index 2619458a7c..d920d2af48 100644 --- a/scheduler/test/cook/test/mesos/util.clj +++ b/scheduler/test/cook/test/mesos/util.clj @@ -100,26 +100,26 @@ :pool "pool-a") ; No default pool, specifying a specific pool - (is (= 2 (count (util/get-user-running-job-ents (db conn) "u1" "pool-a")))) - (is (= 1 (count (util/get-user-running-job-ents (db conn) "u2" "pool-b")))) - (is (= 0 (count (util/get-user-running-job-ents (db conn) "u3" "pool-c")))) + (is (= 2 (count (util/get-user-running-job-ents-in-pool (db conn) "u1" "pool-a")))) + (is (= 1 (count (util/get-user-running-job-ents-in-pool (db conn) "u2" "pool-b")))) + (is (= 0 (count (util/get-user-running-job-ents-in-pool (db conn) "u3" "pool-c")))) ; No default pool, not specifying a pool - (is (= 2 (count (util/get-user-running-job-ents (db conn) "u1" nil)))) - (is (= 1 (count (util/get-user-running-job-ents (db conn) "u2" nil)))) - (is (= 0 (count (util/get-user-running-job-ents (db conn) "u3" nil)))) + (is (= 2 (count (util/get-user-running-job-ents-in-pool (db conn) "u1" nil)))) + (is (= 1 (count (util/get-user-running-job-ents-in-pool (db conn) "u2" nil)))) + (is (= 0 (count (util/get-user-running-job-ents-in-pool (db conn) "u3" nil)))) ; Default pool defined, specifying a specific pool (with-redefs [config/default-pool (constantly "pool-a")] - (is (= 4 (count (util/get-user-running-job-ents (db conn) "u1" "pool-a")))) - (is (= 1 (count (util/get-user-running-job-ents (db conn) "u2" "pool-b")))) - (is (= 0 (count (util/get-user-running-job-ents (db conn) "u3" "pool-c"))))) + (is (= 4 (count (util/get-user-running-job-ents-in-pool (db conn) "u1" "pool-a")))) + (is (= 1 (count (util/get-user-running-job-ents-in-pool (db conn) "u2" "pool-b")))) + (is (= 0 (count (util/get-user-running-job-ents-in-pool (db conn) "u3" "pool-c"))))) ; Default pool defined, not specifying a pool (with-redefs [config/default-pool (constantly "pool-b")] - (is (= 3 (count (util/get-user-running-job-ents (db conn) "u1" nil)))) - (is (= 2 (count (util/get-user-running-job-ents (db conn) "u2" nil)))) - (is (= 0 (count (util/get-user-running-job-ents (db conn) "u3" nil))))))) + (is (= 3 (count (util/get-user-running-job-ents-in-pool (db conn) "u1" nil)))) + (is (= 2 (count (util/get-user-running-job-ents-in-pool (db conn) "u2" nil)))) + (is (= 0 (count (util/get-user-running-job-ents-in-pool (db conn) "u3" nil))))))) (deftest test-cache (let [cache (util/new-cache)