Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: biyooon, adding using zenohex #12

Merged
merged 13 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Config

config :giocci_relay_zenoh, :system_variables,
my_node_name: "relay1",
engine_node_name: ["engine1", "engine2", "engine3", "engine4", "engine5"],
client_node_name: "client1"

config :giocci_relay, :system_variables,
node_name: "relay",
node_ipaddr: "127.0.0.1",
cookie: "idkp",
inet_dist_listen_min: "9100",
inet_dist_listen_max: "9155",
my_process_name: "{:global, :relay}",
node_engine_name: "{:global, :engine}",
rpc_engine_name: "engine@127.0.0.1"

# import_config "#{config_env()}.exs"
13 changes: 10 additions & 3 deletions lib/giocci_relay/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ defmodule GiocciRelay.Application do

@impl true
def start(_type, _args) do
{my_process_name, _} = System.get_env("MY_PROCESS_NAME") |> Code.eval_string()
{node_engine_name, _} = System.get_env("NODE_ENGINE_NAME") |> Code.eval_string()
rpc_engine_name = System.get_env("RPC_ENGINE_NAME") |> String.to_atom()
{my_process_name, _} =
Application.get_env(:giocci_relay, :system_variables)[:my_process_name]
|> Code.eval_string()

{node_engine_name, _} =
Application.get_env(:giocci_relay, :system_variables)[:node_engine_name]
|> Code.eval_string()

rpc_engine_name =
Application.get_env(:giocci_relay, :system_variables)[:rpc_engine_name] |> String.to_atom()

children = [
# Starts a worker by calling: GiocciRelay.Worker.start_link(arg)
Expand Down
160 changes: 160 additions & 0 deletions lib/giocci_relay_zenoh.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
defmodule GiocciRelayZenoh do
@moduledoc """
## Examples

iex> GiocciRelayZenoh.setup_relay()

"""

use GenServer
require Logger
use Application

@doc """
最初に指定された数のEngineノードとのZenohコネクションを作成する(clientは一個想定
"""
def setup_relay() do
create_session(Application.get_env(:giocci_relay_zenoh, :system_variables)[:engine_node_name])
end

def start_link(engine_name) do
relay_name = Application.get_env(:giocci_relay_zenoh, :system_variables)[:my_node_name]
client_name = Application.get_env(:giocci_relay_zenoh, :system_variables)[:client_node_name]
## RelayのZenohセッションを起動
{:ok, session} = Zenohex.open()
## pub,subそれぞれのキーをたてる
{:ok, subscriber1} =
Zenohex.Session.declare_subscriber(session, "from/" <> engine_name <> "/to/" <> relay_name)

{:ok, publisher1} =
Zenohex.Session.declare_publisher(session, "from/" <> relay_name <> "/to/" <> client_name)

{:ok, subscriber2} =
Zenohex.Session.declare_subscriber(session, "from/" <> client_name <> "/to/" <> relay_name)

{:ok, publisher2} =
Zenohex.Session.declare_publisher(session, "from/" <> relay_name <> "/to/" <> engine_name)

id_string = client_name <> engine_name
## 状態として次の状態をもつ
state = %{
publisher_relay2client: publisher1,
subscriber_engine2relay: subscriber1,
callback_engine2relay: &callback_fromengine/2,
publisher_relay2engine: publisher2,
subscriber_client2relay: subscriber2,
callback_client2relay: &callback_fromclient/2,
id: String.to_atom(id_string),
session: session
}

## 上記の状態を保存する用のGenServerの起動
GenServer.start_link(__MODULE__, state, name: String.to_atom(id_string))
Logger.info("from/" <> relay_name <> "/to/" <> engine_name)
## subの開始
subscriber_loop_engine2relay(state)
subscriber_loop_client2relay(state)
{:ok, state}
end

## Clientから送られたデータを解析して、やりたい動作ごとに割り振るコールバック関数
def callback_fromclient(state, message) do
%{
key_expr: erkey,
value: message_intermediate,
kind: kind,
reference: reference
} = message

## msgをバイナリからlistにもどす
message_readable =
message_intermediate
|> String.trim()
|> Base.decode64!()
|> :erlang.binary_to_term()

case message_readable do
## module_execの場合
[_, _, _, :module_exec] = message_readable ->
Zenohex.Publisher.put(state.publisher_relay2engine, message_intermediate)

## module_saveの場合
[_, :module_save] = message_readable ->
Zenohex.Publisher.put(state.publisher_relay2engine, message_intermediate)

_ = message_readable ->
Logger.error(inspect("no match"))
end
end

## Engineから送られたメッセージを抽出し、Clientに返送
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fomfom 関数がどう動くかのコメントは @doc 使いましょう(他にも多数)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

これはコールバック関数なので@docにしないものだと認識しています

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fomfom あーーー 自分で定義したコールバック関数だったんですね

コールバック関数だから @doc にしないのでなく、できるなら @doc にしてください

自分定義のコールバック関数したことないのでやってみないと分からんのですけど、きっと自分定義の一意の関数名だったら @doc 使えるんじゃないかと思ってます

def callback_fromengine(state, message) do
%{
key_expr: erkey,
value: message_intermediate,
kind: kind,
reference: reference
} = message

Zenohex.Publisher.put(state.publisher_relay2client, message_intermediate)
end

## subをループするhandle info
def handle_info(:loop_engine2relay, state) do
subscriber_loop_engine2relay(state)
{:noreply, state}
end

## subをループするhandle info
def handle_info(:loop_client2relay, state) do
subscriber_loop_client2relay(state)
{:noreply, state}
end

defp create_session([]) do
:ok
end

## セッションを作る関数
defp create_session(engine_list) do
[engine_name | tail] = engine_list
start_link(engine_name)
create_session(tail)
end

## subを永続化する関数
defp subscriber_loop_engine2relay(state) do
case Zenohex.Subscriber.recv_timeout(state.subscriber_engine2relay, 10_000) do
{:ok, sample} ->
state.callback_engine2relay.(state, sample)
send(state.id, :loop_engine2relay)

{:error, :timeout} ->
send(state.id, :loop_engine2relay)

{:error, error} ->
Logger.error(inspect(error))

{_, _} ->
Logger.error("unexpected error")
end
end

## subを永続化する関数
defp subscriber_loop_client2relay(state) do
case Zenohex.Subscriber.recv_timeout(state.subscriber_client2relay, 10_000) do
{:ok, sample} ->
state.callback_client2relay.(state, sample)
send(state.id, :loop_client2relay)

{:error, :timeout} ->
send(state.id, :loop_client2relay)

{:error, error} ->
Logger.error(inspect(error))

{_, _} ->
Logger.error("unexpected error")
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule GiocciRelay.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:zenohex, "~>0.3.2"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
Expand Down
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
%{
"castore": {:hex, :castore, "1.0.10", "43bbeeac820f16c89f79721af1b3e092399b3a1ecc8df1a472738fd853574911", [:mix], [], "hexpm", "1b0b7ea14d889d9ea21202c43a4fa015eb913021cb535e8ed91946f4b77a8848"},
"rustler_precompiled": {:hex, :rustler_precompiled, "0.8.2", "5f25cbe220a8fac3e7ad62e6f950fcdca5a5a5f8501835d2823e8c74bf4268d5", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "63d1bd5f8e23096d1ff851839923162096364bac8656a4a3c00d1fff8e83ee0a"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"zenohex": {:hex, :zenohex, "0.3.2", "d94f7ec91f8163508d63fe040f55178a6c4150d90bed9052ee8fb9da157d6175", [:mix], [{:rustler, "0.34.0", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.8.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}, {:toml, "~> 0.7", [hex: :toml, repo: "hexpm", optional: false]}], "hexpm", "c0f44c0a46b5e8463c1de12339c0913f1fab903c1d1b844ff4fad1a9436a8dd7"},
}
Loading