Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Provides /usage sub-map with all pools in the default case (#806)
Browse files Browse the repository at this point in the history
  • Loading branch information
dposada authored and pschorf committed Apr 17, 2018
1 parent d63bbd9 commit b00c3ae
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 54 deletions.
14 changes: 10 additions & 4 deletions integration/tests/cook/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,7 @@ def test_user_usage_basic(self):
self.assertEqual(resp.status_code, 200, resp.content)
usage_data = resp.json()
# Check that the response structure looks as expected
self.assertEqual(list(usage_data.keys()), ['total_usage'], usage_data)
self.assertEqual(list(usage_data.keys()), ['total_usage', 'pools'], usage_data)
self.assertEqual(len(usage_data['total_usage']), 4, usage_data)
# Since we don't know what other test jobs are currently running,
# we conservatively check current usage with the >= operation.
Expand All @@ -1700,7 +1700,7 @@ def test_user_usage_grouped(self):
self.assertEqual(resp.status_code, 200, resp.content)
usage_data = resp.json()
# Check that the response structure looks as expected
self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped'}, usage_data)
self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped', 'pools'}, usage_data)
self.assertEqual(set(usage_data['ungrouped'].keys()), {'running_jobs', 'usage'}, usage_data)
my_group_usage = next(x for x in usage_data['grouped'] if x['group']['uuid'] == group_uuid)
self.assertEqual(set(my_group_usage.keys()), {'group', 'usage'}, my_group_usage)
Expand Down Expand Up @@ -1731,6 +1731,13 @@ def test_user_usage_grouped(self):
self.assertEqual(usage_data['total_usage']['gpus'], breakdowns_total['gpus'], usage_data)
self.assertEqual(usage_data['total_usage']['jobs'], breakdowns_total['jobs'], usage_data)
# Pool-specific checks
pools, _ = util.pools(self.cook_url)
for pool in pools:
# There should be a sub-map under pools for each pool in the
# system, since we didn't specify the pool in the usage request
pool_usage = usage_data['pools'][pool['name']]
self.assertEqual(set(pool_usage.keys()), {'total_usage', 'grouped', 'ungrouped'}, pool_usage)
self.assertEqual(set(pool_usage['ungrouped'].keys()), {'running_jobs', 'usage'}, pool_usage)
default_pool = util.default_pool(self.cook_url)
if default_pool:
# If there is a default pool configured, make sure that our jobs,
Expand All @@ -1747,7 +1754,6 @@ def test_user_usage_grouped(self):
self.assertEqual(my_group_usage['usage']['jobs'], job_count, my_group_usage)
# If there is a non-default pool, make sure that
# our jobs don't appear under that pool's usage
pools, _ = util.pools(self.cook_url)
non_default_pools = [p for p in pools if p['name'] != default_pool]
if len(non_default_pools) > 0:
pool = non_default_pools[0]['name']
Expand Down Expand Up @@ -1777,7 +1783,7 @@ def test_user_usage_ungrouped(self):
self.assertEqual(resp.status_code, 200, resp.content)
usage_data = resp.json()
# Check that the response structure looks as expected
self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped'}, usage_data)
self.assertEqual(set(usage_data.keys()), {'total_usage', 'grouped', 'ungrouped', 'pools'}, usage_data)
ungrouped_data = usage_data['ungrouped']
self.assertEqual(set(ungrouped_data.keys()), {'running_jobs', 'usage'}, ungrouped_data)
# Our jobs should be included in the ungrouped breakdown
Expand Down
28 changes: 14 additions & 14 deletions integration/tests/cook/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,23 +439,23 @@ def test_list_by_state(self):
self.assertEqual(0, cp.returncode, cp.stderr)
running_uuid = uuids[0]

# Submit a successful job
cp, uuids = cli.submit('ls', self.cook_url, submit_flags='--name %s' % name)
self.assertEqual(0, cp.returncode, cp.stderr)
success_uuid = uuids[0]
try:
# Submit a successful job
cp, uuids = cli.submit('ls', self.cook_url, submit_flags='--name %s' % name)
self.assertEqual(0, cp.returncode, cp.stderr)
success_uuid = uuids[0]

# Submit a failed job
cp, uuids = cli.submit('exit 1', self.cook_url, submit_flags='--name %s' % name)
self.assertEqual(0, cp.returncode, cp.stderr)
failed_uuid = uuids[0]
# Submit a failed job
cp, uuids = cli.submit('exit 1', self.cook_url, submit_flags='--name %s' % name)
self.assertEqual(0, cp.returncode, cp.stderr)
failed_uuid = uuids[0]

# Wait for the desired states to be reached
util.wait_for_job(self.cook_url, waiting_uuid, 'waiting')
util.wait_for_job(self.cook_url, running_uuid, 'running')
util.wait_for_job(self.cook_url, success_uuid, 'completed')
util.wait_for_job(self.cook_url, failed_uuid, 'completed')
# Wait for the desired states to be reached
util.wait_for_job(self.cook_url, waiting_uuid, 'waiting')
util.wait_for_job(self.cook_url, running_uuid, 'running')
util.wait_for_job(self.cook_url, success_uuid, 'completed')
util.wait_for_job(self.cook_url, failed_uuid, 'completed')

try:
# waiting
cp, jobs = self.list_jobs(name, user, 'waiting')
self.assertEqual(0, cp.returncode, cp.stderr)
Expand Down
2 changes: 1 addition & 1 deletion integration/tests/cook/test_multi_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_multi_user_usage(self):
self.assertEqual(resp.status_code, 200, resp.content)
usage_data = resp.json()
# Check that the response structure looks as expected
self.assertEqual(list(usage_data.keys()), ['total_usage'], usage_data)
self.assertEqual(list(usage_data.keys()), ['total_usage', 'pools'], usage_data)
self.assertEqual(len(usage_data['total_usage']), 4, usage_data)
# Check that each user's usage is as expected
self.assertEqual(usage_data['total_usage']['mem'], job_resources['mem'] * i, usage_data)
Expand Down
82 changes: 60 additions & 22 deletions scheduler/src/cook/mesos/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
[compojure.api.middleware :as c-mw]
[compojure.api.sweet :as c-api]
[compojure.core :refer [ANY GET POST routes]]
[cook.config :as config]
[cook.cors :as cors]
[cook.datomic :as datomic]
[cook.mesos.pool :as pool]
Expand Down Expand Up @@ -2024,41 +2025,77 @@
"Helper-schema for :ungrouped jobs in UserUsageResponse."
{:running_jobs [s/Uuid], :usage UsageInfo})

(def UserUsageResponse
"Schema for a usage response."
(def UserUsageInPool
"Schema for a user's usage within a particular pool."
{:total_usage UsageInfo
(s/optional-key :grouped) [{:group UsageGroupInfo, :usage UsageInfo}]
(s/optional-key :ungrouped) JobsUsageResponse})

(def UserUsageResponse
"Schema for a usage response."
(assoc UserUsageInPool
(s/optional-key :pools) {s/Str UserUsageInPool}))

(def zero-usage
"Resource usage map 'zero' value"
(util/total-resources-of-jobs nil))

(defn user-usage
"Given a collection of jobs, returns the usage information for those jobs."
[with-group-breakdown? jobs]
(merge
; basic user usage response
{:total-usage (util/total-resources-of-jobs jobs)}
; (optional) user's total usage with breakdown by job groups
(when with-group-breakdown?
(let [breakdowns (->> jobs
(group-by util/job-ent->group-uuid)
(map-vals (juxt #(mapv :job/uuid %)
util/total-resources-of-jobs
#(-> % first :group/_job first))))]
{:grouped (for [[guuid [job-uuids usage group]] breakdowns
:when guuid]
{:group {:uuid (:group/uuid group)
:name (:group/name group)
:running-jobs job-uuids}
:usage usage})
:ungrouped (let [[job-uuids usage] (get breakdowns nil)]
{:running-jobs job-uuids
:usage (or usage zero-usage)})}))))

(defn no-usage-map
"Returns a resource usage map showing no usage"
[with-group-breakdown?]
(cond-> {:total-usage zero-usage}
with-group-breakdown? (assoc :grouped []
:ungrouped {:running-jobs []
:usage zero-usage})))

(defn get-user-usage
"Query a user's current resource usage based on running jobs."
[db ctx]
(let [user (get-in ctx [:request :query-params :user])
with-group-breakdown? (get-in ctx [:request :query-params :group_breakdown])
pool (get-in ctx [:request :query-params :pool])
running-jobs (util/get-user-running-job-ents db user pool)]
(merge
; basic user usage response
{:total-usage (util/total-resources-of-jobs running-jobs)}
; (optional) user's total usage with breakdown by job groups
(when with-group-breakdown?
(let [breakdowns (->> running-jobs
(group-by util/job-ent->group-uuid)
(map-vals (juxt #(mapv :job/uuid %)
util/total-resources-of-jobs
#(-> % first :group/_job first))))]
{:grouped (for [[guuid [job-uuids usage group]] breakdowns
:when guuid]
{:group {:uuid (:group/uuid group)
:name (:group/name group)
:running-jobs job-uuids}
:usage usage})
:ungrouped (let [[job-uuids usage] (get breakdowns nil)]
{:running-jobs job-uuids :usage (or usage zero-usage)})})))))
pool (get-in ctx [:request :query-params :pool])]
(if pool
(let [jobs (util/get-user-running-job-ents-in-pool db user pool)]
(user-usage with-group-breakdown? jobs))
(let [jobs (util/get-user-running-job-ents db user)
pools (pool/all-pools db)]
(if (pos? (count pools))
(let [default-pool-name (config/default-pool)
pool-name->jobs (group-by
#(if-let [pool (:job/pool %)]
(:pool/name pool)
default-pool-name)
jobs)
no-usage (no-usage-map with-group-breakdown?)
pool-name->usage (map-vals (partial user-usage with-group-breakdown?) pool-name->jobs)
pool-name->no-usage (into {} (map (fn [{:keys [pool/name]}] [name no-usage]) pools))
default-pool-usage (get pool-name->usage default-pool-name no-usage)]
(assoc default-pool-usage
:pools (merge pool-name->no-usage pool-name->usage)))
(user-usage with-group-breakdown? jobs))))))

(defn read-usage-handler
"Handle GET requests for a user's current usage."
Expand Down Expand Up @@ -2389,6 +2426,7 @@
JobResponse (partial map-keys ->snake_case)
GroupResponse (partial map-keys ->snake_case)
UserUsageResponse (partial map-keys ->snake_case)
UserUsageInPool (partial map-keys ->snake_case)
UsageGroupInfo (partial map-keys ->snake_case)
JobsUsageResponse (partial map-keys ->snake_case)
s/Uuid str})]
Expand Down
17 changes: 17 additions & 0 deletions scheduler/src/cook/mesos/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,23 @@
(= pool-name (config/default-pool)))

(defn get-user-running-job-ents
"Returns all running job entities for a specific user."
([db user]
(timers/time!
get-user-running-jobs-duration
(->> (q
'[:find [?j ...]
:in $ ?user
:where
;; Note: We're assuming that many users will have significantly more
;; completed jobs than there are jobs currently running in the system.
;; If not, we might want to swap these two constraints.
[?j :job/state :job.state/running]
[?j :job/user ?user]]
db user)
(map (partial d/entity db))))))

(defn get-user-running-job-ents-in-pool
"Returns all running job entities for a specific user and pool."
[db user pool-name]
(let [requesting-default-pool? (or (nil? pool-name) (default-pool? pool-name))
Expand Down
128 changes: 127 additions & 1 deletion scheduler/test/cook/test/mesos/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
[cook.mesos.reason :as reason]
[cook.mesos.scheduler :as sched]
[cook.mesos.util :as util]
[cook.test.testutil :refer [restore-fresh-database! flush-caches! create-dummy-job create-dummy-instance]]
[cook.test.testutil :refer [create-dummy-instance
create-dummy-job
create-pool
flush-caches!
restore-fresh-database!]]
[datomic.api :as d :refer [q db]]
[mesomatic.scheduler :as msched]
[schema.core :as s])
Expand Down Expand Up @@ -1632,3 +1636,125 @@
(is (nil? (:job/name job-entity)))
(is (= job-uuid (:uuid job-map-for-api)))
(is (= "cookjob" (:name job-map-for-api))))))

(deftest test-get-user-usage-no-usage
(let [conn (restore-fresh-database! "datomic:mem://test-get-user-usage")
request-context {:request {:query-params {:user "alice"}}}]
(is (= {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
(api/get-user-usage (d/db conn) request-context)))

(create-pool conn "foo")
(create-pool conn "bar")
(create-pool conn "baz")
(is (= {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}
:pools {"foo" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
"bar" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
"baz" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}}}
(api/get-user-usage (d/db conn) request-context)))))

(deftest test-get-user-usage-with-some-usage
(let [conn (restore-fresh-database! "datomic:mem://test-get-user-usage-with-some-usage")
request-context {:request {:query-params {:user "alice"}}}]
; No pools in the database
(create-dummy-job conn
:user "alice"
:job-state :job.state/running
:ncpus 12
:memory 34
:gpus 56)
(is (= {:total-usage {:cpus 12.0
:mem 34.0
:gpus 56.0
:jobs 1}}
(api/get-user-usage (d/db conn) request-context)))

; Jobs with no pool should show up in the default pool
(create-pool conn "foo")
(create-pool conn "bar")
(create-pool conn "baz")
(is (= {:total-usage {:cpus 12.0
:mem 34.0
:gpus 56.0
:jobs 1}
:pools {"foo" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
"bar" {:total-usage {:cpus 12.0
:mem 34.0
:gpus 56.0
:jobs 1}}
"baz" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}}}
(with-redefs [config/default-pool (constantly "bar")]
(api/get-user-usage (d/db conn) request-context))))

; Jobs with a pool should show up in that pool
(create-dummy-job conn
:user "alice"
:job-state :job.state/running
:ncpus 78
:memory 910
:gpus 1112
:pool "baz")
(is (= {:total-usage {:cpus 12.0
:mem 34.0
:gpus 56.0
:jobs 1}
:pools {"foo" {:total-usage {:cpus 12.0
:mem 34.0
:gpus 56.0
:jobs 1}}
"bar" {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
"baz" {:total-usage {:cpus 78.0
:mem 910.0
:gpus 1112.0
:jobs 1}}}}
(with-redefs [config/default-pool (constantly "foo")]
(api/get-user-usage (d/db conn) request-context))))

; Asking for a specific pool should return that pool's
; usage at the top level, and should not produce sub-map
(create-dummy-job conn
:user "alice"
:job-state :job.state/running
:ncpus 13
:memory 14
:gpus 15
:pool "bar")
(with-redefs [config/default-pool (constantly "baz")]
(is (= {:total-usage {:cpus 0.0
:mem 0.0
:gpus 0.0
:jobs 0}}
(api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "foo"))))
(is (= {:total-usage {:cpus 13.0
:mem 14.0
:gpus 15.0
:jobs 1}}
(api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "bar"))))
(is (= {:total-usage {:cpus 90.0
:mem 944.0
:gpus 1168.0
:jobs 2}}
(api/get-user-usage (d/db conn) (assoc-in request-context [:request :query-params :pool] "baz")))))))
Loading

0 comments on commit b00c3ae

Please sign in to comment.