dune_core/lib.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
#![feature(let_chains)]
#![doc = include_str!("../../README.md")]
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::path::PathBuf;
use cfg::{Config, Endpoint, Interface, Link, Node, Phynodes};
// use graphrs::{Graph, GraphSpecs};
use serde::{Deserialize, Serialize};
pub mod cfg;
type NodeId = String;
pub enum SetupStep {
Pre(Vec<String>),
Nodes,
Links,
Post,
Processes,
PreDown,
Down,
}
pub struct ExperimentalSetup {
pub pre: Vec<String>,
pub nodes: Vec<String>,
pub links: Vec<String>,
pub post: Vec<String>,
pub processes: Vec<String>,
pub pre_down: Vec<String>,
pub down: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Dune {
pub nodes: HashMap<NodeId, Node>,
// topo: Graph<NodeId, ()>,
pub infra: Phynodes,
allocated: bool,
}
impl Dune {
pub fn init(cfg: &PathBuf) -> Self {
let mut dune = Self::new(cfg);
dune.allocate();
dune
}
pub fn new(cfg: &PathBuf) -> Self {
fn load_interface(
nodes: &mut HashMap<String, Node>,
link: &Link,
cfg: &Config,
idx: usize,
) {
assert!(idx == 0 || idx == 1);
if let Some(node) = nodes.get_mut(&link.endpoints[idx].node) {
let iface = Interface::new(&cfg.topology.defaults.links, &link, idx);
// TODO: check that interface is not defined multiple times
node.interfaces
.get_or_insert_with(HashMap::new)
.insert(link.endpoints[idx].interface.clone(), iface);
}
}
// Load DUNE's configuration
let cfg = Config::new(cfg.to_str().unwrap());
// let mut topo = Graph::<NodeId, _>::new(GraphSpecs::multi_directed());
// Collect and expand Nodes data
let mut nodes = cfg
.topology
.nodes
.iter()
.map(|(name, config)| {
// topo.add_node(graphrs::Node::from_name(name.clone()));
(
name.clone(),
Node::new(&cfg.topology.defaults.nodes, &config, name),
)
})
.collect::<HashMap<String, Node>>();
// Collect and expand Links data
cfg.topology.links.iter().for_each(|link| {
(0..2).into_iter().for_each(|idx| {
load_interface(&mut nodes, link, &cfg, idx);
})
});
Self {
nodes,
// topo,
infra: cfg.infrastructure,
allocated: false,
}
}
/// Allocate requested cores to physical cores, if possible given the provided infrastructure.
pub fn allocate(&mut self) {
if !self.allocated {
self.allocated = true;
// Sort nodes by decreasing number of cores to allocate
let mut cores: BTreeMap<usize, BTreeSet<NodeId>> = BTreeMap::new();
self.nodes.iter().for_each(|(node_id, node)| {
cores
.entry(node.cores())
.and_modify(|entry| {
let _ = entry.insert(node_id.clone());
})
.or_insert(BTreeSet::from([node_id.clone()]));
});
assert!(
cores.iter().fold(0, |acc, (cores, _)| acc + cores) < self.infra.cores(),
"More core booked than available in the defined infrastructure. Please, fix your configuration file."
);
let mut core_pool = self.infra.clone();
cores.iter().rev().for_each(|(_, nodes)| {
// For each node, reserve the necessary amount of cores then allocate them
nodes.iter().for_each(|node_id| {
if let Some(node) = self.nodes.get_mut(node_id) {
let n = node.cores();
// Search for at least n cores located on the same NUMA node for locality.
// This ensures that every Pinned processes of a Node are located on the same NUMA node.
// The strategy is dummy: we fill servers in order.
for (name, phynode) in core_pool.nodes.iter_mut() {
if let Some(available) = phynode
.cores
.iter_mut()
.find(|available| available.len() >= n)
{
node.cores
.iter_mut()
.for_each(|(_, core)| *core = Some(available.pop().unwrap()));
node.phynode = Some(name.clone());
break;
}
}
}
});
});
}
}
pub fn phynodes(&self) -> Vec<NodeId> {
self.infra
.nodes
.iter()
.map(|(phynode, _)| phynode.clone())
.collect::<Vec<NodeId>>()
}
pub fn phynode_setup(&self, phynode: NodeId) {
self.nodes.iter().for_each(|(name, node)| {
if let Some(node_phynode) = &node.phynode
&& node_phynode == &phynode
{
node.setup();
}
})
}
// fn phynode(&mut self, node_id: NodeId) -> &String {
// if !self.allocated {
// self.allocate();
// }
// &self.nodes[&node_id].phynode
// }
// fn phynode_exec(&self, pynode: String, step: SetupStep, cmd: String) {}
// fn node_exec(&mut self, node_id: NodeId, step: SetupStep, cmd: String) {
// let phynode = self.phynode(node_id);
// }
}