Source code for gratools.Gratools

import logging
import subprocess
import sys
from collections import defaultdict, OrderedDict
from pathlib import Path

from rich import get_console
from tqdm import tqdm
import click
from dataclasses import dataclass, field
import gzip
from .Graph import GFA, SubGraph, human_sort_key, GratoolsBam
import pandas as pd
from Bio import SeqIO
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
from rich.console import Console
from rich.table import Table
from shutil import rmtree

pd.set_option('display.max_rows', None)  # Display all rows
pd.set_option('display.max_columns', None)  # Display all columns
[docs] def flatten(xss): return [x for xs in xss for x in xs]
[docs] @dataclass class Gratools: """ A class to parse and process GFA files, including handling BAM files and extracting segments and links. Attributes: gfa_path (Path): The path to the GFA file. gfa_name (str): The name of the GFA file. bam_segments_file (Path): The path to the BAM segments file. dict_samples_chrom (defaultdict): A dictionary mapping sample names to chromosomes. works_path (Path): The path to the working directory. bed_path (Path): The path to the BED files directory. bam_path (Path): The path to the BAM files directory. samples_chrom_path (Path): The path to the file containing sample chromosome data. """ gfa_path: Path threads: int = 1 logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__)) gfa_name: str = None bam_segments_file: Path = None dict_samples_chrom: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=False) works_path: Path = None bed_path: Path = None bam_path: Path = None samples_chrom_path: Path = None dict_gfa_graph_object: dict = field(default_factory=dict, repr=False) sample_name: str = None chromosome: str = None start: int = 0 stop: int = None suffix: str = None build_fasta: bool = False gzip_gfa: bool = False def __post_init__(self): """ Initialize paths and directories, then parse the GFA file and tag the BAM file. """ suffixes = ''.join(self.gfa_path.suffixes) if ".gz" in suffixes: self.gfa_name = self.gfa_path.name.replace(''.join(self.gfa_path.suffixes[-2:]),'') self.gzip_gfa = True elif ".gfa" in suffixes: self.gfa_name = self.gfa_path.stem self.works_path = self.gfa_path.parent / f"{self.gfa_name}_GraTools" self.bed_path = self.works_path / "bed_files" self.bam_path = self.works_path / "bam_files" self.bam_segments_file = self.bam_path / f'{self.gfa_name}.bam' self.header_gfa_file = self.works_path / f'header_{self.gfa_name}.txt' self.samples_chrom_path = self.works_path / "samples_chrom.txt" self._check_bedtools_avail() self._check_gratools_files() self._read_samples_chrom() self._validate_bed_files() def _check_bedtools_avail(self): """ Check if BEDtools is installed. """ # test if bedtools tools command line is avail on shell command process = subprocess.run("bedtools", shell=True, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if int(process.returncode) >= 1: raise NotImplementedError("BEDtools is not installed or avail on PATH. Please install/load BEDtools and try again.") def _check_gratools_files(self): """ Check if Gratools files exist, if not create them. """ if not self.works_path.exists(): self.logger.debug(f"First run of Gratools on {self.gfa_path.name}.\nSo extracting GFA information to {self.works_path}") try: gfa = GFA(gfa_path=self.gfa_path, logger=self.logger) except Exception as e: rmtree(self.gfa_path.parent / f"{self.gfa_name}_GraTools") self.logger.error(f"Error: {e}", exc_info=True) else: self.logger.debug(f"Check if all Gratools_files {self.works_path} exist") # check if bam_segments_file file not exist, is file and is empty if not self.bam_segments_file.exists() or self.bam_segments_file.stat().st_size == 0: raise FileNotFoundError(f"bam file {self.bam_segments_file.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again") elif not self.header_gfa_file.exists() or self.header_gfa_file.stat().st_size == 0: raise FileNotFoundError(f"header file {self.header_gfa_file.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again") elif not self.samples_chrom_path.exists() or self.samples_chrom_path.stat().st_size == 0: raise FileNotFoundError(f"samples_chrom file {self.samples_chrom_path.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again") elif not self.bed_path.exists(): raise FileNotFoundError(f"bed directory {self.bed_path} does not exist, please remove Gratools_files: {self.works_path} and try again") def _read_samples_chrom(self): """Read samples and their corresponding chromosomes from the samples_chrom.txt file.""" with open(self.samples_chrom_path, "r") as file: for line in file: sample, chrom, fragment1, fragment2 = line.strip().split("\t") if sample not in self.dict_samples_chrom: self.dict_samples_chrom[sample] = {chrom: [(fragment1, fragment2)]} elif chrom not in self.dict_samples_chrom[sample]: self.dict_samples_chrom[sample][chrom] = [(fragment1, fragment2)] else: self.dict_samples_chrom[sample][chrom].append((fragment1, fragment2)) def _validate_bed_files(self): """Validate the presence of BED files for each sample.""" self.logger.debug(f"Check if bed files {self.bed_path} exist") for sample in self.dict_samples_chrom: bed_file = self.bed_path / f"{sample}.bed" if not bed_file.exists() or bed_file.stat().st_size == 0: raise FileNotFoundError(f"BED file {bed_file} does not exist or is empty. Please check the BED files.") def _check_sample_in_GFA(self, sample): """Check if the sample is in the GFA file.""" if sample not in self.dict_samples_chrom: raise ValueError(f"Sample {sample} not found in {self.gfa_path.name}. Avail:{list(self.dict_samples_chrom.keys())}") def _check_samples_file(self, samples_list_path=None): """Read and check if sample on file_path exists on GFA file if file_path is not None. and return list of samples Else return list of samples in GFA file.""" if not samples_list_path: samples_list = list(self.dict_samples_chrom.keys()) samples_list.remove(self.sample_name) else: samples_list = samples_list_path.open("r").read().splitlines() for smp in samples_list: self._check_sample_in_GFA(sample=smp) return samples_list def _check_chrom_in_GFA(self, sample, chrom): """""" if chrom not in self.dict_samples_chrom[sample]: raise ValueError( f"Chromosome {chrom} not found in sample {sample}.Avail: {list(self.dict_samples_chrom[sample].keys())}")
[docs] def get_chromosome_size(self, sample, chrom): df = pd.read_csv(self.samples_chrom_path, sep="\t", header=None, names=["SAMPLES", "CHROMOSOMES_LIST", "FRG_START", "FRG_STOP"]) df.query(f"SAMPLES==@sample & CHROMOSOMES_LIST==@chrom", inplace=True) return max(df["FRG_STOP"])
@property def get_sample_names(self): """ Retrieve the unique sample names from the dataset. This function extracts the unique sample names from the `dict_samples_chrom` attribute, which is assumed to be a dictionary where keys represent the sample names, and values contain chromosome data. Returns: list: A list of unique sample names extracted from the dataset. Example: >>> sample_names = self.get_sample_names() >>> print(sample_names) ['CG14', 'Og103', 'Og182', 'Og20', 'Tog5681'] """ # Assuming dict_samples_chrom holds sample information sample_names = list(self.dict_samples_chrom.keys()) return sample_names
[docs] def display_sample_names(self): """ Display a list of sample names in a formatted table using rich. This function show a list of sample names visually appealing table format using the `rich` library. The table will contain only one column labeled 'SAMPLES' where each row represents a sample name. Returns: None: This function only prints the table to the console; it does not return any value. """ # Initialize the console from the rich library console = Console() # get sample name list sample_names = self.get_sample_names # Create a rich Table object with a title table = Table(title=f"List of Samples in {self.gfa_path.name}") # Add one column labeled 'SAMPLES' with cyan text style table.add_column("SAMPLES", style="cyan", no_wrap=True) # Add each sample name to the table as a row for sample in sample_names: table.add_row(sample) # Print the table to the console console.print(table)
@property def get_chromosome_data(self): """ Load the chromosome data from a CSV file and return a DataFrame. Returns: pandas.DataFrame: DataFrame containing the chromosome data. """ # Read the CSV file (assumed structure: SAMPLES, CHROMOSOMES_LIST, START, END) df = pd.read_csv(self.samples_chrom_path, sep="\t", header=None, names=["SAMPLES", "CHROMOSOMES_LIST", "START", "END"]) # Drop any duplicates if necessary df = df.drop_duplicates(subset=['SAMPLES', 'CHROMOSOMES_LIST', 'START', 'END']) return df @property def get_chromosomes_by_sample(self): """ Group the chromosomes by SAMPLES and return a grouped DataFrame. Returns: pandas.DataFrame: A DataFrame with chromosomes grouped by SAMPLES. """ df = self.get_chromosome_data # Group chromosomes by SAMPLES and aggregate them into lists grouped = df.groupby('SAMPLES')['CHROMOSOMES_LIST'].apply(lambda x: ', '.join(set(x))).reset_index() return grouped
[docs] def display_chromosome_names(self): """Display the SAMPLES and CHROMOSOMES_LIST using rich.""" grouped_df = self.get_chromosomes_by_sample console = Console() # Create the table object with two columns: Samples and Chromosomes table = Table(title=f"Samples and Chromosomes in {self.gfa_path.name}", show_lines=True) # Add columns to the table table.add_column("SAMPLES", style="cyan", no_wrap=True) table.add_column("CHROMOSOMES_LIST", style="magenta") # Iterate over the grouped DataFrame and add rows to the table for _, row in grouped_df.iterrows(): table.add_row( str(row['SAMPLES']), str(row['CHROMOSOMES_LIST']) ) # Print the table using rich's console console.print(table)
[docs] def display_full_chromosome_data(self): """Display the full chromosome data (SAMPLES, CHROMOSOMES_LIST, START, END) using rich.""" df = self.get_chromosome_data console = Console() # Create the table object with appropriate columns table = Table(title=f"Samples and Full Chromosomes in {self.gfa_path.name}", show_lines=True) # Add columns to the table table.add_column("SAMPLES", style="cyan", no_wrap=True) table.add_column("CHROMOSOMES (can be fragmented)", style="magenta") table.add_column("START", justify="right", style="green") table.add_column("END", justify="right", style="green") # Group by 'SAMPLES' and aggregate chromosome, start, and end info grouped = df.groupby('SAMPLES').agg({ 'CHROMOSOMES_LIST': lambda x: '\n'.join(x), # Concatenate chromosomes 'START': lambda x: '\n'.join(map(str, x)), # Concatenate start positions 'END': lambda x: '\n'.join(map(str, x)) # Concatenate end positions }).reset_index() # Iterate over the grouped rows and add them to the table for _, row in grouped.iterrows(): table.add_row( str(row['SAMPLES']), str(row['CHROMOSOMES_LIST']), str(row['START']), str(row['END']) ) # Print the table using rich's console console.print(table)
[docs] def extract_sub_graph(self, sample_name, chromosome, start=0, stop=None, samples_list_path=None, build_fasta=False): """ Extract a subgraph from the GFA file. Args: sample_name (str): The name of the sample. chromosome (str): The chromosome identifier. start (int): The starting position. default=0 stop (int): The stopping position. samples_list_path (Path, optional): File with list of samples to include in the subgraph. """ self.sample_name = sample_name self.chromosome = chromosome self.start = start self.stop = stop self.build_fasta = build_fasta # use Bed class need check if chromosome is on sample bed_file = self.bed_path / f"{sample_name}.bed" # test if sample in dict_samples_chrom self._check_sample_in_GFA(sample=sample_name) # test if chromosome in dict_samples_chrom self._check_chrom_in_GFA(sample=sample_name, chrom=chromosome) # test samples_list_path samples_list = self._check_samples_file(samples_list_path=samples_list_path) # chech if stop is not greater than chromosome size chrom_size = self.get_chromosome_size(sample=sample_name, chrom=chromosome) if not self.stop: self.stop = chrom_size self.logger.info(f"Chromosome {chromosome} size: {self.stop}") elif self.stop > chrom_size: self.stop = chrom_size self.logger.warning(f"Chromosome stop {self.stop} is greater than {chromosome} size: {chrom_size}") self.suffix = f"{self.sample_name}-{self.chromosome}-{self.start}-{self.stop}" self.dict_gfa_graph_object = {} self.logger.info(f"Extracting subgraph for sample {self.suffix}") self.sub_graph_ref = SubGraph( logger=self.logger, bam_path=self.bam_segments_file, bed_path=bed_file, chromosome=chromosome, start=self.start, stop=self.stop, ) self.dict_gfa_graph_object[sample_name] = self.sub_graph_ref self.sub_graph_ref.compute_intersection() self.sub_graph_ref.build_walks() self.sub_graph_ref.build_segments() if self.build_fasta: self.sub_graph_ref.build_fasta() self.logger.info(f"Finished processing sample {sample_name} extracting region on other samples") if self.threads >= 1: self.parallelize_samples(samples_list) else: with tqdm(total=len(samples_list), desc="Processing samples", unit="samples", file=sys.stdout) as pbar: for sample in samples_list: self.process_sample(sample) pbar.update(1) pbar.set_postfix(sample=sample)
[docs] def process_sample(self, sample, shared_progress, task_id): """Traite un échantillon en plusieurs étapes avec mise à jour de la progression via un dictionnaire partagé.""" # self.logger.info(f"Extracting subgraph for sample {sample}") shared_progress[task_id] = {"progress": 1, "total": 1, "description": f"[{sample}]: Start of extracting subgraph"} # Étape 1 : Extraction des positions chromosomiques bed_file = self.bed_path / f"{sample}.bed" sub_graph = SubGraph( logger=self.logger, bam_path=self.bam_segments_file, bed_path=bed_file, sample_name=sample, segment_id_set=self.sub_graph_ref.segment_id_set, dict_segments_samples=self.sub_graph_ref.dict_segments_samples, ) # Étape 1 : Extraction des positions chromosomiques sub_graph.get_chr_pos(progress=shared_progress, task_id=task_id) # Étape 2 : Construction du fichier FASTA si nécessaire if self.build_fasta: self.logger.debug(f"Building FASTA for sample {sample}") # Construction du fichier FASTA sub_graph.build_fasta() # Stocker l'objet SubGraph finalisé self.dict_gfa_graph_object[sample] = sub_graph self.logger.debug(f"Completed processing for sample {sample}") # return sub_graph to store value on self.dict_gfa_graph_object return sample, sub_graph
[docs] def parallelize_samples(self, samples_list): # Configuration du nombre de workers à utiliser n_workers = self.threads with Progress( TextColumn("[progress.description]{task.description}"), # Colonne pour afficher la description de la tâche BarColumn(), # Barre de progression TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), # Affichage du pourcentage TextColumn("[progress.completed]{task.completed}/{task.total}"), # Affichage du nombre d'étapes complétées refresh_per_second=4, # Mises à jour moins fréquentes console=get_console(), ) as shared_progress: futures = [] # Liste pour suivre les futures tâches with multiprocessing.Manager() as manager: # Dictionnaire partagé pour garder trace de la progression des tâches _progress = manager.dict() # Tâche globale pour suivre le progrès de tous les jobs overall_progress_task = shared_progress.add_task("[green]GLOBAL: Samples progression:", completed=0, total=len(samples_list),visible=True) with ProcessPoolExecutor(max_workers=n_workers) as executor: # Initialiser les tâches individuelles pour chaque sample for sample in samples_list: # Ajoute une tâche au gestionnaire de progression, initialement invisible task_id = shared_progress.add_task(f"[cyan]{sample}: Create processing", completed=0, total=5, visible=True) futures.append(executor.submit(self.process_sample, sample, _progress, task_id)) # Boucle pour surveiller la progression des tâches while (n_finished := sum(future.done() for future in futures)) < len(futures): # Mise à jour du progrès global shared_progress.update(overall_progress_task, completed=n_finished, total=len(futures)) for task_id, update_data in _progress.items(): latest = update_data["progress"] total = update_data["total"] description = update_data["description"] # Mise à jour de la progression pour chaque tâche individuelle shared_progress.update( task_id, completed=latest, total=total, description=description, visible=latest <= total, # Rend visible uniquement si la tâche n'est pas terminée ) shared_progress.update(overall_progress_task, completed=len(futures), total=len(futures)) # # Vérifie et lève les erreurs potentielles for future in futures: # recupere les objets sub_graph de chaque tâche pour les ajouter a self.dict_gfa_graph_object sample, sub_graph = future.result() self.dict_gfa_graph_object.update({sample: sub_graph})
[docs] def concat_generate_sub_graph(self): """ """ # add header self.logger.info(f"Concatenate subgraphs") header_list = self.header_gfa_file.open("r").read().splitlines() gfa_segment_list = set() gfa_link_list = set() gfa_walk_list = set() self.logger.info(f"Concatenate subgraphs for {len(self.dict_gfa_graph_object)} samples") for sample, sub_graph in self.dict_gfa_graph_object.items(): gfa_segment_list.update(sub_graph.gfa_segment_list) gfa_link_list.update(sub_graph.gfa_link_list) self.logger.info(f"Sample {sample} have {len(sub_graph.gfa_walk_list)} walks") gfa_walk_list.update(sub_graph.gfa_walk_list) # TODO: allow user suffix? # TODO: gzip file? sub_gfa_path = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{self.suffix}.gfa" if self.gzip_gfa: sub_gfa_path = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{self.suffix}.gfa.gz" open_fn = gzip.open if self.gzip_gfa else open with open_fn(sub_gfa_path, "wt") as f: for head in header_list: f.write(f"{head}\n") # add subgraph info to header? f.write(f"H\tVN:Z:1.1\tRS: Z:{self.suffix}\n") for segment in sorted(list(gfa_segment_list), key=human_sort_key): f.write(f"{segment}\n") for link in sorted(list(gfa_link_list), key=human_sort_key): f.write(f"{link}\n") for walk in sorted(list(gfa_walk_list), key=human_sort_key): f.write(f"{walk}\n") self.logger.info(f"Generate GFA file {sub_gfa_path.name}")
[docs] def generate_fasta(self): fasta_out = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{self.suffix}.fasta" sequences_list = list() for sample, sub_graph in self.dict_gfa_graph_object.items(): sequences_list.append(sub_graph.sequences_list) with open(Path(fasta_out).resolve().as_posix(), "w") as output_handle: SeqIO.write(flatten(sequences_list), output_handle, "fasta") self.logger.info(f"Generate fasta file {fasta_out.name}")
[docs] def segments_info(self, shared_min=None, specific_max=None, filter_len=None): bam = GratoolsBam(bam_path=self.bam_segments_file, threads=self.threads) bam.segments_info(output_base_path=self.gfa_path, nb_samples_gfa=len(self.dict_samples_chrom), shared_min=shared_min, specific_max=specific_max, filter_len=filter_len)
[docs] def specific_and_shared_segments(self, sample_list_a_path=None, sample_list_b_path=None, filter_len=None): bam = GratoolsBam(bam_path=self.bam_segments_file, threads=self.threads) sample_list_A = self._check_samples_file(sample_list_a_path) sample_list_B = None if sample_list_b_path: sample_list_B = self._check_samples_file(sample_list_b_path) bam.specific_and_shared_segments( samples_list_A=sample_list_A, samples_list_B=sample_list_B, filter_len=filter_len)
if __name__ == "__main__": gfa_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/Og_cactus.gfa.gz") # gfa_path_input = Path("/home/christine/Documents/Dev/git/pangenomics/SuperToolBox/Og_cactus.gfa.gz") #gfa_path_input = Path("/shared/home/sravel/annotation_fijiensism2/2024/mashtree/pangenome/Pf_graphv0/Pf_graphv0.d2.gfa.gz") #print(gfa_path_input) gfa = Gratools(gfa_path=gfa_path_input) gfa.show_samples_names gfa.show_chroms_names #bam_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/pangraphwizard_files/bam_files/segments.bam") #print(bam_path_input) #bam = Bam(bam_path=bam_path_input)