Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cni support #891

Open
wants to merge 5 commits into
base: releasing-2.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## Changes from 2.5.0 to 2.5.1

### Correctly handle `TASK_STARTING` status updates.


## Changes from 2.4.0 to 2.5.0

### Highlights of this Release
Expand All @@ -24,7 +29,7 @@ Check the [REST API documentation](https://mesos.github.io/chronos/docs/api.html
#### Changed the default framework name
The framework name doesn't include the version number anymore.

#### New API endpoints
#### New API endpoints
The new `/scheduler/leader` endpoint makes it possible to get the current leader.

It is now possible to mark a job as successful via the `/scheduler/job/success` API endpoint.
Expand Down
96 changes: 96 additions & 0 deletions docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ All examples in this section assume that you've found a running leader at `chron
- [Adding a Scheduled Job](#adding-a-scheduled-job)
- [Adding a Dependent Job](#adding-a-dependent-job)
- [Adding a Docker Job](#adding-a-docker-job)
- [Adding a Containerized Job](#adding-a-containerized-job)
- [Using External Volumes](#using-external-volumes)
- [Updating Task Progress](#updating-task-progress)
- [Describing the Dependency Graph](#describing-the-dependency-graph)
- [Asynchronous Jobs](#asynchronous-jobs)
Expand All @@ -26,6 +28,7 @@ All examples in this section assume that you've found a running leader at `chron
- [Constraints](#constraints)



## Leaders

When you have multiple Chronos nodes running, only one of them will be elected as the leader.
Expand Down Expand Up @@ -278,6 +281,99 @@ There is also support for passing in arbitrary docker config options.
}
```

## Adding a Containerized Job

A containerized job takes the same format as a scheduled job or a dependency job and runs on a Mesos container.
To configure it, an additional `container` argument is required, which contains a type (required), an image (required), a network name (optional), mounted volumes (optional) and whether Mesos should always pull the latest image before executing or not (optional).

```json
{
"container": {
"type": "MESOS",
"forcePullImage": true,
"image": "debian",
"networkInfos": [
{
"name": "mynet"
}
],
"volumes": [
{
"containerPath": "/var/log/",
"hostPath": "/logs/",
"mode": "RW"
}
}
]
}
}
```
## Using External Volumes

Docker and Mesos Containerized jobs can use external volumes, typically volumes mounted using docker volume plugins.
To configure it, do not add a `hostPath` argument, instead add an `external` argument, which contain a `name`, `provider`, and `options` (optional).

```json
"volumes": [
{
"mode": "RW",
"containerPath": "/tmp",
"external": {
"name": "test",
"provider": "local-persist",
"options": [
{
"key": "mountpoint",
"value": "/tmp/test"
}
]
}
}
]
```

Here is a more elaborate example with a Mesos container, a network name, a mounted and an external volumes.
```json
{
"container": {
"type": "MESOS",
"forcePullImage": true,
"image": "debian",
"networkInfos": [
{
"name": "mynet",
"labels": [
{
"key": "service",
"value": "test"
}
]
}
],
"volumes": [
{
"containerPath": "/var/log/",
"hostPath": "/logs/",
"mode": "RW"
},
{
"mode": "RW",
"containerPath": "/tmp",
"external": {
"name": "test",
"provider": "local-persist",
"options": [
{
"key": "mountpoint",
"value": "/tmp/test"
}
]
}
}
]
}
}
```
## Updating Task Progress

Task progress can be updated by providing the number of additional elements processed. This will increment the existing count of elements processed.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.apache.mesos</groupId>
<artifactId>chronos</artifactId>
<version>2.5.0</version>
<version>2.5.2</version>
<inceptionYear>2012</inceptionYear>

<prerequisites>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,58 @@ object VolumeMode extends Enumeration {
object NetworkMode extends Enumeration {
type NetworkMode = Value

// Bridged and Host
val BRIDGE, HOST = Value
// Bridged, Host and USER
val BRIDGE, HOST, USER = Value
}

object ContainerType extends Enumeration {
type ContainerType = Value

// Docker, Mesos
val DOCKER, MESOS = Value
}


object ProtocolType extends Enumeration {
type ProtocolType = Value

val IPv4, IPv6 = Value
}

import org.apache.mesos.chronos.scheduler.jobs.NetworkMode._
import org.apache.mesos.chronos.scheduler.jobs.VolumeMode._
import org.apache.mesos.chronos.scheduler.jobs.ContainerType._
import org.apache.mesos.chronos.scheduler.jobs.ProtocolType._

case class ExternalVolume(
@JsonProperty name: String,
@JsonProperty provider: String,
@JsonProperty options: Seq[Parameter])

case class Volume(
@JsonProperty hostPath: Option[String],
@JsonProperty containerPath: String,
@JsonProperty mode: Option[VolumeMode])
@JsonProperty mode: Option[VolumeMode],
@JsonProperty external: Option[ExternalVolume])

case class PortMapping(
@JsonProperty hostPort: Int,
@JsonProperty containerPort: Int,
@JsonProperty protocol: Option[String])

case class Network(
@JsonProperty name: String,
@JsonProperty protocol: Option[ProtocolType],
@JsonProperty labels: Seq[Label],
@JsonProperty portMappings: Seq[PortMapping])

case class DockerContainer(
case class Container(
@JsonProperty image: String,
@JsonProperty `type`: ContainerType = ContainerType.DOCKER,
@JsonProperty volumes: Seq[Volume],
@JsonProperty parameters: Seq[Parameter],
@JsonProperty network: NetworkMode = NetworkMode.HOST,
// DEPRECATED, "networkName" will be removed in a future version.
@JsonProperty networkName: Option[String],
@JsonProperty networkInfos: Seq[Network],
@JsonProperty forcePullImage: Boolean = false)
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ trait BaseJob {

def runAsUser: String = ""

def container: DockerContainer = null
def container: Container = null

def environmentVariables: Seq[EnvironmentVariable] = List()

Expand Down Expand Up @@ -114,7 +114,7 @@ case class ScheduleBasedJob(
@JsonProperty override val fetch: Seq[Fetch] = List(),
@JsonProperty override val highPriority: Boolean = false,
@JsonProperty override val runAsUser: String = "",
@JsonProperty override val container: DockerContainer = null,
@JsonProperty override val container: Container = null,
@JsonProperty scheduleTimeZone: String = "",
@JsonProperty override val environmentVariables: Seq[EnvironmentVariable] = List(),
@JsonProperty override val shell: Boolean = true,
Expand Down Expand Up @@ -152,7 +152,7 @@ case class DependencyBasedJob(
@JsonProperty override val fetch: Seq[Fetch] = List(),
@JsonProperty override val highPriority: Boolean = false,
@JsonProperty override val runAsUser: String = "",
@JsonProperty override val container: DockerContainer = null,
@JsonProperty override val container: Container = null,
@JsonProperty override val environmentVariables: Seq[EnvironmentVariable] = List(),
@JsonProperty override val shell: Boolean = true,
@JsonProperty override val arguments: Seq[String] = List(),
Expand Down
25 changes: 25 additions & 0 deletions src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Label.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.mesos.chronos.scheduler.jobs

import org.apache.mesos.{Protos => mesos}

/**
* Represents an environment variable definition for the job
*/
case class Label(
key: String,
value: String) {

def toProto(): mesos.Label =
mesos.Label.newBuilder
.setKey(key)
.setValue(value)
.build
}

object Label {
def apply(proto: mesos.Label): Label =
Label(
proto.getKey,
proto.getValue
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,7 @@ class MesosJobFramework @Inject()(

val (jobName, _, _, _) = TaskUtils.parseTaskId(taskStatus.getTaskId.getValue)
taskStatus.getState match {
case TaskState.TASK_RUNNING =>
scheduler.handleStartedTask(taskStatus)
updateRunningTask(jobName, taskStatus)
case TaskState.TASK_STAGING =>
case TaskState.TASK_RUNNING | TaskState.TASK_STAGING | TaskState.TASK_STARTING =>
scheduler.handleStartedTask(taskStatus)
updateRunningTask(jobName, taskStatus)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import com.google.protobuf.ByteString
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos._
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, ContainerType, Fetch, TaskUtils}

import scala.collection.JavaConverters._
import scala.collection.Map
Expand Down Expand Up @@ -173,16 +175,63 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
v.mode.map { m =>
volumeBuilder.setMode(Volume.Mode.valueOf(m.toString.toUpperCase))
}
v.external.foreach { e =>
volumeBuilder.setSource(Volume.Source.newBuilder()
.setType(Volume.Source.Type.DOCKER_VOLUME)
.setDockerVolume(Volume.Source.DockerVolume.newBuilder()
.setDriver(e.provider)
.setName(e.name)
.setDriverOptions(Parameters.newBuilder()
.addAllParameter(e.options.map(_.toProto()).asJava).build()
).build()
).build()
).build()
}

volumeBuilder.build()
}.foreach(builder.addVolumes)
builder.setType(ContainerInfo.Type.DOCKER)
builder.setDocker(DockerInfo.newBuilder()
.setImage(job.container.image)
.setNetwork(DockerInfo.Network.valueOf(job.container.network.toString.toUpperCase))
.setForcePullImage(job.container.forcePullImage)
.addAllParameters(job.container.parameters.map(_.toProto).asJava)
.build()).build

job.container.`type` match {
case ContainerType.DOCKER =>
builder.setType(ContainerInfo.Type.DOCKER)
builder.setDocker(DockerInfo.newBuilder()
.setImage(job.container.image)
.setNetwork(DockerInfo.Network.valueOf(job.container.network.toString.toUpperCase))
.setForcePullImage(job.container.forcePullImage)
.addAllParameters(job.container.parameters.map(_.toProto()).asJava)
.build())
case ContainerType.MESOS =>
builder.setType(ContainerInfo.Type.MESOS)
builder.setMesos(ContainerInfo.MesosInfo.newBuilder()
.setImage(Image.newBuilder()
// TODO add APPC image support
.setType(Image.Type.DOCKER)
.setDocker(Image.Docker.newBuilder()
.setName(job.container.image)
// TODO add setCredential
.build())
.setCached(!job.container.forcePullImage)
.build())
.build())
}
job.container.networkName.foreach {
n => builder.addNetworkInfos(NetworkInfo.newBuilder()
.setName(n).build()
)
}

job.container.networkInfos.foreach {
n => builder.addNetworkInfos(NetworkInfo.newBuilder()
.setName(n.name)
.setLabels(Labels.newBuilder()
.addAllLabels(n.labels.map(_.toProto()).asJava).build()
)
// TODO add protocol, portMappings, requires mesos >= 1.1.0
.build()
)
}

builder.build
}

private def appendExecutorData(taskInfo: TaskInfo.Builder, job: BaseJob, environment: Environment.Builder, uriProtos: Seq[CommandInfo.URI]) {
Expand Down
Loading