Source code for drvi.utils.tools.interpretability._differential_vars

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
import scipy

from drvi.model import DRVI
from drvi.utils.tools.interpretability._latent_traverse import get_dimensions_of_traverse_data, traverse_latent

if TYPE_CHECKING:
    from typing import Literal

    from anndata import AnnData


def find_differential_effects(
    traverse_adata: AnnData,
    method: Literal["max_possible", "min_possible"] = "max_possible",
    key_added: str = "effect",
    add_to_counts: float = 0.1,
    relax_max_by: float = 0.0,
) -> None:
    """Find differential effects in latent space traversal data.

    This function analyzes the differential effects between control and effect
    conditions in traversal data to identify genes that respond to latent
    dimension changes. It supports two methods for calculating differential effects.

    The "max_possible" method is the simple log-fold-change effect between effect and control.
    The "min_possible" method is more conservative and considers the maximum possible effect of
    other dimensions to normalize the log-fold-change effect of the current dimension.

    Parameters
    ----------
    traverse_adata
        AnnData object created by `traverse_latent` or `make_traverse_adata`.
        Must contain `.layers['control']` and `.layers['effect']`.
    method
        Method for calculating differential effects:
        - "max_possible": Simple log-fold-change between effect and control conditions.
          This is the direct difference in log-space and represents the maximum
          possible effect a latent dimension can have on gene expression.
        - "min_possible": Conservative estimate that normalizes the effect by considering
          the maximum possible effects from other dimensions. Although the effect in count space
          is deterministic, the effect in log-space is not. This accounts for that, so changes in
          one dimension may constrain the possible change in other dimensions.
    key_added
        Prefix for the keys added to `traverse_adata.uns` and `traverse_adata.varm`.
        Results will be stored with keys like `{key_added}_traverse_effect_stepwise`.
    add_to_counts
        Small value added to counts to avoid log(0) issues in log-space calculations.
        This pseudo-count ensures numerical stability when computing log-fold-changes.
    relax_max_by
        Relaxation factor for the maximum possible effect calculation
        (only used with "min_possible" method).

    Returns
    -------
    None
        Results are stored in `traverse_adata`:
        - `.uns[f"{key_added}_traverse_effect_stepwise"]`: Stepwise effects for each
          latent dimension, step, and gene (shape: n_latent × n_steps × n_vars)
        - `.varm[f"{key_added}_traverse_effect_pos"]`: Maximum positive effects per
          dimension and gene (DataFrame with genes as rows, dimensions as columns)
        - `.varm[f"{key_added}_traverse_effect_neg"]`: Maximum negative effects per
          dimension and gene (DataFrame with genes as rows, dimensions as columns)
        - `.uns[f"{key_added}_traverse_effect_pos_dim_ids"]`: Array of dimension IDs
          for positive effects
        - `.uns[f"{key_added}_traverse_effect_neg_dim_ids"]`: Array of dimension IDs
          for negative effects

    Raises
    ------
    AssertionError
        If `method` is not one of the allowed values.
    ValueError
        If required data is missing from `traverse_adata` (e.g., missing layers).
    KeyError
        If required columns are missing from `traverse_adata.obs`.

    Notes
    -----
    The function performs the following steps:
    1. Calculates differential effects using the specified method:
       - "max_possible": Direct difference between effect and control conditions
       - "min_possible": Normalized difference that considers the maximum possible
         effect from other dimensions
    2. Identifies maximum effects in positive and negative directions
    3. Stores results in the AnnData object for further analysis

    **Method Details:**

    - **max_possible**: Computes the direct log-fold-change between effect and control
      conditions. This represents the maximum possible effect a latent dimension can
      have on gene expression, assuming no constraints from other dimensions.

    - **min_possible**: More conservative approach that normalizes the effect by
      considering the maximum possible effects from other dimensions:
      ```
      normalized_effect = log(exp(effect) + pseudo_count + baseline) - baseline
      ```
      where baseline is the maximum possible effect from other dimensions.

    Examples
    --------
    >>> # Using max_possible method (default)
    >>> find_differential_effects(traverse_adata, method="max_possible")
    >>>
    >>> # Using min_possible method with custom parameters
    >>> find_differential_effects(
    ...     traverse_adata, method="min_possible", key_added="conservative", add_to_counts=0.05, relax_max_by=0.1
    ... )
    >>> # Access results
    >>> stepwise_effects = traverse_adata.uns["effect_traverse_effect_stepwise"]
    >>> positive_effects = traverse_adata.varm["effect_traverse_effect_pos"]
    >>> negative_effects = traverse_adata.varm["effect_traverse_effect_neg"]
    """
    assert method in ["max_possible", "min_possible"]

    # Reorder the traverse_adata to original order
    original_traverse_adata = traverse_adata
    traverse_adata = traverse_adata[traverse_adata.obs.sort_values(["original_order"]).index].copy()
    traverse_adata = traverse_adata[:, traverse_adata.var.sort_values(["original_order"]).index].copy()

    # Get the number of latent dimensions, steps, samples, and vars
    n_latent, n_steps, n_samples, n_vars = get_dimensions_of_traverse_data(traverse_adata)

    # Get the dim_id and span values
    span_values = traverse_adata.obs["span_value"].values.reshape(n_latent, n_steps, n_samples)
    assert np.allclose(span_values, span_values.max(axis=-1, keepdims=True))
    span_values = span_values[:, :, 0]  # n_latent x n_steps
    dim_ids = traverse_adata.obs["dim_id"].values.reshape(n_latent, n_steps, n_samples)[:, 0, 0]  # n_latent

    # Get the output mean parameters in 4D format
    control_mean_param = traverse_adata.layers["control"].reshape(n_latent, n_steps, n_samples, n_vars)
    effect_mean_param = traverse_adata.layers["effect"].reshape(n_latent, n_steps, n_samples, n_vars)

    # Helper functions
    average_over_samples = lambda x: x.mean(axis=2)
    add_eps_in_count_space = lambda x: scipy.special.logsumexp(
        np.stack([x, np.log(add_to_counts) * np.ones_like(x)]), axis=0
    )
    find_relative_effect = lambda x, baseline: (
        scipy.special.logsumexp(np.stack([x, np.log(add_to_counts) * np.ones_like(x), baseline]), axis=0) - baseline
    )

    # Find DE for each sample and average over samples
    if method == "max_possible":
        diff_considering_small_values = average_over_samples(
            add_eps_in_count_space(effect_mean_param) - add_eps_in_count_space(control_mean_param)
        )  # n_latent x n_steps x n_vars
    elif method == "min_possible":
        reduce_dims = (
            1,
            2,
        )
        max_of_two = np.maximum(
            effect_mean_param.max(axis=reduce_dims, keepdims=True),
            control_mean_param.max(axis=reduce_dims, keepdims=True),
        )  # n_latent x 1 x n_samples, n_vars
        max_cumulative_possible_all = scipy.special.logsumexp(
            max_of_two, axis=0, keepdims=True
        )  # 1 x 1 x n_samples, n_vars
        max_cumulative_possible_other_dims = (
            np.log(np.exp(max_cumulative_possible_all) - np.exp(max_of_two)) - relax_max_by
        )  # n_latent x 1 x n_samples, n_vars
        max_cumulative_possible_other_dims = max_cumulative_possible_other_dims + np.zeros_like(
            effect_mean_param
        )  # n_latent x n_steps x n_samples, n_vars
        normalized_effect_mean_param = find_relative_effect(
            effect_mean_param, max_cumulative_possible_other_dims
        )  # n_latent x n_steps x n_samples, n_vars
        normalized_control_mean_param = find_relative_effect(
            control_mean_param, max_cumulative_possible_other_dims
        )  # n_latent x n_steps x n_samples, n_vars
        diff_considering_small_values = average_over_samples(
            normalized_effect_mean_param - normalized_control_mean_param
        )  # n_latent x n_steps x n_vars
    else:
        raise NotImplementedError()

    original_traverse_adata.uns[f"{key_added}_traverse_effect_stepwise"] = diff_considering_small_values

    # Find DE vars in positive and negative directions
    for effect_sign in ["pos", "neg"]:
        mask = np.where(span_values >= 0, 1, 0) if effect_sign == "pos" else np.where(span_values <= 0, 1, 0)
        max_effect = np.max(np.expand_dims(mask, axis=-1) * diff_considering_small_values, axis=1)  # n_latent x n_vars
        max_effect = pd.DataFrame(max_effect.T, index=traverse_adata.var_names, columns=dim_ids)
        original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"] = max_effect.loc[
            original_traverse_adata.var_names
        ].copy()
        original_traverse_adata.uns[f"{key_added}_traverse_effect_{effect_sign}_dim_ids"] = (
            original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"].columns.values
        )
        original_traverse_adata.varm[
            f"{key_added}_traverse_effect_{effect_sign}"
        ].columns = original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"].columns.astype(str)


def combine_differential_effects(
    traverse_adata: AnnData,
    keys: list[str],
    key_added: str,
    combine_function: callable,
) -> None:
    """Combine differential effects from multiple analyses.

    This function combines differential effects calculated using different methods
    or parameters to create a unified score. It applies a custom combination
    function to merge the stepwise effects and recalculates the positive/negative
    direction effects.

    Parameters
    ----------
    traverse_adata
        AnnData object containing differential effects from `find_differential_effects`.
        Must have `.uns[f"{key}_traverse_effect_stepwise"]` for each key in `keys`.
    keys
        List of keys corresponding to existing differential effect analyses.
        Each key should have been used in a previous call to `find_differential_effects`.
    key_added
        Prefix for the keys added to store the combined results.
        Results will be stored with keys like `{key_added}_traverse_effect_stepwise`.
    combine_function
        Function to combine the stepwise effects. Should take multiple arrays
        (one for each key) as positional arguments and return a single combined
        array with the same shape as the input arrays.

    Returns
    -------
    None
        Combined results are stored in `traverse_adata`:
        - `.uns[f"{key_added}_traverse_effect_stepwise"]`: Combined stepwise effects
          (shape: n_latent × n_steps × n_vars)
        - `.varm[f"{key_added}_traverse_effect_pos"]`: Combined positive direction effects
          (DataFrame with genes as rows, dimensions as columns)
        - `.varm[f"{key_added}_traverse_effect_neg"]`: Combined negative direction effects
          (DataFrame with genes as rows, dimensions as columns)
        - `.uns[f"{key_added}_traverse_effect_pos_dim_ids"]`: Array of dimension IDs
          for positive effects
        - `.uns[f"{key_added}_traverse_effect_neg_dim_ids"]`: Array of dimension IDs
          for negative effects

    Raises
    ------
    KeyError
        If any key in `keys` is missing from `traverse_adata.uns`.
    ValueError
        If the combine_function returns an array with unexpected shape.

    Notes
    -----
    The function performs the following steps:
    1. Reorders the data to original order for consistent processing
    2. Extracts stepwise effects for each key in `keys`
    3. Applies the `combine_function` to merge the effects
    4. Recalculates positive and negative direction effects from the combined data
    5. Stores the results with the new `key_added` prefix

    The `combine_function` should be designed to work with the specific types of
    differential effects being combined. Common approaches include:
    - Taking the maximum/minimum of multiple analyses
    - Computing weighted averages
    - Applying logical operations (AND/OR) for binary effects

    Examples
    --------
    >>> # Combine max_possible and min_possible effects using maximum
    >>> def combine_max_min(max_effects, min_effects):
    ...     return np.maximum(max_effects, min_effects)
    >>> combine_differential_effects(
    ...     traverse_adata,
    ...     keys=["max_possible", "min_possible"],
    ...     key_added="combined",
    ...     combine_function=combine_max_min,
    ... )
    """
    # Reorder the traverse_adata to original order
    original_traverse_adata = traverse_adata
    traverse_adata = traverse_adata[traverse_adata.obs.sort_values(["original_order"]).index].copy()
    traverse_adata = traverse_adata[:, traverse_adata.var.sort_values(["original_order"]).index].copy()

    # Get the number of latent dimensions, steps, samples, and vars
    n_latent, n_steps, n_samples, n_vars = get_dimensions_of_traverse_data(traverse_adata)

    # Get the dim_id and span values
    span_values = traverse_adata.obs["span_value"].values.reshape(n_latent, n_steps, n_samples)
    assert np.allclose(span_values, span_values.max(axis=-1, keepdims=True))
    span_values = span_values[:, :, 0]  # n_latent x n_steps
    dim_ids = traverse_adata.obs["dim_id"].values.reshape(n_latent, n_steps, n_samples)[:, 0, 0]  # n_latent

    # Combine effects
    combined_traverse_effect_stepwise = combine_function(
        *[traverse_adata.uns[f"{key}_traverse_effect_stepwise"] for key in keys]
    )

    original_traverse_adata.uns[f"{key_added}_traverse_effect_stepwise"] = combined_traverse_effect_stepwise

    # Find DE vars in positive and negative directions
    for effect_sign in ["pos", "neg"]:
        mask = np.where(span_values >= 0, 1, 0) if effect_sign == "pos" else np.where(span_values <= 0, 1, 0)
        max_effect = np.max(
            np.expand_dims(mask, axis=-1) * combined_traverse_effect_stepwise, axis=1
        )  # n_latent x n_vars
        max_effect = pd.DataFrame(max_effect.T, index=traverse_adata.var_names, columns=dim_ids)
        original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"] = max_effect.loc[
            original_traverse_adata.var_names
        ].copy()
        original_traverse_adata.uns[f"{key_added}_traverse_effect_{effect_sign}_dim_ids"] = (
            original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"].columns.values
        )
        original_traverse_adata.varm[
            f"{key_added}_traverse_effect_{effect_sign}"
        ].columns = original_traverse_adata.varm[f"{key_added}_traverse_effect_{effect_sign}"].columns.astype(str)


[docs] def calculate_differential_vars(traverse_adata: AnnData, **kwargs) -> None: """Calculate differential variables based on a combination of max_possible and min_possible effects. This function performs a comprehensive differential variable analysis by calculating effects using both "max_possible" and "min_possible" methods, then combining them into a unified score that considers both approaches. Parameters ---------- traverse_adata AnnData object created by `traverse_latent` or `make_traverse_adata`. Must contain `.layers['control']` and `.layers['effect']`. **kwargs Additional keyword arguments passed to `find_differential_effects`. Returns ------- None Results are stored in `traverse_adata`: - `.uns["max_possible_traverse_effect_stepwise"]`: Max possible stepwise effects - `.uns["min_possible_traverse_effect_stepwise"]`: Min possible stepwise effects - `.uns["combined_score_traverse_effect_stepwise"]`: Combined stepwise effects - `.varm["max_possible_traverse_effect_pos/neg"]`: Max possible direction effects - `.varm["min_possible_traverse_effect_pos/neg"]`: Min possible direction effects - `.varm["combined_score_traverse_effect_pos/neg"]`: Combined direction effects Notes ----- The function performs the following steps: 1. Calculates differential effects using "max_possible" method 2. Calculates differential effects using "min_possible" method 3. Combines the results using a custom scoring function **Scoring Logic:** The combined score is designed to identify genes that show consistent differential effects across both calculation methods. The filtering criteria are: 1. **Base threshold**: max_possible >= 1.0 (ensures substantial effect) 2. **Relative thresholds**: Either: - max_possible > 50% of its maximum across all dimensions and steps, OR - min_possible > 10% of its maximum across all dimensions and steps 3. **Final score**: For genes passing the filters, score = max_possible × min_possible This approach provides a robust way to identify genes that show consistent differential effects across both calculation methods, reducing false positives while maintaining sensitivity to real biological effects. **Biological Interpretation:** - **High combined scores**: Genes with strong specific effects - **High max_possible, low min_possible**: These genes are usually shared with another program, with the other program having a larger effect. - **Low max_possible, high min_possible**: Not possible. - **Low scores in both**: Genes with weak or inconsistent effects Examples -------- >>> # Basic differential variable calculation >>> calculate_differential_vars(traverse_adata) >>> # With custom parameters >>> calculate_differential_vars(traverse_adata, add_to_counts=0.05, relax_max_by=0.1) >>> # Access results >>> max_effects = traverse_adata.varm["max_possible_traverse_effect_pos"] >>> min_effects = traverse_adata.varm["min_possible_traverse_effect_pos"] >>> combined_effects = traverse_adata.varm["combined_score_traverse_effect_pos"] """ # Raise deprecation warning in favor of model.calculate_interpretability_scores warnings.warn( "calculate_differential_vars is deprecated and will be removed soon; use model.calculate_interpretability_scores(embed, ...) instead.", category=DeprecationWarning, stacklevel=2, ) print("Finding differential variables per latent dimension ...") find_differential_effects(traverse_adata, method="max_possible", key_added="max_possible", **kwargs) find_differential_effects(traverse_adata, method="min_possible", key_added="min_possible", **kwargs) def combine_function(min_possible, max_possible): # min_possible and max_possible dimensions: n_latent x n_steps x n_vars keep = (max_possible >= 1.0) & ( (max_possible > max_possible.max(axis=(0, 1), keepdims=True) / 2) | (min_possible > min_possible.max(axis=(0, 1), keepdims=True) / 10) ) score = np.where(keep, max_possible * min_possible, 0) return score # Combine scores with product combine_differential_effects( traverse_adata, keys=["min_possible", "max_possible"], key_added="combined_score", combine_function=combine_function, )
[docs] def get_split_effects( model: DRVI, embed: AnnData, n_steps: int = 20, n_samples: int = 100, traverse_kwargs: dict | None = None, de_kwargs: dict | None = None, ) -> AnnData: """Get split effects by performing latent traversal and differential analysis. This is a high-level function that combines latent space traversal with differential variable analysis. It performs the complete pipeline from generating traversal data to calculating differential effects. Parameters ---------- model Trained DRVI model for decoding latent representations. embed AnnData object containing latent dimension statistics in `.var`. Must have columns: `original_dim_id`, `min`, `max`, `std`, `title`, `vanished`, `order`. n_steps Number of steps in the traversal. Must be even (half negative, half positive). n_samples Number of samples to generate for each step. traverse_kwargs Additional arguments passed to `traverse_latent`. Common options include: - `copy_adata_var_info`: Whether to copy variable information - `noise_formula`: Custom noise generation function - `max_noise_std`: Maximum noise standard deviation. de_kwargs Additional arguments passed to `calculate_differential_vars`. Common options include: - `add_to_counts`: Pseudo-count for log calculations - `relax_max_by`: Relaxation factor for min_possible method. Returns ------- AnnData AnnData object containing both traversal data and differential analysis results. Includes all outputs from `traverse_latent` and `calculate_differential_vars`: **Traversal Data:** - `.X`: Difference between effect and control conditions - `.layers['control']`: Control condition gene expression - `.layers['effect']`: Effect condition gene expression - `.obs`: Metadata including dim_id, sample_id, step_id, span_value, title, vanished, order **Differential Analysis Results:** - `.uns["max_possible_traverse_effect_stepwise"]`: Max possible stepwise effects - `.uns["min_possible_traverse_effect_stepwise"]`: Min possible stepwise effects - `.uns["combined_score_traverse_effect_stepwise"]`: Combined stepwise effects - `.varm["*_traverse_effect_pos/neg"]`: Direction-specific effects for all methods Raises ------ ValueError If required columns are missing from `embed.var`. KeyError If required data is missing from the model or embed objects. Notes ----- This function is a convenience wrapper that performs the complete analysis pipeline in one call. It's equivalent to: ```python traverse_adata = traverse_latent(model, embed, n_steps, n_samples, **traverse_kwargs) calculate_differential_vars(traverse_adata, **de_kwargs) return traverse_adata ``` **Functionality:** 1. **Traversal Generation**: Creates systematic traversals through latent space 2. **Decoding**: Converts latent traversals to gene expression predictions 3. **Differential Analysis**: Calculates effects using max_possible and min_possible methods 4. **Combination**: Creates unified scores for robust gene identification **Use Cases:** - **Gene Discovery**: Identify genes associated with specific latent dimensions - **Pathway Analysis**: Understand biological processes captured by latent factors - **Model Validation**: Verify that latent dimensions have interpretable biological meaning - **Comparative Analysis**: Compare effects across different models or conditions Examples -------- >>> # Basic split effects analysis >>> split_effects = get_split_effects(model, embed) >>> # With custom parameters >>> split_effects = get_split_effects(model, embed, n_steps=30, n_samples=50) >>> # Access results >>> combined_effects = split_effects.varm["combined_score_traverse_effect_pos"] >>> stepwise_effects = split_effects.uns["combined_score_traverse_effect_stepwise"] """ if traverse_kwargs is None: traverse_kwargs = {} if de_kwargs is None: de_kwargs = {} traverse_adata = traverse_latent(model, embed, n_steps=n_steps, n_samples=n_samples, **traverse_kwargs) calculate_differential_vars(traverse_adata, **de_kwargs) return traverse_adata
[docs] def iterate_on_top_differential_vars( traverse_adata: AnnData, key: str, title_col: str = "title", order_col: str = "order", gene_symbols: str | None = None, score_threshold: float = 0.0, ) -> list[tuple[str, pd.Series]]: """Create an iterator of top differential variables per latent dimension. This function processes differential analysis results to create an organized list of top differentially expressed genes for each latent dimension, sorted by their effect scores and organized by dimension. Parameters ---------- traverse_adata AnnData object with differential analysis results from `calculate_differential_vars`. Must contain differential effect data for the specified `key`. key Key prefix for the differential variables in `traverse_adata`. Should correspond to a key used in `find_differential_effects` or `calculate_differential_vars`. Common value: "combined_score". title_col Column name in `traverse_adata.obs` containing dimension titles. These titles will be used in the output dimension names. order_col Column name in `traverse_adata.obs` containing dimension ordering. gene_symbols Column name in `traverse_adata.var` containing gene symbols. If None, uses the index of `traverse_adata.var` (usually gene IDs). Useful for converting between gene IDs and readable gene names. score_threshold Minimum score threshold to include genes in the results. Only genes with scores above this threshold will be included. Returns ------- list[tuple[str, pd.Series]] List of tuples, where each tuple contains: - str: Dimension title with direction indicator (e.g., "Cell Cycle+", "Cell Cycle-") - pd.Series: Series of gene scores for that dimension/direction, sorted descending The list is sorted by dimension order, with each dimension appearing at most twice (once for positive effects, once for negative effects). Raises ------ KeyError If required columns or differential effect data are missing. ValueError If the specified key doesn't exist in the AnnData object. Notes ----- The function performs the following steps: 1. Extracts positive and negative differential effects for the specified key 2. Maps gene names to symbols if `gene_symbols` is provided 3. Filters genes by score threshold 4. Organizes results by dimension and direction (positive/negative) 5. Returns a list sorted by dimension order **Output Structure:** Each dimension appears twice in the results - once for positive effects and once for negative effects. The direction is indicated by "+" or "-" appended to the dimension title. Only dimensions with at least one gene above the threshold are included. Examples -------- >>> # Basic iteration over top differential variables >>> top_vars = iterate_on_top_differential_vars(traverse_adata, "combined_score") >>> for dim_title, gene_scores in top_vars: ... print(f"{dim_title}: {len(gene_scores)} genes") ... print(f"Top genes: {gene_scores.head().index.tolist()}") >>> # With custom parameters and gene symbols >>> top_vars = iterate_on_top_differential_vars( ... traverse_adata, "max_possible", gene_symbols="gene_symbol", score_threshold=1.0 ... ) >>> # Create a summary of results >>> for dim_title, gene_scores in top_vars: ... print(f"{dim_title}: {gene_scores.head().index.tolist()}") """ df_pos = traverse_adata.varm[f"{key}_traverse_effect_pos"].copy() df_neg = traverse_adata.varm[f"{key}_traverse_effect_neg"].copy() if gene_symbols is not None: gene_name_mapping = dict(zip(traverse_adata.var.index, traverse_adata.var[gene_symbols], strict=False)) else: gene_name_mapping = dict(zip(traverse_adata.var.index, traverse_adata.var.index, strict=False)) df_pos.index = df_pos.index.map(gene_name_mapping) df_neg.index = df_neg.index.map(gene_name_mapping) de_info = dict( **{ (str(k) + "+"): v.sort_values(ascending=False).where(lambda x: x > score_threshold).dropna() for k, v in df_pos.to_dict(orient="series").items() }, **{ (str(k) + "-"): v.sort_values(ascending=False).where(lambda x: x > score_threshold).dropna() for k, v in df_neg.to_dict(orient="series").items() }, ) return [ (f"{row[title_col]}{direction}", de_info[f"{row['dim_id']}{direction}"]) for i, row in traverse_adata.obs[["dim_id", order_col, title_col]] .drop_duplicates() .sort_values(order_col) .iterrows() for direction in ["-", "+"] if len(de_info[f"{row['dim_id']}{direction}"]) > 0 ]