Skip to content

Commit

Permalink
Merge pull request #112 from Burning1020/bugfix-task
Browse files Browse the repository at this point in the history
task: some bugfixs
  • Loading branch information
flyflypeng authored Jan 24, 2024
2 parents 9877880 + 256a492 commit ac2c98a
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 47 deletions.
8 changes: 4 additions & 4 deletions vmm/sandbox/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions vmm/task/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

191 changes: 154 additions & 37 deletions vmm/task/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ limitations under the License.
*/

use std::{
convert::TryFrom, os::unix::prelude::ExitStatusExt, path::Path, process::ExitStatus, sync::Arc,
convert::TryFrom, io::SeekFrom, os::unix::prelude::ExitStatusExt, path::Path,
process::ExitStatus, sync::Arc,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -47,8 +48,8 @@ use oci_spec::runtime::{LinuxResources, Process, Spec};
use runc::{options::GlobalOpts, Runc, Spawner};
use serde::Deserialize;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader},
fs::{remove_file, File},
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader},
process::Command,
sync::Mutex,
};
Expand Down Expand Up @@ -245,28 +246,67 @@ impl KuasarFactory {
}

// runtime_error will read the OCI runtime logfile retrieving OCI runtime error
pub async fn runtime_error(bundle: &str, e: runc::error::Error, msg: &str) -> Error {
pub async fn runtime_error(bundle: &str, r_err: runc::error::Error, msg: &str) -> Error {
match get_last_runtime_error(bundle).await {
Err(e) => other!(
"{}: unable to retrieve OCI runtime error ({}): {}",
msg,
e,
r_err
),
Ok(rt_msg) => {
if rt_msg.is_empty() {
other!("{}: empty msg in log file: {}", msg, r_err)
} else {
other!("{}: {}", msg, rt_msg)
}
}
}
}

async fn get_last_runtime_error(bundle: &str) -> Result<String> {
let log_path = Path::new(bundle).join("log.json");
let mut rt_msg = String::new();
match File::open(Path::new(bundle).join("log.json")).await {
Err(err) => other!("{}: unable to open OCI runtime log file){}", msg, err),
match File::open(log_path).await {
Err(e) => Err(other!("unable to open OCI runtime log file: {}", e)),
Ok(file) => {
let mut lines = BufReader::new(file).lines();
while let Ok(Some(line)) = lines.next_line().await {
// Retrieve the last runtime error
match serde_json::from_str::<Log>(&line) {
Err(err) => return other!("{}: unable to parse log msg: {}", msg, err),
Ok(log) => {
if log.level == "error" {
rt_msg = log.msg.trim().to_string();
let mut reader = BufReader::new(file);
let file_size = reader
.seek(SeekFrom::End(0))
.await
.map_err(other_error!(e, "error seek from start"))?;

let mut pre_buffer: Option<Vec<u8>> = None;
let mut buffer = Vec::new();

for offset in (0..file_size).rev() {
if offset == 0 {
break;
}
reader
.seek(SeekFrom::Start(offset))
.await
.map_err(other_error!(e, "error seek"))?;
let result = reader
.read_until(b'\n', &mut buffer)
.await
.map_err(other_error!(e, "reading from cursor fail"))?;
if result == 1 && pre_buffer.is_some() {
let line = String::from_utf8_lossy(&pre_buffer.unwrap()).into_owned();
match serde_json::from_str::<Log>(&line) {
Err(e) => return Err(other!("unable to parse log msg({}): {}", line, e)),
Ok(log) => {
if log.level == "error" {
rt_msg = log.msg.trim().to_string();
break;
}
}
}
}
pre_buffer = Some(buffer.clone());
buffer.clear();
}
if !rt_msg.is_empty() {
other!("{}: {}", msg, rt_msg)
} else {
other!("{}: (no OCI runtime error in logfile) {}", msg, e)
}
Ok(rt_msg)
}
}
}
Expand Down Expand Up @@ -298,17 +338,17 @@ impl ProcessFactory<ExecProcess> for KuasarExecFactory {
spec: p,
exit_signal: Default::default(),
}),
stdin: Arc::new(Mutex::new(None)),
})
}
}

#[async_trait]
impl ProcessLifecycle<InitProcess> for KuasarInitLifecycle {
async fn start(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
self.runtime
.start(p.id.as_str())
.await
.map_err(other_error!(e, "failed start"))?;
if let Err(e) = self.runtime.start(p.id.as_str()).await {
return Err(runtime_error(&p.lifecycle.bundle, e, "OCI runtime start failed").await);
}
p.state = Status::RUNNING;
Ok(())
}
Expand All @@ -319,31 +359,37 @@ impl ProcessLifecycle<InitProcess> for KuasarInitLifecycle {
signal: u32,
all: bool,
) -> containerd_shim::Result<()> {
self.runtime
if let Err(r_err) = self
.runtime
.kill(
p.id.as_str(),
signal,
Some(&runc::options::KillOpts { all }),
)
.await
.map_err(|e| check_kill_error(e.to_string()))
{
let e = runtime_error(&p.lifecycle.bundle, r_err, "OCI runtime kill failed").await;

return Err(check_kill_error(e.to_string()));
}
Ok(())
}

async fn delete(&self, p: &mut InitProcess) -> containerd_shim::Result<()> {
self.runtime
if let Err(e) = self
.runtime
.delete(
p.id.as_str(),
Some(&runc::options::DeleteOpts { force: true }),
)
.await
.or_else(|e| {
if !e.to_string().to_lowercase().contains("does not exist") {
Err(e)
} else {
Ok(())
}
})
.map_err(other_error!(e, "failed delete"))?;
{
if !e.to_string().to_lowercase().contains("does not exist") {
return Err(
runtime_error(&p.lifecycle.bundle, e, "OCI runtime delete failed").await,
);
}
}
self.exit_signal.signal();
Ok(())
}
Expand Down Expand Up @@ -416,7 +462,8 @@ impl KuasarInitLifecycle {
impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
async fn start(&self, p: &mut ExecProcess) -> containerd_shim::Result<()> {
rescan_pci_bus().await?;
let pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", &p.id));
let bundle = self.bundle.to_string();
let pid_path = Path::new(&bundle).join(format!("{}.pid", &p.id));
let mut exec_opts = runc::options::ExecOpts {
io: None,
pid_file: Some(pid_path.to_owned()),
Expand All @@ -441,7 +488,7 @@ impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
if let Some(s) = socket {
s.clean().await;
}
return Err(other!("failed to start runc exec: {}", e));
return Err(runtime_error(&bundle, e, "OCI runtime exec failed").await);
}
copy_io_or_console(p, socket, pio, p.lifecycle.exit_signal.clone()).await?;
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
Expand Down Expand Up @@ -472,8 +519,10 @@ impl ProcessLifecycle<ExecProcess> for KuasarExecLifecycle {
}
}

async fn delete(&self, _p: &mut ExecProcess) -> containerd_shim::Result<()> {
async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
self.exit_signal.signal();
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
remove_file(exec_pid_path).await.unwrap_or_default();
Ok(())
}

Expand Down Expand Up @@ -613,3 +662,71 @@ pub fn check_kill_error(emsg: String) -> Error {
other!("unknown error after kill {}", emsg)
}
}

#[cfg(test)]
mod tests {
use std::path::Path;

use containerd_shim::util::{mkdir, write_str_to_file};
use tokio::fs::remove_dir_all;

use crate::container::runtime_error;

#[tokio::test]
async fn test_runtime_error_with_logfile() {
let empty_err = runc::error::Error::NotFound;
let log_json = "\
{\"level\":\"info\",\"msg\":\"hello word\",\"time\":\"2022-11-25\"}\n\
{\"level\":\"error\",\"msg\":\"failed error\",\"time\":\"2022-11-26\"}\n\
{\"level\":\"error\",\"msg\":\"panic\",\"time\":\"2022-11-27\"}\n\
{\"level\":\"info\",\"msg\":\"program exit\",\"time\":\"2024-1-24\"}\n\
";
let test_dir = "/tmp/kuasar-test_runtime_error_with_logfile";
let _ = mkdir(test_dir, 0o711).await;
let test_log_file = Path::new(test_dir).join("log.json");
write_str_to_file(test_log_file.as_path(), log_json)
.await
.expect("write log json should not be error");

let expected_msg = "panic";
let actual_err = runtime_error(
test_dir,
empty_err,
"test_runtime_error_with_logfile failed",
)
.await;
remove_dir_all(test_dir).await.expect("remove test dir");
assert!(
actual_err.to_string().contains(expected_msg),
"actual error \"{}\" should contains \"{}\"",
actual_err.to_string(),
expected_msg
);
}

#[tokio::test]
async fn test_runtime_error_without_logfile() {
let empty_err = runc::error::Error::NotFound;
let test_dir = "/tmp/kuasar-test_runtime_error_without_logfile";
let _ = remove_dir_all(test_dir).await;
assert!(
!Path::new(test_dir).exists(),
"{} should not exist",
test_dir
);

let expected_msg = "Unable to locate the runc";
let actual_err = runtime_error(
test_dir,
empty_err,
"test_runtime_error_without_logfile failed",
)
.await;
assert!(
actual_err.to_string().contains(expected_msg),
"actual error \"{}\" should contains \"{}\"",
actual_err.to_string(),
expected_msg
);
}
}
17 changes: 15 additions & 2 deletions vmm/task/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub(crate) async fn copy_io_or_console<P>(
) -> Result<()> {
if p.stdio.terminal {
if let Some(console_socket) = socket {
let console_result = copy_console(&console_socket, &p.stdio, exit_signal).await;
let console_result = copy_console(p, &console_socket, &p.stdio, exit_signal).await;
console_socket.clean().await;
match console_result {
Ok(c) => {
Expand All @@ -144,7 +144,8 @@ pub(crate) async fn copy_io_or_console<P>(
Ok(())
}

async fn copy_console(
async fn copy_console<P>(
p: &ProcessTemplate<P>,
console_socket: &ConsoleSocket,
stdio: &Stdio,
exit_signal: Arc<ExitSignal>,
Expand All @@ -155,6 +156,18 @@ async fn copy_console(
let f = unsafe { File::from_raw_fd(fd) };
if !stdio.stdin.is_empty() {
debug!("copy_console: pipe stdin to console");

let stdin_clone = stdio.stdin.clone();
let stdin_w = p.stdin.clone();
// open the write side to make sure read side unblock, as open write side
// will block too, open it in another thread
tokio::spawn(async move {
if let Ok(stdin_file) = OpenOptions::new().write(true).open(stdin_clone).await {
let mut lock_guard = stdin_w.lock().await;
*lock_guard = Some(stdin_file);
}
});

let console_stdin = f
.try_clone()
.await
Expand Down

0 comments on commit ac2c98a

Please sign in to comment.