Source code for metachat.preprocessing._preprocess

# ============================================================
import numpy as np
import pandas as pd
import anndata
# ============================================================

[docs] def global_intensity_scaling( adata_ref: anndata.AnnData, adata_target: anndata.AnnData, method: str = 'tic', scales: float = 1e-5 ): """ Perform global intensity scaling of `adata_target` to match `adata_ref`, using either total ion current (TIC) or root-mean-square (RMS) normalization. Parameters ---------- adata_ref Reference dataset for scaling (e.g., negative ion mode). adata_target Target dataset to be scaled (e.g., positive ion mode). method Scaling method to use: - `'tic'`: scale by total ion current (sum of all intensities) - `'rms'`: scale by root-mean-square of intensities scales Optional global scaling factor applied to both datasets (default: 1e-5). Returns ------- adata_ref : anndata.AnnData Scaled reference dataset. adata_target : anndata.AnnData Scaled target dataset. """ # Extract dense arrays for computation if hasattr(adata_ref.X, "toarray"): ref_data = adata_ref.X.toarray() else: ref_data = adata_ref.X.copy() if hasattr(adata_target.X, "toarray"): tgt_data = adata_target.X.toarray() else: tgt_data = adata_target.X.copy() if method == 'tic': # Compute global TIC for reference and target global_ref = np.sum(ref_data) global_tgt = np.sum(tgt_data) elif method == 'rms': # Compute global RMS for reference and target global_ref = np.sqrt(np.mean(np.square(ref_data))) global_tgt = np.sqrt(np.mean(np.square(tgt_data))) else: raise ValueError(f"Unknown method '{method}'. Choose 'tic' or 'rms'.") # Compute scale factor, avoid division by zero scale_factor = float(global_ref) / float(global_tgt) if global_tgt != 0 else 1.0 # Apply constant scaling to the entire target matrix if hasattr(adata_target.X, "multiply"): adata_target.X = adata_target.X.multiply(scale_factor * scales) else: adata_target.X = adata_target.X * scale_factor * scales if hasattr(adata_ref.X, "multiply"): adata_ref.X = adata_ref.X.multiply(scales) else: adata_ref.X = adata_ref.X * scales return adata_ref, adata_target
[docs] def load_barrier_segments( csv_path: str = None, coord_cols = ("axis-2", "axis-1"), close_polygons: bool = True, scale: float = None ): """ Parse Napari shapes CSV and extract barrier line segments. This function converts a Napari shapes `.csv` file (usually exported from Napari's "Shapes" layer) into a list of 2D line segments represented as coordinate pairs. Each shape is grouped by its `index` and its vertices ordered by `vertex-index`. Parameters ---------- csv_path : str Path to the Napari shapes CSV file. coord_cols : tuple of str, default=('axis-2', 'axis-1') Column names representing the coordinate axes in the CSV. The order is typically ('axis-2', 'axis-1') = (Y, X). close_polygons : bool, default=True Whether to close polygonal shapes by connecting the last vertex to the first. scale : float, optional Scaling factor applied to all coordinates. For example, set `scale=0.5` to convert from pixel to micrometer units. Returns ------- segs : list of tuple A list of line segments, each represented as `[((x1, y1), (x2, y2)), ((x3, y3), (x4, y4)), ...]`. Notes ----- The input CSV should contain at least the following columns: `['index', 'vertex-index', 'shape-type', 'axis-2', 'axis-1']`. """ # ==== Read and group CSV ==== df = pd.read_csv(csv_path) segs = [] # ==== Extract line segments ==== for idx, g in df.groupby("index", sort=True): g = g.sort_values("vertex-index") shape = g["shape-type"].iloc[0].lower() P = g[list(coord_cols)].to_numpy(dtype=float) if len(P) < 2: continue for a, b in zip(P[:-1], P[1:]): segs.append((tuple(a), tuple(b))) if shape == "polygon" and close_polygons: segs.append((tuple(P[0]), tuple(P[1]))) # ==== Apply scaling ==== if scale is not None: segs = [((a[0]*scale, a[1]*scale), (b[0]*scale, b[1]*scale)) for a, b in segs] return segs