Skip to content

Commit

Permalink
Merge branch 'development'
Browse files Browse the repository at this point in the history
  • Loading branch information
TK009 committed Sep 7, 2016
2 parents 94b1175 + eaa9920 commit 4a7030c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 97 deletions.
21 changes: 12 additions & 9 deletions O-MI Node/src/main/scala/agentSystem/DBPusher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,7 @@ trait DBPusher extends BaseAgentSystem{

val updatedStaticItems = metas ++ iiDescriptions ++ newItems ++ objectMetadatas

// Update our hierarchy data structures if needed
if (updatedStaticItems.nonEmpty) {

// aggregate all updates to single odf tree
val updateTree: OdfObjects =
(updatedStaticItems map createAncestors).foldLeft(OdfObjects())(_ union _)

singleStores.hierarchyStore execute Union(updateTree)
}

// DB + Poll Subscriptions
val infosToBeWrittenInDB: Seq[OdfInfoItem] =
Expand All @@ -227,12 +219,23 @@ trait DBPusher extends BaseAgentSystem{
val writeFuture = dbConnection.writeMany(infosToBeWrittenInDB)

writeFuture.onSuccess{
case _ =>
case _ =>{
// Update our hierarchy data structures if needed

if (updatedStaticItems.nonEmpty) {
// aggregate all updates to single odf tree
val updateTree: OdfObjects =
(updatedStaticItems map createAncestors).foldLeft(OdfObjects())(_ union _)

singleStores.hierarchyStore execute Union(updateTree)
}

triggeringEvents.foreach(iie =>
iie.infoItem.values.headOption.map(newValue=>
singleStores.latestStore execute SetSensorData(iie.infoItem.path, newValue)
)
)
}
}
writeFuture.onFailure{
case t: Throwable => log.error(t, "Error when writing values for paths $paths")
Expand Down
170 changes: 83 additions & 87 deletions O-MI Node/src/main/scala/http/NodeCLI.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@

/*+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ Copyright (c) 2015 Aalto University. +
+ +
+ Licensed under the 4-clause BSD (the "License"); +
+ you may not use this file except in compliance with the License. +
+ You may obtain a copy of the License at top most directory of project. +
+ +
+ Unless required by applicable law or agreed to in writing, software +
+ distributed under the License is distributed on an "AS IS" BASIS, +
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +
+ See the License for the specific language governing permissions and +
+ limitations under the License. +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*/
+ Copyright (c) 2015 Aalto University. +
+ +
+ Licensed under the 4-clause BSD (the "License"); +
+ you may not use this file except in compliance with the License. +
+ You may obtain a copy of the License at top most directory of project. +
+ +
+ Unless required by applicable law or agreed to in writing, software +
+ distributed under the License is distributed on an "AS IS" BASIS, +
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +
+ See the License for the specific language governing permissions and +
+ limitations under the License. +
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++*/

package http

Expand Down Expand Up @@ -51,35 +51,35 @@ object OmiNodeCLI{
removeHandler: RemoveHandlerT,
agentSystem: ActorRef,
subscriptionManager: ActorRef
)(
) : Props = Props(
new OmiNodeCLI(
sourceAddress,
removeHandler,
agentSystem,
subscriptionManager
)
)
)(
) : Props = Props(
new OmiNodeCLI(
sourceAddress,
removeHandler,
agentSystem,
subscriptionManager
)
)
}
/** Command Line Interface for internal agent management.
*
*/
*
*/
class OmiNodeCLI(
protected val sourceAddress: InetSocketAddress,
protected val removeHandler: RemoveHandlerT,
protected val agentSystem: ActorRef,
protected val subscriptionManager: ActorRef
) extends Actor with ActorLogging {
) extends Actor with ActorLogging {

val commands = """Current commands:
start <agent classname>
stop <agent classname>
list agents
list subs
showSub <id>
remove <subsription id>
remove <path>
"""
start <agent classname>
stop <agent classname>
list agents
list subs
showSub <id>
remove <subsription id>
remove <path>
"""
val ip = sourceAddress.toString.tail
implicit val timeout : Timeout = 1.minute

Expand Down Expand Up @@ -109,52 +109,52 @@ remove <path>
agentsStrChart( agents.sortBy{ info => info.name} )
case _ => ""
}
.recover[String]{
case a : Throwable =>
log.warning(s"Failed to get list of Agents. Sending error message. " + a.toString)
"Something went wrong. Could not get list of Agents.\n"
}
Await.result(result, commandTimeout)
.recover[String]{
case a : Throwable =>
log.warning(s"Failed to get list of Agents. Sending error message. " + a.toString)
"Something went wrong. Could not get list of Agents.\n"
}
Await.result(result, commandTimeout)
}

def subsStrChart (intervals: Set[IntervalSub @unchecked],
events: Set[EventSub] @unchecked,
polls: Set[PolledSub] @unchecked) : String = {

val (idS, intervalS, startTimeS, endTimeS, callbackS, lastPolledS) =
("ID", "INTERVAL", "START TIME", "END TIME", "CALLBACK", "LAST POLLED")
def subsStrChart (
intervals: Set[IntervalSub @unchecked],
events: Set[EventSub] @unchecked,
polls: Set[PolledSub] @unchecked) : String = {

val intMsg= "Interval subscriptions:\n" + f"$idS%-10s | $intervalS%-20s | $startTimeS%-30s | $endTimeS%-30s | $callbackS\n" +
intervals.map{ sub=>
f"${sub.id}%-10s | ${sub.interval}%-20s | ${sub.startTime}%-30s | ${sub.endTime}%-30s | ${ sub.callback.address }"
}.mkString("\n")
val (idS, intervalS, startTimeS, endTimeS, callbackS, lastPolledS) =
("ID", "INTERVAL", "START TIME", "END TIME", "CALLBACK", "LAST POLLED")

val eventMsg = "Event subscriptions:\n" + f"$idS%-10s | $endTimeS%-30s | $callbackS\n" + events.map{ sub=>
f"${sub.id}%-10s | ${sub.endTime}%-30s | ${ sub.callback.address}"
}.mkString("\n")
val intMsg= "Interval subscriptions:\n" + f"$idS%-10s | $intervalS%-20s | $startTimeS%-30s | $endTimeS%-30s | $callbackS\n" +
intervals.map{ sub=>
f"${sub.id}%-10s | ${sub.interval}%-20s | ${sub.startTime}%-30s | ${sub.endTime}%-30s | ${ sub.callback.address }"
}.mkString("\n")

val pollMsg = "Poll subscriptions:\n" + f"$idS%-10s | $startTimeS%-30s | $endTimeS%-30s | $lastPolledS\n" +
polls.map{ sub=>
f"${sub.id}%-10s | ${sub.startTime}%-30s | ${sub.endTime}%-30s | ${ sub.lastPolled }"
}.mkString("\n")
val eventMsg = "Event subscriptions:\n" + f"$idS%-10s | $endTimeS%-30s | $callbackS\n" + events.map{ sub=>
f"${sub.id}%-10s | ${sub.endTime}%-30s | ${ sub.callback.address}"
}.mkString("\n")

s"$intMsg\n$eventMsg\n$pollMsg\n"
val pollMsg = "Poll subscriptions:\n" + f"$idS%-10s | $startTimeS%-30s | $endTimeS%-30s | $lastPolledS\n" +
polls.map{ sub=>
f"${sub.id}%-10s | ${sub.startTime}%-30s | ${sub.endTime}%-30s | ${ sub.lastPolled }"
}.mkString("\n")

s"$intMsg\n$eventMsg\n$pollMsg\n"
}
private def listSubs(): String = {
log.info(s"Got list subs command from $ip")
val result = (subscriptionManager ? ListSubsCmd())
.map{
case (intervals: Set[IntervalSub @unchecked],
events: Set[EventSub] @unchecked,
polls: Set[PolledSub] @unchecked) => // type arguments cannot be checked
events: Set[EventSub] @unchecked,
polls: Set[PolledSub] @unchecked) => // type arguments cannot be checked
log.info("Received list of Subscriptions. Sending ...")

subsStrChart( intervals, events, polls)
}
.recover{
case a: Throwable =>
log.info("Failed to get list of Subscriptions.\n Sending ...")
"Failed to get list of subscriptions.\n"
}.recover{
case a: Throwable =>
log.info("Failed to get list of Subscriptions.\n Sending ...")
"Failed to get list of subscriptions.\n"
}
Await.result(result, commandTimeout)
}
Expand Down Expand Up @@ -186,24 +186,22 @@ remove <path>
case None =>
log.info(s"Subscription with id $id not found.\n Sending ...")
s"Subscription with id $id not found.\n"
}
.recover{
}.recover{
case a: Throwable =>
log.info(s"Failed to get subscription with $id.\n Sending ...")
s"Failed to get subscription with $id.\n"
}
Await.result(result, commandTimeout)
}

private def startAgent(agent: AgentName): String = {
log.info(s"Got start command from $ip for $agent")
val result = (agentSystem ? StartAgentCmd(agent)).mapTo[Future[String]]
.flatMap{ case future : Future[String] => future }
.map{
case msg: String =>
msg +"\n"
}
.recover{
}.recover{
case a : Throwable =>
"Command failure unknown.\n"
}
Expand All @@ -217,8 +215,7 @@ remove <path>
.map{
case msg:String =>
msg +"\n"
}
.recover{
}.recover{
case a : Throwable =>
"Command failure unknown.\n"
}
Expand All @@ -238,20 +235,19 @@ remove <path>
s"Removed subscription with $id successfully.\n"
case false =>
s"Failed to remove subscription with $id. Subscription does not exist or it is already expired.\n"
}
.recover{
}.recover{
case a : Throwable =>
"Command failure unknown.\n"
}
Await.result(result, commandTimeout)
} else {
log.info(s"Trying to remove path $pathOrId")
if (removeHandler.handlePathRemove(Path(pathOrId))) {
log.info(s"Successfully removed path")
s"Successfully removed path $pathOrId\n"
log.info(s"Successfully removed path")
s"Successfully removed path $pathOrId\n"
} else {
log.info(s"Given path does not exist")
s"Given path does not exist\n"
log.info(s"Given path does not exist")
s"Given path does not exist\n"
}
} //requestHandler isn't actor

Expand Down Expand Up @@ -287,20 +283,20 @@ remove <path>
}

class OmiNodeCLIListener(
protected val system: ActorSystem,
protected val agentSystem: ActorRef,
protected val subscriptionManager: ActorRef,
protected val singleStores: SingleStores,
protected val dbConnection: DBReadWrite
) extends Actor with ActorLogging{
protected val system: ActorSystem,
protected val agentSystem: ActorRef,
protected val subscriptionManager: ActorRef,
protected val singleStores: SingleStores,
protected val dbConnection: DBReadWrite

) extends Actor with ActorLogging{

import Tcp._

def receive : Actor.Receive={
case Bound(localAddress) =>
// TODO: do something?
// It seems that this branch was not executed?
// TODO: do something?
// It seems that this branch was not executed?

case CommandFailed(b: Bind) =>
log.warning(s"CLI connection failed: $b")
Expand All @@ -313,8 +309,8 @@ class OmiNodeCLIListener(

val cli = context.system.actorOf(
OmiNodeCLI.props(remote,remover,agentSystem, subscriptionManager),
"cli-" + remote.toString.tail)
connection ! Register(cli)
"cli-" + remote.toString.tail)
connection ! Register(cli)
case _ => //noop?
}

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ addCommandAlias("systemTest", "omiNode/testOnly http.SystemTest")

def commonSettings(moduleName: String) = Seq(
name := s"O-MI-$moduleName",
version := "0.6.1",
version := "0.6.2",
scalaVersion := "2.11.8",
scalacOptions := Seq("-unchecked", "-feature", "-deprecation", "-encoding", "utf8", "-Xlint"),
scalacOptions in (Compile,doc) ++= Seq("-groups", "-deprecation", "-implicits", "-diagrams", "-diagrams-debug", "-encoding", "utf8"),
Expand Down

0 comments on commit 4a7030c

Please sign in to comment.