use chrono::{DateTime, Utc}; use global_placeholders::global; use macros_rs::{string, ternary, then}; use pmc::{file, helpers, process::Runner}; use prometheus::{Encoder, TextEncoder}; use psutil::process::{MemoryInfo, Process}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::convert::Infallible; use utoipa::ToSchema; use crate::daemon::{ api::{HTTP_COUNTER, HTTP_REQ_HISTOGRAM}, pid, }; use warp::{ hyper::body::Body, reject, reply::{self, json, Response}, Rejection, Reply, }; use std::{ env, fs::{self, File}, io::{self, BufRead, BufReader}, }; #[allow(dead_code)] #[derive(ToSchema)] #[schema(as = MemoryInfo)] pub(crate) struct DocMemoryInfo { rss: u64, vms: u64, #[cfg(target_os = "linux")] shared: u64, #[cfg(target_os = "linux")] text: u64, #[cfg(target_os = "linux")] data: u64, #[cfg(target_os = "macos")] page_faults: u64, #[cfg(target_os = "macos")] pageins: u64, } #[derive(Deserialize, ToSchema)] pub(crate) struct ActionBody { #[schema(example = "restart")] method: String, } #[derive(Serialize, ToSchema)] pub(crate) struct ActionResponse<'a> { #[schema(example = true)] done: bool, #[schema(example = "name")] action: &'a str, } #[derive(Serialize, ToSchema)] pub(crate) struct LogResponse { logs: Vec, } #[derive(Serialize, ToSchema)] pub struct MetricsRoot<'a> { pub version: Version<'a>, pub daemon: Daemon, } #[derive(Serialize, ToSchema)] pub struct Version<'a> { pub pkg: String, pub hash: &'a str, pub build_date: &'a str, pub target: &'a str, } #[derive(Serialize, ToSchema)] pub struct Daemon { pub pid: String, pub running: bool, pub uptime: String, pub process_count: usize, pub daemon_type: String, pub stats: Stats, } #[derive(Serialize, ToSchema)] pub struct Stats { pub memory_usage: String, pub cpu_percent: String, } #[inline] fn attempt(done: bool, method: &str) -> reply::Json { let data = json!(ActionResponse { done, action: ternary!(done, method, "DOES_NOT_EXIST") }); json(&data) } #[inline] #[utoipa::path(get, tag = "Daemon", path = "/prometheus", responses((status = 200, description = "Get prometheus metrics", body = String)))] pub async fn prometheus_handler() -> Result { let encoder = TextEncoder::new(); let mut buffer = Vec::::new(); let metric_families = prometheus::gather(); encoder.encode(&metric_families, &mut buffer).unwrap(); Ok(format!("{}", String::from_utf8(buffer.clone()).unwrap())) } #[inline] #[utoipa::path(get, path = "/list", tag = "Process", responses((status = 200, description = "List processes successfully", body = [ProcessItem])))] pub async fn list_handler() -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["list"]).start_timer(); let data = Runner::new().json(); HTTP_COUNTER.inc(); timer.observe_duration(); Ok(json(&data)) } #[inline] #[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}", params( ("id" = usize, Path, description = "Process id to get logs for", example = 0), ("kind" = String, Path, description = "Log output type", example = "out") ), responses( (status = 200, description = "Process logs of {type} fetched", body = LogResponse), (status = NOT_FOUND, description = "Process was not found", body = ErrorMessage) ) )] pub async fn log_handler(id: usize, kind: String) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer(); HTTP_COUNTER.inc(); match Runner::new().info(id) { Some(item) => { let log_file = match kind.as_str() { "out" | "stdout" => global!("pmc.logs.out", item.name.as_str()), "error" | "stderr" => global!("pmc.logs.error", item.name.as_str()), _ => global!("pmc.logs.out", item.name.as_str()), }; match File::open(log_file) { Ok(data) => { let reader = BufReader::new(data); let logs: Vec = reader.lines().collect::>().unwrap(); timer.observe_duration(); Ok(json(&json!(LogResponse { logs }))) } Err(_) => Ok(json(&json!(LogResponse { logs: vec![] }))), } } None => { timer.observe_duration(); Err(reject::not_found()) } } } #[inline] #[utoipa::path(get, tag = "Process", path = "/process/{id}/logs/{kind}/raw", params( ("id" = usize, Path, description = "Process id to get logs for", example = 0), ("kind" = String, Path, description = "Log output type", example = "out") ), responses( (status = 200, description = "Process logs of {type} fetched raw", body = String), (status = NOT_FOUND, description = "Process was not found", body = ErrorMessage) ) )] pub async fn log_handler_raw(id: usize, kind: String) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["log"]).start_timer(); HTTP_COUNTER.inc(); match Runner::new().info(id) { Some(item) => { let log_file = match kind.as_str() { "out" | "stdout" => global!("pmc.logs.out", item.name.as_str()), "error" | "stderr" => global!("pmc.logs.error", item.name.as_str()), _ => global!("pmc.logs.out", item.name.as_str()), }; let data = match fs::read_to_string(log_file) { Ok(data) => data, Err(err) => err.to_string(), }; timer.observe_duration(); Ok(Response::new(Body::from(data))) } None => { timer.observe_duration(); Err(reject::not_found()) } } } #[inline] #[utoipa::path(get, tag = "Process", path = "/process/{id}/info", params(("id" = usize, Path, description = "Process id to get information for", example = 0)), responses( (status = 200, description = "Current process info retrieved", body = ItemSingle), (status = NOT_FOUND, description = "Process was not found", body = ErrorMessage) ) )] pub async fn info_handler(id: usize) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["info"]).start_timer(); HTTP_COUNTER.inc(); match Runner::new().info(id) { Some(item) => { timer.observe_duration(); Ok(json(&item.clone().json())) } None => { timer.observe_duration(); Err(reject::not_found()) } } } #[inline] #[utoipa::path(post, tag = "Process", path = "/process/{id}/rename", request_body(content = String), params(("id" = usize, Path, description = "Process id to rename", example = 0)), responses( (status = 200, description = "Rename process successful", body = ActionResponse), (status = NOT_FOUND, description = "Process was not found", body = ErrorMessage) ) )] pub async fn rename_handler(id: usize, body: String) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["rename"]).start_timer(); let mut runner = Runner::new(); HTTP_COUNTER.inc(); if runner.exists(id) { let item = runner.get(id); item.rename(body.trim().replace("\n", "")); then!(item.running, item.restart()); timer.observe_duration(); Ok(attempt(true, "rename")) } else { timer.observe_duration(); Err(reject::not_found()) } } #[inline] #[utoipa::path(get, tag = "Process", path = "/process/{id}/env", params(("id" = usize, Path, description = "Process id to fetch env from", example = 0)), responses( (status = 200, description = "Current process env", body = HashMap), (status = NOT_FOUND, description = "Process was not found", body = ErrorMessage) ) )] pub async fn env_handler(id: usize) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["env"]).start_timer(); HTTP_COUNTER.inc(); match Runner::new().info(id) { Some(item) => { timer.observe_duration(); Ok(json(&item.clone().env)) } None => { timer.observe_duration(); Err(reject::not_found()) } } } #[inline] #[utoipa::path(post, tag = "Process", path = "/process/{id}/action", request_body = ActionBody, params(("id" = usize, Path, description = "Process id to run action on", example = 0)), responses( (status = 200, description = "Run action on process successful", body = ActionResponse), (status = NOT_FOUND, description = "Process/action was not found", body = ErrorMessage) ) )] pub async fn action_handler(id: usize, body: ActionBody) -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["action"]).start_timer(); let mut runner = Runner::new(); let method = body.method.as_str(); HTTP_COUNTER.inc(); match method { "start" | "restart" => { runner.get(id).restart(); timer.observe_duration(); Ok(attempt(true, method)) } "stop" | "kill" => { runner.get(id).stop(); timer.observe_duration(); Ok(attempt(true, method)) } "remove" | "delete" => { runner.remove(id); timer.observe_duration(); Ok(attempt(true, method)) } _ => { timer.observe_duration(); Err(reject::not_found()) } } } #[inline] #[utoipa::path(get, tag = "Daemon", path = "/metrics", responses((status = 200, description = "Get daemon metrics", body = MetricsRoot)))] pub async fn metrics_handler() -> Result { let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["metrics"]).start_timer(); let mut pid: Option = None; let mut cpu_percent: Option = None; let mut uptime: Option> = None; let mut memory_usage: Option = None; let mut runner: Runner = file::read_rmp(global!("pmc.dump")); HTTP_COUNTER.inc(); if pid::exists() { if let Ok(process_id) = pid::read() { if let Ok(mut process) = Process::new(process_id as u32) { pid = Some(process_id); uptime = Some(pid::uptime().unwrap()); memory_usage = process.memory_info().ok(); cpu_percent = process.cpu_percent().ok(); } } } let memory_usage = match memory_usage { Some(usage) => helpers::format_memory(usage.rss()), None => string!("0b"), }; let cpu_percent = match cpu_percent { Some(percent) => format!("{:.2}%", percent), None => string!("0%"), }; let uptime = match uptime { Some(uptime) => helpers::format_duration(uptime), None => string!("none"), }; let pid = match pid { Some(pid) => string!(pid), None => string!("n/a"), }; let response = json!(MetricsRoot { version: Version { pkg: format!("v{}", env!("CARGO_PKG_VERSION")), hash: env!("GIT_HASH"), build_date: env!("BUILD_DATE"), target: env!("PROFILE"), }, daemon: Daemon { pid: pid, running: pid::exists(), uptime: uptime, process_count: runner.count(), daemon_type: global!("pmc.daemon.kind"), stats: Stats { memory_usage, cpu_percent } } }); timer.observe_duration(); Ok(json(&response)) }