Source code for gratools.Gratools
import logging
import subprocess
import sys
from collections import defaultdict, OrderedDict
from pathlib import Path
from rich import get_console
from tqdm import tqdm
import click
from dataclasses import dataclass, field
import gzip
from .Graph import GFA, SubGraph, human_sort_key, GratoolsBam
import pandas as pd
from Bio import SeqIO
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
from rich.console import Console
from rich.table import Table
from shutil import rmtree
pd.set_option('display.max_rows', None) # Display all rows
pd.set_option('display.max_columns', None) # Display all columns
[docs]
@dataclass
class Gratools:
"""
A class to parse and process GFA files, including handling BAM files and extracting segments and links.
Attributes:
gfa_path (Path): The path to the GFA file.
gfa_name (str): The name of the GFA file.
bam_segments_file (Path): The path to the BAM segments file.
dict_samples_chrom (defaultdict): A dictionary mapping sample names to chromosomes.
works_path (Path): The path to the working directory.
bed_path (Path): The path to the BED files directory.
bam_path (Path): The path to the BAM files directory.
samples_chrom_path (Path): The path to the file containing sample chromosome data.
"""
gfa_path: Path
threads: int = 1
logger: logging.Logger = field(default_factory=lambda: logging.getLogger(__name__))
gfa_name: str = None
bam_segments_file: Path = None
dict_samples_chrom: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=False)
works_path: Path = None
bed_path: Path = None
bam_path: Path = None
samples_chrom_path: Path = None
dict_gfa_graph_object: dict = field(default_factory=dict, repr=False)
sample_name: str = None
chromosome: str = None
start: int = 0
stop: int = None
suffix: str = None
build_fasta: bool = False
gzip_gfa: bool = False
def __post_init__(self):
"""
Initialize paths and directories, then parse the GFA file and tag the BAM file.
"""
suffixes = ''.join(self.gfa_path.suffixes)
if ".gz" in suffixes:
self.gfa_name = self.gfa_path.name.replace(''.join(self.gfa_path.suffixes[-2:]),'')
self.gzip_gfa = True
elif ".gfa" in suffixes:
self.gfa_name = self.gfa_path.stem
self.works_path = self.gfa_path.parent / f"{self.gfa_name}_GraTools"
self.bed_path = self.works_path / "bed_files"
self.bam_path = self.works_path / "bam_files"
self.bam_segments_file = self.bam_path / f'{self.gfa_name}.bam'
self.header_gfa_file = self.works_path / f'header_{self.gfa_name}.txt'
self.samples_chrom_path = self.works_path / "samples_chrom.txt"
self._check_bedtools_avail()
self._check_gratools_files()
self._read_samples_chrom()
self._validate_bed_files()
def _check_bedtools_avail(self):
"""
Check if BEDtools is installed.
"""
# test if bedtools tools command line is avail on shell command
process = subprocess.run("bedtools", shell=True, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if int(process.returncode) >= 1:
raise NotImplementedError("BEDtools is not installed or avail on PATH. Please install/load BEDtools and try again.")
def _check_gratools_files(self):
"""
Check if Gratools files exist, if not create them.
"""
if not self.works_path.exists():
self.logger.debug(f"First run of Gratools on {self.gfa_path.name}.\nSo extracting GFA information to {self.works_path}")
try:
gfa = GFA(gfa_path=self.gfa_path, logger=self.logger)
except Exception as e:
rmtree(self.gfa_path.parent / f"{self.gfa_name}_GraTools")
self.logger.error(f"Error: {e}", exc_info=True)
else:
self.logger.debug(f"Check if all Gratools_files {self.works_path} exist")
# check if bam_segments_file file not exist, is file and is empty
if not self.bam_segments_file.exists() or self.bam_segments_file.stat().st_size == 0:
raise FileNotFoundError(f"bam file {self.bam_segments_file.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again")
elif not self.header_gfa_file.exists() or self.header_gfa_file.stat().st_size == 0:
raise FileNotFoundError(f"header file {self.header_gfa_file.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again")
elif not self.samples_chrom_path.exists() or self.samples_chrom_path.stat().st_size == 0:
raise FileNotFoundError(f"samples_chrom file {self.samples_chrom_path.name} does not exist or is empty, please remove Gratools_files: {self.works_path} and try again")
elif not self.bed_path.exists():
raise FileNotFoundError(f"bed directory {self.bed_path} does not exist, please remove Gratools_files: {self.works_path} and try again")
def _read_samples_chrom(self):
"""Read samples and their corresponding chromosomes from the samples_chrom.txt file."""
with open(self.samples_chrom_path, "r") as file:
for line in file:
sample, chrom, fragment1, fragment2 = line.strip().split("\t")
if sample not in self.dict_samples_chrom:
self.dict_samples_chrom[sample] = {chrom: [(fragment1, fragment2)]}
elif chrom not in self.dict_samples_chrom[sample]:
self.dict_samples_chrom[sample][chrom] = [(fragment1, fragment2)]
else:
self.dict_samples_chrom[sample][chrom].append((fragment1, fragment2))
def _validate_bed_files(self):
"""Validate the presence of BED files for each sample."""
self.logger.debug(f"Check if bed files {self.bed_path} exist")
for sample in self.dict_samples_chrom:
bed_file = self.bed_path / f"{sample}.bed"
if not bed_file.exists() or bed_file.stat().st_size == 0:
raise FileNotFoundError(f"BED file {bed_file} does not exist or is empty. Please check the BED files.")
def _check_sample_in_GFA(self, sample):
"""Check if the sample is in the GFA file."""
if sample not in self.dict_samples_chrom:
raise ValueError(f"Sample {sample} not found in {self.gfa_path.name}. Avail:{list(self.dict_samples_chrom.keys())}")
def _check_samples_file(self, samples_list_path=None):
"""Read and check if sample on file_path exists on GFA file if file_path is not None. and return list of samples
Else return list of samples in GFA file."""
if not samples_list_path:
samples_list = list(self.dict_samples_chrom.keys())
samples_list.remove(self.sample_name)
else:
samples_list = samples_list_path.open("r").read().splitlines()
for smp in samples_list:
self._check_sample_in_GFA(sample=smp)
return samples_list
def _check_chrom_in_GFA(self, sample, chrom):
""""""
if chrom not in self.dict_samples_chrom[sample]:
raise ValueError(
f"Chromosome {chrom} not found in sample {sample}.Avail: {list(self.dict_samples_chrom[sample].keys())}")
[docs]
def get_chromosome_size(self, sample, chrom):
df = pd.read_csv(self.samples_chrom_path, sep="\t", header=None,
names=["SAMPLES", "CHROMOSOMES_LIST", "FRG_START", "FRG_STOP"])
df.query(f"SAMPLES==@sample & CHROMOSOMES_LIST==@chrom", inplace=True)
return max(df["FRG_STOP"])
@property
def get_sample_names(self):
"""
Retrieve the unique sample names from the dataset.
This function extracts the unique sample names from the
`dict_samples_chrom` attribute, which is assumed to be a dictionary
where keys represent the sample names, and values contain chromosome data.
Returns:
list: A list of unique sample names extracted from the dataset.
Example:
>>> sample_names = self.get_sample_names()
>>> print(sample_names)
['CG14', 'Og103', 'Og182', 'Og20', 'Tog5681']
"""
# Assuming dict_samples_chrom holds sample information
sample_names = list(self.dict_samples_chrom.keys())
return sample_names
[docs]
def display_sample_names(self):
"""
Display a list of sample names in a formatted table using rich.
This function show a list of sample names visually appealing table format using the `rich` library.
The table will contain only one column labeled 'SAMPLES' where each row represents a sample name.
Returns:
None: This function only prints the table to the console; it does not return any value.
"""
# Initialize the console from the rich library
console = Console()
# get sample name list
sample_names = self.get_sample_names
# Create a rich Table object with a title
table = Table(title=f"List of Samples in {self.gfa_path.name}")
# Add one column labeled 'SAMPLES' with cyan text style
table.add_column("SAMPLES", style="cyan", no_wrap=True)
# Add each sample name to the table as a row
for sample in sample_names:
table.add_row(sample)
# Print the table to the console
console.print(table)
@property
def get_chromosome_data(self):
"""
Load the chromosome data from a CSV file and return a DataFrame.
Returns:
pandas.DataFrame: DataFrame containing the chromosome data.
"""
# Read the CSV file (assumed structure: SAMPLES, CHROMOSOMES_LIST, START, END)
df = pd.read_csv(self.samples_chrom_path, sep="\t", header=None,
names=["SAMPLES", "CHROMOSOMES_LIST", "START", "END"])
# Drop any duplicates if necessary
df = df.drop_duplicates(subset=['SAMPLES', 'CHROMOSOMES_LIST', 'START', 'END'])
return df
@property
def get_chromosomes_by_sample(self):
"""
Group the chromosomes by SAMPLES and return a grouped DataFrame.
Returns:
pandas.DataFrame: A DataFrame with chromosomes grouped by SAMPLES.
"""
df = self.get_chromosome_data
# Group chromosomes by SAMPLES and aggregate them into lists
grouped = df.groupby('SAMPLES')['CHROMOSOMES_LIST'].apply(lambda x: ', '.join(set(x))).reset_index()
return grouped
[docs]
def display_chromosome_names(self):
"""Display the SAMPLES and CHROMOSOMES_LIST using rich."""
grouped_df = self.get_chromosomes_by_sample
console = Console()
# Create the table object with two columns: Samples and Chromosomes
table = Table(title=f"Samples and Chromosomes in {self.gfa_path.name}", show_lines=True)
# Add columns to the table
table.add_column("SAMPLES", style="cyan", no_wrap=True)
table.add_column("CHROMOSOMES_LIST", style="magenta")
# Iterate over the grouped DataFrame and add rows to the table
for _, row in grouped_df.iterrows():
table.add_row(
str(row['SAMPLES']),
str(row['CHROMOSOMES_LIST'])
)
# Print the table using rich's console
console.print(table)
[docs]
def display_full_chromosome_data(self):
"""Display the full chromosome data (SAMPLES, CHROMOSOMES_LIST, START, END) using rich."""
df = self.get_chromosome_data
console = Console()
# Create the table object with appropriate columns
table = Table(title=f"Samples and Full Chromosomes in {self.gfa_path.name}", show_lines=True)
# Add columns to the table
table.add_column("SAMPLES", style="cyan", no_wrap=True)
table.add_column("CHROMOSOMES (can be fragmented)", style="magenta")
table.add_column("START", justify="right", style="green")
table.add_column("END", justify="right", style="green")
# Group by 'SAMPLES' and aggregate chromosome, start, and end info
grouped = df.groupby('SAMPLES').agg({
'CHROMOSOMES_LIST': lambda x: '\n'.join(x), # Concatenate chromosomes
'START': lambda x: '\n'.join(map(str, x)), # Concatenate start positions
'END': lambda x: '\n'.join(map(str, x)) # Concatenate end positions
}).reset_index()
# Iterate over the grouped rows and add them to the table
for _, row in grouped.iterrows():
table.add_row(
str(row['SAMPLES']),
str(row['CHROMOSOMES_LIST']),
str(row['START']),
str(row['END'])
)
# Print the table using rich's console
console.print(table)
[docs]
def extract_sub_graph(self, sample_name, chromosome, start=0, stop=None, samples_list_path=None, build_fasta=False):
"""
Extract a subgraph from the GFA file.
Args:
sample_name (str): The name of the sample.
chromosome (str): The chromosome identifier.
start (int): The starting position. default=0
stop (int): The stopping position.
samples_list_path (Path, optional): File with list of samples to include in the subgraph.
"""
self.sample_name = sample_name
self.chromosome = chromosome
self.start = start
self.stop = stop
self.build_fasta = build_fasta
# use Bed class need check if chromosome is on sample
bed_file = self.bed_path / f"{sample_name}.bed"
# test if sample in dict_samples_chrom
self._check_sample_in_GFA(sample=sample_name)
# test if chromosome in dict_samples_chrom
self._check_chrom_in_GFA(sample=sample_name, chrom=chromosome)
# test samples_list_path
samples_list = self._check_samples_file(samples_list_path=samples_list_path)
# chech if stop is not greater than chromosome size
chrom_size = self.get_chromosome_size(sample=sample_name, chrom=chromosome)
if not self.stop:
self.stop = chrom_size
self.logger.info(f"Chromosome {chromosome} size: {self.stop}")
elif self.stop > chrom_size:
self.stop = chrom_size
self.logger.warning(f"Chromosome stop {self.stop} is greater than {chromosome} size: {chrom_size}")
self.suffix = f"{self.sample_name}-{self.chromosome}-{self.start}-{self.stop}"
self.dict_gfa_graph_object = {}
self.logger.info(f"Extracting subgraph for sample {self.suffix}")
self.sub_graph_ref = SubGraph(
logger=self.logger,
bam_path=self.bam_segments_file,
bed_path=bed_file,
chromosome=chromosome,
start=self.start,
stop=self.stop,
)
self.dict_gfa_graph_object[sample_name] = self.sub_graph_ref
self.sub_graph_ref.compute_intersection()
self.sub_graph_ref.build_walks()
self.sub_graph_ref.build_segments()
if self.build_fasta:
self.sub_graph_ref.build_fasta()
self.logger.info(f"Finished processing sample {sample_name} extracting region on other samples")
if self.threads >= 1:
self.parallelize_samples(samples_list)
else:
with tqdm(total=len(samples_list), desc="Processing samples", unit="samples", file=sys.stdout) as pbar:
for sample in samples_list:
self.process_sample(sample)
pbar.update(1)
pbar.set_postfix(sample=sample)
[docs]
def process_sample(self, sample, shared_progress, task_id):
"""Traite un échantillon en plusieurs étapes avec mise à jour de la progression via un dictionnaire partagé."""
# self.logger.info(f"Extracting subgraph for sample {sample}")
shared_progress[task_id] = {"progress": 1,
"total": 1,
"description": f"[{sample}]: Start of extracting subgraph"}
# Étape 1 : Extraction des positions chromosomiques
bed_file = self.bed_path / f"{sample}.bed"
sub_graph = SubGraph(
logger=self.logger,
bam_path=self.bam_segments_file,
bed_path=bed_file,
sample_name=sample,
segment_id_set=self.sub_graph_ref.segment_id_set,
dict_segments_samples=self.sub_graph_ref.dict_segments_samples,
)
# Étape 1 : Extraction des positions chromosomiques
sub_graph.get_chr_pos(progress=shared_progress, task_id=task_id)
# Étape 2 : Construction du fichier FASTA si nécessaire
if self.build_fasta:
self.logger.debug(f"Building FASTA for sample {sample}")
# Construction du fichier FASTA
sub_graph.build_fasta()
# Stocker l'objet SubGraph finalisé
self.dict_gfa_graph_object[sample] = sub_graph
self.logger.debug(f"Completed processing for sample {sample}")
# return sub_graph to store value on self.dict_gfa_graph_object
return sample, sub_graph
[docs]
def parallelize_samples(self, samples_list):
# Configuration du nombre de workers à utiliser
n_workers = self.threads
with Progress(
TextColumn("[progress.description]{task.description}"), # Colonne pour afficher la description de la tâche
BarColumn(), # Barre de progression
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), # Affichage du pourcentage
TextColumn("[progress.completed]{task.completed}/{task.total}"), # Affichage du nombre d'étapes complétées
refresh_per_second=4, # Mises à jour moins fréquentes
console=get_console(),
) as shared_progress:
futures = [] # Liste pour suivre les futures tâches
with multiprocessing.Manager() as manager:
# Dictionnaire partagé pour garder trace de la progression des tâches
_progress = manager.dict()
# Tâche globale pour suivre le progrès de tous les jobs
overall_progress_task = shared_progress.add_task("[green]GLOBAL: Samples progression:", completed=0, total=len(samples_list),visible=True)
with ProcessPoolExecutor(max_workers=n_workers) as executor:
# Initialiser les tâches individuelles pour chaque sample
for sample in samples_list:
# Ajoute une tâche au gestionnaire de progression, initialement invisible
task_id = shared_progress.add_task(f"[cyan]{sample}: Create processing", completed=0, total=5, visible=True)
futures.append(executor.submit(self.process_sample, sample, _progress, task_id))
# Boucle pour surveiller la progression des tâches
while (n_finished := sum(future.done() for future in futures)) < len(futures):
# Mise à jour du progrès global
shared_progress.update(overall_progress_task, completed=n_finished, total=len(futures))
for task_id, update_data in _progress.items():
latest = update_data["progress"]
total = update_data["total"]
description = update_data["description"]
# Mise à jour de la progression pour chaque tâche individuelle
shared_progress.update(
task_id,
completed=latest,
total=total,
description=description,
visible=latest <= total, # Rend visible uniquement si la tâche n'est pas terminée
)
shared_progress.update(overall_progress_task, completed=len(futures), total=len(futures))
# # Vérifie et lève les erreurs potentielles
for future in futures:
# recupere les objets sub_graph de chaque tâche pour les ajouter a self.dict_gfa_graph_object
sample, sub_graph = future.result()
self.dict_gfa_graph_object.update({sample: sub_graph})
[docs]
def concat_generate_sub_graph(self,suffix):
"""
"""
# add header
self.logger.info(f"Concatenate subgraphs")
header_list = self.header_gfa_file.open("r").read().splitlines()
gfa_segment_list = set()
gfa_link_list = set()
gfa_walk_list = set()
self.logger.info(f"Concatenate subgraphs for {len(self.dict_gfa_graph_object)} samples")
for sample, sub_graph in self.dict_gfa_graph_object.items():
gfa_segment_list.update(sub_graph.gfa_segment_list)
gfa_link_list.update(sub_graph.gfa_link_list)
self.logger.info(f"Sample {sample} have {len(sub_graph.gfa_walk_list)} walks")
gfa_walk_list.update(sub_graph.gfa_walk_list)
# TODO: gzip file?
if suffix == '': #if the option suffix is default or empty use the self.suffix
used_suffix = self.suffix
else:
used_suffix = suffix
sub_gfa_path = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{used_suffix}.gfa"
if self.gzip_gfa:
sub_gfa_path = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{used_suffix}.gfa.gz"
open_fn = gzip.open if self.gzip_gfa else open
with open_fn(sub_gfa_path, "wt") as f:
for head in header_list:
f.write(f"{head}\n")
# add subgraph info to header?
f.write(f"H\tVN:Z:1.1\tRS: Z:{self.suffix}\n")
for segment in sorted(list(gfa_segment_list), key=human_sort_key):
f.write(f"{segment}\n")
for link in sorted(list(gfa_link_list), key=human_sort_key):
f.write(f"{link}\n")
for walk in sorted(list(gfa_walk_list), key=human_sort_key):
f.write(f"{walk}\n")
self.logger.info(f"Generate GFA file {sub_gfa_path.name}")
[docs]
def generate_fasta(self,suffix):
if suffix == '': #if the option suffix is default or empty use the self.suffix
used_suffix = self.suffix
else:
used_suffix = suffix
fasta_out = self.gfa_path.parent / f"{self.gfa_name}_subgraph_{used_suffix}.fasta"
sequences_list = list()
for sample, sub_graph in self.dict_gfa_graph_object.items():
sequences_list.append(sub_graph.sequences_list)
with open(Path(fasta_out).resolve().as_posix(), "w") as output_handle:
SeqIO.write(flatten(sequences_list), output_handle, "fasta")
self.logger.info(f"Generate fasta file {fasta_out.name}")
[docs]
def segments_info(self, shared_min=None, specific_max=None, filter_len=None):
bam = GratoolsBam(bam_path=self.bam_segments_file, threads=self.threads)
bam.segments_info(output_base_path=self.gfa_path, nb_samples_gfa=len(self.dict_samples_chrom), shared_min=shared_min, specific_max=specific_max, filter_len=filter_len)
if __name__ == "__main__":
gfa_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/Og_cactus.gfa.gz")
# gfa_path_input = Path("/home/christine/Documents/Dev/git/pangenomics/SuperToolBox/Og_cactus.gfa.gz")
#gfa_path_input = Path("/shared/home/sravel/annotation_fijiensism2/2024/mashtree/pangenome/Pf_graphv0/Pf_graphv0.d2.gfa.gz")
#print(gfa_path_input)
gfa = Gratools(gfa_path=gfa_path_input)
gfa.show_samples_names
gfa.show_chroms_names
#bam_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/pangraphwizard_files/bam_files/segments.bam")
#print(bam_path_input)
#bam = Bam(bam_path=bam_path_input)