Skip to content

Commit

Permalink
Merge pull request #12 from biyooon-ex/biyooon-keiya
Browse files Browse the repository at this point in the history
Feat: biyooon, adding using zenohex
  • Loading branch information
takasehideki authored Dec 30, 2024
2 parents 22c97eb + 8512b8c commit 0b3e6df
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 3 deletions.
18 changes: 18 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Config

config :giocci_relay_zenoh, :system_variables,
my_node_name: "relay1",
engine_node_name: ["engine1", "engine2", "engine3", "engine4", "engine5"],
client_node_name: "client1"

config :giocci_relay, :system_variables,
node_name: "relay",
node_ipaddr: "127.0.0.1",
cookie: "idkp",
inet_dist_listen_min: "9100",
inet_dist_listen_max: "9155",
my_process_name: "{:global, :relay}",
node_engine_name: "{:global, :engine}",
rpc_engine_name: "engine@127.0.0.1"

# import_config "#{config_env()}.exs"
13 changes: 10 additions & 3 deletions lib/giocci_relay/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ defmodule GiocciRelay.Application do

@impl true
def start(_type, _args) do
{my_process_name, _} = System.get_env("MY_PROCESS_NAME") |> Code.eval_string()
{node_engine_name, _} = System.get_env("NODE_ENGINE_NAME") |> Code.eval_string()
rpc_engine_name = System.get_env("RPC_ENGINE_NAME") |> String.to_atom()
{my_process_name, _} =
Application.get_env(:giocci_relay, :system_variables)[:my_process_name]
|> Code.eval_string()

{node_engine_name, _} =
Application.get_env(:giocci_relay, :system_variables)[:node_engine_name]
|> Code.eval_string()

rpc_engine_name =
Application.get_env(:giocci_relay, :system_variables)[:rpc_engine_name] |> String.to_atom()

children = [
# Starts a worker by calling: GiocciRelay.Worker.start_link(arg)
Expand Down
160 changes: 160 additions & 0 deletions lib/giocci_relay_zenoh.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
defmodule GiocciRelayZenoh do
@moduledoc """
## Examples
iex> GiocciRelayZenoh.setup_relay()
"""

use GenServer
require Logger
use Application

@doc """
最初に指定された数のEngineノードとのZenohコネクションを作成する(clientは一個想定
"""
def setup_relay() do
create_session(Application.get_env(:giocci_relay_zenoh, :system_variables)[:engine_node_name])
end

def start_link(engine_name) do
relay_name = Application.get_env(:giocci_relay_zenoh, :system_variables)[:my_node_name]
client_name = Application.get_env(:giocci_relay_zenoh, :system_variables)[:client_node_name]
## RelayのZenohセッションを起動
{:ok, session} = Zenohex.open()
## pub,subそれぞれのキーをたてる
{:ok, subscriber1} =
Zenohex.Session.declare_subscriber(session, "from/" <> engine_name <> "/to/" <> relay_name)

{:ok, publisher1} =
Zenohex.Session.declare_publisher(session, "from/" <> relay_name <> "/to/" <> client_name)

{:ok, subscriber2} =
Zenohex.Session.declare_subscriber(session, "from/" <> client_name <> "/to/" <> relay_name)

{:ok, publisher2} =
Zenohex.Session.declare_publisher(session, "from/" <> relay_name <> "/to/" <> engine_name)

id_string = client_name <> engine_name
## 状態として次の状態をもつ
state = %{
publisher_relay2client: publisher1,
subscriber_engine2relay: subscriber1,
callback_engine2relay: &callback_fromengine/2,
publisher_relay2engine: publisher2,
subscriber_client2relay: subscriber2,
callback_client2relay: &callback_fromclient/2,
id: String.to_atom(id_string),
session: session
}

## 上記の状態を保存する用のGenServerの起動
GenServer.start_link(__MODULE__, state, name: String.to_atom(id_string))
Logger.info("from/" <> relay_name <> "/to/" <> engine_name)
## subの開始
subscriber_loop_engine2relay(state)
subscriber_loop_client2relay(state)
{:ok, state}
end

## Clientから送られたデータを解析して、やりたい動作ごとに割り振るコールバック関数
def callback_fromclient(state, message) do
%{
key_expr: erkey,
value: message_intermediate,
kind: kind,
reference: reference
} = message

## msgをバイナリからlistにもどす
message_readable =
message_intermediate
|> String.trim()
|> Base.decode64!()
|> :erlang.binary_to_term()

case message_readable do
## module_execの場合
[_, _, _, :module_exec] = message_readable ->
Zenohex.Publisher.put(state.publisher_relay2engine, message_intermediate)

## module_saveの場合
[_, :module_save] = message_readable ->
Zenohex.Publisher.put(state.publisher_relay2engine, message_intermediate)

_ = message_readable ->
Logger.error(inspect("no match"))
end
end

## Engineから送られたメッセージを抽出し、Clientに返送
def callback_fromengine(state, message) do
%{
key_expr: erkey,
value: message_intermediate,
kind: kind,
reference: reference
} = message

Zenohex.Publisher.put(state.publisher_relay2client, message_intermediate)
end

## subをループするhandle info
def handle_info(:loop_engine2relay, state) do
subscriber_loop_engine2relay(state)
{:noreply, state}
end

## subをループするhandle info
def handle_info(:loop_client2relay, state) do
subscriber_loop_client2relay(state)
{:noreply, state}
end

defp create_session([]) do
:ok
end

## セッションを作る関数
defp create_session(engine_list) do
[engine_name | tail] = engine_list
start_link(engine_name)
create_session(tail)
end

## subを永続化する関数
defp subscriber_loop_engine2relay(state) do
case Zenohex.Subscriber.recv_timeout(state.subscriber_engine2relay, 10_000) do
{:ok, sample} ->
state.callback_engine2relay.(state, sample)
send(state.id, :loop_engine2relay)

{:error, :timeout} ->
send(state.id, :loop_engine2relay)

{:error, error} ->
Logger.error(inspect(error))

{_, _} ->
Logger.error("unexpected error")
end
end

## subを永続化する関数
defp subscriber_loop_client2relay(state) do
case Zenohex.Subscriber.recv_timeout(state.subscriber_client2relay, 10_000) do
{:ok, sample} ->
state.callback_client2relay.(state, sample)
send(state.id, :loop_client2relay)

{:error, :timeout} ->
send(state.id, :loop_client2relay)

{:error, error} ->
Logger.error(inspect(error))

{_, _} ->
Logger.error("unexpected error")
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule GiocciRelay.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:zenohex, "~>0.3.2"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
Expand Down
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
%{
"castore": {:hex, :castore, "1.0.10", "43bbeeac820f16c89f79721af1b3e092399b3a1ecc8df1a472738fd853574911", [:mix], [], "hexpm", "1b0b7ea14d889d9ea21202c43a4fa015eb913021cb535e8ed91946f4b77a8848"},
"rustler_precompiled": {:hex, :rustler_precompiled, "0.8.2", "5f25cbe220a8fac3e7ad62e6f950fcdca5a5a5f8501835d2823e8c74bf4268d5", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "63d1bd5f8e23096d1ff851839923162096364bac8656a4a3c00d1fff8e83ee0a"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"zenohex": {:hex, :zenohex, "0.3.2", "d94f7ec91f8163508d63fe040f55178a6c4150d90bed9052ee8fb9da157d6175", [:mix], [{:rustler, "0.34.0", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.8.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}, {:toml, "~> 0.7", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "c0f44c0a46b5e8463c1de12339c0913f1fab903c1d1b844ff4fad1a9436a8dd7"},
}

0 comments on commit 0b3e6df

Please sign in to comment.