Skip to content

Commit

Permalink
LAB-1491 ray jobs (#992)
Browse files Browse the repository at this point in the history
Co-authored-by: Aakaash Meduri <aakaash.meduri@gmail.com>
  • Loading branch information
supraja-968 and acashmoney authored Jul 29, 2024
1 parent 40c8b76 commit 4f11c82
Show file tree
Hide file tree
Showing 24 changed files with 767 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ export default function ExperimentDetail() {

const experimentID = experiment.ID?.toString();

useEffect(() => {
if (["running", "queued"].includes(status.status) && experimentID) {
const interval = setInterval(() => {
console.log("Checking for new results");
dispatch(experimentDetailThunk(experimentID));
}, 15000);

return () => clearInterval(interval);
}
}, [dispatch, experimentID, status]);
// useEffect(() => {
// if (["running", "queued"].includes(status.status) && experimentID) {
// const interval = setInterval(() => {
// console.log("Checking for new results");
// dispatch(experimentDetailThunk(experimentID));
// }, 15000);

// return () => clearInterval(interval);
// }
// }, [dispatch, experimentID, status]);

return (
<div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export default function JobsAccordion({ experiment }: JobsAccordionProps) {
return (
<Accordion type="single" defaultValue={activeJobUUID} value={activeJobUUID} onValueChange={setActiveJobUUID} className="min-h-[600px]">
{[...experiment.Jobs]?.sort((a, b) => (a.ID || 0) - (b.ID || 0)).map((job, index) => {
const validStates = ["queued", "running", "failed", "completed"];
const status = (validStates.includes(job.JobStatus) ? job.JobStatus : "unknown") as "queued" | "running" | "failed" | "completed" | "unknown";
const validStates = ["queued", "processing", "pending", "running", "failed", "succeeded", "stopped"];
const status = (validStates.includes(job.JobStatus) ? job.JobStatus : "unknown") as "queued" | "processing" | "pending" | "running" | "failed" | "succeeded" | "stopped" | "unknown";

return (
<AccordionItem value={job.RayJobID} className="border-0 [&[data-state=open]>div]:shadow-lg" key={job.ID}>
Expand Down
42 changes: 21 additions & 21 deletions frontend/app/experiments/(experiment)/ExperimentStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,32 @@ export function aggregateJobStatus(jobs: JobDetail[]) {
let label;
const totalJobs = jobs.length;
const queuedJobs = jobs.filter((job) => job.JobStatus === "queued").length;
const processingJobs = jobs.filter((job) => job.JobStatus === "processing").length;
const pendingJobs = jobs.filter((job) => job.JobStatus === "pending").length;
const runningJobs = jobs.filter((job) => job.JobStatus === "running").length;
const failedJobs = jobs.filter((job) => job.JobStatus === "failed").length;
const completedJobs = jobs.filter((job) => job.JobStatus === "completed").length;
const succeededJobs = jobs.filter((job) => job.JobStatus === "succeeded").length;
const stoppedJobs = jobs.filter((job) => job.JobStatus === "stopped").length;

//These statuses may be deprecated
const errorJobs = jobs.filter((job) => job.Status === "error").length;
const newJobs = jobs.filter((job) => job.Status === "new").length;

const pendingJobs = queuedJobs + runningJobs + newJobs;
const progressingJobs = queuedJobs + runningJobs + processingJobs + pendingJobs;
const unsuccessfulJobs = failedJobs + stoppedJobs;

if (totalJobs === 0) {
status = "unknown";
label = "Status unknown";
} else if (totalJobs === completedJobs) {
status = "completed";
} else if (totalJobs === succeededJobs) {
status = "succeeded";
label = "All runs completed successfully";
} else if (failedJobs === totalJobs || errorJobs === totalJobs) {
} else if (unsuccessfulJobs === totalJobs) {
status = "failed";
label = "All runs failed";
} else if (queuedJobs === totalJobs || newJobs === totalJobs) {
label = "All runs failed or stopped";
} else if (queuedJobs === totalJobs || processingJobs === totalJobs || pendingJobs === totalJobs) {
status = "queued";
label = "Waiting to start";
} else if (pendingJobs > 0) {
} else if (progressingJobs > 0) {
status = "running";
label = "Running";
} else if (failedJobs + errorJobs > 0) {
} else if (failedJobs + stoppedJobs > 0) {
status = "partial-failure";
label = "Some runs completed successfully";
} else {
Expand All @@ -44,9 +44,9 @@ export function aggregateJobStatus(jobs: JobDetail[]) {
status,
label,
totalJobs,
pendingJobs,
failedJobs,
completedJobs,
progressingJobs,
unsuccessfulJobs,
succeededJobs,
};
}

Expand All @@ -59,7 +59,7 @@ export function JobStatusIcon({ size = 12, status }: JobStatusIconProps) {
return (
<span className={cn("relative z-0", (status === "queued" || status === "running") && "animate-pulse")}>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 12 12" width={size} height={size} fill="none">
{(status === "failed" || status === "partial-failure" || status === "completed" || status === "unknown") && (
{(status === "failed" || status === "partial-failure" || status === "succeeded" || status === "unknown") && (
<circle cx={6} cy={6} r={6} fill={`url(#${status})`} />
)}
{status === "queued" && (
Expand Down Expand Up @@ -106,7 +106,7 @@ interface ExperimentStatusProps {
}

export function ExperimentStatus({ jobs, className }: ExperimentStatusProps) {
const { status, label, pendingJobs, failedJobs, completedJobs } = aggregateJobStatus(jobs);
const { status, label, progressingJobs, unsuccessfulJobs, succeededJobs } = aggregateJobStatus(jobs);
return (
<>
<TooltipProvider>
Expand All @@ -119,9 +119,9 @@ export function ExperimentStatus({ jobs, className }: ExperimentStatusProps) {
<TooltipContent>
<div className="text-xs">
<p className="mb-1 font-mono uppercase">{label}</p>
{pendingJobs > 0 && <p>{pendingJobs} pending</p>}
<p>{completedJobs} complete</p>
<p>{failedJobs} failed</p>
{progressingJobs > 0 && <p>{progressingJobs} pending</p>}
<p>{succeededJobs} complete</p>
<p>{unsuccessfulJobs} failed</p>
</div>
</TooltipContent>
</Tooltip>
Expand Down
24 changes: 21 additions & 3 deletions frontend/components/ui/badge.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,57 @@ const badgeVariants = cva(
},
status: {
queued: "bg-gray-400/20",
processing: "bg-gray-400/20",
pending: "bg-gray-400/20",
running: "bg-sky-500/20",
completed: "bg-green-600/20",
succeeded: "bg-green-600/20",
failed: "bg-destructive/20",
stopped: "bg-destructive/20",
"partial-failure": "bg-amber-500/20",
unknown: "bg-gray-400/20",
},
},
compoundVariants: [
{
variant: "outline",
status: ["queued", "running", "completed", "failed", "partial-failure", "unknown"],
status: ["queued", "processing", "pending", "running", "failed", "succeeded", "stopped", "partial-failure", "unknown"],
class: "bg-transparent",
},
{
variant: "outline",
status: "queued",
class: "border-gray-400 text-gray-400",
},
{
variant: "outline",
status: "processing",
class: "border-gray-400 text-gray-400",
},
{
variant: "outline",
status: "pending",
class: "border-gray-400 text-gray-400",
},
{
variant: "outline",
status: "running",
class: "border-sky-500 text-sky-500",
},
{
variant: "outline",
status: "completed",
status: "succeeded",
class: "border-green-600 text-green-600",
},
{
variant: "outline",
status: "failed",
class: "border-destructive text-destructive",
},
{
variant: "outline",
status: "stopped",
class: "border-destructive text-destructive",
},
{
variant: "outline",
status: "partial-failure",
Expand Down
11 changes: 11 additions & 0 deletions gateway/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ func ServeWebApp() {
}
}()

go func() {
for {
if err := utils.MonitorRunningJobs(db); err != nil {
fmt.Printf("unexpected error monitoring running jobs: %v\n", err)
time.Sleep(10 * time.Second) // wait for 5 seconds before retrying
} else {
break // exit the loop if no error (optional based on your use case)
}
}
}()

// Start the server with CORS middleware
fmt.Println("Server started on http://localhost:8080")
http.ListenAndServe(":8080", corsMiddleware.Handler(mux))
Expand Down
5 changes: 3 additions & 2 deletions gateway/handlers/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func AddExperimentHandler(db *gorm.DB) http.HandlerFunc {
Inputs: datatypes.JSON(inputsJSON),
CreatedAt: time.Now().UTC(),
Public: false,
JobType: model.JobType,
}

result := db.Create(&job)
Expand Down Expand Up @@ -195,7 +196,7 @@ func AddExperimentHandler(db *gorm.DB) http.HandlerFunc {
JobID: job.ID,
RetryCount: 0,
EventTime: time.Now().UTC(),
EventType: models.EventTypeJobCreated,
EventType: models.EventTypeJobQueued,
}

result = db.Save(&inferenceEvent)
Expand Down Expand Up @@ -638,7 +639,7 @@ func AddJobToExperimentHandler(db *gorm.DB) http.HandlerFunc {
JobID: job.ID,
RetryCount: 0,
EventTime: time.Now().UTC(),
EventType: models.EventTypeJobCreated,
EventType: models.EventTypeJobQueued,
}

result = db.Save(&inferenceEvent)
Expand Down
7 changes: 5 additions & 2 deletions gateway/handlers/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type JobSummary struct {

type QueueSummary struct {
Queued JobSummary `json:"queued"`
Pending JobSummary `json:"pending"`
Running JobSummary `json:"running"`
}

Expand Down Expand Up @@ -106,10 +107,12 @@ func GetJobsQueueSummaryHandler(db *gorm.DB) http.HandlerFunc {
Count: data.Count,
}

if data.JobStatus == models.JobStateQueued {
summary.Ray.Queued = jobSummary
if data.JobStatus == models.JobStatePending {
summary.Ray.Pending = jobSummary
} else if data.JobStatus == models.JobStateRunning {
summary.Ray.Running = jobSummary
} else if data.JobStatus == models.JobStateQueued {
summary.Ray.Queued = jobSummary
}

}
Expand Down
30 changes: 19 additions & 11 deletions gateway/handlers/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ func AddModelHandler(db *gorm.DB, s3c *s3.S3Client) http.HandlerFunc {
} else {
maxRunningTime = model.MaxRunningTime
}

var jobType models.JobType
if model.JobType == "service" {
jobType = models.JobTypeService
} else {
jobType = models.JobTypeJob
}
// Start transaction
tx := db.Begin()

Expand All @@ -153,17 +160,18 @@ func AddModelHandler(db *gorm.DB, s3c *s3.S3Client) http.HandlerFunc {
}

modelEntry := models.Model{
WalletAddress: user.WalletAddress,
Name: model.Name,
ModelJson: modelJSON,
CreatedAt: time.Now().UTC(),
Display: display,
TaskCategory: taskCategory,
DefaultModel: defaultModel,
MaxRunningTime: maxRunningTime,
RayServiceEndpoint: model.RayServiceEndpoint,
ComputeCost: model.ComputeCost,
S3URI: s3_uri,
WalletAddress: user.WalletAddress,
Name: model.Name,
ModelJson: modelJSON,
CreatedAt: time.Now().UTC(),
Display: display,
TaskCategory: taskCategory,
DefaultModel: defaultModel,
MaxRunningTime: maxRunningTime,
RayEndpoint: model.RayEndpoint,
ComputeCost: model.ComputeCost,
S3URI: s3_uri,
JobType: jobType,
}

result := tx.Create(&modelEntry)
Expand Down
2 changes: 2 additions & 0 deletions gateway/migrations/38_job_type_column.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE jobs DROP COLUMN IF EXISTS job_type;
ALTER TABLE experiments DROP COLUMN IF EXISTS experiment_uuid;
2 changes: 2 additions & 0 deletions gateway/migrations/38_job_type_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE jobs ADD COLUMN IF NOT EXISTS job_type VARCHAR;
ALTER TABLE experiments ADD COLUMN IF NOT EXISTS experiment_uuid VARCHAR;
15 changes: 8 additions & 7 deletions gateway/models/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package models
import "time"

type Experiment struct {
ID uint `gorm:"primaryKey;autoIncrement"`
Jobs []Job `gorm:"foreignKey:ExperimentID"`
Name string `gorm:"type:varchar(255);"`
Public bool `gorm:"type:boolean;not null;default:false"`
RecordCID string `gorm:"column:record_cid;type:varchar(255);"`
WalletAddress string `gorm:"type:varchar(42);not null"`
CreatedAt time.Time `gorm:"autoCreateTime"`
ID uint `gorm:"primaryKey;autoIncrement"`
Jobs []Job `gorm:"foreignKey:ExperimentID"`
Name string `gorm:"type:varchar(255);"`
Public bool `gorm:"type:boolean;not null;default:false"`
RecordCID string `gorm:"column:record_cid;type:varchar(255);"`
WalletAddress string `gorm:"type:varchar(42);not null"`
CreatedAt time.Time `gorm:"autoCreateTime"`
ExperimentUUID string `gorm:"type:varchar(255);"`
}
13 changes: 8 additions & 5 deletions gateway/models/inferenceevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (

// event type can only be certain string values
const (
EventTypeJobCreated = "job_created"
EventTypeJobStarted = "job_started"
EventTypeJobCompleted = "job_completed"
EventTypeJobFailed = "job_failed"
EventTypeJobCancelled = "job_cancelled"
EventTypeJobQueued = "job_queued"
EventTypeJobProcessing = "job_processing"
EventTypeJobPending = "job_pending"
EventTypeJobRunning = "job_running"
EventTypeJobStopped = "job_stopped"
EventTypeJobSucceeded = "job_succeeded"
EventTypeJobFailed = "job_failed"
)

// retry default 0?
type InferenceEvent struct {
ID uint `gorm:"primaryKey;autoIncrement"`
JobID uint `gorm:"not null"`
Expand Down
21 changes: 8 additions & 13 deletions gateway/models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
type JobState string

const (
JobStateQueued JobState = "queued"
JobStatePending JobState = "pending"
JobStateRunning JobState = "running"
JobStateFailed JobState = "failed"
JobStateCompleted JobState = "completed"
JobStateStopped JobState = "stopped"
JobStateQueued JobState = "queued"
JobStateProcessing JobState = "processing"
JobStatePending JobState = "pending"
JobStateRunning JobState = "running"
JobStateStopped JobState = "stopped"
JobStateSucceeded JobState = "succeeded"
JobStateFailed JobState = "failed"
)

type QueueType string
Expand All @@ -23,13 +24,6 @@ const (
QueueTypeRay QueueType = "ray"
)

type JobType string

const (
JobTypeBacalhau JobType = "bacalhau"
JobTypeRay JobType = "ray"
)

type Job struct {
ID uint `gorm:"primaryKey;autoIncrement"`
RayJobID string `gorm:"type:varchar(255)"`
Expand All @@ -49,4 +43,5 @@ type Job struct {
Inputs datatypes.JSON `gorm:"type:json"`
InputFiles []File `gorm:"many2many:job_input_files;foreignKey:ID;joinForeignKey:job_id;References:ID;JoinReferences:file_id"`
OutputFiles []File `gorm:"many2many:job_output_files;foreignKey:ID;references:ID"`
JobType JobType `gorm:"type:varchar(255);default:'job'"`
}
Loading

0 comments on commit 4f11c82

Please sign in to comment.