Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Approach 2 for Marathon 1.9 #357

Draft
wants to merge 19 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ class ApplicationController(cc: ControllerComponents, metricsModule: MetricsModu
}

def showMetrics = Action {
val metricsJsonString = metricsModule.snapshot() match {
case Left(_) =>
// Kamon snapshot
throw new IllegalArgumentException("Only Dropwizard format is supported, cannot render metrics from Kamon snapshot. Make sure your metrics are configured correctly.")
case Right(dropwizardRegistry) => Json.stringify(Json.toJson(Raml.toRaml(dropwizardRegistry)))
}
val snapshot = Raml.toRaml(metricsModule.snapshot())
val metricsJsonString = Json.stringify(Json.toJson(snapshot))
Ok(metricsJsonString).as(ContentTypes.JSON)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ package object models {
(__ \ "run").format[JobRunSpec])(JobSpec.apply(_, _, _, Seq.empty, _), s => (s.id, s.description, s.labels, s.run))

implicit lazy val TaskIdFormat: Format[Task.Id] = Format(
Reads.of[String](Reads.minLength[String](3)).map(Task.Id(_)),
Reads.of[String](Reads.minLength[String](3)).map(Task.Id.parse),
Writes[Task.Id] { id => JsString(id.idString) })

implicit lazy val TaskStateFormat: Format[TaskState] = new Format[TaskState] {
Expand Down
18 changes: 15 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,36 @@ lazy val jobs = (project in file("jobs"))
}, projectSettings)
.settings(pbSettings)
.settings(
libraryDependencies ++= Seq(
libraryDependencies ++=
(
Dependencies.Curator.all ++
Dependencies.DropwizardMetrics.all ++
Seq(
Dependencies.asyncAwait,
Dependencies.playJson,
Dependencies.marathon,
// Dependencies.marathon,
Dependencies.marathonPlugin,
Dependencies.macWireMacros,
Dependencies.macWireUtil,
Dependencies.macWireProxy,
Dependencies.cronUtils,
Dependencies.akka,
Dependencies.akkaStream, // We need to include this, even if we don't use it to overwrite indirect dependencies
Dependencies.akkaSlf4j,
Dependencies.caffeine,
Dependencies.scallop,
Dependencies.uuidGenerator,
Dependencies.jGraphT,
Dependencies.java8Compat,
Dependencies.mesos,
Dependencies.guice,
Dependencies.Test.scalatest,
Dependencies.Test.akkaTestKit,
Dependencies.Test.akkaSlf4j,
Dependencies.Test.mockito,
Dependencies.Test.scalatest,
Dependencies.Test.scalaCheck,
).map(
)) .map(
_.excludeAll(excludeSlf4jLog4j12)
.excludeAll(excludeLog4j)
.excludeAll(excludeJCL)
Expand Down
Binary file added jobs/lib/marathon_2.12-1.9.99.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jobs/src/main/scala/dcos/metronome/JobsModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,6 @@ class JobsModule(
jobRunModule.jobRunService,
jobHistoryModule.jobHistoryService)

val queueModule = new LaunchQueueModule(schedulerModule.launchQueueModule.launchQueue)
val queueModule = new LaunchQueueModule(schedulerModule.instanceTrackerModule.instanceTracker)
}

2 changes: 1 addition & 1 deletion jobs/src/main/scala/dcos/metronome/MarathonBuildInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.util.control.NonFatal

case object MarathonBuildInfo {
private val marathonJar = """\/mesosphere\/marathon\/marathon_2.12\/[0-9.]+""".r
val DefaultBuildVersion = SemVer(1, 7, 0, Some("SNAPSHOT"))
val DefaultBuildVersion = SemVer(1, 9, 0, Some("SNAPSHOT"))

/**
* sbt-native-package provides all of the files as individual JARs. By default, `getResourceAsStream` returns the
Expand Down
4 changes: 2 additions & 2 deletions jobs/src/main/scala/dcos/metronome/jobrun/JobRunModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import mesosphere.marathon.metrics.Metrics
import scala.concurrent.Promise

class JobRunModule(
config: JobRunConfig,
config: JobsConfig,
actorSystem: ActorSystem,
clock: Clock,
jobRunRepository: Repository[JobRunId, JobRun],
Expand All @@ -32,7 +32,7 @@ class JobRunModule(
val persistenceActorFactory = (id: JobRunId, context: ActorContext) =>
context.actorOf(JobRunPersistenceActor.props(id, jobRunRepository, metrics))
JobRunExecutorActor.props(jobRun, promise, persistenceActorFactory,
launchQueue, instanceTracker, driverHolder, clock)(actorSystem.scheduler)
launchQueue, instanceTracker, driverHolder, config.scallopConf, clock)(actorSystem.scheduler)
}

val jobRunServiceActor = leadershipModule.startWhenLeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import dcos.metronome.eventbus.TaskStateChangedEvent
import dcos.metronome.jobrun.StartedJobRun
import dcos.metronome.model.{ JobResult, JobRun, JobRunId, JobRunStatus, JobRunTask, RestartPolicy }
import dcos.metronome.scheduler.TaskState
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.{ MarathonSchedulerDriverHolder, StoreCommandFailedException }
import dcos.metronome.utils.glue.MarathonImplicits
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.{ AllConf, MarathonSchedulerDriverHolder, StoreCommandFailedException }
import org.apache.zookeeper.KeeperException.NodeExistsException
import scala.async.Async.{ async, await }

import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Await, Promise }
import scala.async.Async.async
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration

/**
* Handles one job run from start until the job either completes successful or failed.
Expand All @@ -31,6 +32,7 @@ class JobRunExecutorActor(
launchQueue: LaunchQueue,
instanceTracker: InstanceTracker,
driverHolder: MarathonSchedulerDriverHolder,
config: AllConf,
clock: Clock)(implicit scheduler: Scheduler) extends Actor with Stash with ActorLogging {
import JobRunExecutorActor._
import JobRunPersistenceActor._
Expand Down Expand Up @@ -112,8 +114,8 @@ class JobRunExecutorActor(
log.info(s"Job run ${jobRun.id} already exists in LaunchQueue - not adding")
} else {
log.info("addTaskToLaunchQueue")
import dcos.metronome.utils.glue.MarathonImplicits._
launchQueue.add(jobRun.toRunSpec, count = 1)
val runSpec = MarathonImplicits.toRunSpec(jobRun, config.mesosRole())
launchQueue.add(runSpec, count = 1)
}
}

Expand All @@ -140,8 +142,7 @@ class JobRunExecutorActor(
}

def existsInLaunchQueue(): Boolean = {
// timeout is enforced on LaunchQueue implementation side
Await.result(launchQueue.get(runSpecId), Duration.Inf).exists(i => i.finalInstanceCount > 0)
instanceTracker.instancesBySpecSync.specInstances(runSpecId).nonEmpty
}

def updatedTasks(update: TaskStateChangedEvent): Map[Task.Id, JobRunTask] = {
Expand All @@ -165,7 +166,6 @@ class JobRunExecutorActor(
}

def becomeFinishing(updatedJobRun: JobRun): Unit = {
Await.result(launchQueue.purge(runSpecId), Duration.Inf) // there is already timeout enforced in Marathon
jobRun = updatedJobRun
context.parent ! JobRunUpdate(StartedJobRun(jobRun, promise.future))
persistenceActor ! Delete(jobRun)
Expand Down Expand Up @@ -196,7 +196,6 @@ class JobRunExecutorActor(

// FIXME: compare to becomeFinishing, there's lots of DRY violation
def becomeFailing(updatedJobRun: JobRun): Unit = {
Await.result(launchQueue.purge(runSpecId), Duration.Inf) // there is already timeout enforced in Marathon
jobRun = updatedJobRun
context.parent ! JobRunUpdate(StartedJobRun(jobRun, promise.future))
persistenceActor ! Delete(jobRun)
Expand All @@ -212,7 +211,6 @@ class JobRunExecutorActor(
jobRun.tasks.values.filter(t => isActive(t.status)).foreach { t =>
driverHolder.driver.foreach(_.killTask(t.id.mesosTaskId))
}
Await.result(launchQueue.purge(runSpecId), Duration.Inf) // there is already timeout enforced in Marathon

// Abort the jobRun
jobRun = jobRun.copy(
Expand Down Expand Up @@ -403,9 +401,10 @@ object JobRunExecutorActor {
launchQueue: LaunchQueue,
instanceTracker: InstanceTracker,
driverHolder: MarathonSchedulerDriverHolder,
config: AllConf,
clock: Clock)(implicit scheduler: Scheduler): Props = Props(
new JobRunExecutorActor(run, promise, persistenceActorRefFactory,
launchQueue, instanceTracker, driverHolder, clock))
launchQueue, instanceTracker, driverHolder, config, clock))
}

object TaskStates {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package dcos.metronome
package jobrun.impl

import dcos.metronome.Protos.JobSpec.RunSpec
import dcos.metronome.model._
import mesosphere.marathon.Protos.Constraint
import mesosphere.marathon.core.launchqueue.LaunchQueue.QueuedInstanceInfo
import mesosphere.marathon.state.Container.MesosDocker
import mesosphere.marathon.state.Container.{ Docker, MesosDocker }
import mesosphere.marathon.state.{ AppDefinition, Container }
import org.slf4j.LoggerFactory

Expand All @@ -30,7 +28,8 @@ object QueuedJobRunConverter {

implicit class MarathonContainerToDockerSpec(val container: Option[Container]) extends AnyVal {

def toDockerModel: Option[DockerSpec] = container.flatMap(c => c.docker).map(d => DockerSpec(d.image, d.forcePullImage))
def toDockerModel: Option[DockerSpec] = container.map{ case d: Docker => DockerSpec(d.image, d.forcePullImage) }

def toUcrModel: Option[UcrSpec] = container.collect {
case ucr: MesosDocker =>
val image = ImageSpec(id = ucr.image, forcePull = ucr.forcePullImage)
Expand Down Expand Up @@ -66,20 +65,4 @@ object QueuedJobRunConverter {
}
}
}

implicit class QueuedTaskInfoToQueuedJobRunInfo(val instanceInfo: QueuedInstanceInfo) extends AnyVal {

def toModel: QueuedJobRunInfo = {
val jobRunSpec = instanceInfo.runSpec match {
case app: AppDefinition => app.toModel
case runSpec =>
throw new IllegalArgumentException(s"Unexpected runSpec type - jobs are translated to Apps on Marathon level, got $runSpec")
}
QueuedJobRunInfo(
id = instanceInfo.runSpec.id,
backOffUntil = instanceInfo.backOffUntil,
run = jobRunSpec,
acceptedResourceRoles = instanceInfo.runSpec.acceptedResourceRoles)
}
}
}
6 changes: 3 additions & 3 deletions jobs/src/main/scala/dcos/metronome/model/JobId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package model
import com.wix.accord._
import com.wix.accord.dsl._
import mesosphere.marathon.plugin
import mesosphere.marathon.state.PathId
import mesosphere.marathon.state.AbsolutePathId

case class JobId(path: Seq[String]) extends plugin.PathId {

def safePath: String = path.mkString("_")

def toPathId: PathId = PathId(path)
def toPathId: AbsolutePathId = AbsolutePathId(path)

override lazy val toString: String = path.mkString(".")
}
Expand All @@ -21,7 +21,7 @@ object JobId {
JobId(in.split("[.]").filter(_.nonEmpty).toList)
}

def apply(pathId: PathId): JobId = {
def apply(pathId: AbsolutePathId): JobId = {
JobId(pathId.path)
}

Expand Down
8 changes: 4 additions & 4 deletions jobs/src/main/scala/dcos/metronome/model/JobRunId.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package dcos.metronome
package model

import java.time.{ Clock, ZoneId }
import java.time.format.DateTimeFormatter
import java.time.{ Clock, ZoneId }

import mesosphere.marathon.state.PathId
import mesosphere.marathon.state.AbsolutePathId

case class JobRunId(jobId: JobId, value: String) {
override def toString: String = s"${jobId.path.mkString(".")}.$value"
def toPathId: PathId = jobId.toPathId / value
def toPathId: AbsolutePathId = jobId.toPathId / value
}

object JobRunId {
Expand All @@ -21,7 +21,7 @@ object JobRunId {
JobRunId(job.id, s"$date$random")
}

def apply(runSpecId: PathId): JobRunId = {
def apply(runSpecId: AbsolutePathId): JobRunId = {
JobRunId(JobId(runSpecId.parent), runSpecId.path.last)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package queue

import com.softwaremill.macwire.wire
import dcos.metronome.queue.impl.LaunchQueueServiceImpl
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.task.tracker.InstanceTracker

class LaunchQueueModule(launchQueue: LaunchQueue) {
class LaunchQueueModule(instanceTracker: InstanceTracker) {

def launchQueueService = wire[LaunchQueueServiceImpl]
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,44 @@
package dcos.metronome
package queue.impl

import dcos.metronome.jobrun.impl.QueuedJobRunConverter.QueuedTaskInfoToQueuedJobRunInfo
import dcos.metronome.model.QueuedJobRunInfo
import dcos.metronome.queue.LaunchQueueService
import mesosphere.marathon.core.launchqueue.LaunchQueue
import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.core.task.tracker.InstanceTracker.SpecInstances
import mesosphere.marathon.state.{ AbsolutePathId, AppDefinition }

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class LaunchQueueServiceImpl(launchQueue: LaunchQueue) extends LaunchQueueService {
class LaunchQueueServiceImpl(instanceTracker: InstanceTracker) extends LaunchQueueService {

override def list(): Seq[QueuedJobRunInfo] = {
// timeout is enforced in LaunchQueue
Await.result(launchQueue.list, Duration.Inf).filter(_.inProgress).map(_.toModel)
toModel(instanceTracker.instancesBySpecSync.instancesMap)
}

private[this] def toModel(instanceMap: Map[AbsolutePathId, SpecInstances]): Seq[QueuedJobRunInfo] = {
instanceMap.flatMap{ case (id, instances) => mapRunSpecs(id, instances) }.toIndexedSeq
}

private[this] def mapRunSpecs(id: AbsolutePathId, instances: SpecInstances): Seq[QueuedJobRunInfo] = {
import dcos.metronome.jobrun.impl.QueuedJobRunConverter.RunSpecToJobRunSpec

instances.instanceMap.values.map{ instance =>
val jobRunSpec = instance.runSpec match {
case app: AppDefinition => app.toModel
case runSpec =>
throw new IllegalArgumentException(s"Unexpected runSpec type - jobs are translated to Apps on Marathon level, got $runSpec")
}

// val configRef = RunSpecConfigRef
// launchQueue.getDelay(instance.runSpec.configRef).delay.get.deadline

// TODO AN: This is wrong, but at the moment we don't have a backoff anyway.
val backoffUntil = instance.state.since

QueuedJobRunInfo(
id = id,
backOffUntil = backoffUntil,
run = jobRunSpec,
acceptedResourceRoles = instance.runSpec.acceptedResourceRoles)
}.toIndexedSeq
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,23 @@ class SchedulerRepositoriesModule(metrics: Metrics, config: SchedulerConfig, lif

private[this] val zkRootPath = config.zkStatePath

lazy val curatorFramework: Option[RichCuratorFramework] = StorageConfig.curatorFramework(config.scallopConf, crashStrategy, lifecycleState)
// Initialize Apache Curator Framework (wrapped in [[RichCuratorFramework]] and connect/sync with the storage
// for an underlying Zookeeper storage.
val curatorFramework: RichCuratorFramework = StorageConfig.curatorFramework(config.scallopConf, crashStrategy, lifecycleState)

val zkStore: ZKStore = new ZKStore(
curatorFramework.get,
private[this] val zkStore: ZKStore = new ZKStore(
curatorFramework,
zkRootPath,
CompressionConf(config.zkCompressionEnabled, config.zkCompressionThreshold))

def jobSpecRepository: Repository[JobId, JobSpec] = new ZkJobSpecRepository(zkStore, ec)

def jobRunRepository: Repository[JobRunId, JobRun] = new ZkJobRunRepository(zkStore, ec)

def jobHistoryRepository: Repository[JobId, JobHistory] = new ZkJobHistoryRepository(zkStore, ec)

lazy val storageConfig = StorageConfig(config.scallopConf, curatorFramework)
lazy val storageModule: StorageModule = StorageModule(metrics, storageConfig, config.scallopConf.mesosBridgeName())(actorsModule.materializer, ExecutionContext.global, actorSystem.scheduler, actorSystem)
lazy val storageModule: StorageModule = StorageModule(metrics, config.scallopConf, curatorFramework)(actorsModule.materializer, ExecutionContext.global, actorSystem.scheduler, actorSystem)

lazy val instanceRepository: InstanceRepository = storageModule.instanceRepository
lazy val groupRepository: GroupRepository = storageModule.groupRepository

lazy val frameworkIdRepository: FrameworkIdRepository = storageModule.frameworkIdRepository

lazy val migration: Migration = new MigrationImpl(zkStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object JobHistoryConversions {
id = proto.getJobRunId.toModel,
createdAt = Instant.ofEpochMilli(proto.getCreatedAt),
finishedAt = Instant.ofEpochMilli(proto.getFinishedAt),
tasks = proto.getTasksList.asScala.map(Task.Id(_)).to[Seq])
tasks = proto.getTasksList.asScala.map(Task.Id.parse).to[Seq])
}.toList
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object JobRunConversions {

implicit class ProtoToJobRunTask(val proto: Protos.JobRun.JobRunTask) extends AnyVal {
def toModel: JobRunTask = JobRunTask(
Task.Id(proto.getId),
Task.Id.parse(proto.getId),
Instant.ofEpochMilli(proto.getStartedAt),
if (proto.hasCompletedAt) Some(Instant.ofEpochMilli(proto.getCompletedAt)) else None,
proto.getStatus.toModel)
Expand Down
Loading