Skip to content

Commit

Permalink
Merge pull request #2 from crystal-china/fix_log
Browse files Browse the repository at this point in the history
Fix log
  • Loading branch information
zw963 authored Oct 27, 2024
2 parents 7a61fa2 + 082d64c commit 1890494
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .ameba.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,6 @@ Metrics/CyclomaticComplexity:
MaxComplexity: 10
Enabled: false
Severity: Warning

Naming/BlockParameterName:
Enabled: false
4 changes: 4 additions & 0 deletions scripts/test_log.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
while true; do
echo "$(date) - Hello, World!" 1>&2
sleep 1
done
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.4.3
version: 1.6.3

1 change: 0 additions & 1 deletion shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ targets:
development_dependencies:
ameba:
github: crystal-ameba/ameba
version: ~> 1.4.0

crystal: 1.6.2

Expand Down
5 changes: 0 additions & 5 deletions src/procodile/commands/start_command.cr
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,6 @@ module Procodile
if @options.start_supervisor == false
raise Error.new "Supervisor is not running and cannot be started because --no-supervisor is set"
else
# 这里调用了 Procodile::Supervisor.new, 并传入所有参数给 start 方法。
# Procodile::Supervisor.new(@config, @option).start do
# ...
# end

self.class.start_supervisor(@config, @options) do |supervisor|
unless @options.start_processes == false
supervisor.start_processes(
Expand Down
4 changes: 2 additions & 2 deletions src/procodile/commands/stop_command.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ module Procodile
if @options.wait_until_supervisor_stopped
puts "Waiting for supervisor to stop..."
loop do
sleep 1
sleep 1.second
if supervisor_running?
sleep 1
sleep 1.second
else
puts "Supervisor has stopped"
exit 0
Expand Down
38 changes: 24 additions & 14 deletions src/procodile/control_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,39 @@ require "./control_session"

module Procodile
class ControlServer
def self.start(supervisor)
sock_path = supervisor.config.sock_path
@supervisor : Procodile::Supervisor

spawn do
server = UNIXServer.new(sock_path)
def initialize(@supervisor)
end

Procodile.log nil, "control", "Listening at #{sock_path}"
def listen
sock_path = @supervisor.config.sock_path

loop do
client = server.accept
session = ControlSession.new(supervisor, client)
server = UNIXServer.new(sock_path)

while (line = client.gets)
if (response = session.receive_data(line.strip))
client.puts response
end
end
Procodile.log nil, "control", "Listening at #{sock_path}"

loop do
client = server.accept
session = ControlSession.new(@supervisor, client)

client.close
while (line = client.gets)
if (response = session.receive_data(line.strip))
client.puts response
end
end

client.close
end
ensure
FileUtils.rm_rf(sock_path.not_nil!)
end

def self.start(supervisor)
spawn do
socket = self.new(supervisor)
socket.listen
end
end
end
end
3 changes: 2 additions & 1 deletion src/procodile/instance.cr
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ module Procodile
@pid = process.pid

log_destination.close

File.write(pid_file_path, "#{@pid}\n")
@supervisor.add_instance(self, io)

Expand Down Expand Up @@ -270,7 +271,7 @@ module Procodile

spawn do
while running?
sleep 0.5
sleep 0.5.seconds
end
new_instance.start
end
Expand Down
4 changes: 2 additions & 2 deletions src/procodile/process.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Procodile
#
# Increase the instance index and return
#
def get_instance_id : Int32
def instance_id : Int32
MUTEX.synchronize do
@instance_index = 0 if @instance_index == 10000
@instance_index += 1
Expand Down Expand Up @@ -170,7 +170,7 @@ module Procodile
# :pid => 410794,
# }

Instance.new(supervisor, self, get_instance_id)
Instance.new(supervisor, self, instance_id)
end

#
Expand Down
13 changes: 12 additions & 1 deletion src/procodile/signal_handler.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# 信号处理程序
#
# 当一个信号被拦截后,首先,这个信号会被加入一个 QUEUE

module Procodile
class SignalHandler
QUEUE = [] of Signal
# 保存用户发送的信号.
QUEUE = [] of Signal

# 允许的信号
SIGNALS = {
Signal::TERM,
Signal::USR1,
Expand Down Expand Up @@ -33,6 +40,9 @@ module Procodile
end
end

# 关联信号和处理函数
#
# 这个在 SignalHandler 对象创建之后,被手动调用
def register(signal : Signal, &block)
@handlers[signal] ||= [] of Proc(Nil)
@handlers[signal] << block
Expand All @@ -42,6 +52,7 @@ module Procodile
@pipe[:writer].write(".".to_slice)
end

# 运行拦截的信号对应的处理函数
def handle
if (signal = QUEUE.shift?)
Procodile.log nil, "system", "Supervisor received #{signal} signal"
Expand Down
48 changes: 20 additions & 28 deletions src/procodile/supervisor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ module Procodile
class Supervisor
@tag : String?
getter config, processes, started_at, tag, run_options
property readers = {} of IO::FileDescriptor => Procodile::Instance

def initialize(@config : Procodile::Config, @run_options = Procodile::RunOptions.new)
@processes = {} of Procodile::Process => Array(Procodile::Instance)
@readers = {} of IO::FileDescriptor => Procodile::Instance

@signal_handler = SignalHandler.new
@signal_handler.register(Signal::TERM) { stop_supervisor }
Expand All @@ -35,19 +35,17 @@ module Procodile

ControlServer.start(self)

watch_for_output

@started_at = Time.local

after_start.call(self) # invoke supervisor.start_processes

watch_for_output

@started_at = Time.local
rescue e
Procodile.log nil, "system", "Error: #{e.class} (#{e.message})"
e.backtrace.each { |bt| Procodile.log nil, "system", "=> #{bt})" }
stop(SupervisorOptions.new(stop_supervisor: true))
ensure
loop { supervise; sleep 3 }
loop { supervise; sleep 3.seconds }
end

def start_processes(process_names : Array(String)?, options = SupervisorOptions.new) : Array(Procodile::Instance)
Expand Down Expand Up @@ -219,7 +217,7 @@ module Procodile
end

def add_reader(instance, io)
@readers[io] = instance
readers[io] = instance
@signal_handler.notice
end

Expand All @@ -234,49 +232,42 @@ module Procodile
def remove_instance(instance)
if @processes[instance.process]
@processes[instance.process].delete(instance)
@readers.delete(instance)
readers.delete(instance)
end
end

private def watch_for_output
sleep_chan = Channel(Nil).new
signal_handler_chan = Channel(Nil).new
listener_chan = Channel(Nil).new

spawn do
loop do
sleep 30
@signal_handler.handle
sleep_chan.send nil
end
end

spawn do
loop do
@signal_handler.handle
@signal_handler.pipe[:reader].read(Bytes.new(999)) rescue nil
signal_handler_chan.send nil
end
end

@readers.keys.each do |reader|
spawn do
buffer = {} of IO::FileDescriptor => String
buffer = {} of IO::FileDescriptor => String

readers.keys.each do |reader|
spawn do
loop do
@signal_handler.handle
Fiber.yield

if reader.read_byte.nil?
reader.close
if reader.closed?
buffer.delete(reader)
@readers.delete(reader)
readers.delete(reader)
else
str = reader.gets

next if str.nil?

buffer[reader] ||= ""
buffer[reader] += reader.gets_to_end
buffer[reader] += "#{str.chomp}\n"

while buffer[reader].index("\n")
line, buffer[reader] = buffer[reader].split("\n", 2)
if (instance = @readers[reader])
if (instance = readers[reader])
Procodile.log instance.process.log_color, instance.description, "=> ".color(instance.process.log_color) + line
else
Procodile.log nil, "unknown", buffer[reader]
Expand All @@ -291,8 +282,9 @@ module Procodile

spawn do
loop do
@signal_handler.handle

select
when sleep_chan.receive
when signal_handler_chan.receive
when listener_chan.receive
end
Expand Down

0 comments on commit 1890494

Please sign in to comment.