Skip to content

SGD

Build_SGD_graph

Build_SGD_graph(data: Dict, n_neighbors: str = 6)

Spatial locations are represented as nodes in an undirected graph.Normal spots are isolated, while anomalous spots are connected to their k-nearest anomalous neighbors. Note that in the anomaly detection results, incorrectly identified spots as anomalies (false positives) become connected, and false negatives become isolated, which leads to a deviation from the local structures of the ground truth graph. Spots are divided into two regions: one includes true positives plus false positives (TP+FP) anomalies, and the other includes true positives plus false negatives (TP+FN) anomalies. We perform a bootstrap sampling of m sets of spots from these two regions.

Examples:

>>> g_pred_list, g_truth_list = Build_SGD_graph(data, n_neighbors = 6).build_graph()

Parameters:

Name Type Description Default
data Dict

Dictionary containing spatial data and labels.

required
n_neighbors int

Number of neighbors to consider for edge creation (default is 6).

6
Source code in src\stands\evaluate\SGD.py
def __init__(self, data: Dict, n_neighbors: str = 6):
    """
    Initialize the Build_SGD_graph class. 

    Parameters:
        data (Dict): Dictionary containing spatial data and labels.
        n_neighbors (int): Number of neighbors to consider for edge creation (default is 6).
    """
    self.data = data
    self.position = data['spatial']
    self.n_neighbors = n_neighbors
    self.truth_list = np.delete(np.unique(data['y_true']), 0)
    self.pred_list = np.unique(data['y_pred'])

build_graph

build_graph()

Build graphs for predicted and ground truth labels.

Returns:

Type Description
List[DGLGraph]

List of DGLGraph objects representing the predicted and ground truth graphs.

Source code in src\stands\evaluate\SGD.py
def build_graph(self):
    """
    Build graphs for predicted and ground truth labels.

    Returns:
        (List[dgl.DGLGraph]): List of DGLGraph objects representing the predicted and ground truth graphs.
    """
    u, v = self.get_edge()

    g_pred_list = []
    g_truth_list = []

    for typeid in self.truth_list:
        if typeid == 0 and len(self.truth_list) == 2:
            #ground truth
            g_truth = dgl.graph((u,v))
            g_truth = dgl.add_self_loop(g_truth)
            g_truth.ndata['anomaly'] =  self.get_anomaly(is_truth=True, typeid=typeid)
            g_truth.ndata['position'] = torch.tensor(self.position, dtype=torch.float32)
            self.remove_edges(g_truth)
            g_truth_list.append(g_truth)

            # predict
            g_pred = dgl.graph((u,v))
            g_pred = dgl.add_self_loop(g_pred)
            g_pred.ndata['anomaly'] = self.get_anomaly(is_truth=False, typeid=typeid)
            g_pred.ndata['position'] = torch.tensor(self.position, dtype=torch.float32)

            classification_data = self.get_classification(g_pred, g_truth)
            g_pred.ndata['classification'] = classification_data

            self.remove_edges(g_pred)
            g_pred_list.append(g_pred)

        elif typeid == 0 and len(set(self.truth_list)) != 2:
            continue 

        else:
            # ground truth
            g_truth = dgl.graph((u,v))
            g_truth = dgl.add_self_loop(g_truth)

            g_truth.ndata['anomaly'] = self.get_anomaly(True, typeid, typeid)
            g_truth.ndata['position'] = torch.tensor(self.position, dtype=torch.float32)
            self.remove_edges(g_truth)
            g_truth_list.append(g_truth)

            for predid in self.pred_list:
                if predid == 0:
                    continue

                g_pred = dgl.graph((u,v))
                g_pred = dgl.add_self_loop(g_pred)
                g_pred.ndata['anomaly'] = self.get_anomaly(False, typeid, predid)
                g_pred.ndata['position'] = torch.tensor(self.position, dtype=torch.float32)

                classification_data = self.get_classification(g_pred,g_truth)
                g_pred.ndata['classification'] = classification_data

                self.remove_edges(g_pred)                        
                g_pred_list.append(g_pred)

            g_truth_list = list(itertools.chain.from_iterable(itertools.repeat(g_truth,len(set(self.pred_list))-1) for g_truth in g_truth_list))

    return g_pred_list, g_truth_list

classify_cells

classify_cells(pred: int, truth: int)

Classify cells based on prediction and ground truth.

Parameters:

Name Type Description Default
pred int

Predicted flag (1 for anomaly, 0 for normal).

required
truth int

Ground truth flag (1 for anomaly, 0 for normal).

required

Returns:

Type Description
str

Classification result (TP, FP, FN, or TN).

Source code in src\stands\evaluate\SGD.py
def classify_cells(self, pred: int, truth: int):
    """
    Classify cells based on prediction and ground truth.

    Parameters:
        pred (int): Predicted flag (1 for anomaly, 0 for normal).
        truth (int): Ground truth flag (1 for anomaly, 0 for normal).

    Returns:
        (str): Classification result (TP, FP, FN, or TN).
    """
    if pred == 1 and truth == 1:
        return 'TP'
    elif pred == 1 and truth == 0:
        return 'FP'
    elif pred == 0 and truth == 1:
        return 'FN'
    else:
        return 'TN'    

get_anomaly

get_anomaly(is_truth=True, typeid=None, binary_typeid=None)

Extract anomaly information for nodes.

Parameters:

Name Type Description Default
is_truth bool

If True, extract anomaly data from ground truth labels. If False, extract from predicted labels.

True
typeid Optional[int]

Type ID for filtering data.

None
binary_typeid Optional[int]

Binary Type ID for filtering data.

None

Returns:

Type Description
Tensor

Anomaly data as a tensor.

Source code in src\stands\evaluate\SGD.py
def get_anomaly(self, is_truth=True, typeid=None, binary_typeid=None):
    """
    Extract anomaly information for nodes.

    Parameters:
        is_truth (bool): If True, extract anomaly data from ground truth labels. If False, extract from predicted labels.
        typeid (Optional[int]): Type ID for filtering data.
        binary_typeid (Optional[int]): Binary Type ID for filtering data.

    Returns:
        (torch.Tensor): Anomaly data as a tensor.
    """
    if is_truth:
        if typeid == 0:
            anomaly_data = np.array(self.data['y_true']).astype(int)
        else:
            anomaly_data = np.array(self.data['y_true'] == binary_typeid).astype(int)

    else:
        if typeid == 0:
            anomaly_data = np.array(self.data['y_pred']).astype(int)
        else:
            anomaly_data = np.array(self.data['y_pred'] == binary_typeid).astype(int)

    return torch.tensor(anomaly_data, dtype=torch.float32)

get_classification

get_classification(
    pred_graph: DGLGraph, truth_graph: DGLGraph
)

Get classification labels for nodes in predicted and ground truth graphs.

Parameters:

Name Type Description Default
pred_graph DGLGraph

Predicted graph.

required
truth_graph DGLGraph

Ground truth graph.

required

Returns:

Type Description
Tensor

Classification labels as a tensor.

Source code in src\stands\evaluate\SGD.py
def get_classification(self, pred_graph: dgl.DGLGraph, truth_graph: dgl.DGLGraph):
    """
    Get classification labels for nodes in predicted and ground truth graphs.

    Parameters:
        pred_graph (dgl.DGLGraph): Predicted graph.
        truth_graph (dgl.DGLGraph): Ground truth graph.

    Returns:
        (torch.Tensor): Classification labels as a tensor.
    """
    mapping = {'TP': 1, 'FP': 2, 'FN': 3, 'TN': 4}
    classification_data_list = []

    for pred_node, truth_node in zip(pred_graph.ndata['anomaly'], truth_graph.ndata['anomaly']):
        pred = int(pred_node)
        truth = int(truth_node)

        classification = self.classify_cells(pred, truth)
        classification_data_list.append(mapping[classification])
    return torch.tensor(classification_data_list,dtype = torch.long)

get_edge

get_edge()

Generate edges based on spatial positions.

Returns:

Type Description
Tuple[ndarray]

Two arrays representing source and destination nodes for edges.

Source code in src\stands\evaluate\SGD.py
def get_edge(self):
    """
    Generate edges based on spatial positions.

    Returns:
        (Tuple[np.ndarray]): Two arrays representing source and destination nodes for edges.
    """
    nbrs = NearestNeighbors(n_neighbors = self.n_neighbors + 1)
    nbrs.fit(self.position)   
    _, indices = nbrs.kneighbors(self.position)
    u = indices[:,0].repeat(self.n_neighbors)
    v = indices[:,1:].flatten()
    return u, v

remove_edges

remove_edges(g: DGLGraph)

Remove edges from the graph based on anomaly information.

Parameters:

Name Type Description Default
g DGLGraph

Input graph.

required
Source code in src\stands\evaluate\SGD.py
def remove_edges(self, g: dgl.DGLGraph):
    """
    Remove edges from the graph based on anomaly information.

    Parameters:
        g (dgl.DGLGraph): Input graph.
    """
    edges_to_remove = []
    for edge in zip(*g.edges()):
        if g.ndata['anomaly'][edge[0]] != 1 or g.ndata['anomaly'][edge[1]] != 1:
            edges_to_remove.append(edge)
    if edges_to_remove:
        u_list, v_list = zip(*edges_to_remove)
        edge_ids = g.edge_ids(u_list, v_list)
        g.remove_edges(edge_ids)

SGDEvaluator

SGDEvaluator(
    bins: int = 10,
    num_bootstrap_samples: int = 10,
    sigma: int = 1,
)

Evaluate SGD_degree or SGD_cc with the built predicted and ground truth SGD Graph.

Examples:

>>> evaluator = SGDEvaluator(bins = 10, num_bootstrap_samples = 50, sigma = 1)
>>> evaluator.evaluate_sgd(g_pred_list, g_truth_list, metric = 'degree')
1.01
>>> evaluator.evaluate_sgd(g_pred_list, g_truth_list, metric = 'cc')
0.34

Parameters:

Name Type Description Default
bins int

Number of equal-width bins in the given range when calculating SGD_cc.

10
num_bootstrap_samples int

Number of bootstrap samples for distribution estimation.

10
sigma int

Sigma parameter for Gaussian Earth Mover's Distance.

1
Source code in src\stands\evaluate\SGD.py
def __init__(self, bins: int = 10, num_bootstrap_samples: int = 10, sigma: int = 1):
    """
    Initialize the SGDEvaluator.

    Parameters:
        bins (int): Number of equal-width bins in the given range when calculating SGD_cc.
        num_bootstrap_samples (int): Number of bootstrap samples for distribution estimation.
        sigma (int): Sigma parameter for Gaussian Earth Mover's Distance.
    """
    self.g_pred_list = []
    self.g_truth_list = []
    self.bins = bins
    self.num_bootstrap_samples = num_bootstrap_samples
    self.sigma = sigma

evaluate_sgd

evaluate_sgd(
    g_pred_list: List[DGLGraph],
    g_truth_list: List[DGLGraph],
    metric: Literal["degree", "cc"],
)

Evaluate SGD based on predicted and ground truth graphs.

Parameters:

Name Type Description Default
g_pred_list List[DGLGraph]

List of predicted DGLGraphs.

required
g_truth_list List[DGLGraph]

List of ground truth DGLGraphs.

required
metric Literal['degree', 'cc']

Metric to evaluate ('degree' or 'cc').

required

Returns:

Type Description
float

SGD score for the predicted and ground truth graphs.

Source code in src\stands\evaluate\SGD.py
def evaluate_sgd(self, g_pred_list: List[dgl.DGLGraph],
                 g_truth_list: List[dgl.DGLGraph],
                 metric: Literal['degree', 'cc']):
    """
    Evaluate SGD based on predicted and ground truth graphs.

    Parameters:
        g_pred_list (List[dgl.DGLGraph]): List of predicted DGLGraphs.
        g_truth_list (List[dgl.DGLGraph]): List of ground truth DGLGraphs.
        metric (Literal['degree', 'cc']): Metric to evaluate ('degree' or 'cc').

    Returns:
        (float): SGD score for the predicted and ground truth graphs.
    """
    matrix_size = int(math.sqrt(len(g_pred_list)))
    sgd_matrix = [[0] * matrix_size for _ in range(matrix_size)]

    for idx,(g_pred, g_truth) in enumerate(zip(g_pred_list, g_truth_list)):
        pred_nx_graph = dgl_to_nx(g_pred, include_classification=True)
        true_nx_graph = dgl_to_nx(g_truth, include_classification=False)

        dist = get_distributions_for_subsets(pred_nx_graph, true_nx_graph,
                                             self.bins, self.num_bootstrap_samples)

        tp_count = torch.sum(g_pred.ndata['classification'] == 1).item()
        fn_count = torch.sum(g_pred.ndata['classification'] == 3).item()
        weight = tp_count / (tp_count + fn_count)

        if metric == 'degree':
            pred_tp_fp_key = 'Predicted TP+FP Degree'
            gt_tp_fp_key = 'Ground Truth TP+FP Degree'
            pred_tp_fn_key = 'Predicted TP+FN Degree'
            gt_tp_fn_key = 'Ground Truth TP+FN Degree'
        elif metric == 'cc':
            pred_tp_fp_key = 'Predicted TP+FP Clustering'
            gt_tp_fp_key = 'Ground Truth TP+FP Clustering'
            pred_tp_fn_key = 'Predicted TP+FN Clustering'
            gt_tp_fn_key = 'Ground Truth TP+FN Clustering'
        else:
            raise ValueError("Invalid metric!")

        # TP&FP
        pred_tp_fp_dist = dist[pred_tp_fp_key]
        gt_tp_fp_dist = dist[gt_tp_fp_key]

        mmd_tp_fp = compute_mmd(gt_tp_fp_dist, pred_tp_fp_dist, kernel=gaussian_emd, sigma=self.sigma)

        # TP&FN
        pred_tp_fn_dist = dist[pred_tp_fn_key]
        gt_tp_fn_dist = dist[gt_tp_fn_key]

        mmd_tp_fn = compute_mmd(gt_tp_fn_dist, pred_tp_fn_dist, kernel=gaussian_emd, sigma=self.sigma)

        sgd = weight * mmd_tp_fp + (1 - weight) * mmd_tp_fn

        # save to matrix
        row = idx // matrix_size
        col = idx % matrix_size
        sgd_matrix[row][col] = sgd


    if matrix_size == 1:
        return sgd
    else:
        assignment_matrix = self.solve_assignment_problems(sgd_matrix)
        SGD_list = self.get_assigned_values(sgd_matrix, assignment_matrix)
        return np.average(SGD_list)

get_assigned_values

get_assigned_values(
    sgd_matrix: List[List[float]],
    assignment_matrix: List[List[int]],
)

Get assigned values from the SGD matrix and assignment matrix.

Parameters:

Name Type Description Default
sgd_matrix List[List[float]]

Matrix of SGD values.

required
assignment_matrix List[List[int]]

Assignment matrix indicating optimal assignments.

required

Returns:

Type Description
List[float]

List of dictionaries containing assigned values.

Source code in src\stands\evaluate\SGD.py
def get_assigned_values(self, sgd_matrix: List[List[float]], assignment_matrix: List[List[int]]):
    """
    Get assigned values from the SGD matrix and assignment matrix.

    Parameters:
        sgd_matrix (List[List[float]]): Matrix of SGD values.
        assignment_matrix (List[List[int]]): Assignment matrix indicating optimal assignments.

    Returns:
        (List[float]): List of dictionaries containing assigned values.
    """
    assigned_values = []
    for i,row in enumerate(assignment_matrix):
        for j, assignment in enumerate(row):
            if assignment == 1:
                assigned_values.append(sgd_matrix[i][j])
    return assigned_values

solve_assignment_problems

solve_assignment_problems(
    distance_matrix: List[List[float]],
)

Solve the Assignment Problem using the CBC solver.

Parameters:

Name Type Description Default
distance_matrix List[List[float]]

Matrix representing the costs of assignments (distance).

required

Returns:

Type Description
List[List[int]]

Assignment matrix indicating optimal assignments.

Source code in src\stands\evaluate\SGD.py
def solve_assignment_problems(self, distance_matrix: List[List[float]]):
    """
    Solve the Assignment Problem using the CBC solver.

    Parameters:
        distance_matrix (List[List[float]]): Matrix representing the costs of assignments (distance).

    Returns:
        (List[List[int]]): Assignment matrix indicating optimal assignments.
    """
    problem = pulp.LpProblem("Assignment Problem", pulp.LpMinimize)

    rows = len(distance_matrix)
    cols = len(distance_matrix[0])
    x = [[pulp.LpVariable(f'x_{i}_{j}', cat='Binary') for j in range(cols)] for i in range(rows)]

    # target function
    problem += pulp.lpSum(distance_matrix[i][j] * x[i][j] for i in range(rows) for j in range(cols))

    # constraints 
    for i in range(rows):
        problem += pulp.lpSum(x[i][j] for j in range(cols)) == 1  

    for j in range(cols):
        problem += pulp.lpSum(x[i][j] for i in range(rows)) == 1

    # CBC solver 
    solver = pulp.PULP_CBC_CMD()
    problem.solve(solver)

    status = pulp.LpStatus[problem.status]

    minimum_cost = pulp.lpSum(distance_matrix[i][j] * x[i][j].value() for i in range(rows) for j in range(cols))
    assignment_matrix = [[x[i][j].value() for j in range(cols)] for i in range(rows)]

    return assignment_matrix