Source code for ndtools.fun_binary_graph

from typing import Dict, Tuple, Any, Iterable, Optional, List
import networkx as nx

def _pairwise(seq: List[str]):
    for i in range(len(seq) - 1):
        yield seq[i], seq[i + 1]

def _edge_ids_on_path(G: nx.Graph, node_path: Optional[List[str]]) -> Optional[List[Optional[Any]]]:
    """Return [eid1, eid2, ...] along the node_path; None if no path or eid missing."""
    if not node_path:
        return None
    is_multi = isinstance(G, (nx.MultiGraph, nx.MultiDiGraph))
    eids: List[Optional[Any]] = []
    for u, v in _pairwise(node_path):
        eid = None
        if is_multi:
            data_dict = G.get_edge_data(u, v)  # {key: attrdict}
            if data_dict:
                _, d = next(iter(data_dict.items()))
                eid = d.get("eid")
        else:
            d = G.get_edge_data(u, v)
            if d:
                eid = d.get("eid")
        eids.append(eid)
    return eids

def _node_edge_chain(G: nx.Graph, node_path: Optional[List[str]]) -> Optional[List[Any]]:
    """['n1','n3','n6'] -> ['n1', eid(n1,n3), 'n3', eid(n3,n6), 'n6'] (uses 'eid', falls back to 'u->v')."""
    if not node_path:
        return None
    chain: List[Any] = [node_path[0]]
    is_multi = isinstance(G, (nx.MultiGraph, nx.MultiDiGraph))
    for u, v in _pairwise(node_path):
        eid = None
        if is_multi:
            data_dict = G.get_edge_data(u, v)
            if data_dict:
                _, d = next(iter(data_dict.items()))
                eid = d.get("eid")
        else:
            d = G.get_edge_data(u, v)
            if d:
                eid = d.get("eid")
        chain.append(eid if eid is not None else f"{u}->{v}")
        chain.append(v)
    return chain



[docs] def eval_global_conn_k( comps_state: Dict[str, int], G_base: nx.Graph ) -> Tuple[int, str, None]: """ Build subgraph H from G_base according to component states: - If comps_state[node_id] == 0: remove all edges incident to that node. - Only include edges whose comps_state[eid] == 1. - Nodes remain present; connectivity is determined by remaining edges. Returns: (k_value, k_value, None) """ # Collect node-off set and the set of edge IDs marked on/off node_off = {cid for cid, st in comps_state.items() if st == 0 and cid in G_base.nodes} edge_on = {cid for cid, st in comps_state.items() if st == 1} # keep only these edges # (unknown IDs in comps_state are ignored) # Build subgraph H = nx.Graph() H.add_nodes_from(G_base.nodes(data=True)) for u, v, data in G_base.edges(data=True): eid = data.get("eid") # skip if incident to a node that's off if u in node_off or v in node_off: continue # include only edges explicitly set to 1 if eid is not None and eid in edge_on: H.add_edge(u, v, **data) # Compute global vertex connectivity k_val = nx.node_connectivity(H) if H.number_of_nodes() > 1 else 0 return k_val, k_val, None
from typing import Dict, Tuple, Any, Iterable, Optional, List import networkx as nx
[docs] def _pairwise(seq: List[str]): for i in range(len(seq) - 1): yield seq[i], seq[i + 1]
[docs] def _edge_ids_on_path(G: nx.Graph, node_path: Optional[List[str]]) -> Optional[List[Optional[Any]]]: """Return [eid1, eid2, ...] along the node_path; None if no path or eid missing.""" if not node_path: return None is_multi = isinstance(G, (nx.MultiGraph, nx.MultiDiGraph)) eids: List[Optional[Any]] = [] for u, v in _pairwise(node_path): eid = None if is_multi: data_dict = G.get_edge_data(u, v) # {key: attrdict} if data_dict: _, d = next(iter(data_dict.items())) eid = d.get("eid") else: d = G.get_edge_data(u, v) if d: eid = d.get("eid") eids.append(eid) return eids
[docs] def _node_edge_chain(G: nx.Graph, node_path: Optional[List[str]]) -> Optional[List[Any]]: """['n1','n3','n6'] -> ['n1', eid(n1,n3), 'n3', eid(n3,n6), 'n6'] (uses 'eid', falls back to 'u->v').""" if not node_path: return None chain: List[Any] = [node_path[0]] is_multi = isinstance(G, (nx.MultiGraph, nx.MultiDiGraph)) for u, v in _pairwise(node_path): eid = None if is_multi: data_dict = G.get_edge_data(u, v) if data_dict: _, d = next(iter(data_dict.items())) eid = d.get("eid") else: d = G.get_edge_data(u, v) if d: eid = d.get("eid") chain.append(eid if eid is not None else f"{u}->{v}") chain.append(v) return chain
[docs] def eval_travel_time_to_nearest( comps_state: Dict[str, int], G_base: nx.Graph, origin: str, destinations: Iterable[str], *, avg_speed: float = 60.0, # distance units per hour (e.g., km/h) target_max: float = 0.5, # allowed extra time over baseline, in HOURS length_attr: str = "length", # edge length attribute (e.g., km) ) -> Tuple[Optional[float], str, Dict[str, Any]]: dest_set = set(destinations) if not dest_set: return None, 0, {"reason": "no destinations provided"} # ----- Baseline graph (all edges that have length_attr) ----- Hb = G_base.__class__() Hb.add_nodes_from(G_base.nodes(data=True)) for u, v, data in G_base.edges(data=True): if length_attr in data and data[length_attr] is not None: Hb.add_edge(u, v, **data) if not Hb.has_node(origin): return None, 0, {"reason": "origin_missing_in_baseline"} cand_b = [d for d in dest_set if Hb.has_node(d)] if not cand_b: return None, 0, {"reason": "no_destinations_in_baseline"} try: dist_b_map, paths_b = nx.single_source_dijkstra(Hb, source=origin, weight=length_attr) except nx.NetworkXNoPath: return None, 0, {"reason": "no_baseline_path"} reach_b = [(d, dist_b_map[d]) for d in cand_b if d in dist_b_map] if not reach_b: return None, 0, {"reason": "no_baseline_destination_reachable"} dest_b, dist_b = min(reach_b, key=lambda x: x[1]) time_b = dist_b / float(avg_speed) # hours path_b_nodes = paths_b.get(dest_b) path_b_chain = _node_edge_chain(Hb, path_b_nodes) path_b_edges = _edge_ids_on_path(Hb, path_b_nodes) # ----- Filtered graph (apply comps_state) ----- node_off = {cid for cid, st in comps_state.items() if st == 0 and cid in G_base.nodes} edge_on = {cid for cid, st in comps_state.items() if st == 1} if origin in node_off: return None, 0, { "reason": "origin_off", "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, } H = G_base.__class__() H.add_nodes_from(G_base.nodes(data=True)) for u, v, data in G_base.edges(data=True): if u in node_off or v in node_off: continue eid = data.get("eid") if eid is not None and eid in edge_on: if length_attr in data and data[length_attr] is not None: H.add_edge(u, v, **data) if not H.has_node(origin): return None, 0, { "reason": "origin_missing_in_filtered", "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, } cand_f = [d for d in dest_set if H.has_node(d) and d not in node_off] if not cand_f: return None, 0, { "reason": "no_destinations_in_filtered", "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, } try: dist_f_map, paths_f = nx.single_source_dijkstra(H, source=origin, weight=length_attr) except nx.NetworkXNoPath: return None, 0, { "reason": "no_path_filtered", "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, } reach_f = [(d, dist_f_map[d]) for d in cand_f if d in dist_f_map] if not reach_f: return None, 0, { "reason": "no_destination_reachable_filtered", "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, } dest_f, dist_f = min(reach_f, key=lambda x: x[1]) time_f = dist_f / float(avg_speed) # hours path_f_nodes = paths_f.get(dest_f) path_f_chain = _node_edge_chain(H, path_f_nodes) path_f_edges = _edge_ids_on_path(H, path_f_nodes) # ----- Threshold check ----- if isinstance(target_max, (int, float)): target_max = [target_max] time_threshold = [time_b + float(tm) for tm in target_max] sys_st = next((i for i, t in enumerate(time_threshold) if t < time_f), len(time_threshold)) info = { # filtered "dest_reached": dest_f, "dist_filtered": dist_f, "time_filtered_hours": time_f, "path_filtered_nodes": path_f_nodes, "path_filtered_edges": path_f_edges, "path_filtered_chain": path_f_chain, # baseline "baseline_dest": dest_b, "baseline_dist": dist_b, "baseline_time_hours": time_b, "baseline_path_nodes": path_b_nodes, "baseline_path_edges": path_b_edges, "baseline_path_chain": path_b_chain, # params "avg_speed_per_hour": avg_speed, "allowed_extra_time_hours": target_max, "time_threshold_hours": time_threshold, "reached_any": True, } return time_f, sys_st, info