-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
185 changed files
with
11,742 additions
and
63,780 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,105 +1,119 @@ | ||
package agents | ||
|
||
import agentSystem.AgentTypes._ | ||
import agentSystem._ | ||
import types._ | ||
import types.Path | ||
import types.OdfTypes._ | ||
import types.OmiTypes._ | ||
import akka.util.Timeout | ||
import akka.actor.Cancellable | ||
import akka.pattern.ask | ||
import scala.util.{Success, Failure} | ||
import scala.collection.JavaConversions.{iterableAsScalaIterable, asJavaIterable } | ||
import scala.concurrent._ | ||
import types.OmiTypes.WriteRequest | ||
import akka.actor.{Cancellable, Props} | ||
import scala.concurrent.Promise | ||
import scala.concurrent.duration._ | ||
import java.sql.Timestamp; | ||
import java.util.Random; | ||
import java.util.Date; | ||
import java.util.Date | ||
import scala.util.{Random, Try} | ||
import scala.concurrent.ExecutionContext.Implicits._ | ||
import scala.collection.mutable.{Queue => MutableQueue} | ||
import com.typesafe.config.Config | ||
import java.util.concurrent.TimeUnit | ||
|
||
class BasicAgent extends InternalAgent{ | ||
//Path of owned O-DF InfoItem, Option because ugly mutable state | ||
var pathO: Option[Path] = None | ||
protected def configure(config: String ) : InternalAgentResponse = { | ||
pathO = Some( Path(config) ) | ||
CommandSuccessful("Successfully configured.") | ||
} | ||
object BasicAgent extends PropsCreator { | ||
|
||
def props( config: Config) : InternalAgentProps = { InternalAgentProps(new BasicAgent(config)) } | ||
|
||
} | ||
|
||
class BasicAgent( override val config: Config) extends InternalAgent{ | ||
|
||
protected val interval : FiniteDuration= config.getDuration("interval", TimeUnit.SECONDS).seconds | ||
|
||
protected val path : Path = Path(config.getString("path")) | ||
|
||
//Message for updating values | ||
case class Update() | ||
|
||
//Interval for scheluding generation of new values | ||
val interval : FiniteDuration = Duration(60, SECONDS) | ||
//Cancellable update of values, Option because ugly mutable state | ||
var updateSchelude : Option[Cancellable] = None | ||
protected def start = { | ||
//Cancellable update of values, "mutable Option" | ||
case class UpdateSchedule( var option: Option[Cancellable] = None) | ||
private val updateSchedule = UpdateSchedule( None ) | ||
|
||
protected def start : Try[InternalAgentSuccess ] = Try{ | ||
|
||
// Schelude update and save job, for stopping | ||
// Will send Update message to self every interval | ||
updateSchelude = Some(context.system.scheduler.schedule( | ||
Duration(0, SECONDS), | ||
interval, | ||
self, | ||
Update | ||
)) | ||
CommandSuccessful("Successfully started.") | ||
updateSchedule.option = Some( | ||
context.system.scheduler.schedule( | ||
Duration(0, SECONDS), | ||
interval, | ||
self, | ||
Update | ||
) | ||
) | ||
|
||
CommandSuccessful() | ||
} | ||
|
||
protected def stop = updateSchelude match{ | ||
protected def stop : Try[InternalAgentSuccess ] = Try{ | ||
updateSchedule.option match{ | ||
//If agent has scheluded update, cancel job | ||
case Some(job) => | ||
case Some(job: Cancellable) => | ||
|
||
job.cancel() | ||
|
||
//Check if job was cancelled | ||
job.isCancelled match { | ||
case true => | ||
CommandSuccessful("Successfully stopped.") | ||
case false => | ||
CommandFailed("Failed to stop agent.") | ||
if(job.isCancelled){ | ||
updateSchedule.option = None | ||
CommandSuccessful() | ||
}else throw CommandFailed("Failed to stop agent.") | ||
|
||
case None => throw CommandFailed("Failed to stop agent, no job found.") | ||
} | ||
case None => CommandFailed("Failed to stop agent, no job found.") | ||
} | ||
//Restart agent, first stop it and then start it | ||
protected def restart = { | ||
stop match{ | ||
case success : InternalAgentSuccess => start | ||
case error : InternalAgentFailure => error | ||
} | ||
} | ||
|
||
|
||
//Random number generator for generating new values | ||
val rnd: Random = new Random() | ||
def newValueStr = rnd.nextInt().toString | ||
protected val rnd: Random = new Random() | ||
protected def newValueStr = rnd.nextInt().toString | ||
|
||
//Helper function for current timestamps | ||
def currentTimestamp = new Timestamp( new java.util.Date().getTime() ) | ||
protected def currentTimestamp = new Timestamp( new java.util.Date().getTime() ) | ||
|
||
//Update values in paths | ||
def update() : Unit = { | ||
pathO.foreach{ //Only run if some path found | ||
path => | ||
val timestamp = currentTimestamp | ||
val typeStr = "xs:integer" | ||
//Generate new values and create O-DF | ||
val infoItem = OdfInfoItem(path,Vector(OdfValue(newValueStr,typeStr,timestamp))) | ||
//fromPath generate O-DF structure from a ode's path and retuns OdfObjects | ||
val objects : OdfObjects = fromPath(infoItem) | ||
//Updates interval as time to live | ||
val write = WriteRequest( interval, objects ) | ||
//PromiseResults contains Promise containing Iterable of Promises and has some helper methods. | ||
//First level Promise is used for getting answer from AgentSystem and second level Promises are | ||
//used to get results of actual writes and from agents that owned paths that this agent wanted to write. | ||
val result = PromiseResult() | ||
//Lets fire and forget our write, results will be received and handled hrougth promiseResult | ||
parent ! PromiseWrite( result, write ) | ||
//isSuccessful will return combined result or first failed write. | ||
val succ = result.isSuccessful | ||
succ.onSuccess{ | ||
case s: SuccessfulWrite => | ||
log.debug(s"$name pushed data successfully.") | ||
} | ||
succ.onFailure{ | ||
case e => | ||
log.warning(s"$name failed to write all data, error: $e") | ||
} | ||
protected def update() : Unit = { | ||
|
||
val timestamp = currentTimestamp | ||
val typeStr = "xs:integer" | ||
|
||
//Generate new values and create O-DF | ||
val infoItem = OdfInfoItem( path, Vector( OdfValue( newValueStr, typeStr, timestamp ) ) ) | ||
|
||
//createAncestors generate O-DF structure from a node's path and retuns OdfObjects | ||
val objects : OdfObjects = createAncestors( infoItem ) | ||
|
||
//interval as time to live | ||
val write = WriteRequest( interval, objects ) | ||
|
||
//PromiseResults contains Promise containing Iterable of Promises and has some helper methods. | ||
//First level Promise is used for getting answer from AgentSystem and second level Promises are | ||
//used to get results of actual writes and from agents that owned paths that this agent wanted to write. | ||
val result = PromiseResult() | ||
|
||
//Let's tell agentSystem about our write, results will be received and handled througth promiseResult | ||
agentSystem.tell( PromiseWrite( result, write ), self ) | ||
|
||
//isSuccessful will return combined result or first failed write. | ||
val succ = result.isSuccessful | ||
|
||
succ.onSuccess{ | ||
case s: SuccessfulWrite => | ||
log.debug(s"$name pushed data successfully.") | ||
} | ||
|
||
succ.onFailure{ | ||
case e: Throwable => | ||
log.warning(s"$name failed to write all data, error: $e") | ||
} | ||
} | ||
|
||
receiver{ | ||
override protected def receiver = { | ||
case Update => update | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,98 +1,20 @@ | ||
package agents | ||
|
||
package agents | ||
import agentSystem._ | ||
import agentSystem.AgentTypes._ | ||
//import agentSystem.InternalAgentExceptions.{AgentException, AgentInitializationException, AgentInterruption} | ||
import types._ | ||
import types.OdfTypes._ | ||
import types.OmiTypes._ | ||
import akka.util.Timeout | ||
import akka.actor.Cancellable | ||
import akka.pattern.ask | ||
import java.sql.Timestamp; | ||
import java.util.{Random, Date}; | ||
import scala.util.{Success, Failure} | ||
import scala.collection.JavaConversions.{iterableAsScalaIterable, asJavaIterable } | ||
import scala.concurrent._ | ||
import scala.concurrent.duration._ | ||
|
||
class BrokenAgent extends InternalAgent{ | ||
import scala.concurrent.ExecutionContext.Implicits._ | ||
case class Update() | ||
val rnd: Random = new Random() | ||
val interval : FiniteDuration = Duration(60, SECONDS) | ||
var pathOwned: Option[Path] = None | ||
var pathPublic: Option[Path] = None | ||
def date = new java.util.Date(); | ||
protected def configure(config: String ) : InternalAgentResponse = { | ||
pathOwned = Some( new Path(config ++ "Owned")) | ||
pathPublic = Some( new Path(config ++ "Public")) | ||
CommandSuccessful("Successfully configured.") | ||
} | ||
var updateSchelude : Option[Cancellable] = None | ||
protected def start = { | ||
updateSchelude = Some(context.system.scheduler.schedule( | ||
Duration(0, SECONDS), | ||
interval, | ||
self, | ||
Update | ||
)) | ||
CommandSuccessful("Successfully started.") | ||
} | ||
|
||
def update() : Unit = { | ||
val promiseResult = PromiseResult() | ||
for{ | ||
ownedPath <- pathOwned | ||
publicPath <- pathPublic | ||
ownedItem = fromPath(OdfInfoItem( | ||
ownedPath, | ||
Vector(OdfValue( | ||
rnd.nextInt().toString, | ||
"xs:integer", | ||
new Timestamp( date.getTime() ) | ||
)) | ||
)) | ||
publicItem = fromPath(OdfInfoItem( | ||
publicPath, | ||
Vector(OdfValue( | ||
rnd.nextInt().toString, | ||
"xs:integer", | ||
new Timestamp( date.getTime() ) | ||
)) | ||
)) | ||
objects = ownedItem.union(publicItem) | ||
write = WriteRequest( interval, objects ) | ||
u = context.parent ! PromiseWrite( promiseResult, write ) | ||
} yield write | ||
|
||
promiseResult.isSuccessful.onSuccess{ | ||
//Check if failed promises | ||
case s => | ||
log.debug(s"$name pushed data successfully.") | ||
} | ||
import types.OmiTypes.WriteRequest | ||
import scala.concurrent.Promise | ||
import com.typesafe.config.Config | ||
import akka.actor.Props | ||
object BrokenAgent extends PropsCreator{ | ||
def props(config: Config) : InternalAgentProps = { | ||
InternalAgentProps( new BrokenAgent(config) ) | ||
} | ||
} | ||
|
||
receiver{ | ||
case Update => update | ||
} | ||
protected def stop = updateSchelude match{ | ||
case Some(job) => | ||
job.cancel() | ||
job.isCancelled match { | ||
case true => | ||
CommandSuccessful("Successfully stopped.") | ||
case false => | ||
CommandFailed("Failed to stop agent.") | ||
} | ||
case None => CommandFailed("Failed to stop agent.") | ||
} | ||
protected def restart = { | ||
stop match{ | ||
case success : InternalAgentSuccess => start | ||
case error : InternalAgentFailure => error | ||
} | ||
} | ||
protected def handleWrite(promise:Promise[ResponsibleAgentResponse], write: WriteRequest) = { | ||
class BrokenAgent(config: Config) extends ResponsibleAgent(config){ | ||
override protected def handleWrite(promise:Promise[ResponsibleAgentResponse], write: WriteRequest) = { | ||
promise.failure(new Exception(s"Broken agent, could not write.")) | ||
} | ||
} |
Oops, something went wrong.