diff --git a/README.md b/README.md
index dcf47607..2b752600 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# TAK Server Development
*Requires Java 11*
-* Linux / MacOS is recommended for development. If using Windows, replace "gradlew" with "gradlew.bat" in commands below.
+* Linux or MacOS is recommended for development. If using Windows, replace "gradlew" with "gradlew.bat" in commands below. An x86-64 architecture CPU is required to build from source, including on MacOS. M1 or M2 Apple silicon is not supported.
Links:
* [Test Execution](src/takserver-takcl-core/docs/testing.md)
@@ -33,7 +33,7 @@ docker run -it -d --rm --name TakserverServer0DB \
--env POSTGRES_HOST_AUTH_METHOD=trust \
--env POSTGRES_USER=martiuser \
--env POSTGRES_DB=cot \
- -p 5432 postgis/postgis:10-3.1
+ -p 5432 postgis/postgis:15-3.3
echo SQL SERVER IP: `docker inspect --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' TakserverServer0DB`
```
@@ -48,7 +48,9 @@ Setup Local Database. If the postgis container was used, only the last two lines
```
Configure Local CoreConfig and Certs
-```cd takserver-core/example```
+```
+cd takserver-core/example
+```
This is the CoreConfig that takserver war will look for when running from the takserver-core/example directory. From this point, just follow the instructions at takserver/src/docs/TAK_Server_Configuration_Guide.pdf to set up the CoreConfig and Certs. Make sure that the CoreConfig now points to the directory where the certs were generated locally.
@@ -113,16 +115,24 @@ java -Xmx -Dspring.profiles.active=messaging -jar ../build/libs/takserver
```
turn down log level of all logs:
-```java -jar takserver.war $@ --logging.level.root=ERROR```
+```
+java -jar takserver.war $@ --logging.level.root=ERROR
+```
turn down log level for subscriptions:
-```java -jar takserver.war $@ --logging.level.com.bbn.marti.service.Subscription=ERROR```
+```
+java -jar takserver.war $@ --logging.level.com.bbn.marti.service.Subscription=ERROR
+```
turn off logs just for subscriptions:
-```java -jar takserver.war $@ --logging.level.com.bbn.marti.service.Subscription=OFF```
+```
+java -jar takserver.war $@ --logging.level.com.bbn.marti.service.Subscription=OFF
+```
entirely disable most logging:
-```java -jar takserver.war $@ --logging.level.root=OFF```
+```
+java -jar takserver.war $@ --logging.level.root=OFF
+```
The default log level for most things is INFO. Possible levels are INFO, WARN, ERROR, OFF (in order of decreasing log frequency)
@@ -135,6 +145,13 @@ i.e.
```--logging.level.root=ERROR```
+The TAK Server log files can be found in the _logs_ subdirectory:
+
+1. _takserver-messaging.log_ - Execution-level information about the messaging process, including client connection events, error messages and warnings.
+2. _takserver-api.log_ - Execution-level information about the API process, including error messages and warnings.
+3. _takserver-messaging-console.log_ - Java Virtual Machine (JVM) informational messages and errors, for the messaging process.
+4. _takserver-api-console.log_ - Java Virtual Machine (JVM) informational messages and errors, for the API process.
+
## Swagger
https://localhost:8443/swagger-ui.html
diff --git a/src/.licenses/config/allowed-licenses.json b/src/.licenses/config/allowed-licenses.json
new file mode 100644
index 00000000..acb71e07
--- /dev/null
+++ b/src/.licenses/config/allowed-licenses.json
@@ -0,0 +1,35 @@
+{
+ "allowedLicenses": [
+ {"moduleLicense": "Apache License, Version 2.0"},
+ {"moduleLicense": "Apache License"},
+ {"moduleLicense": "Apache Software License, version 1.1"},
+ {"moduleLicense": "The MIT License"},
+ {"moduleLicense": "The 3-Clause BSD License"},
+ {"moduleLicense": "The 2-Clause BSD License"},
+
+ {"moduleLicense": "BSD"},
+ {"moduleLicense": "New BSD License"},
+ {"moduleLicense": "BSD New license"},
+ {"moduleLicense": "The New BSD License"},
+ {"moduleLicense": "The BSD License"},
+ {"moduleLicense": "Revised BSD"},
+ {"moduleLicense": "BSD-style"},
+
+ {"moduleLicense": "Public Domain"},
+ {"moduleLicense": "PUBLIC DOMAIN"},
+ {"moduleLicense": "Common Public License Version 1.0"},
+
+ {"moduleLicense": "CDDL + GPLv2 with classpath exception"},
+ {"moduleLicense": "LGPL, version 2.1"},
+ {"moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0"},
+ {"moduleLicense": "GPL2 w/ CPE"},
+ {"moduleLicense": "CDDL 1.1"},
+ {"moduleLicense": "LGPL"},
+ {"moduleLicense": "Google Cloud Software License"},
+ {"moduleLicense": "GNU Lesser General Public License"},
+ {"moduleLicense": "CDDL+GPL License"},
+ {"moduleLicense": "ASL"},
+ {"moduleLicense": "COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.1"},
+ {"moduleLicense": null}
+ ]
+}
\ No newline at end of file
diff --git a/src/.quality/config/pmd-ruleset.xml b/src/.quality/config/pmd-ruleset.xml
new file mode 100644
index 00000000..2e441a86
--- /dev/null
+++ b/src/.quality/config/pmd-ruleset.xml
@@ -0,0 +1,110 @@
+
+
+
+
+ My custom rules
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Static final fields should have names in all upper case
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Normal variables should start with lower case letters
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Non-final variables should not have underscores in the name
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/build.gradle b/src/build.gradle
index 6cf80e75..7e10b870 100644
--- a/src/build.gradle
+++ b/src/build.gradle
@@ -1,3 +1,7 @@
+//import se.bjurr.violations.gradle.plugin.ViolationsTask
+//import com.github.jk1.license.render.InventoryHtmlReportRenderer
+//import com.github.jk1.license.render.XmlReportRenderer
+
buildscript {
repositories {
@@ -9,12 +13,13 @@ buildscript {
dependencies {
classpath 'org.ajoberstar:grgit:1.7.2'
+// classpath 'gradle.plugin.se.bjurr.violations:violations-gradle-plugin:1.52.2'
+// classpath 'com.github.jk1:gradle-license-report:1.17'
}
}
apply plugin: 'eclipse'
apply plugin: 'idea'
-
apply from: 'gradle/license.gradle', to: project
ext {
@@ -35,7 +40,7 @@ ext {
takreleaserpm = takrel1 + takrel2
def (tvmaj, tvmin) = tv.tokenize('.')
-
+
verMajor = tvmaj
verMinor = tvmin
verPatch = takrel2
@@ -49,6 +54,8 @@ allprojects {
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
+// apply plugin: 'pmd'
+
sourceCompatibility = 1.8
targetCompatibility = 1.8
@@ -94,13 +101,54 @@ subprojects {
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:' + gradle_shadow_version
classpath 'gradle.plugin.org.openrepose:gradle-jaxb-plugin:2.5.0'
+// classpath 'gradle.plugin.se.bjurr.violations:violations-gradle-plugin:1.52.2'
+// classpath 'com.github.jk1:gradle-license-report:1.17'
}
}
- clean {
- delete 'bin/'
- }
+// apply plugin: 'se.bjurr.violations.violations-gradle-plugin' // Normalizes quality reports for Gitlab CI
+// apply plugin: 'pmd'
+// // Define a new task to merge quality reports and convert them into a format GitLab can consume
+// tasks.create('violations', ViolationsTask) {
+// maxReporterColumnWidth = 0 // 0 means "no limit"
+// maxRuleColumnWidth = 60
+// maxSeverityColumnWidth = 0
+// maxLineColumnWidth = 0
+// maxMessageColumnWidth = 50
+// codeClimateFile = new File(project.buildDir.path + '/code-climate-file.json')
+// // Will create a CodeClimate JSON report.
+// violations = [
+// ["PMD", buildDir.path, ".*/pmd/.*\\.xml\$", "pmd"]
+// ]
+// }
+// pmd {
+// ignoreFailures = true
+// toolVersion = '6.47.0'
+// ruleSetFiles = files("${rootDir}/.quality/config/pmd-ruleset.xml")
+// // Note: The ruleSets array is explicitly set to empty to avoid using the default configuration.
+// ruleSets = []
+// }
+//
+// apply plugin: 'com.github.jk1.dependency-license-report'
+// licenseReport {
+// // Don't include artifacts of project's own group in the report
+// excludeOwnGroup = true
+//
+// // Don't exclude bom dependencies.
+// // If set to true, then all boms will be excluded from the report
+// excludeBoms = false
+//
+// // Set custom report renderer, implementing ReportRenderer.
+// // Yes, you can write your own to support any format necessary.
+// renderers = [new XmlReportRenderer()]
+// renderers = [new InventoryHtmlReportRenderer()]
+//
+// // This is for the allowed-licenses-file in checkLicense Task
+// // Accepts File, URL or String path to local or remote file
+// allowedLicensesFile = new File("${rootDir}/.licenses/config/allowed-licenses.json")
+// }
+//
+// clean {
+// delete 'bin/'
+// }
}
-
-
-
diff --git a/src/docs/INSTALL.txt b/src/docs/INSTALL.txt
deleted file mode 100644
index 4722dff6..00000000
--- a/src/docs/INSTALL.txt
+++ /dev/null
@@ -1,25 +0,0 @@
-TAK Server Installation Quick Start
-===================================
-Version 1.3.6
-
-----
-Distribution Statement A: Approved for public release; distribution is unlimited.
-----
-
-Perform a fresh install of CentOS 7
-
-sudo yum install epel-release
-
-sudo yum install takserver--.noarch.rpm
-
-sudo /opt/tak/db-utils/takserver-setup-db.sh
-
-Start TAK Server:
-sudo systemctl start takserver
-
-Set TAK Server to run at startup:
-sudo systemctl enable takserver
-
-Refer to the configuration guide for additional information, including generating certificates, and setting up admin users.
-
-/opt/tak/docs/TAK_Server_Configuration_Guide.pdf
diff --git a/src/docs/TAK_Server_Configuration_Guide.odt b/src/docs/TAK_Server_Configuration_Guide.odt
index 645f9d35..59a1b923 100644
Binary files a/src/docs/TAK_Server_Configuration_Guide.odt and b/src/docs/TAK_Server_Configuration_Guide.odt differ
diff --git a/src/docs/TAK_Server_Configuration_Guide.pdf b/src/docs/TAK_Server_Configuration_Guide.pdf
index cb17a1f0..5ef5a276 100644
Binary files a/src/docs/TAK_Server_Configuration_Guide.pdf and b/src/docs/TAK_Server_Configuration_Guide.pdf differ
diff --git a/src/docs/TAK_Server_Installation_QuickStart_CentOS_RHEL_6.docx b/src/docs/TAK_Server_Installation_QuickStart_CentOS_RHEL_6.docx
deleted file mode 100644
index 5a763dae..00000000
Binary files a/src/docs/TAK_Server_Installation_QuickStart_CentOS_RHEL_6.docx and /dev/null differ
diff --git a/src/docs/TAK_protobuf_protocol.txt b/src/docs/TAK_protobuf_protocol.txt
deleted file mode 100644
index 02c82f99..00000000
--- a/src/docs/TAK_protobuf_protocol.txt
+++ /dev/null
@@ -1,269 +0,0 @@
-*** Traditional Protocol - "Protocol Version 0"
-
-Clients send and receive XML CoT messages.
-"Mesh" network participants announce via "SA" messages via UDP datagrams
-over multicast to a well known address and port. Each UDP datagram contains
-one (and only one) CoT XML message as its payload.
-
-Messages directed only to specific network participants are send by making
-TCP connection to the remote recipient, sending the CoT XML, then closing
-the connection.
-
-
-Streaming connections (to TAK servers) send the same XML-based CoT payloads
-over TCP sockets. The TCP stream is comprised of one CoT after
-another. Messages are delimited and broken apart by searching for the token
-"" and breaking apart immediately after that token.
-When sending, messages must be prefaced by XML header (),
-followed by a newline, followed by the complete XML . TAK servers
-require that no arbitrary newlines follow the end of message and
-that the next character immediate commences the next header.
-
-
-
-*** TAK Protocol - Design Goals
-
-The goal of the new TAK Protocol design is to allow interoperation with
-other legacy clients and TAK server, as well as to strongly identify
-what rendition of communication will be used in a session. This is to allow
-for future expansion or complete revision of the protocol while allowing
-an opportunity to support mixed client versions (and varying versions of TAK
-servers).
-
-
-
-*** TAK Protocol - Generic Framework - Mesh Networks
-
-Mesh networks broadcasts (SA announces, etc) will reuse the existing UDP
-datagram-based networking already in place. Directed (unicasted) TCP
-messages will reuse the existing connect, send 1 message, disconnect
-networking.
-
-For both TCP and UDP, instead of sending CoT as XML, clients will send data
-packets whose payloads contain one message complying with the new "TAK
-Protocol".
-Both types of messages will utilize a data payload that begins with the "TAK
-Protocol Header" followed by the "TAK Protocol Payload". The header
-serves to self-identify as a TAK Protocol message, as well as indicate a
-particular version number to which the subsequent Payload comforms.
-
-TAK Protocol Message:
-
-
-The "TAK Protocol Header" is nothing more than a set of "magic numbers" to
-identify the message header as such, and a version identifier to indicate
-what TAK Protocol version the remainder of the payload is comprised of.
-
-TAK Protocol Header:
-Where....
- is the single byte 0xbf
- is the version number of the TAK Protocol the payload
-in the remainder of the message conforms to. This is encoded as a "varint".
-See "TAK Protocol Varint Encoding".
-
-
-
-*** TAK Protocol - Generic Framework - Streaming Connections
-
-Steaming connections (TAK server connections) use a different message style
-as the repeating protocol version information in every message that is done
-in mesh TAK Protocol Messages would be a waste of resources in the streaming
-environment (since all messages will use the same Version).
-The TAK Protocol Stream Message is instead defined to provide the length of
-the streaming message (necessary to break apart the message from its
-neighbors to avoid need to scan for special tokens).
-
-In a streaming connection, "TAK Protocol Streaming Messages" are sent one
-after another (with no intervening data) over the streaming connection.
-
-TAK Protocol Stream Message:
-
-
-Important to note here is that the "TAK Protocol Payload" is precisely the
-same in form and content to that which is used for mesh network messages for
-a given protocol version.
-
-
-
-The "TAK Protocol Streaming Header" is as follows:
-
-TAK Protocol Streaming Header:
-
-Where...
- is the single byte 0xbf
- is the number of bytes in the "TAK Protocol Payload" which
- follows the header. This is encoded as a "varint".
-
-As mentioned prior, the version identification for the message's payload
-format is omitted from the streaming header. Protocol version negotiation
-is expected to occur outside of core TAK Protocol message exchange.
-See "Streaming Connection Protocol Negotiation".
-
-
-
-
-*** TAK Protocol Payload - Version 1
-
-Version 1 of the TAK Protocol Payload is a Google Protocol Buffer based
-payload. Each Payload consists of one (and only one)
-atakmap::commoncommo::v1::TakMessage message which is serialized using
-Google protocol buffers version 3.
-
-See the .proto files for more information on the specific messages and their
-fields, as well as the mapping to/from CoT XML.
-
-Revising the messages used by Version 1 may be done in accordance with the
-following rules:
-
-1. Additional message fields MAY be added to the end of existing messages
- following normal google protobuf rules if and only if
- ignorance of the new fields on decoding is 100% irrelevant to correct
- semantic operation at the TAK application level of ALL TAK applications.
-2. Otherwise, any and all changes must be tied to a protocol version change.
-
-
-This version of TAK Protocol does not define any additional attributes to be
-used during Streaming Connection Protocol Negotiation.
-
-
-
-*** Streaming Connection Protocol Negotiation
-
-TAK clients often connect to a variety of TAK servers, each of which may be
-a different version of software capable of different versions of the TAK
-Protocol (or indeed not capable of the TAK Protocol and simply only
-supporting traditional streaming CoT as XML).
-
-Because of the desire to allow operation of various client and server
-versions, and the desire to keep the traditional XML encoding available, the
-following negotiation is performed when connecting to a TAK server with a
-client that supports the TAK Protocol.
-
-1. Once the connection is established, if authentication is required for
- this server, the authentication message is sent by the client to the server.
- 1a. If the server accepts the auth, proceed to 2.
- 1b. If the server denies the auth, the connection is closed.
-2. Client and server expect to exchange traditional CoT XML messages per
- "Traditional Protocol" section.
-3. A server which supports the TAK Protocol MAY send the following CoT XML
- message to indicate this support (whitespace added, xml header omitted):
-
-
-
-
-
-
-
-
-
- ... where the version attribute is an integer number specifying
- a version of the TAK Protocol the server supports. This message
- may contain one or more TakProtocolSupport elements inside the single
- detail, each specifying a supported version.
- The TAK server MUST send this message no more than once per connection.
-
- To allow for ancillary information in the negotiation, the
- TakProtocolSupport element MAY contain additional attributes compliant
- with the Protocol version indicated.
-4. Client and server continue to expect to exchange traditional CoT XML
- messages per "Traditional Protocol" section.
-5. If the client wishes to initiate a transfer to TAK Protocol encoding, it
- selects one of the supported versions advertised in the server's message
- from step 3. It then sends the following CoT XML:
-
-
-
-
-
-
-
-
-
- ... where the version attribute is the integer version chosen above.
- Only ONE TakRequest element is allowed.
-
- To allow for ancillary information in the negotiation, the
- TakRequest element MAY contain additional attributes compliant
- with the Protocol version indicated.
-
- Clients SHALL NOT send this message unless they have observed the
- message from step #3, above, first.
-
-6. Once the client sends the message in #5, it MUST NOT send additional
- CoT XML to the server. Client also MUST still process incoming CoT XML
- from the server. The client MUST wait in this state for a response per
- the following for at least 30 seconds.
- The server MAY still send CoT XML messages up until it notices the
- control request from the client (from step #5) and is ready to respond.
- The server MUST then respond as soon as possible to the client with the
- following message to indicate either acceptance or denial of the request:
-
-
-
-
-
-
-
-
-
- ... where the status attribute is either true (to indicate the server
- accepts the requested version) or false (to indicate that the server
- denies the request).
- Only ONE TakResponse element is allowed.
-
- To allow for ancillary information in the negotiation, the
- TakResponse element MAY contain additional attributes compliant
- with the Protocol version selected in the request that this response
- applies to.
-
- If no response is received by the client before its timeout elapses,
- the client SHALL disconnect as the entire negotiation is in an
- indeterminate state. The client SHOULD reconnect and begin again at step
- 1, possibly with a longer timeout or alternate protocol version choice.
-
-7. Operation at this point depends on the response send in #6:
-7a. If status was true: The server MUST NOT send additiona CoT XML after the
- "true" response in #6. Instead, the server SHALL send all future data
- in accordance with the TAK Protocol Streaming Connection framework
- and containing TAK Payloads of the negotiated version.
- The client MAY resume sending messages at this time but MUST immediately
- send said messages in accordance with the TAK Protocol Streaming
- Connection framework and containing TAK Payloads of the negotiated
- version. NOTE: the negotiated version SHALL be the same for both
- directions of the streaming connection!
-7b. If status was false: Both client and server resume operation as though
- they were back at step #4. The client may attempt a new negotiation
- if it wishes, or may simply continue to exchange traditional XML-based
- CoT messages.
-
-In the messages in 3, 5, and 6 above, the following common rules apply:
-a. "protouid" is any valid UID representing the negotiation transaction.
- The server generates this when offering protocol versions. The client
- re-uses it when placing request(s) and the server re-uses it when
- issuing the response to a request.
- The UID SHALL be unique from UIDs used for other messages and purposes.
-b. "TIME" is filled with a valid time representation per the CoT schemas.
- The TIME values may be different from each other as needed.
-
-
-*** TAK Protocol Varint Encoding
-
-The varints used in the headers of the TAK Protocol are encoded in
-accordance with the UNSIGNED varint rules for Google protocol buffers.
-This encoding is summarized here:
-
-1. The value must be UNSIGNED. Only values equal to or greater than zero
- are allowed.
-2. The value to be encoded is taken 7 bits at a time, starting with the
- least significant 7 bits (bits 7 -> 0), then the next least significant bits
- (14 -> 8), etc. This repeats over all 7 bit values that are significant
- (that is, up to and including the most significant '1' bit).
-3. For each 7 bit group:
- 3a. Let S = 0 if this is the the last 7 bit group, else let S = 1
- 3b. Output a byte that is (S << 7) | (the 7 bits)
-
-The TAK Protocol use of Varints limits use to 64-bit values. This
-effectively limits the range as [ 0, (2^63 - 1) ] and the varint coded value
-to be limited to 10 bytes.
-
-
diff --git a/src/docs/WebCop-API.docx b/src/docs/WebCop-API.docx
deleted file mode 100644
index 7212b2a4..00000000
Binary files a/src/docs/WebCop-API.docx and /dev/null differ
diff --git a/src/federation-common/docker/Dockerfile.fedhub b/src/federation-common/docker/Dockerfile.fedhub
new file mode 100644
index 00000000..9348eddb
--- /dev/null
+++ b/src/federation-common/docker/Dockerfile.fedhub
@@ -0,0 +1,5 @@
+FROM openjdk:11-jdk-bullseye
+RUN apt update && \
+ apt-get install -y emacs-nox net-tools netcat vim
+
+ENTRYPOINT ["/bin/bash", "-c", "/opt/tak/federation-hub/scripts/configureInDocker.sh init"]
diff --git a/src/federation-common/scripts/configureInDocker.sh b/src/federation-common/scripts/configureInDocker.sh
new file mode 100755
index 00000000..2c855255
--- /dev/null
+++ b/src/federation-common/scripts/configureInDocker.sh
@@ -0,0 +1,20 @@
+!/bin/sh
+if [ $# -eq 0 ]
+ then
+ ps -ef | grep 'federation-hub-policy' | grep -v grep | awk '{print $2}' | xargs kill
+ ps -ef | grep 'federation-hub-ui' | grep -v grep | awk '{print $2}' | xargs kill
+ ps -ef | grep 'federation-hub-broker' | grep -v grep | awk '{print $2}' | xargs kill
+fi
+
+cd /opt/tak/federation-hub/scripts/
+sleep 1
+sh federation-hub-policy.sh &
+sleep 2
+sh federation-hub-broker.sh &
+sleep 3
+sh federation-hub-ui.sh &
+
+if ! [ $# -eq 0 ]
+ then
+ tail -f /dev/null
+fi
\ No newline at end of file
diff --git a/src/federation-common/src/main/java/io/grpc/internal/ServerCallImpl.java b/src/federation-common/src/main/java/io/grpc/internal/ServerCallImpl.java
new file mode 100644
index 00000000..e9cb6ce5
--- /dev/null
+++ b/src/federation-common/src/main/java/io/grpc/internal/ServerCallImpl.java
@@ -0,0 +1,413 @@
+/*
+ * Copyright 2015 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.internal;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.internal.GrpcAttributes.ATTR_SECURITY_LEVEL;
+import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
+import static io.grpc.internal.GrpcUtil.CONTENT_LENGTH_KEY;
+import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
+import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Attributes;
+import io.grpc.Codec;
+import io.grpc.Compressor;
+import io.grpc.CompressorRegistry;
+import io.grpc.Context;
+import io.grpc.DecompressorRegistry;
+import io.grpc.InternalDecompressorRegistry;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.SecurityLevel;
+import io.grpc.ServerCall;
+import io.grpc.Status;
+import io.perfmark.PerfMark;
+import io.perfmark.Tag;
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * TAKServer Update instructions:
+ *
+ * See Method: sendMessageInternal for notes about what versions work properly,
+ * and what functionality we need within TAK Server
+ *
+ */
+
+final class ServerCallImpl extends ServerCall {
+
+ private static final Logger log = Logger.getLogger(ServerCallImpl.class.getName());
+
+ @VisibleForTesting
+ static final String TOO_MANY_RESPONSES = "Too many responses";
+ @VisibleForTesting
+ static final String MISSING_RESPONSE = "Completed without a response";
+
+ private final ServerStream stream;
+ private final MethodDescriptor method;
+ private final Tag tag;
+ private final Context.CancellableContext context;
+ private final byte[] messageAcceptEncoding;
+ private final DecompressorRegistry decompressorRegistry;
+ private final CompressorRegistry compressorRegistry;
+ private CallTracer serverCallTracer;
+
+ // state
+ private volatile boolean cancelled;
+ private boolean sendHeadersCalled;
+ private boolean closeCalled;
+ private Compressor compressor;
+ private boolean messageSent;
+
+ ServerCallImpl(ServerStream stream, MethodDescriptor method,
+ Metadata inboundHeaders, Context.CancellableContext context,
+ DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
+ CallTracer serverCallTracer, Tag tag) {
+ this.stream = stream;
+ this.method = method;
+ this.context = context;
+ this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
+ this.decompressorRegistry = decompressorRegistry;
+ this.compressorRegistry = compressorRegistry;
+ this.serverCallTracer = serverCallTracer;
+ this.serverCallTracer.reportCallStarted();
+ this.tag = tag;
+ }
+
+ @Override
+ public void request(int numMessages) {
+ PerfMark.startTask("ServerCall.request", tag);
+ try {
+ stream.request(numMessages);
+ } finally {
+ PerfMark.stopTask("ServerCall.request", tag);
+ }
+ }
+
+ @Override
+ public void sendHeaders(Metadata headers) {
+ PerfMark.startTask("ServerCall.sendHeaders", tag);
+ try {
+ sendHeadersInternal(headers);
+ } finally {
+ PerfMark.stopTask("ServerCall.sendHeaders", tag);
+ }
+ }
+
+ private void sendHeadersInternal(Metadata headers) {
+ checkState(!sendHeadersCalled, "sendHeaders has already been called");
+ checkState(!closeCalled, "call is closed");
+
+ headers.discardAll(CONTENT_LENGTH_KEY);
+ headers.discardAll(MESSAGE_ENCODING_KEY);
+ if (compressor == null) {
+ compressor = Codec.Identity.NONE;
+ } else {
+ if (messageAcceptEncoding != null) {
+ // TODO(carl-mastrangelo): remove the string allocation.
+ if (!GrpcUtil.iterableContains(
+ ACCEPT_ENCODING_SPLITTER.split(new String(messageAcceptEncoding, GrpcUtil.US_ASCII)),
+ compressor.getMessageEncoding())) {
+ // resort to using no compression.
+ compressor = Codec.Identity.NONE;
+ }
+ } else {
+ compressor = Codec.Identity.NONE;
+ }
+ }
+
+ // Always put compressor, even if it's identity.
+ headers.put(MESSAGE_ENCODING_KEY, compressor.getMessageEncoding());
+
+ stream.setCompressor(compressor);
+
+ headers.discardAll(MESSAGE_ACCEPT_ENCODING_KEY);
+ byte[] advertisedEncodings =
+ InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(decompressorRegistry);
+ if (advertisedEncodings.length != 0) {
+ headers.put(MESSAGE_ACCEPT_ENCODING_KEY, advertisedEncodings);
+ }
+
+ // Don't check if sendMessage has been called, since it requires that sendHeaders was already
+ // called.
+ sendHeadersCalled = true;
+ stream.writeHeaders(headers);
+ }
+
+ @Override
+ public void sendMessage(RespT message) {
+ PerfMark.startTask("ServerCall.sendMessage", tag);
+ try {
+ sendMessageInternal(message);
+ } finally {
+ PerfMark.stopTask("ServerCall.sendMessage", tag);
+ }
+ }
+
+ private void sendMessageInternal(RespT message) {
+ checkState(sendHeadersCalled, "sendHeaders has not been called");
+ checkState(!closeCalled, "call is closed");
+
+ if (method.getType().serverSendsOneMessage() && messageSent) {
+ internalClose(Status.INTERNAL.withDescription(TOO_MANY_RESPONSES));
+ return;
+ }
+
+
+ messageSent = true;
+
+ // GRPC BUG FIX: version 1.48-1.50 (current as of this fix) allows 0 messages through for client streaming.
+ // this means we can never send a return value.. to fix that, allow message flow the same as version <= 1.47.
+ try {
+ InputStream resp = method.streamResponse(message);
+ stream.writeMessage(resp);
+// if (!getMethodDescriptor().getType().serverSendsOneMessage()) {
+// stream.flush();
+// }
+ stream.flush();
+ } catch (RuntimeException e) {
+ close(Status.fromThrowable(e), new Metadata());
+ } catch (Error e) {
+ close(
+ Status.CANCELLED.withDescription("Server sendMessage() failed with Error"),
+ new Metadata());
+ throw e;
+ }
+ }
+
+ @Override
+ public void setMessageCompression(boolean enable) {
+ stream.setMessageCompression(enable);
+ }
+
+ @Override
+ public void setCompression(String compressorName) {
+ // Added here to give a better error message.
+ checkState(!sendHeadersCalled, "sendHeaders has been called");
+
+ compressor = compressorRegistry.lookupCompressor(compressorName);
+ checkArgument(compressor != null, "Unable to find compressor by name %s", compressorName);
+ }
+
+ @Override
+ public boolean isReady() {
+ if (closeCalled) {
+ return false;
+ }
+ return stream.isReady();
+ }
+
+ @Override
+ public void close(Status status, Metadata trailers) {
+ PerfMark.startTask("ServerCall.close", tag);
+ try {
+ closeInternal(status, trailers);
+ } finally {
+ PerfMark.stopTask("ServerCall.close", tag);
+ }
+ }
+
+ private void closeInternal(Status status, Metadata trailers) {
+ checkState(!closeCalled, "call already closed");
+ try {
+ closeCalled = true;
+
+ if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
+ internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
+ return;
+ }
+
+ stream.close(status, trailers);
+ } finally {
+ serverCallTracer.reportCallEnded(status.isOk());
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ ServerStreamListener newServerStreamListener(ServerCall.Listener listener) {
+ return new ServerStreamListenerImpl<>(this, listener, context);
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return stream.getAttributes();
+ }
+
+ @Override
+ public String getAuthority() {
+ return stream.getAuthority();
+ }
+
+ @Override
+ public MethodDescriptor getMethodDescriptor() {
+ return method;
+ }
+
+ @Override
+ public SecurityLevel getSecurityLevel() {
+ final Attributes attributes = getAttributes();
+ if (attributes == null) {
+ return super.getSecurityLevel();
+ }
+ final SecurityLevel securityLevel = attributes.get(ATTR_SECURITY_LEVEL);
+ return securityLevel == null ? super.getSecurityLevel() : securityLevel;
+ }
+
+ /**
+ * Close the {@link ServerStream} because an internal error occurred. Allow the application to
+ * run until completion, but silently ignore interactions with the {@link ServerStream} from now
+ * on.
+ */
+ private void internalClose(Status internalError) {
+ log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
+ stream.cancel(internalError);
+ serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
+ }
+
+ /**
+ * All of these callbacks are assumed to called on an application thread, and the caller is
+ * responsible for handling thrown exceptions.
+ */
+ @VisibleForTesting
+ static final class ServerStreamListenerImpl implements ServerStreamListener {
+ private final ServerCallImpl call;
+ private final ServerCall.Listener listener;
+ private final Context.CancellableContext context;
+
+ public ServerStreamListenerImpl(
+ ServerCallImpl call, ServerCall.Listener listener,
+ Context.CancellableContext context) {
+ this.call = checkNotNull(call, "call");
+ this.listener = checkNotNull(listener, "listener must not be null");
+ this.context = checkNotNull(context, "context");
+ // Wire ourselves up so that if the context is cancelled, our flag call.cancelled also
+ // reflects the new state. Use a DirectExecutor so that it happens in the same thread
+ // as the caller of {@link Context#cancel}.
+ this.context.addListener(
+ new Context.CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ // If the context has a cancellation cause then something exceptional happened
+ // and we should also mark the call as cancelled.
+ if (context.cancellationCause() != null) {
+ ServerStreamListenerImpl.this.call.cancelled = true;
+ }
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public void messagesAvailable(MessageProducer producer) {
+ PerfMark.startTask("ServerStreamListener.messagesAvailable", call.tag);
+ try {
+ messagesAvailableInternal(producer);
+ } finally {
+ PerfMark.stopTask("ServerStreamListener.messagesAvailable", call.tag);
+ }
+ }
+
+ @SuppressWarnings("Finally") // The code avoids suppressing the exception thrown from try
+ private void messagesAvailableInternal(final MessageProducer producer) {
+ if (call.cancelled) {
+ GrpcUtil.closeQuietly(producer);
+ return;
+ }
+
+ InputStream message;
+ try {
+ while ((message = producer.next()) != null) {
+ try {
+ listener.onMessage(call.method.parseRequest(message));
+ } catch (Throwable t) {
+ GrpcUtil.closeQuietly(message);
+ throw t;
+ }
+ message.close();
+ }
+ } catch (Throwable t) {
+ GrpcUtil.closeQuietly(producer);
+ Throwables.throwIfUnchecked(t);
+ throw new RuntimeException(t);
+ }
+ }
+
+ @Override
+ public void halfClosed() {
+ PerfMark.startTask("ServerStreamListener.halfClosed", call.tag);
+ try {
+ if (call.cancelled) {
+ return;
+ }
+
+ listener.onHalfClose();
+ } finally {
+ PerfMark.stopTask("ServerStreamListener.halfClosed", call.tag);
+ }
+ }
+
+ @Override
+ public void closed(Status status) {
+ PerfMark.startTask("ServerStreamListener.closed", call.tag);
+ try {
+ closedInternal(status);
+ } finally {
+ PerfMark.stopTask("ServerStreamListener.closed", call.tag);
+ }
+ }
+
+ private void closedInternal(Status status) {
+ try {
+ if (status.isOk()) {
+ listener.onComplete();
+ } else {
+ call.cancelled = true;
+ listener.onCancel();
+ }
+ } finally {
+ // Cancel context after delivering RPC closure notification to allow the application to
+ // clean up and update any state based on whether onComplete or onCancel was called.
+ // Note that in failure situations JumpToApplicationThreadServerStreamListener has already
+ // closed the context. In these situations this cancel() call will be a no-op.
+ context.cancel(null);
+ }
+ }
+
+ @Override
+ public void onReady() {
+ PerfMark.startTask("ServerStreamListener.onReady", call.tag);
+ try {
+ if (call.cancelled) {
+ return;
+ }
+ listener.onReady();
+ } finally {
+ PerfMark.stopTask("ServerCall.closed", call.tag);
+ }
+ }
+ }
+}
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/FederationHubCache.java b/src/federation-common/src/main/java/tak/server/federation/hub/FederationHubCache.java
new file mode 100644
index 00000000..1dab43b5
--- /dev/null
+++ b/src/federation-common/src/main/java/tak/server/federation/hub/FederationHubCache.java
@@ -0,0 +1,26 @@
+package tak.server.federation.hub;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import tak.server.federation.FederationPolicyGraph;
+
+public class FederationHubCache {
+
+ public static final String POLICY_GRAPH_CACHE_KEY = "policyGraph";
+ private static IgniteCache configurationCache;
+ public static IgniteCache getFederationHubPolicyStoreCache(Ignite ignite) {
+ if (configurationCache == null) {
+ CacheConfiguration cfg = new CacheConfiguration();
+
+ cfg.setName("FederationHubPolicyStore");
+ cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ configurationCache = ignite.getOrCreateCache(cfg);
+ }
+
+ return configurationCache;
+ }
+
+}
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/broker/HubConnectionStore.java b/src/federation-common/src/main/java/tak/server/federation/hub/broker/HubConnectionStore.java
index eb4ba5a2..1291e3b5 100644
--- a/src/federation-common/src/main/java/tak/server/federation/hub/broker/HubConnectionStore.java
+++ b/src/federation-common/src/main/java/tak/server/federation/hub/broker/HubConnectionStore.java
@@ -26,6 +26,7 @@ public class HubConnectionStore {
private final Map sessionMap = new ConcurrentHashMap<>();
private final Map connectionInfos = new ConcurrentHashMap<>();
+
public Collection getConnectionInfos() {
return connectionInfos.values();
}
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManager.java b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManager.java
index acbf24aa..92599b94 100644
--- a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManager.java
+++ b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManager.java
@@ -1,20 +1,19 @@
package tak.server.federation.hub.policy;
+import java.util.Collection;
+import java.util.List;
+
import tak.server.federation.Federate;
import tak.server.federation.FederateGroup;
import tak.server.federation.FederationException;
import tak.server.federation.FederationPolicyGraph;
-
import tak.server.federation.hub.ui.graph.FederationPolicyModel;
import tak.server.federation.hub.ui.graph.PolicyObjectCell;
-import java.util.Collection;
-import java.util.List;
-
public interface FederationHubPolicyManager {
void addCaGroup(FederateGroup federateGroup);
Collection getCaGroups();
- void addCaFederate(Federate federate, List federateCaNames);
+ FederationPolicyGraph addCaFederate(Federate federate, List federateCaNames);
FederationPolicyGraph getPolicyGraph();
void setPolicyGraph(FederationPolicyModel newPolicyModel, Object updateFile) throws FederationException;
Collection getPolicyCells();
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerImpl.java b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerImpl.java
index 303816fa..1c7d50d9 100644
--- a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerImpl.java
+++ b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerImpl.java
@@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.ignite.Ignite;
import org.apache.ignite.services.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,15 +28,17 @@
import tak.server.federation.FederationException;
import tak.server.federation.FederationNode;
import tak.server.federation.FederationPolicyGraph;
+import tak.server.federation.hub.FederationHubCache;
import tak.server.federation.hub.ui.graph.FederationPolicyModel;
import tak.server.federation.hub.ui.graph.PolicyObjectCell;
public class FederationHubPolicyManagerImpl implements FederationHubPolicyManager, Service {
private static final long serialVersionUID = 1012094988435086891L;
+
+ private final Ignite ignite;
- private static final String DEFAULT_POLICY_PATH = "/opt/tak/federation-hub/";
- private static final String DEFAULT_POLICY_FILENAME = "ui_generated_policy.json";
+ private static String DEFAULT_POLICY_FILE = "/opt/tak/federation-hub/ui_generated_policy.json";
/* JSON file constants. */
private static final String POLICY_NAME = "name";
@@ -58,39 +61,59 @@ public class FederationHubPolicyManagerImpl implements FederationHubPolicyManage
private Set groupCas;
private Set dynamicallyAddedFederates;
private Collection cells = new ArrayList<>();
+
+ private FederationPolicyGraph federationPolicyGraph;
+
+ public FederationHubPolicyManagerImpl(Ignite ignite, String DEFAULT_POLICY_FILE) {
+ this.ignite = ignite;
+
+ if (!Strings.isNullOrEmpty(DEFAULT_POLICY_FILE))
+ FederationHubPolicyManagerImpl.DEFAULT_POLICY_FILE = DEFAULT_POLICY_FILE;
+ }
@Override
public FederationPolicyGraph getPolicyGraph() {
- return FederationHubPolicyStore.getInstance().getPolicyGraph();
- }
+ return federationPolicyGraph;
+ }
+
+ public void cachePolicyGraph() {
+ // cache events will alert other services of the change so they don't have to constantly ask for the
+ // most up to date policy
+ FederationHubCache.getFederationHubPolicyStoreCache(ignite).put(FederationHubCache.POLICY_GRAPH_CACHE_KEY, federationPolicyGraph);
+ }
@Override
- public void addCaFederate(Federate federate, List federateCaNames) {
+ public FederationPolicyGraph addCaFederate(Federate federate, List federateCaNames) {
for (String caName : federateCaNames) {
- if (getPolicyGraph().getGroup(caName) != null) {
+ if (federationPolicyGraph.getGroup(caName) != null) {
federate.addGroupIdentity(new FederateIdentity(caName));
- getPolicyGraph().getGroup(caName).addFederateToGroup(federate);
+ federationPolicyGraph.getGroup(caName).addFederateToGroup(federate);
}
}
- if (getPolicyGraph().getNode(federate.getFederateIdentity().getFedId()) == null) {
+ if (federationPolicyGraph.getNode(federate.getFederateIdentity().getFedId()) == null) {
// Add iff federate had certs that matched CA groups in the policy graph
if (!federate.getGroupIdentities().isEmpty()) {
- getPolicyGraph().addFederate(federate);
+ federationPolicyGraph.addFederate(federate);
dynamicallyAddedFederates.add(federate);
}
} else {
for (String caName : federateCaNames) {
- if (getPolicyGraph().getGroup(caName) != null) {
- getPolicyGraph().getFederate(federate.getFederateIdentity().getFedId()).addGroupIdentity(new FederateIdentity(caName));
+ if (federationPolicyGraph.getGroup(caName) != null) {
+ federationPolicyGraph.getFederate(federate.getFederateIdentity().getFedId()).addGroupIdentity(new FederateIdentity(caName));
}
}
}
+
+ cachePolicyGraph();
+
+ return federationPolicyGraph;
}
@Override
public void addCaGroup(FederateGroup federateGroup) {
groupCas.add(federateGroup);
+ cachePolicyGraph();
}
@Override
@@ -99,7 +122,7 @@ public Collection getCaGroups() {
}
private void updateFederate(Federate node) {
- Federate currentFederate = getPolicyGraph().getFederate(node.getFederateIdentity().getFedId());
+ Federate currentFederate = federationPolicyGraph.getFederate(node.getFederateIdentity().getFedId());
// Add objects to current federate first, then add combined list to new federate
// This lets new objects supersede old ones. We don't just update the current federate
@@ -108,11 +131,11 @@ private void updateFederate(Federate node) {
currentFederate.getAttributes().putAll(node.getAttributes());
node.getGroupIdentities().addAll(currentFederate.getGroupIdentities());
node.getAttributes().putAll(currentFederate.getAttributes());
- getPolicyGraph().addNode(node);
+ federationPolicyGraph.addNode(node);
}
private void updateGroup(FederateGroup node) {
- FederateGroup currentGroup = getPolicyGraph().getGroup(node.getFederateIdentity());
+ FederateGroup currentGroup = federationPolicyGraph.getGroup(node.getFederateIdentity());
// Add objects to current group first, then add combined list to new group
// This lets new objects supersede old ones. We don't just update the current federate
@@ -121,7 +144,7 @@ private void updateGroup(FederateGroup node) {
currentGroup.getAttributes().putAll(node.getAttributes());
node.getFederatesInGroup().addAll(currentGroup.getFederatesInGroup());
node.getAttributes().putAll(currentGroup.getAttributes());
- getPolicyGraph().addNode(node);
+ federationPolicyGraph.addNode(node);
}
private void updateNode(FederationNode node) {
@@ -134,22 +157,22 @@ private void updateNode(FederationNode node) {
private void updateNodes(Collection federationNodes) {
for (FederationNode node : federationNodes) {
- if (getPolicyGraph().getNode(node.getFederateIdentity().getFedId()) != null) {
+ if (federationPolicyGraph.getNode(node.getFederateIdentity().getFedId()) != null) {
updateNode(node);
} else {
- getPolicyGraph().addNode(node);
+ federationPolicyGraph.addNode(node);
}
}
}
private void updateEdges(FederationPolicyGraph federationPolicyGraph) throws FederationException {
- getPolicyGraph().getEdgeSet().clear();
- getPolicyGraph().getEdgeSet().addAll(federationPolicyGraph.getEdgeSet());
+ federationPolicyGraph.getEdgeSet().clear();
+ federationPolicyGraph.getEdgeSet().addAll(federationPolicyGraph.getEdgeSet());
}
private void updateFile(Object updateFile) {
/* TODO allow policy file to be specified via configuration. */
- String policyFilePath = DEFAULT_POLICY_PATH + DEFAULT_POLICY_FILENAME;
+ String policyFilePath = DEFAULT_POLICY_FILE;
if (policyFilePath.contains(".yml"))
policyFilePath = policyFilePath.replace(".yml", ".json");
ObjectMapper mapper = new ObjectMapper();
@@ -163,8 +186,8 @@ private void updateFile(Object updateFile) {
@Override
public void updatePolicyGraph(FederationPolicyModel federationPolicyModel,
Object updateFile) throws FederationException {
- getPolicyGraph().getNodes().clear();
- getPolicyGraph().getEdgeSet().clear();
+ federationPolicyGraph.getNodes().clear();
+ federationPolicyGraph.getEdgeSet().clear();
FederationPolicyGraph federationPolicyGraph = federationPolicyModel.getPolicyGraphFromModel();
updateNodes(federationPolicyGraph.getNodes());
@@ -175,13 +198,15 @@ public void updatePolicyGraph(FederationPolicyModel federationPolicyModel,
if (updateFile != null) {
updateFile(updateFile);
}
+ cachePolicyGraph();
}
@Override
public void setPolicyGraph(FederationPolicyModel newPolicyModel,
Object updateFile) throws FederationException {
FederationPolicyGraph newPolicyGraph = newPolicyModel.getPolicyGraphFromModel();
- FederationHubPolicyStore.getInstance().setPolicyGraph(newPolicyGraph);
+ this.federationPolicyGraph = newPolicyGraph;
+ cachePolicyGraph();
if (newPolicyModel.getCells() != null)
cells = newPolicyModel.getCells();
@@ -366,13 +391,12 @@ private boolean initializePolicyGraphFromFile(String policyFile) {
} catch (Exception e) {
logger.error("err",e);
}
-
- FederationHubPolicyStore.getInstance().setPolicyGraph(newPolicyGraph);
+ this.federationPolicyGraph = newPolicyGraph;
+ cachePolicyGraph();
} catch (IOException | RuntimeException e) {
logger.error("Could not load policy graph from file: " + e);
return false;
}
-
return true;
}
@@ -388,10 +412,12 @@ public void init() throws Exception {
}
/* TODO allow policy file to be specified via configuration. */
- String policyFilename = DEFAULT_POLICY_PATH + DEFAULT_POLICY_FILENAME;
+ String policyFilename = DEFAULT_POLICY_FILE;
File policyFile = new File(policyFilename);
- if (!policyFile.exists() || !initializePolicyGraphFromFile(policyFilename))
- FederationHubPolicyStore.getInstance().setPolicyGraph(getEmptyPolicy());
+ if (!policyFile.exists() || !initializePolicyGraphFromFile(policyFilename)) {
+ this.federationPolicyGraph = getEmptyPolicy();
+ cachePolicyGraph();
+ }
groupCas = new HashSet<>();
dynamicallyAddedFederates = new HashSet<>();
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyStore.java b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyStore.java
deleted file mode 100644
index 81546721..00000000
--- a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyStore.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package tak.server.federation.hub.policy;
-
-import tak.server.federation.FederationPolicyGraph;
-
-public class FederationHubPolicyStore {
-
- private static FederationHubPolicyStore instance;
- public static synchronized FederationHubPolicyStore getInstance() {
- if (instance == null)
- instance = new FederationHubPolicyStore();
-
- return instance;
- }
-
- private FederationPolicyGraph policyGraph;
-
- public FederationPolicyGraph getPolicyGraph() {
- return policyGraph;
- }
-
- public void setPolicyGraph(FederationPolicyGraph policyGraph) {
- this.policyGraph = policyGraph;
- }
-
-}
diff --git a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationPolicyGraphImpl.java b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationPolicyGraphImpl.java
index 2770318c..41aac493 100644
--- a/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationPolicyGraphImpl.java
+++ b/src/federation-common/src/main/java/tak/server/federation/hub/policy/FederationPolicyGraphImpl.java
@@ -57,11 +57,11 @@ public Set allReachableFederates(FederationNode source) throws Federat
@Override
public Set allReachableFederates(String sourceUid) throws FederationException {
- FederationNode FederationNode = getNode(sourceUid);
- if (FederationNode == null) {
+ FederationNode federationNode = getNode(sourceUid);
+ if (federationNode == null) {
throw new FederationException("The passed sourceID " + sourceUid + " was not found in the policy graph.");
}
- return allReachableFederates(FederationNode);
+ return allReachableFederates(federationNode);
}
@Override
@@ -224,7 +224,7 @@ public void addEdge(FederateEdge federateEdge) throws FederationException {
}
@Override
- public List getFiltersAlongPath(FederationNode FederationNode, FederationNode FederationNode1) {
+ public List getFiltersAlongPath(FederationNode federationNode, FederationNode federationNode1) {
return null;
}
@@ -328,10 +328,6 @@ private Set findReachableFederates(FederationNode source, Set, which will only work for in-process plugins
+ * Currently expected to be a Map{@literal <}String,Object{@literal >}, which will only work for in-process plugins
*/
public static final String ParentMetadata = "parentMetadata";
public static final String NumChildrenCreated = "numChildrenCreated";
@@ -48,7 +48,7 @@ public class MetadataConstants {
public static final String NumSendAttempts = "numSendAttempts";
public static final String HttpRequestURI = "http request uri";
-
+
/** Expected to be a String[] */
public static final String HttpHeaders = "http_headers";
public static final String SingleShotMessage = "single_shot_message";
@@ -58,4 +58,4 @@ public class MetadataConstants {
public static final String Callsign = "callsign";
-}
\ No newline at end of file
+}
diff --git a/src/federation-common/src/main/java/tak/server/federation/message/MetadataUtils.java b/src/federation-common/src/main/java/tak/server/federation/message/MetadataUtils.java
index c7073c6a..dba517dc 100644
--- a/src/federation-common/src/main/java/tak/server/federation/message/MetadataUtils.java
+++ b/src/federation-common/src/main/java/tak/server/federation/message/MetadataUtils.java
@@ -16,10 +16,10 @@
/**
- *
+ *
* Currently, this class provides a mapping from file extensions to valid Mime Type Strings.
- * We may want to extend it do offer more functionality.
- *
+ * We may want to extend it do offer more functionality.
+ *
* TODO Discuss putting in the PluginContext for use by all plugins
*
*/
@@ -29,17 +29,17 @@ public final class MetadataUtils {
private static MetadataUtils instance = null;
private static final String DEFAULT_FILE_NAME = "mimetypes.csv";
//private static final Logger LOGGER = LoggerFactory.getLogger(MetadataUtils.class);
-
-
+
+
private MetadataUtils() throws IOException{
// file only contains "image/jpeg"
mimeTypeToFileExtension.put("image/jpg", ".jpg");
-
- // Compatibility for ATAK. Since this is an old MIME Type,
+
+ // Compatibility for ATAK. Since this is an old MIME Type,
// we only want to recognize it, not use it based on the file extension
mimeTypeToFileExtension.put("application/x-zip-compressed", ".zip");
-
- InputStream inputStream = MetadataUtils.class.getClassLoader().getResourceAsStream(DEFAULT_FILE_NAME);
+
+ InputStream inputStream = MetadataUtils.class.getClassLoader().getResourceAsStream(DEFAULT_FILE_NAME);
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String fileLine = reader.readLine();
while (fileLine != null){
@@ -47,7 +47,7 @@ private MetadataUtils() throws IOException{
// index 0 is description, don't need for now
fileExtensionToMimeTypeString.put(mimeTypeInfo[2], mimeTypeInfo[1]);
mimeTypeToFileExtension.put(mimeTypeInfo[1], mimeTypeInfo[2]);
-
+
fileLine = reader.readLine();
}
reader.close();
@@ -60,48 +60,48 @@ public static synchronized MetadataUtils getInstance() throws IOException{
}
return instance;
}
-
+
public String getMimeTypeFromFileName(String fileName){
-
+
int lastPeriod = fileName.lastIndexOf('.');
-
+
if (lastPeriod == -1){
return null;
}
String extension = fileName.substring(lastPeriod);
return getMimeType(extension);
}
-
+
/**
- *
+ *
* @param fileExtension, including period
* @return MIME Type in String format
*/
public String getMimeType(String fileExtension){
return fileExtensionToMimeTypeString.get(fileExtension.toLowerCase(Locale.ENGLISH));
}
-
+
/**
- *
+ *
* @param mimeType Type in String format
* @return fileExtension, including period
*/
public String getFileExtension(String mimeType){
return mimeTypeToFileExtension.get(mimeType.toLowerCase(Locale.ENGLISH));
}
-
+
/**
- *
+ *
* @param possibleMime
* @return true if possibleMime is the string representation of a recognized MIME Type according to ROGER
*/
public boolean isValidMimeType(String possibleMime){
return mimeTypeToFileExtension.containsKey(possibleMime);
}
-
+
/**
- * Replaces file extension, where file extension is the characters after the last period in fileName, according to the mimeType provided.
- * Appends file extension if none is found.
+ * Replaces file extension, where file extension is the characters after the last period in fileName, according to the mimeType provided.
+ * Appends file extension if none is found.
* @param fileName
* @return
*/
@@ -109,9 +109,9 @@ public String changeFileExtension(String fileName, String mimeType){
String coreName = removeFileExtension(fileName);
return coreName + getFileExtension(mimeType);
}
-
+
/**
- * Remove file extension, where file extension is the characters after the last period in fileName
+ * Remove file extension, where file extension is the characters after the last period in fileName
* @param fileName
* @return a new string with no extension
*/
@@ -123,7 +123,7 @@ public String removeFileExtension(String fileName){
}
return coreName;
}
-
+
private int lastIndexOf(String string, char character){
//if index = -1 right away, return -1 because the character isn't there
int lastIndex = -1;
@@ -134,13 +134,13 @@ private int lastIndexOf(String string, char character){
} // now index is -1
return lastIndex;
}
-
-
+
+
/**
- * This method makes a "deep clone" of any Java object it is given.
+ * This method makes a "deep clone" of any Java object it is given.
* Returns null for null object
- * @throws IOException
- * @throws ClassNotFoundException
+ * @throws IOException
+ * @throws ClassNotFoundException
*/
public static Object deepClone(Object object) throws IOException, ClassNotFoundException {
if (object == null) {
@@ -156,12 +156,11 @@ public static Object deepClone(Object object) throws IOException, ClassNotFoundE
ObjectInputStream ois = new ObjectInputStream(bais);
return ois.readObject();
}
-
-
+
+
/**
* Creates parent metadata by combining metadata from child messages (does not use parentMetadata from metadataConstants)
* @param messages
- * @param metadataUtils
* @return
*/
public Map createParentMetadata(Message... messages){
diff --git a/src/federation-common/src/main/java/tak/server/federation/message/Payload.java b/src/federation-common/src/main/java/tak/server/federation/message/Payload.java
index fed83564..b4aedd01 100644
--- a/src/federation-common/src/main/java/tak/server/federation/message/Payload.java
+++ b/src/federation-common/src/main/java/tak/server/federation/message/Payload.java
@@ -2,40 +2,39 @@
/**
* This interface exports the methods which are common to kinds of message payloads.
- *
- * The payload can be accessed either accessed via its binary representation (byte array), or as a generic object (parameterized type X).
- *
+ *
+ * The payload can be accessed either accessed via its binary representation (byte array), or as a generic object (parameterized type X).
+ *
* Important note: Concrete implementations of this interface are responsible for managing the relationship between the binary form and the generic form.
- *
+ *
*
Serialization
- * In order for a payload class that implements this interface to be serializable by the default PayloadSerializationPlugin,
- * the payload must have a zero-argument constructor and be able to set its payload through setBytes.
- * @see PayloadSerializationPlugin
- *
+ * In order for a payload class that implements this interface to be serializable by the default PayloadSerializationPlugin,
+ * the payload must have a zero-argument constructor and be able to set its payload through setBytes.
+ *
*/
public interface Payload {
-
+
/*
* Access the payload in binary form
*/
byte[] getBytes();
-
+
/*
* Set the payload in binary form
- *
+ *
* @param bytes The binary form of the payload
*/
@SuppressWarnings("PMD.ArrayIsStoredDirectly")
void setBytes(byte[] bytes);
-
+
/*
* Access the payload as a parameterized type
*/
X getContent();
-
+
/*
* Mutate the paramaterized type form of the payload
- *
+ *
* @param X The parameterized type form of the payload
*/
void setContent(X content);
diff --git a/src/federation-hub-broker/build.gradle b/src/federation-hub-broker/build.gradle
index 003db12c..4c5e1b6c 100644
--- a/src/federation-hub-broker/build.gradle
+++ b/src/federation-hub-broker/build.gradle
@@ -43,15 +43,14 @@ dependencies {
compile group: 'org.apache.ignite', name: 'ignite-slf4j', version: ignite_version
compile group: 'io.netty', name: 'netty-handler', version: netty_handler_version
- compile group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: netty_tcnative_version
compile group: 'io.netty', name: 'netty-tcnative', version: netty_tcnative_version
+ compile group: 'io.netty', name: 'netty-tcnative-classes', version: netty_tcnative_version
compile group: 'io.netty', name: 'netty-transport-native-epoll', version: netty_version, classifier: 'linux-x86_64'
-
- implementation('io.netty:netty-tcnative-classes') {
- version {
- strictly '2.0.53.Final'
- }
- }
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:linux-x86_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:linux-aarch_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:osx-x86_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:osx-aarch_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:windows-x86_64"
compile group: 'org.springframework.boot', name: 'spring-boot-starter-actuator'
compile group: 'org.springframework.boot', name: 'spring-boot-loader', version: spring_boot_version
diff --git a/src/federation-hub-broker/scripts/federation-hub b/src/federation-hub-broker/scripts/federation-hub
index a88c5ab9..4ced581c 100755
--- a/src/federation-hub-broker/scripts/federation-hub
+++ b/src/federation-hub-broker/scripts/federation-hub
@@ -1,4 +1,9 @@
#!/bin/bash
+### BEGIN INIT INFO
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: federation-hub-init init script
+### END INIT INFO
#
# /etc/rc.d/init.d/federation-hub-init
#
@@ -17,7 +22,9 @@
# description: Federation Hub for Team Awareness Kit (TAK)
# Source function library
-. /etc/rc.d/init.d/functions
+if [ -f /etc/rc.d/init.d/functions ]; then
+ . /etc/rc.d/init.d/functions
+fi
SERVICE="Federation Hub"
FEDERATION_HUB_HOME=/opt/tak/federation-hub
diff --git a/src/federation-hub-broker/scripts/federation-hub-broker b/src/federation-hub-broker/scripts/federation-hub-broker
index d4acb7e1..d167f371 100755
--- a/src/federation-hub-broker/scripts/federation-hub-broker
+++ b/src/federation-hub-broker/scripts/federation-hub-broker
@@ -1,4 +1,9 @@
#!/bin/bash
+### BEGIN INIT INFO
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: federation-hub-broker init script
+### END INIT INFO
#
# /etc/rc.d/init.d/federation-hub-broker
#
@@ -16,7 +21,9 @@
# description: Federation Hub broker service.
# Source function library
-. /etc/rc.d/init.d/functions
+if [ -f /etc/rc.d/init.d/functions ]; then
+ . /etc/rc.d/init.d/functions
+fi
SERVICE="Federation Hub Broker"
FEDERATION_HUB_HOME=/opt/tak/federation-hub
diff --git a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubBrokerService.java b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubBrokerService.java
index 48eaadf5..868c2bfd 100644
--- a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubBrokerService.java
+++ b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubBrokerService.java
@@ -38,6 +38,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import javax.cache.event.CacheEntryEvent;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
@@ -48,6 +49,9 @@
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTree;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cache.query.ContinuousQuery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
@@ -118,6 +122,7 @@
import tak.server.federation.FederationException;
import tak.server.federation.FederationPolicyGraph;
import tak.server.federation.GuardedStreamHolder;
+import tak.server.federation.hub.FederationHubCache;
import tak.server.federation.hub.broker.events.BrokerServerEvent;
import tak.server.federation.hub.broker.events.HubClientDisconnectEvent;
import tak.server.federation.hub.broker.events.RestartServerEvent;
@@ -127,6 +132,8 @@
import tak.server.federation.hub.ui.graph.PolicyObjectCell;
public class FederationHubBrokerService implements ApplicationListener {
+
+ private final Ignite ignite;
private static final String SSL_SESSION_ID = "sslSessionId";
private static final String FEDERATED_ID_KEY = "federatedIdentity";
@@ -171,14 +178,37 @@ public class FederationHubBrokerService implements ApplicationListener continuousConfigurationQuery = new ContinuousQuery<>();
- public FederationHubBrokerService(SSLConfig sslConfig, FederationHubServerConfig fedHubConfig, FederationHubPolicyManager fedHubPolicyManager, HubConnectionStore hubConnectionStore) {
+ public FederationHubBrokerService(Ignite ignite, SSLConfig sslConfig, FederationHubServerConfig fedHubConfig, FederationHubPolicyManager fedHubPolicyManager, HubConnectionStore hubConnectionStore) {
instance = this;
+ this.ignite = ignite;
this.sslConfig = sslConfig;
this.fedHubConfig = fedHubConfig;
this.fedHubPolicyManager = fedHubPolicyManager;
this.hubConnectionStore = hubConnectionStore;
setupFederationServers();
+
+ // rather than hitting ignite every time we need the policy graph,
+ // use event driven approach to always have the updated graph
+ // available here for instant access
+ continuousConfigurationQuery.setLocalListener((evts) -> {
+ for (CacheEntryEvent extends String, ? extends FederationPolicyGraph> e : evts) {
+ federationPolicyGraph = e.getValue();
+ }
+ });
+
+ FederationHubCache.getFederationHubPolicyStoreCache(ignite).query(continuousConfigurationQuery);
+ }
+
+ private FederationPolicyGraph federationPolicyGraph;
+
+ public FederationPolicyGraph getFederationPolicyGraph() {
+ if (federationPolicyGraph == null)
+ federationPolicyGraph = fedHubPolicyManager.getPolicyGraph();
+
+ return federationPolicyGraph;
}
private void removeInactiveClientStreams() {
@@ -270,7 +300,7 @@ private SslContext buildServerSslContext(FederationHubServerConfig fedHubConfig)
}
public void sendContactMessagesV1(NioNettyFederationHubServerHandler handler) {
- FederationPolicyGraph fpg = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
if (fpg == null) {
logger.error("Cannot send contact messages; policy manager is null");
return;
@@ -497,7 +527,7 @@ public void onApplicationEvent(BrokerServerEvent event) {
addFederateToGroupPolicyIfMissingV2(hubConnectionStore.getSessionMap().get(entry.getKey()) ,entry.getValue());
// check if the currently connected spoke is still allowed to be connected after the policy change
- FederationPolicyGraph fpg = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
Federate clientNode = checkFederateExistsInPolicy(entry.getValue(), session, fpg);
if (clientNode == null) {
logger.info("Permission Denied. Federate/CA Group not found in the policy graph: " + entry.getValue().getFederateIdentity());
@@ -514,7 +544,6 @@ public void onApplicationEvent(BrokerServerEvent event) {
hubConnectionStore.removeSession(entry.getKey());
}
});
-
updateOutgoingConnections(((UpdatePolicy) event).getOutgoings());
}
}
@@ -693,13 +722,15 @@ public void addCaFederateToPolicyGraph(FederateIdentity federateIdentity, Certif
}
Federate federate = new Federate(federateIdentity);
- fedHubPolicyManager.addCaFederate(federate, caCertNames);
+ synchronized (federationPolicyGraph) {
+ federationPolicyGraph = fedHubPolicyManager.addCaFederate(federate, caCertNames);
+ }
}
public void addFederateToGroupPolicyIfMissingV1(Certificate[] certArray,
FederateIdentity federateIdentity) {
String fedId = federateIdentity.getFedId();
- if (fedHubPolicyManager.getPolicyGraph().getNode(fedId) == null) {
+ if (getFederationPolicyGraph().getNode(fedId) == null) {
addCaFederateToPolicyGraph(federateIdentity, certArray);
}
}
@@ -707,7 +738,10 @@ public void addFederateToGroupPolicyIfMissingV1(Certificate[] certArray,
public void addFederateToGroupPolicyIfMissingV2(SSLSession session, GuardedStreamHolder holder) {
hubConnectionStore.addSession(new BigInteger(session.getId()).toString(), session);
String fedId = holder.getFederateIdentity().getFedId();
- if (fedHubPolicyManager.getPolicyGraph().getNode(fedId) == null) {
+
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
+
+ if (fpg.getNode(fedId) == null) {
try {
Certificate[] certArray = session.getPeerCertificates();
addCaFederateToPolicyGraph(holder.getFederateIdentity(), certArray);
@@ -731,6 +765,8 @@ private class FederatedChannelService extends FederatedChannelGrpc.FederatedChan
private final FederationHubBrokerService broker = FederationHubBrokerService.this;
AtomicReference start = new AtomicReference<>();
+
+
@Override
public void serverFederateGroupsStream(Subscription request, StreamObserver responseObserver) {
@@ -744,7 +780,6 @@ public void serverFederateGroupsStream(Subscription request, StreamObserver streamHolder = new GuardedStreamHolder(
responseObserver, request.getIdentity().getName(),
FederationUtils.getBytesSHA256(clientCertArray[0].getEncoded()), session, request,
@@ -754,13 +789,12 @@ public int compare(FederateGroups a, FederateGroups b) {
return ComparisonChain.start().compare(a.hashCode(), b.hashCode()).result();
}
}, true);
-
- if (fedHubConfig.isUseCaGroups()) {
- addFederateToGroupPolicyIfMissingV2(session, streamHolder);
- }
-
- FederationPolicyGraph fpg = fedHubPolicyManager.getPolicyGraph();
- requireNonNull(fpg, "federation policy graph object");
+
+ addFederateToGroupPolicyIfMissingV2(session, streamHolder);
+
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
+
+ requireNonNull(fpg, "federation policy graph object");
Federate clientNode = checkFederateExistsInPolicy(streamHolder, session, fpg);
if (clientNode == null) {
@@ -794,16 +828,17 @@ public StreamObserver clientFederateGroupsStream(StreamObserver<
public void onNext(FederateGroups fedGroups) {
SSLSession session = (SSLSession)sslSessionKey.get(Context.current());
String id = new BigInteger(session.getId()).toString();
- if (fedHubConfig.isUseCaGroups()) {
- GuardedStreamHolder holder = hubConnectionStore.getClientStreamMap().get(id);
- if (holder != null) {
- addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(id));
- }
- GuardedStreamHolder groupHolder = hubConnectionStore.getClientGroupStreamMap().get(id);
- if (groupHolder != null) {
- addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientGroupStreamMap().get(id));
- }
- }
+
+ GuardedStreamHolder holder = hubConnectionStore.getClientStreamMap().get(id);
+ if (holder != null) {
+ addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(id));
+ }
+
+ GuardedStreamHolder groupHolder = hubConnectionStore.getClientGroupStreamMap().get(id);
+ if (groupHolder != null) {
+ addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientGroupStreamMap().get(id));
+ }
+
addFederateGroups(id, fedGroups);
}
@@ -844,9 +879,7 @@ public void onNext(BinaryBlob value) {
value.getSerializedSize() + " bytes (serialized) latency: " +
latency + " ms");
- if (fedHubConfig.isUseCaGroups()) {
- addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(new BigInteger(session.getId()).toString()));
- }
+ addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(new BigInteger(session.getId()).toString()));
FederationHubBrokerService.this.handleRead(value, new BigInteger(session.getId()).toString());
}
@@ -946,11 +979,9 @@ public int compare(FederatedEvent a, FederatedEvent b) {
}, true
);
- if (fedHubConfig.isUseCaGroups()) {
- addFederateToGroupPolicyIfMissingV2(session, streamHolder);
- }
+ addFederateToGroupPolicyIfMissingV2(session, streamHolder);
- FederationPolicyGraph fpg = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
requireNonNull(fpg, "federation policy graph object");
Federate clientNode = checkFederateExistsInPolicy(streamHolder, session, fpg);
@@ -1061,11 +1092,9 @@ public int compare(ROL a, ROL b) {
}, true
);
- if (fedHubConfig.isUseCaGroups()) {
- addFederateToGroupPolicyIfMissingV2(session, rolStreamHolder);
- }
+ addFederateToGroupPolicyIfMissingV2(session, rolStreamHolder);
- FederationPolicyGraph fpg = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph fpg = getFederationPolicyGraph();
requireNonNull(fpg, "federation policy graph object");
Federate clientNode = checkFederateExistsInPolicy(rolStreamHolder, session, fpg);
@@ -1102,9 +1131,7 @@ public void onNext(ROL clientROL) {
SSLSession session = (SSLSession) sslSessionKey.get(Context.current());
String sessionId = new BigInteger(session.getId()).toString();
- if (fedHubConfig.isUseCaGroups()) {
- addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(sessionId));
- }
+ addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(sessionId));
parseRol(clientROL, sessionId);
} catch (Exception e) {
@@ -1141,7 +1168,7 @@ public StreamObserver serverEventStream(StreamObserver() {
@Override
@@ -1150,12 +1177,10 @@ public void onNext(FederatedEvent fe) {
clientByteAccumulator.addAndGet(fe.getSerializedSize());
// Add federate to group in case policy was updated during connection
- if (fedHubConfig.isUseCaGroups()) {
- GuardedStreamHolder holder = hubConnectionStore.getClientStreamMap().get(id);
- if (holder != null) {
- addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(id));
- }
- }
+ GuardedStreamHolder holder = hubConnectionStore.getClientStreamMap().get(id);
+ if (holder != null) {
+ addFederateToGroupPolicyIfMissingV2(session, hubConnectionStore.getClientStreamMap().get(id));
+ }
// submit to orchestrator
FederationHubBrokerService.this.handleRead(fe, new BigInteger(session.getId()).toString());
}
@@ -1258,8 +1283,7 @@ public ServerCall.Listener interceptCall(
public void assignMessageSourceAndDestinationsFromPolicy(Message message,
FederateIdentity federateIdentity)
throws FederationException {
- assignMessageSourceAndDestinationsFromPolicy(message, federateIdentity,
- fedHubPolicyManager.getPolicyGraph());
+ assignMessageSourceAndDestinationsFromPolicy(message, federateIdentity, getFederationPolicyGraph());
}
private void assignMessageSourceAndDestinationsFromPolicy(Message message,
@@ -1303,7 +1327,7 @@ private void sendRolMessage(Message message) {
FederateIdentity src = (FederateIdentity)message.getSource().getEntity();
FederateIdentity dest = (FederateIdentity)entity.getEntity();
- FederationPolicyGraph policyGraph = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph policyGraph = getFederationPolicyGraph();
Federate srcNode = policyGraph.getFederate(src.getFedId());
Federate destNode = policyGraph.getFederate(dest.getFedId());
@@ -1407,7 +1431,7 @@ public void sendFederatedEventV1(Message message) {
FederateIdentity src = (FederateIdentity)message.getSource().getEntity();
FederateIdentity dest = (FederateIdentity)entity.getEntity();
- FederationPolicyGraph policyGraph = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph policyGraph = getFederationPolicyGraph();
Federate srcNode = policyGraph.getFederate(src.getFedId());
Federate destNode = policyGraph.getFederate(dest.getFedId());
@@ -1442,7 +1466,7 @@ private void sendFederatedEvent(Message message) {
FederateIdentity src = (FederateIdentity)message.getSource().getEntity();
FederateIdentity dest = (FederateIdentity)entity.getEntity();
- FederationPolicyGraph policyGraph = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph policyGraph = getFederationPolicyGraph();
Federate srcNode = policyGraph.getFederate(src.getFedId());
Federate destNode = policyGraph.getFederate(dest.getFedId());
@@ -1483,7 +1507,7 @@ private void sendFederatedGroup(Message message) {
FederateIdentity dest = (FederateIdentity)entity.getEntity();
FederateGroups groups = (FederateGroups)message.getPayload().getContent();
- FederationPolicyGraph policyGraph = fedHubPolicyManager.getPolicyGraph();
+ FederationPolicyGraph policyGraph = getFederationPolicyGraph();
Federate srcNode = policyGraph.getFederate(src.getFedId());
Federate destNode = policyGraph.getFederate(dest.getFedId());
@@ -1546,7 +1570,7 @@ public void handleRead(BinaryBlob event, String streamKey) {
try {
assignMessageSourceAndDestinationsFromPolicy(federatedMessage,
streamHolder.getFederateIdentity(),
- fedHubPolicyManager.getPolicyGraph());
+ getFederationPolicyGraph());
sendMessage(federatedMessage);
} catch (FederationException e) {
@@ -1574,7 +1598,7 @@ public void handleRead(ROL event, String streamKey) {
try {
assignMessageSourceAndDestinationsFromPolicy(federatedMessage,
streamHolder.getFederateIdentity(),
- fedHubPolicyManager.getPolicyGraph());
+ getFederationPolicyGraph());
sendMessage(federatedMessage);
} catch (FederationException e) {
logger.error("Could not get destinations from policy graph", e);
@@ -1604,7 +1628,7 @@ public void handleRead(FederatedEvent event, String streamKey) {
try {
assignMessageSourceAndDestinationsFromPolicy(federatedMessage,
streamHolder.getFederateIdentity(),
- fedHubPolicyManager.getPolicyGraph());
+ getFederationPolicyGraph());
sendMessage(federatedMessage);
if (event != null && event.hasContact()) {
@@ -1680,7 +1704,7 @@ public void addFederateGroups(String sourceId, FederateGroups groups) {
try {
assignMessageSourceAndDestinationsFromPolicy(federatedMessage,
ident,
- fedHubPolicyManager.getPolicyGraph());
+ getFederationPolicyGraph());
sendMessage(federatedMessage);
} catch (FederationException e) {
logger.error("Could not get destinations from policy graph", e);
diff --git a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubServer.java b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubServer.java
index 9046bc7c..ed57f880 100644
--- a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubServer.java
+++ b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/FederationHubServer.java
@@ -43,11 +43,14 @@ public class FederationHubServer implements CommandLineRunner {
private static String configFile;
public static void main(String[] args) {
+
if (args.length > 1) {
System.err.println("Usage: java -jar federation-hub-broker.jar [CONFIG_FILE_PATH]");
return;
} else if (args.length == 1) {
configFile = args[0];
+ } else if (!Strings.isNullOrEmpty(System.getProperty("FEDERATION_HUB_BROKER_CONFIG"))) {
+ configFile = System.getProperties().getProperty("FEDERATION_HUB_BROKER_CONFIG");
} else {
configFile = DEFAULT_CONFIG_FILE;
}
@@ -118,8 +121,8 @@ public FederationHubServerConfig getFedHubConfig()
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
- public FederationHubBrokerService FederationHubBrokerService(SSLConfig getSslConfig, FederationHubServerConfig fedHubConfig, FederationHubPolicyManager fedHubPolicyManager, HubConnectionStore hubConnectionStore) {
- return new FederationHubBrokerService(getSslConfig, fedHubConfig, fedHubPolicyManager, hubConnectionStore);
+ public FederationHubBrokerService FederationHubBrokerService(Ignite ignite, SSLConfig getSslConfig, FederationHubServerConfig fedHubConfig, FederationHubPolicyManager fedHubPolicyManager, HubConnectionStore hubConnectionStore) {
+ return new FederationHubBrokerService(ignite, getSslConfig, fedHubConfig, fedHubPolicyManager, hubConnectionStore);
}
private FederationHubServerConfig loadConfig(String configFile)
diff --git a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/HubFigClient.java b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/HubFigClient.java
index 5f0feb16..eabc22da 100644
--- a/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/HubFigClient.java
+++ b/src/federation-hub-broker/src/main/java/tak/server/federation/hub/broker/HubFigClient.java
@@ -65,7 +65,6 @@
import tak.server.federation.GuardedStreamHolder;
import tak.server.federation.hub.FederationHubDependencyInjectionProxy;
import tak.server.federation.hub.broker.events.HubClientDisconnectEvent;
-import tak.server.federation.hub.broker.events.UpdatePolicy;
import tak.server.federation.hub.ui.graph.FederationOutgoingCell;
/*
@@ -185,26 +184,6 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {}
});
-
- asyncFederatedChannel.clientROLStream(
- Subscription.newBuilder().setFilter("")
- .setIdentity(Identity.newBuilder().setName(fedName).setUid(clientUid).build()).build(),
- new StreamObserver() {
-
- @Override
- public void onNext(ROL value) {
- FederationHubBrokerService.getInstance().parseRol(value, fedName);
- }
-
- @Override
- public void onError(Throwable t) {
- logger.error("ROL Stream Error: ", t);
- processDisconnect();
- }
-
- @Override
- public void onCompleted() {}
- });
asyncFederatedChannel.serverFederateGroupsStream(
Subscription.newBuilder().setFilter("")
@@ -243,6 +222,8 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {}
});
+
+ final AtomicBoolean initROLStream = new AtomicBoolean(false);
healthScheduler = scheduler.scheduleWithFixedDelay(() -> {
ClientHealth clientHealth = ClientHealth.newBuilder().setStatus(ClientHealth.ServingStatus.SERVING).build();
@@ -263,6 +244,31 @@ public void onNext(ServerHealth value) {
processDisconnect();
throw new RuntimeException("Not Healthy");
}
+
+ if (initROLStream.compareAndSet(false, true)) {
+ // open the client ROL stream only after getting a health check back. This will trigger transmission of federated mission changes.
+ // Subscription / Stream to receive ROL messages from server
+ asyncFederatedChannel.clientROLStream(
+ Subscription.newBuilder().setFilter("")
+ .setIdentity(Identity.newBuilder().setName(fedName).setUid(clientUid).build()).build(),
+ new StreamObserver() {
+
+ @Override
+ public void onNext(ROL value) {
+ FederationHubBrokerService.getInstance().parseRol(value, fedName);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("ROL Stream Error: ", t);
+ processDisconnect();
+ }
+
+ @Override
+ public void onCompleted() {}
+ });
+ }
+
}
@Override
@@ -311,12 +317,11 @@ public X509Certificate[] propogate(X509Certificate[] certs) {
}
}
- FederationPolicyGraph fpg = FederationHubDependencyInjectionProxy.getInstance().fedHubPolicyManager().getPolicyGraph();
+ FederationPolicyGraph fpg = FederationHubBrokerService.getInstance().getFederationPolicyGraph();
requireNonNull(fpg, "federation policy graph object");
- Federate clientNode = FederationHubDependencyInjectionProxy.getInstance()
- .fedHubPolicyManager()
- .getPolicyGraph()
+ Federate clientNode = FederationHubBrokerService.getInstance()
+ .getFederationPolicyGraph()
.getFederate(new FederateIdentity(fedName));
requireNonNull(clientNode, "federation policy node for newly connected client");
@@ -335,13 +340,14 @@ public X509Certificate[] propogate(X509Certificate[] certs) {
}
private void setupGroupStreamSender() {
- groupsCall = channel.newCall(
- io.grpc.MethodDescriptor.create(MethodDescriptor.MethodType.CLIENT_STREAMING,
- generateFullMethodName("com.atakmap.FederatedChannel", "ClientFederateGroupsStream"),
- io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.FederateGroups.getDefaultInstance()),
- io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.Subscription.getDefaultInstance())),
- asyncFederatedChannel.getCallOptions());
-
+ MethodDescriptor methodDescripton = MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.CLIENT_STREAMING)
+ .setFullMethodName(generateFullMethodName("com.atakmap.FederatedChannel", "ClientFederateGroupsStream"))
+ .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.FederateGroups.getDefaultInstance()))
+ .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.Subscription.getDefaultInstance()))
+ .build();
+
+ groupsCall = channel.newCall(methodDescripton, asyncFederatedChannel.getCallOptions());
// use listener to respect flow control, and send messages to the server when it
// is ready
groupsCall.start(new ClientCall.Listener() {
@@ -350,6 +356,7 @@ private void setupGroupStreamSender() {
public void onMessage(Subscription response) {
// Notify gRPC to receive one additional response.
groupsCall.request(1);
+
}
@Override
@@ -371,14 +378,38 @@ public int compare(FederateGroups a, FederateGroups b) {
);
FederationHubDependencyInjectionProxy.getInstance().hubConnectionStore().addGroupStream(fedName, groupStreamHolder);
}
+
+ private void setServerSubscriptionForConnection() {
+ if (eventStreamHolder != null && serverSubscription != null) {
+ eventStreamHolder.setSubscription(serverSubscription);
+
+ List existingConnectionsFromRemoteServer = FederationHubDependencyInjectionProxy.getInstance()
+ .hubConnectionStore()
+ .getConnectionInfos()
+ .stream()
+ .filter(i -> i.getRemoteServerId().equals(serverSubscription.getIdentity().getServerId()))
+ .collect(Collectors.toList());
+
+ // if we already have a connection to/from this server, don't allow another. force close without reconnect.
+ if (existingConnectionsFromRemoteServer.size() > 0) {
+ logger.info("Error: Connection to/from " + fedName + " already exists. Disallowing duplicate");
+ processDisconnectWithoutRetry();
+ } else {
+ FederationHubDependencyInjectionProxy.getInstance().hubConnectionStore().addConnectionInfo(fedName, info);
+ }
+ logger.info("Outgoing connection for {} established ", fedName);
+ }
+ }
public void setupEventStreamSender() {
- clientCall = channel.newCall(
- io.grpc.MethodDescriptor.create(io.grpc.MethodDescriptor.MethodType.CLIENT_STREAMING,
- generateFullMethodName("com.atakmap.FederatedChannel", "ServerEventStream"),
- io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.FederatedEvent.getDefaultInstance()),
- io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.Subscription.getDefaultInstance())),
- asyncFederatedChannel.getCallOptions());
+ MethodDescriptor methodDescripton = MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.CLIENT_STREAMING)
+ .setFullMethodName(generateFullMethodName("com.atakmap.FederatedChannel", "ServerEventStream"))
+ .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.FederatedEvent.getDefaultInstance()))
+ .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.Subscription.getDefaultInstance()))
+ .build();
+
+ clientCall = channel.newCall(methodDescripton, asyncFederatedChannel.getCallOptions());
// use listener to respect flow control, and send messages to the server when it
// is ready
@@ -387,25 +418,7 @@ public void setupEventStreamSender() {
@Override
public void onMessage(Subscription response) {
serverSubscription = response;
- if (eventStreamHolder != null) {
- eventStreamHolder.setSubscription(serverSubscription);
-
- List existingConnectionsFromRemoteServer = FederationHubDependencyInjectionProxy.getInstance()
- .hubConnectionStore()
- .getConnectionInfos()
- .stream()
- .filter(i -> i.getRemoteServerId().equals(serverSubscription.getIdentity().getServerId()))
- .collect(Collectors.toList());
-
- // if we already have a connection to/from this server, don't allow another. force close without reconnect.
- if (existingConnectionsFromRemoteServer.size() > 0) {
- logger.info("Error: Connection to/from " + fedName + " already exists. Disallowing duplicate");
- processDisconnectWithoutRetry();
- } else {
- FederationHubDependencyInjectionProxy.getInstance().hubConnectionStore().addConnectionInfo(fedName, info);
- }
- logger.info("Outgoing connection for {} established ", fedName);
- }
+ setServerSubscriptionForConnection();
// Notify gRPC to receive one additional response.
clientCall.request(1);
}
@@ -428,14 +441,12 @@ public int compare(FederatedEvent a, FederatedEvent b) {
}, true
);
- if (serverSubscription != null) {
- eventStreamHolder.setSubscription(serverSubscription);
- }
+ setServerSubscriptionForConnection();
eventStreamHolder.send(FederatedEvent.newBuilder().build());
FederationHubDependencyInjectionProxy.getInstance().hubConnectionStore().addClientStreamHolder(fedName, eventStreamHolder);
- FederationPolicyGraph fpg = FederationHubDependencyInjectionProxy.getInstance().fedHubPolicyManager().getPolicyGraph();
+ FederationPolicyGraph fpg = FederationHubBrokerService.getInstance().getFederationPolicyGraph();
String fedId = eventStreamHolder.getFederateIdentity().getFedId();
Federate clientNode = fpg.getFederate(fedId);
@@ -466,14 +477,15 @@ public int compare(FederatedEvent a, FederatedEvent b) {
}
public void setupRolStreamSender() {
- // open a channel to the FIG server for the purpose of sending ROL messages
- rolCall = channel.newCall(io.grpc.MethodDescriptor.create(io.grpc.MethodDescriptor.MethodType.CLIENT_STREAMING,
- generateFullMethodName("com.atakmap.FederatedChannel", "ServerROLStream"),
- io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.ROL.getDefaultInstance()),
- io.grpc.protobuf.ProtoUtils
- .marshaller(com.atakmap.Tak.Subscription.getDefaultInstance())),
- asyncFederatedChannel.getCallOptions());
-
+ MethodDescriptor methodDescripton = MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.CLIENT_STREAMING)
+ .setFullMethodName(generateFullMethodName("com.atakmap.FederatedChannel", "ServerROLStream"))
+ .setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.ROL.getDefaultInstance()))
+ .setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(com.atakmap.Tak.Subscription.getDefaultInstance()))
+ .build();
+
+ rolCall = channel.newCall(methodDescripton, asyncFederatedChannel.getCallOptions());
+
rolCall.start(new ClientCall.Listener() {
@Override
diff --git a/src/federation-hub-broker/src/main/resources/logback-broker.xml b/src/federation-hub-broker/src/main/resources/logback-broker.xml
index 84237958..6db26fca 100644
--- a/src/federation-hub-broker/src/main/resources/logback-broker.xml
+++ b/src/federation-hub-broker/src/main/resources/logback-broker.xml
@@ -1,4 +1,9 @@
+
+
+ 1
+
+ /opt/tak/federation-hub/logs/federation-hub-broker.log
diff --git a/src/federation-hub-policy/scripts/federation-hub-policy b/src/federation-hub-policy/scripts/federation-hub-policy
index e229fdc0..de0d692e 100755
--- a/src/federation-hub-policy/scripts/federation-hub-policy
+++ b/src/federation-hub-policy/scripts/federation-hub-policy
@@ -1,4 +1,9 @@
#!/bin/bash
+### BEGIN INIT INFO
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: federation-hub-policy init script
+### END INIT INFO
#
# /etc/rc.d/init.d/federation-hub-policy
#
@@ -16,7 +21,9 @@
# description: Federation Hub policy manager service.
# Source function library
-. /etc/rc.d/init.d/functions
+if [ -f /etc/rc.d/init.d/functions ]; then
+ . /etc/rc.d/init.d/functions
+fi
SERVICE="Federation Hub Policy Manager"
FEDERATION_HUB_HOME=/opt/tak/federation-hub
diff --git a/src/federation-hub-policy/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerService.java b/src/federation-hub-policy/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerService.java
index bbbbd092..d7fed286 100644
--- a/src/federation-hub-policy/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerService.java
+++ b/src/federation-hub-policy/src/main/java/tak/server/federation/hub/policy/FederationHubPolicyManagerService.java
@@ -1,17 +1,17 @@
package tak.server.federation.hub.policy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
-import org.springframework.context.annotation.Bean;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.annotation.Bean;
+
+import com.google.common.base.Strings;
import tak.server.federation.hub.FederationHubConstants;
import tak.server.federation.hub.FederationHubUtils;
@@ -24,7 +24,7 @@ public class FederationHubPolicyManagerService implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(FederationHubPolicyManagerService.class);
private static Ignite ignite = null;
-
+
public static void main(String[] args) {
SpringApplication application = new SpringApplication(FederationHubPolicyManagerService.class);
@@ -45,7 +45,12 @@ public Ignite getIgnite() {
@Override
public void run(String... args) throws Exception {
- FederationHubPolicyManagerImpl hpm = new FederationHubPolicyManagerImpl();
+ String defaultUiPolicyFile = null;
+ if (!Strings.isNullOrEmpty(System.getProperty("FEDERATION_HUB_POLICY_CONFIG"))) {
+ defaultUiPolicyFile = System.getProperties().getProperty("FEDERATION_HUB_POLICY_CONFIG");
+ }
+
+ FederationHubPolicyManagerImpl hpm = new FederationHubPolicyManagerImpl(ignite, defaultUiPolicyFile);
ClusterGroup cg = ignite.cluster().forAttribute(
FederationHubConstants.FEDERATION_HUB_IGNITE_PROFILE_KEY,
FederationHubConstants.FEDERATION_HUB_POLICY_IGNITE_PROFILE);
diff --git a/src/federation-hub-policy/src/main/resources/logback-policy.xml b/src/federation-hub-policy/src/main/resources/logback-policy.xml
index dbcf7833..da3c4a1c 100644
--- a/src/federation-hub-policy/src/main/resources/logback-policy.xml
+++ b/src/federation-hub-policy/src/main/resources/logback-policy.xml
@@ -1,4 +1,9 @@
+
+
+ 1
+
+ /opt/tak/federation-hub/logs/federation-hub-policy.log
diff --git a/src/federation-hub-ui/scripts/federation-hub-ui b/src/federation-hub-ui/scripts/federation-hub-ui
index f6a6d9d8..3728fb29 100755
--- a/src/federation-hub-ui/scripts/federation-hub-ui
+++ b/src/federation-hub-ui/scripts/federation-hub-ui
@@ -1,4 +1,9 @@
#!/bin/bash
+### BEGIN INIT INFO
+# Default-Start: 2 3 4 5
+# Default-Stop: 0 1 6
+# Short-Description: federation-hub-ui init script
+### END INIT INFO
#
# /etc/rc.d/init.d/federation-hub-ui
#
@@ -16,7 +21,9 @@
# description: Federation Hub UI service.
# Source function library
-. /etc/rc.d/init.d/functions
+if [ -f /etc/rc.d/init.d/functions ]; then
+ . /etc/rc.d/init.d/functions
+fi
SERVICE="Federation Hub UI"
FEDERATION_HUB_HOME=/opt/tak/federation-hub
diff --git a/src/federation-hub-ui/src/main/java/tak/server/federation/hub/ui/FederationHubUIServer.java b/src/federation-hub-ui/src/main/java/tak/server/federation/hub/ui/FederationHubUIServer.java
index 7b6a16e8..c405edfb 100644
--- a/src/federation-hub-ui/src/main/java/tak/server/federation/hub/ui/FederationHubUIServer.java
+++ b/src/federation-hub-ui/src/main/java/tak/server/federation/hub/ui/FederationHubUIServer.java
@@ -131,7 +131,7 @@ public AuthorizationFileWatcher authFileWatcher(FederationHubUIConfig fedHubConf
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
- public FederationHubUIService FederationHubBrokerService() {
+ public FederationHubUIService federationHubUIService() {
return new FederationHubUIService();
}
}
diff --git a/src/federation-hub-ui/src/main/resources/logback-ui.xml b/src/federation-hub-ui/src/main/resources/logback-ui.xml
index 89267383..967e7b0d 100644
--- a/src/federation-hub-ui/src/main/resources/logback-ui.xml
+++ b/src/federation-hub-ui/src/main/resources/logback-ui.xml
@@ -1,4 +1,9 @@
+
+
+ 1
+
+ /opt/tak/federation-hub/logs/federation-hub-ui.log
diff --git a/src/gradle.properties b/src/gradle.properties
index 5dc18afc..bd5bf6a3 100644
--- a/src/gradle.properties
+++ b/src/gradle.properties
@@ -10,7 +10,9 @@ httpcomponents_version = 4.5.13
slf4j_version = 1.7.32
logback_version = 1.2.11
log4j_api_version = 2.17.1
-postgres_version = 42.2.5
+# Switched versions of of postgres to address xray reported vulnerability
+#postgres_version = 42.2.5
+postgres_version = 42.3.3
# Current 5.3.3 released 01/2021
# Candidate 5.3.19 released 04/2022
@@ -20,7 +22,9 @@ spring_version = 5.3.21
# Candidate 5.3.13.RELEASE released 12/2021
# Candidate 5.5.6 released 04/2022
# Candidate 5.6.3 released 04/2022
-spring_security_version = 5.7.2
+#spring_security_version = 5.7.2
+# Version recommended by xray
+spring_security_version = 5.7.5
# Current 2.5.0.RELEASE released 05/2020
# Candidate 2.5.1.RELEASE released 04/2021
@@ -83,6 +87,7 @@ commons_collections_version = 4.2
commons_fileupload_version = 1.4
commons_io_version = 2.11.0
commons_pool_version = 2.6.0
+commons_validator_version = 1.7
concurrent_hashmap_version = 1.0
esapi_version = 2.2.0.0
@@ -140,14 +145,15 @@ xerces_version = 2.12.2
xpp3_version = 1.1.4c
hamcrest_version = 1.3
junit_version = 4.12
-flyway_version = 6.4.0
+flyway_version = 9.8.3
# Current 2.1.4 released 10/2008 with no known vulnberabilities as of 06/2022
# Candidate 2.1.7 released 04/2009 with no known vulnberabilities as of 06/2022
# Candidate 2.2.10 released 01/2015 with no known vulnberabilities as of 06/2022
# Candidate 2.3.5 released 08/2021 with no known vulnberabilities as of 06/2022
# Candidate 3.0.2 released 08/2021 with no known vulnberabilities as of 06/2022
-jaxwsrt_version = 2.1.4
+#jaxwsrt_version = 2.1.4 # Attempting version update to 2.3.5 to accommodate Raspberry Pi OS LITE installations
+jaxwsrt_version = 2.3.5
# Current 2.9.0 released 02/2022 with no known vulnberabilities as of 06/2022
gson_version = 2.9.0
@@ -162,18 +168,16 @@ json_org_version = 20180813
opencsv_version = 4.4
# Netty and GRPC must be valid with one another
-#Go to github.com/grpc-java in SECURITY.md file look for the table of known working versions
-
-# Current 4.1.76.Final released 04/2022
-netty_version = 4.1.76.Final
-
-# try this:
-netty_handler_version = 4.1.58.Final
-netty_tcnative_version = 2.0.36.Final
-
-
-grpc_version = 1.35.0
+# These four are the lastest in sync versions according to
+# https://github.com/grpc/grpc-java/blob/master/SECURITY.md
+# Other than netty_version being slightly newer
+netty_version = 4.1.77.Final
+netty_handler_version = 4.1.77.Final
+netty_tcnative_version = 2.0.53.Final
+grpc_version = 1.49.1
+# keep this up to date with the version gRPC is expecting
+perfmark_api_version = 0.25.0
docker_plugin_version = 1.2
gradle_ospackage_version = 8.3.0
@@ -202,3 +206,4 @@ annotation_api_version = 1.3.2
snake_yaml_version = 1.30
protobuf_java_version = 3.16.1
springdoc_version = 1.6.9
+okhttp3_version = 4.10.0
diff --git a/src/takserver-cluster/deployments/helm/Chart.yaml b/src/takserver-cluster/deployments/helm/Chart.yaml
index 826f6541..6eefa475 100644
--- a/src/takserver-cluster/deployments/helm/Chart.yaml
+++ b/src/takserver-cluster/deployments/helm/Chart.yaml
@@ -6,7 +6,7 @@ type: application
dependencies:
- name: postgresql
- version: "10.16.2"
+ version: "12.1.6"
repository: "https://charts.bitnami.com/bitnami"
condition: postgresql.enabled
tags:
diff --git a/src/takserver-cluster/docker-files/Dockerfile.ca b/src/takserver-cluster/docker-files/Dockerfile.ca
index f04d288e..f9a47794 100644
--- a/src/takserver-cluster/docker-files/Dockerfile.ca
+++ b/src/takserver-cluster/docker-files/Dockerfile.ca
@@ -1,5 +1,5 @@
# need java for keytool
-FROM openjdk:11-jdk-stretch
+FROM openjdk:11-jdk-bullseye
COPY takserver-core/certs/* /
ARG ARG_CA_NAME
ENV CA_NAME=$ARG_CA_NAME
@@ -14,4 +14,4 @@ RUN apt update && \
apt install -y openssl && \
apt install -y vim && \
./generateClusterCerts.sh
-CMD ["bash"]
\ No newline at end of file
+CMD ["bash"]
diff --git a/src/takserver-cluster/docker-files/Dockerfile.database-setup b/src/takserver-cluster/docker-files/Dockerfile.database-setup
index f3fedae0..6b7a6485 100644
--- a/src/takserver-cluster/docker-files/Dockerfile.database-setup
+++ b/src/takserver-cluster/docker-files/Dockerfile.database-setup
@@ -1,4 +1,4 @@
-FROM openjdk:11-jdk-stretch
+FROM openjdk:11-jdk-bullseye
COPY takserver-schemamanager/SchemaManager.jar ./
COPY takserver-schemamanager/generic-cluster-database-configuration.sh ./
COPY CoreConfig.xml opt/tak/CoreConfig.xml
@@ -6,4 +6,4 @@ COPY takserver-schemamanager/db-connection-configuration.sh .
RUN chmod +x generic-cluster-database-configuration.sh
RUN chmod +x db-connection-configuration.sh
RUN ./db-connection-configuration.sh
-CMD ["./generic-cluster-database-configuration.sh"]
\ No newline at end of file
+CMD ["./generic-cluster-database-configuration.sh"]
diff --git a/src/takserver-cluster/docker-files/Dockerfile.rds b/src/takserver-cluster/docker-files/Dockerfile.rds
index 5f3c144c..c5263bee 100644
--- a/src/takserver-cluster/docker-files/Dockerfile.rds
+++ b/src/takserver-cluster/docker-files/Dockerfile.rds
@@ -1,4 +1,4 @@
-FROM openjdk:11-stretch
+FROM openjdk:11-jdk-bullseye
RUN apt-get update && \
apt-get install -y \
python \
diff --git a/src/takserver-cluster/docker-files/Dockerfile.takserver-base b/src/takserver-cluster/docker-files/Dockerfile.takserver-base
index 44b83cb7..333c9899 100644
--- a/src/takserver-cluster/docker-files/Dockerfile.takserver-base
+++ b/src/takserver-cluster/docker-files/Dockerfile.takserver-base
@@ -1,4 +1,4 @@
-FROM openjdk:11-jdk-stretch
+FROM openjdk:11-jdk-bullseye
COPY takserver-core/takserver-core*.war takserver.war
COPY takserver-usermanager/UserManager.jar .
COPY CoreConfig.xml .
diff --git a/src/takserver-cluster/scripts/build-eks.py b/src/takserver-cluster/scripts/build-eks.py
index b32d2988..b9a7a2b1 100644
--- a/src/takserver-cluster/scripts/build-eks.py
+++ b/src/takserver-cluster/scripts/build-eks.py
@@ -196,8 +196,8 @@ def setupDBParameterGroups():
except botocore.exceptions.ClientError as e:
create_db_param_group_res = boto3.client('rds', region_name=TAK_CLUSTER_REGION).create_db_parameter_group(
DBParameterGroupName='takserver-rds-pg',
- DBParameterGroupFamily='postgres10',
- Description='Takserver RDS parameter group for postgres10'
+ DBParameterGroupFamily='postgres15',
+ Description='Takserver RDS parameter group for postgres15'
)
print('\nCreating RDS Parameter Group')
diff --git a/src/takserver-cluster/scripts/build-kops-gossip.py b/src/takserver-cluster/scripts/build-kops-gossip.py
index 8196b72a..82c17069 100755
--- a/src/takserver-cluster/scripts/build-kops-gossip.py
+++ b/src/takserver-cluster/scripts/build-kops-gossip.py
@@ -185,8 +185,8 @@ def setupDBParameterGroups():
except botocore.exceptions.ClientError as e:
create_db_param_group_res = boto3.client('rds', region_name=TAK_CLUSTER_REGION).create_db_parameter_group(
DBParameterGroupName='takserver-rds-pg',
- DBParameterGroupFamily='postgres10',
- Description='Takserver RDS parameter group for postgres10'
+ DBParameterGroupFamily='postgres15',
+ Description='Takserver RDS parameter group for postgres15'
)
print('\nCreating RDS Parameter Group')
diff --git a/src/takserver-cluster/scripts/build-kops.py b/src/takserver-cluster/scripts/build-kops.py
index 004b63ba..ea248296 100755
--- a/src/takserver-cluster/scripts/build-kops.py
+++ b/src/takserver-cluster/scripts/build-kops.py
@@ -185,8 +185,8 @@ def setupDBParameterGroups():
except botocore.exceptions.ClientError as e:
create_db_param_group_res = boto3.client('rds', region_name=TAK_CLUSTER_REGION).create_db_parameter_group(
DBParameterGroupName='takserver-rds-pg',
- DBParameterGroupFamily='postgres10',
- Description='Takserver RDS parameter group for postgres10'
+ DBParameterGroupFamily='postgres15',
+ Description='Takserver RDS parameter group for postgres15'
)
print('\nCreating RDS Parameter Group')
diff --git a/src/takserver-common/build.gradle b/src/takserver-common/build.gradle
index 11f3a11b..e092f90d 100644
--- a/src/takserver-common/build.gradle
+++ b/src/takserver-common/build.gradle
@@ -27,7 +27,11 @@ dependencies {
exclude group: 'xpp'
} */
- compile group: 'io.netty', name: 'netty-tcnative-boringssl-static', version: netty_tcnative_version
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:linux-x86_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:linux-aarch_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:osx-x86_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:osx-aarch_64"
+ compile "io.netty:netty-tcnative-boringssl-static:$netty_tcnative_version:windows-x86_64"
// compile group: 'io.netty', name: 'netty-tcnative', version: netty_tcnative_version
compile group: 'io.netty', name: 'netty-transport-native-epoll', version: netty_version, classifier: 'linux-x86_64'
diff --git a/src/takserver-common/src/main/java/com/bbn/marti/remote/FederationManager.java b/src/takserver-common/src/main/java/com/bbn/marti/remote/FederationManager.java
index 44ad44ba..a2104e9a 100644
--- a/src/takserver-common/src/main/java/com/bbn/marti/remote/FederationManager.java
+++ b/src/takserver-common/src/main/java/com/bbn/marti/remote/FederationManager.java
@@ -41,6 +41,7 @@ public interface FederationManager {
void removeFederateInboundGroupsMap(@NotNull String federateUID, @NotNull String remoteGroup, @NotNull String localGroup);
void removeInboundGroupFromCA(@NotNull String caID, @NotNull Set localGroupNames);
void removeOutboundGroupFromCA(@NotNull String caID, @NotNull Set localGroupNames);
+ void updateFederateMissionSettings(@NotNull String federateUID, @NotNull boolean missionFederateDefault, @NotNull List federateMissions);
// Get outgoing config object by address and port
List getOutgoingConnections(@NotNull String address, int port);
@@ -74,10 +75,12 @@ public interface FederationManager {
// Send ROL to messaging process, to be federated subject to group filtering.
// Attach outbound groups to the ROL for federates using group mapping
void submitFederateROL(ROL rol, NavigableSet groups);
+ void submitMissionFederateROL(ROL rol, NavigableSet groups, String missionName);
// Send ROL to messaging process, to be federated subject to group filtering.
- // Attach outbound groups to the ROL for federates using group mapping
+ // Attach outbound groups to the ROL for federates using group mapping
void submitFederateROL(ROL rol, NavigableSet groups, String fileHash);
+ void submitMissionFederateROL(ROL rol, NavigableSet groups, String fileHash, String missionName);
void reconfigureFederation();
diff --git a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/ConnectionModifyResult.java b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/ConnectionModifyResult.java
index 8d37c9cc..459383ad 100644
--- a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/ConnectionModifyResult.java
+++ b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/ConnectionModifyResult.java
@@ -5,9 +5,9 @@
/**
* An enum used to contain all the possible results from manipulating an input or static subscription.
- *
+ *
* If the provided change matches the current state, a SUCCESS is expected to be returned.
- *
+ *
* Created on 2/26/16.
*/
diff --git a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/FileUserManagementInterface.java b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/FileUserManagementInterface.java
index 76581932..2f2d7a48 100644
--- a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/FileUserManagementInterface.java
+++ b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/FileUserManagementInterface.java
@@ -22,15 +22,15 @@
/**
* Abstract interface used to define what can be done for user management
- *
+ *
* Created on 9/21/15.
*/
public interface FileUserManagementInterface {
-
+
UserAuthenticationFile getUserAuthenticationFile();
boolean userExists(@NotNull String userIdentifier);
-
+
FileAuthenticatorControl addOrUpdateUser(@NotNull String userIdentifier, @Nullable String userPassword, boolean wasPasswordAlreadyHashed);
FileAuthenticatorControl addOrUpdateUserFromCertificate(@NotNull X509Certificate certificate);
@@ -73,9 +73,9 @@ public interface FileUserManagementInterface {
String getUserFingerprint(@NotNull String userIdentifier);
User getFirstUser(String userIdentifier);
-
+
SimpleGroupWithUsersModel getUsersInGroup(String groupName);
-
+
@NotNull
Set getGroupNames();
}
diff --git a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/GroupManager.java b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/GroupManager.java
index 0af6f4d5..a754d451 100644
--- a/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/GroupManager.java
+++ b/src/takserver-common/src/main/java/com/bbn/marti/remote/groups/GroupManager.java
@@ -17,8 +17,8 @@
/*
* Operations for keeping track of users and groups.
- *
- *
+ *
+ *
*/
public interface GroupManager {
@@ -26,37 +26,37 @@ public interface GroupManager {
* Get all groups of which a user is a member.
*/
NavigableSet getGroups(User user);
-
+
/*
* Get all users.
*/
Collection getAllUsers();
-
+
/*
* Get all connection ids.
*/
Set getAllConnectionIds();
-
+
/*
* Get all groups.
*/
Collection getAllGroups();
-
+
/*
* Add user to group. Create user and group if they don't exist.
*/
void addUserToGroup(User user, Group group);
-
+
/*
* Add user, creating if it doesn't exist.
*/
void addUser(User user);
-
+
/*
* Get group by id and direction
*/
Group getGroup(String id, Direction direction);
-
+
/*
* Find a User object by connectionId
*/
@@ -66,7 +66,7 @@ public interface GroupManager {
* Get all groups by connection id
*/
NavigableSet getGroupsByConnectionId(String connectionId);
-
+
/*
* Get cached group vector by connection id
*/
@@ -84,31 +84,31 @@ public interface GroupManager {
* handler is an Object, not a ConnectionInfo, to keep this interface decoupled from the NIO code.
*/
Reachability getReachability(Object connectionInfo);
-
+
/*
* Remove a user from a group.
- *
+ *
*/
void removeUserFromGroup(User user, Group group);
-
+
/*
* Remove a user and associated group memberships
- *
+ *
*/
- void removeUser(User user);
-
+ void removeUser(User user);
+
/*
- * Update group membership for a user, comparing the current group membership set with the provided group set.
- *
+ * Update group membership for a user, comparing the current group membership set with the provided group set.
+ *
*/
void updateGroups(User user, Set groups);
-
+
/*
* Get the set of groups that set a but not in set b
- *
+ *
*/
Set getGroupDiff(Set a, Set b);
-
+
/*
* Generate a Set from a list of group names.
*/
@@ -118,12 +118,12 @@ public interface GroupManager {
* Finds a Set from a list of existing groups.
*/
Set findGroups(List groupNames);
-
+
/*
* Explicity track user by connectionId
*/
void putUserByConnectionId(User user, String connectionId);
-
+
/*
* Set subscription for a user
*/
@@ -133,7 +133,7 @@ public interface GroupManager {
* Get subscription for a user
*/
RemoteSubscription getSubscriptionForUser(User user);
-
+
/*
* Register an authenticator
*/
@@ -151,10 +151,9 @@ public interface GroupManager {
/**
* Searches ldap groups (e.g., to help user configure items that require a group distinguished name reference)
- *
+ *
* @param groupNameFilter String optional filter applied to the LDAP cn attribute
- * @return List instances
- * @throws RemoteException
+ * @return List of @See LdapGroup instances
*/
List searchGroups(String groupNameFilter, boolean exactMatch);
@@ -165,7 +164,7 @@ public interface GroupManager {
LdapUser searchUser(String username);
String getGroupPrefix();
-
+
/*
* Make a copy of this user and its current group membership, so that it be independently managed. The copied user will be assigned a randomly generated connection id.
*/
@@ -173,13 +172,13 @@ public interface GroupManager {
/*
* Fill in Group details using the configured cache / DB
- *
+ *
*/
Group hydrateGroup(Group group);
-
+
/*
* make an LDAP connection to the configured server
- *
+ *
*/
DirContext connectLdap();
@@ -194,10 +193,10 @@ public interface GroupManager {
*
*/
boolean testLdap();
-
+
/*
- * Get the representation of a group vector, as a NavigableSet data structure
- *
+ * Get the representation of a group vector, as a NavigableSet data structure
+ *
*/
NavigableSet groupVectorToGroupSet(String groupVector);
diff --git a/src/takserver-common/src/main/java/com/bbn/marti/remote/service/RetentionQueryService.java b/src/takserver-common/src/main/java/com/bbn/marti/remote/service/RetentionQueryService.java
index bca0dafe..7c865f7c 100644
--- a/src/takserver-common/src/main/java/com/bbn/marti/remote/service/RetentionQueryService.java
+++ b/src/takserver-common/src/main/java/com/bbn/marti/remote/service/RetentionQueryService.java
@@ -5,6 +5,8 @@
import org.dom4j.Element;
+import com.bbn.marti.sync.model.Mission;
+
public interface RetentionQueryService {
void deleteMissionByExpiration(Long ttl);
@@ -17,4 +19,6 @@ public interface RetentionQueryService {
boolean restoreMission(Map files, Map properties, List groups, String defaultRole, List defaultPermissions);
void restoreCoT(String missionName, List files, List groups);
void restoreContent(String missionName, byte[] file, Element missionContent, List groups) throws Exception;
+ public List getAllMissions(boolean passwordProtected, boolean defaultRole, String tool) throws Exception;
+
}
diff --git a/src/takserver-common/src/main/java/tak/server/cache/TakIgniteSpringCacheManager.java b/src/takserver-common/src/main/java/tak/server/cache/TakIgniteSpringCacheManager.java
index 6b034d1f..5a32a499 100644
--- a/src/takserver-common/src/main/java/tak/server/cache/TakIgniteSpringCacheManager.java
+++ b/src/takserver-common/src/main/java/tak/server/cache/TakIgniteSpringCacheManager.java
@@ -5,6 +5,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -80,7 +83,11 @@ public TakIgniteSpringCacheManager(Ignite ignite) {
if (cache == null) {
CacheConfiguration