Source code for gratools.Graph

import logging
import sys
from collections import defaultdict, OrderedDict, Counter
from pathlib import Path
from tqdm import tqdm
from dataclasses import dataclass, field
import pysam
import gzip
from rich import print, get_console
from pysam import AlignedSegment
import tempfile
import re
import pybedtools
import pandas as pd
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn


# Enable tqdm for pandas
tqdm.pandas()
# configure display precision of floats
pd.set_option('display.precision', 2)



[docs] def human_sort_key(s): """ Generate a sorting key for human-like sorting. """ return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)]
[docs] @dataclass class LinkInfo: ID_plus: list Orient_plus: list OrientKey_plus: list ID_minus: list Orient_minus: list OrientKey_minus: list
[docs] @dataclass class SubGraph: bam_path: Path bed_path: Path logger: logging.Logger = field(default_factory=lambda: logging.getLogger("SubGraph")) sample_name: str = None chromosome: str = None start: int = None stop: int = None intersect_bed: pybedtools.BedTool = None segment_id_set: set = field(default_factory=set, repr=False) segment_id_first: str = None segment_id_last: str = None gfa_walk_list: list = field(default_factory=list, repr=False) gfa_link_list: list = field(default_factory=list, repr=False) gfa_segment_list: list = field(default_factory=list, repr=False) dict_segments_samples: defaultdict = field(default_factory=lambda: defaultdict(list), repr=False) dict_segments_sequence: defaultdict = field(default_factory=lambda: defaultdict(), repr=False) sequences_list: list = field(default_factory=list, repr=False) progress = None task_id = None def __post_init__(self): """Initialize paths and directories.""" suffixes = ''.join(self.bed_path.suffixes) if ".bed.gz" in suffixes: self.sample_name = self.bed_path.name.replace(''.join(self.bed_path.suffixes[-2:]),'') elif ".bed" in suffixes: self.sample_name = self.bed_path.stem else: raise FileExistsError(f"The file {self.bed_path} is not a bed file")
[docs] def compute_intersection(self, list_chromosomes=None): """Compute the intersection of BED regions.""" # TODO: use list of chromosomes sample_bed = pybedtools.BedTool(self.bed_path) extract_bed = pybedtools.BedTool(f'{self.chromosome} {self.start} {self.stop}', from_string=True) self.intersect_bed = sample_bed.intersect(extract_bed, wa=True)
[docs] def build_walks(self): """Build the walks from the intersected BED regions.""" self.logger.debug(f"{self.sample_name}:Building subgraph GFA walks and links") walk_dict = defaultdict(lambda: defaultdict(lambda: {"seg_list": OrderedDict(), "start": float('inf'), "stop": float('-inf')})) self.segment_id_set = OrderedDict() if self.progress: self.progress[self.task_id] = { "progress": 4, "total": 5, "description": f"[cyan]{self.sample_name}: Building walks and links" } # for line in tqdm(self.intersect_bed, total=self.intersect_bed.count(), desc="Walks progress", unit="lines", disable=True, file=sys.stdout): for i, line in enumerate(self.intersect_bed): chromosome_name = line.chrom segment_start = line.start segment_stop = line.stop segment_id = line.fields[3] chromosome_fragment = line.fields[4] haplotype_index = line.fields[5] # self.segment_id_set.add(segment_id[1:]) self.segment_id_set[segment_id[1:]] = "" key = f"{chromosome_name}:{haplotype_index}" fragment_data = walk_dict[key][chromosome_fragment] fragment_data["seg_list"][segment_id] = "" fragment_data["start"] = min(fragment_data["start"], segment_start) fragment_data["stop"] = max(fragment_data["stop"], segment_stop) for chr_name_haplo, fragments in walk_dict.items(): chr_name, haplotype_index = chr_name_haplo.split(':') for fragment, data in fragments.items(): chromosome_start = data["start"] chromosome_stop = data["stop"] segments_order = data["seg_list"].keys() walk_list_fragment = [ "W", self.sample_name, haplotype_index, chr_name, f"{chromosome_start}", f"{chromosome_stop}" ] self._build_links(segments_order) seg_path = "".join(segments_order).replace("+", ">").replace("-", "<") walk_list_fragment.append(seg_path) txt_walk = "\t".join(walk_list_fragment) self.gfa_walk_list.append(txt_walk)
def _build_links(self, segments_order): """Build GFA link lines from ordered segments.""" segments_order = list(OrderedDict.fromkeys(segments_order)) for seg1, seg2 in zip(segments_order[:-1], segments_order[1:]): orient1 = seg1[0] orient2 = seg2[0] txt_link = f"L\t{seg1[1:]}\t{orient1}\t{seg2[1:]}\t{orient2}\t0M" self.gfa_link_list.append(txt_link)
[docs] def build_segments(self): if self.progress: self.progress[self.task_id] = { "progress": 4, "total": 5, "description": f"[cyan]{self.sample_name}: Building segments" } """Build the GFA segments from BAM file segments.""" self.logger.debug(f"{self.sample_name}: Building subgraph GFA segments") bam = GratoolsBam(bam_path=self.bam_path, logger=self.logger) self.gfa_segment_list, self.dict_segments_samples, self.dict_segments_sequence = bam.build_segments(list_segments=list(self.segment_id_set))
[docs] def get_chr_pos(self,progress, task_id): self.progress = progress self.task_id = task_id self.progress[self.task_id] = { "progress": 2, "total": 5, "description": f"[cyan]{self.sample_name}:search corresponding chr pos" } for segment in self.dict_segments_samples: if self.sample_name in self.dict_segments_samples[segment]: self.segment_id_first = segment break #click.secho(f"segment_id_first = {self.segment_id_first}", fg="red") if self.segment_id_first: #click.secho(f"Getting segment_id_last", fg="cyan") reversed_segments = list(reversed(self.dict_segments_samples)) for segment in reversed_segments: if self.sample_name in self.dict_segments_samples[segment]: self.segment_id_last = segment break #click.secho(f"segment_id_last = {self.segment_id_last}", fg="red") # check if segment_id_first and segment_id_last are not empty if not self.segment_id_first and not self.segment_id_last: self.progress[self.task_id] = { "progress": 1, "total": 1, "description": f"[yellow]{self.sample_name}: not found in sub-graph" } self.logger.warning(f"Sample {self.sample_name} not found in sub-graph") else: #print(self.bed_path) # ouvrir le bed avec pandas et chercher le segment_id_first et segment_id_last # Créer des listes pour stocker les résultats finaux df_first_list = [] df_last_list = [] # count number of lines nb_lines = sum(1 for line in open(self.bed_path)) self.progress[self.task_id] = { "progress": 3, "total": 5, "description": f"[cyan]{self.sample_name}: read bed to search chr pos of first and end segment" } # Lecture itérative du fichier CSV par morceaux df_iter = pd.read_csv(self.bed_path, sep="\t", header=None, names=["chromosome", "start", "stop", "segment_id", "fragment", "haplotype_index"], dtype={"chromosome": str, "start": int, "stop": int, "segment_id": str, "fragment": str, "haplotype_index": str}, engine='c', iterator=True, low_memory=False, memory_map=True, chunksize=1000000) # chunksize à ajuster selon la mémoire disponible for chunk in df_iter: # Supprimer les signes + et - au début du segment_id chunk["segment_id"] = chunk["segment_id"].str.lstrip('+-') # Filtrer les segments d'intérêt et les ajouter aux listes df_first_chunk = chunk[chunk["segment_id"] == self.segment_id_first] df_last_chunk = chunk[chunk["segment_id"] == self.segment_id_last] if not df_first_chunk.empty: df_first_list.append(df_first_chunk) if not df_last_chunk.empty: df_last_list.append(df_last_chunk) # Concaténer les résultats des morceaux pour obtenir les DataFrames finaux df_first = pd.concat(df_first_list, ignore_index=True) if df_first_list else pd.DataFrame() df_last = pd.concat(df_last_list, ignore_index=True) if df_last_list else pd.DataFrame() #print(df_first) # df_last = df[df["segment_id"] == self.segment_id_last] #print(df_last) if len(df_first) > 1 or len(df_last) > 1: self.logger.warning("Warning duplicated fragments found") print(df_first) print(df_last) chromosome_first = df_first["chromosome"].values[0] chromosome_last = df_last["chromosome"].values[0] if chromosome_first == chromosome_last: self.chromosome = chromosome_first # chromosome to chromosome if chromosome_first == chromosome_last: self.chromosome = chromosome_first self.start = min(df_first["start"].values[0], df_last["start"].values[0]) self.stop = max(df_first["stop"].values[0], df_last["stop"].values[0]) self.compute_intersection() self.build_walks() self.build_segments() self.progress[self.task_id] = { "progress": 5, "total": 5, "description": f"[cyan]{self.sample_name}: End processing" }
# chromosome to 2 chromosome #elif df_first["chromosome"].values[0] != df_last["chromosome"].values[0]:
[docs] def build_fasta(self): self.logger.info(f"{self.sample_name}: Build fasta") # use tqdm for progress bar total = len(self.gfa_walk_list) # Accumuler les séquences en utilisant une liste pour une meilleure performance records = [] for walk in tqdm(self.gfa_walk_list, total=total, desc="Walks progress", unit="lines", disable=True, file=sys.stdout): # Extraction des informations du walk _, sample_name, haplotype_index, chromosome_name, chromosome_start, chromosome_stop, segments_order = walk.split("\t") # TODO: calcul of start stop # Construction de l'identifiant de séquence seq_name = f"{self.sample_name}_{self.chromosome}_{self.start}_{self.stop}_{chromosome_start}-{chromosome_stop}" record = SeqRecord(Seq(""), id=seq_name, name=seq_name, description="") # Traitement des segments segments = segments_order.replace('>', ',+').replace('<', ',-').split(',')[1:] nb_segments = len(segments) seq_objs = [] # Accumulation des séquences for segment in tqdm(segments, total=nb_segments, desc="Segment to fasta progress", unit="lines", disable=True, file=sys.stdout): segment_id = segment.strip('+-') orient = segment[0] seq = Seq(self.dict_segments_sequence[segment_id]) if orient == "-": seq = seq.reverse_complement() seq_objs.append(seq) # Concaténation de toutes les séquences en une seule opération record.seq = sum(seq_objs, Seq("")) records.append(record) # Mise à jour de la liste des séquences self.sequences_list.extend(records)
[docs] @dataclass class GFA: """ A class to parse and process GFA files, including handling BAM files and extracting segments and links. Attributes: gfa_path (Path): The path to the GFA file. gfa_name (str): The name of the GFA file. version (float): The version of the GFA file. header_gfa (list): The header information from the GFA file. sample_reference (str): The reference sample from the GFA file. bam_segments_file (Path): The path to the BAM segments file. dict_link_SA_SB (defaultdict): A dictionary of links between segments. dict_samples_chrom (defaultdict): A dictionary of sample chromosomes. dict_segments_size (defaultdict): A dictionary of segment sizes. dict_segments_samples (defaultdict): A dictionary of segment samples. works_path (Path): The path to the working directory. bed_path (Path): The path to the BED files directory. bam_path (Path): The path to the BAM files directory. """ gfa_path: Path logger: logging.Logger = field(default_factory=lambda: logging.getLogger("GFA")) gfa_name: str = None version: float = None header_gfa: list = field(default_factory=list, repr=True) sample_reference: str = None bam_segments_file: Path = None dict_link_SA_SB: defaultdict = field(default_factory=lambda: defaultdict(), repr=False) dict_samples_chrom: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=True) dict_segments_size: defaultdict = field(default_factory=lambda: defaultdict(int), repr=False) dict_segments_samples: defaultdict = field(default_factory=lambda: defaultdict(list), repr=False) dict_samples_bed: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=True) works_path: Path = None bed_path: Path = None bam_path: Path = None def __post_init__(self): """Initialize paths and directories, then parse the GFA file and tag the BAM file.""" suffixes = ''.join(self.gfa_path.suffixes) if ".gz" in suffixes: self.gfa_name = self.gfa_path.name.replace(''.join(self.gfa_path.suffixes[-2:]),'') elif ".gfa" in suffixes: self.gfa_name = self.gfa_path.stem self.works_path = self.gfa_path.parent / f"{self.gfa_name}_GraTools" self.bed_path = self.works_path / "bed_files" self.bed_path.mkdir(exist_ok=True, parents=True) self.bam_path = self.works_path / "bam_files" self.bam_path.mkdir(exist_ok=True, parents=True) self.bam_segments_file = self.bam_path / f'{self.gfa_name}.bam' self.header_gfa_file = self.works_path / f'header_{self.gfa_name}.txt' # Initialisation spécifique pour dict_link_SA_SB avec des structures par défaut self.dict_link_SA_SB = defaultdict(lambda: LinkInfo([], [], [], [], [], [])) self.parse_gfa() self.tag_bam() self.save_samples_chrom() self.save_header() self.save_samples_bed()
[docs] def save_header(self): """Save the header information in a text file.""" self.logger.info(f"Saving header {self.header_gfa_file}") with open(self.header_gfa_file, "w") as f: # add name of GFA to check if same as bam and bed txt_header = "\n".join(self.header_gfa) f.write(txt_header)
[docs] def save_samples_chrom(self): """Save the samples and their corresponding chromosomes in a text file.""" self.logger.info(f"Saving samples and corresponding chromosomes") with open(self.works_path / "samples_chrom.txt", "w") as f: for sample in sorted(self.dict_samples_chrom.keys(), key=human_sort_key): dict_chroms = self.dict_samples_chrom[sample] for chrom in sorted(dict_chroms.keys(), key=human_sort_key): fragments = dict_chroms[chrom] for fragment in fragments: txt = f"{sample}\t{chrom}\t{fragment}\n" f.write(txt)
[docs] def save_samples_bed(self): """Sauvegarde et trie les segments des échantillons dans un fichier BED.""" self.logger.info(f"Sorting samples chromosomes, and segments in a BED file") for sample, bed_file in self.dict_samples_bed.items(): # Ferme le fichier avant de procéder avec le tri bed_file.close() # Trie le fichier BED en utilisant pybedtools sample_bed = pybedtools.BedTool(bed_file.name) # On donne le chemin du fichier sorted_bed = sample_bed.sort() # Trie le fichier # Sauvegarde le fichier trié (vous pouvez remplacer le fichier original ou en créer un nouveau) sorted_bed.saveas(bed_file.name) # Remplace le fichier original
[docs] def parse_gfa(self): """ Parse the GFA file to extract specific information. Reads the GFA file, processes each line based on its type (Header, Segment, Link, Walk), and performs specific actions for each line type. The function updates the header information, writes to a BAM file, builds a dictionary, and processes the walk line. """ self.logger.info(f"Creating Gratools files {self.works_path}") self.logger.info(f"Reading GFA {self.gfa_path.name}") header = {'HD': {'VN': f'1.0'}} if self.gfa_path.exists() and self.gfa_path.is_file(): open_fn = gzip.open if ".gz" in self.gfa_path.suffix else open total_lines = sum(1 for _ in open_fn(self.gfa_path, "rt")) with (open_fn(self.gfa_path, "rt") as gfa_file_in, pysam.AlignmentFile(self.bam_segments_file, "wb", header=header) as bam_file_out, Progress( TextColumn("[bold blue]{task.description}"), # Description de la tâche BarColumn(), # Barre de progression "[progress.percentage]{task.percentage:>3.1f}%", # Pourcentage complété "{task.completed}/{task.total} lines", # Lignes traitées TimeElapsedColumn(), # Temps écoulé TimeRemainingColumn(), # Temps restant estimé refresh_per_second=4, # Mises à jour moins fréquentes ) as progress ): # Création de la tâche de progression task_id = progress.add_task("GFA parsing progress...", total=total_lines, visible=True) for line in gfa_file_in: # Mise à jour de la barre de progression progress.advance(task_id) line = line.strip("\n") # Select Header line if line.startswith("H"): self.header_gfa.append(line) tab_line = line.split("\t") self.version = tab_line[1].split(":")[-1] self.sample_reference = tab_line[2].split(":")[-1] # Select Segment line elif line.startswith("S"): self._write_bam(line, bam_file_out) # Select Link line elif line.startswith("L"): self._parse_link_line(line) # Select Walk line elif line.startswith("W"): self._parse_walk_line(line)
def _parse_walk_line(self, line): """Parse a walk line from a GFA file to extract relevant information.""" _, sample_name, haplotype_index, chromosome_name, chromosome_start, chromosome_stop, segments_order = line.split("\t") # Ensure the sample and chromosome entries exist in the dictionary if sample_name not in self.dict_samples_chrom: self.dict_samples_chrom[sample_name] = {} if chromosome_name not in self.dict_samples_chrom[sample_name]: self.dict_samples_chrom[sample_name][chromosome_name] = [] # Add the chromosome range to the sample's chromosome dictionary chrom_range = f"{chromosome_start}\t{chromosome_stop}" self.dict_samples_chrom[sample_name][chromosome_name].append(chrom_range) segment_start_chr = int(chromosome_start) # Parse the segments order and process each segment for seg_id in segments_order.replace('>', ',+').replace('<', ',-').split(',')[1:]: is_positive_strand = '+' in seg_id seg_id_only = seg_id.strip('+-') # Ensure the segment entry exists in the segments dictionary if seg_id_only not in self.dict_segments_samples: self.dict_segments_samples[seg_id_only] = [] # Append the segment information to the relevant dictionaries # TODO: fix segment duplicated on same walk => dict of dict? sample_chromosome = f"{sample_name};{chromosome_name}" self.dict_segments_samples[seg_id_only].append(sample_chromosome) segment_size = self.dict_segments_size[seg_id_only] segment_stop_chr = segment_start_chr + segment_size line_bed = f"{chromosome_name}\t{segment_start_chr}\t{segment_stop_chr}\t{seg_id}\t{chromosome_start}:{chromosome_stop}\t{haplotype_index}" # Ouvre le fichier en mode append si ce n'est pas déjà fait if sample_name not in self.dict_samples_bed: self.dict_samples_bed[sample_name] = open(self.bed_path / f"{sample_name}.bed", "a") # Écrit la ligne dans le fichier correspondant au sample self.dict_samples_bed[sample_name].write(f"{line_bed}\n") segment_start_chr = segment_stop_chr def _parse_link_line(self, line): """Build a dictionary to store the links between segments in a GFA file.""" _, seg_id_1, orient_seg_1, seg_id_2, orient_seg_2, *_ = line.split("\t") # Remplacement de l'orientation par 1 pour "+" et -1 pour "-" orient_seg_1 = 1 if orient_seg_1 == "+" else -1 orient_seg_2 = 1 if orient_seg_2 == "+" else -1 # Accès aux listes dans la structure LinkInfo pour seg_id_1 link_info_1 = self.dict_link_SA_SB[seg_id_1] link_info_1.ID_plus.append(seg_id_2) link_info_1.Orient_plus.append(orient_seg_2) link_info_1.OrientKey_plus.append(orient_seg_1) # Accès aux listes dans la structure LinkInfo pour seg_id_2 link_info_2 = self.dict_link_SA_SB[seg_id_2] link_info_2.ID_minus.append(seg_id_1) link_info_2.Orient_minus.append(orient_seg_1) link_info_2.OrientKey_minus.append(orient_seg_2) def _write_bam(self, line, bam_file_out): """ Write a line from a GFA file to a BAM file. Args: line (str): A line from a GFA file containing segment information. bam_file_out (pysam.AlignmentFile): The BAM file to write the segment information to. Returns: None This function takes a line from a GFA file and writes it to a BAM file. It first splits the line into its components: `line_type`, `seg_id`, `sequence`, and `supp`. It then adds the segment size to a dictionary and creates an AlignedSegment object with the segment information. The AlignedSegment object is then written to the BAM file. The function assumes that the BAM file has already been opened and is passed as the `bam_file_out` parameter. Note: - The `line_type` is not used in this function. - The `sequence` is used as the query sequence for the AlignedSegment object. - The `supp` is used as the value for the "SU" tag in the AlignedSegment object. - The "SB" tag is commented out in the AlignedSegment object. - The "SA" tag is commented out in the AlignedSegment object. - The "SW" tag is commented out in the AlignedSegment object. """ _, seg_id, sequence, *supp = line.split("\t") # add segment size to dict self.dict_segments_size[seg_id] = len(sequence) # Create segment to write segment = AlignedSegment() segment.query_name = seg_id segment.flag = 4 segment.reference_name = "*" segment.reference_start = 0 segment.mapping_quality = 255 segment.cigarstring = "1M" segment.next_reference_name = "*" segment.next_reference_start = 0 segment.template_length = 0 segment.query_sequence = sequence segment.query_qualities = pysam.qualitystring_to_array("*" * self.dict_segments_size[seg_id]) segment.tags = (("SU", ','.join(supp)),) bam_file_out.write(segment)
[docs] def tag_bam(self): """ Tags the BAM file with the given dictionary of segment links and segment samples. This function takes the `self.bam_segments_file` and uses the `Bam` class to tag it with the `self.dict_link_SA_SB` and `self.dict_segments_samples` dictionaries. The resulting tagged BAM file is then assigned to `self.bam_segments_file`. Parameters: self (object): The instance of the class. Returns: None """ bam = GratoolsBam(bam_path=self.bam_segments_file, logger=self.logger) self.bam_segments_file = bam.tag(self.dict_link_SA_SB, self.dict_segments_samples)
[docs] @dataclass class GratoolsBam: """Class for handling BAM files.""" bam_path: Path threads: int = 1 logger: logging.Logger = field(default_factory=lambda: logging.getLogger("GratoolsBam")) def __post_init__(self): self.index_bam()
[docs] def index_bam(self): """ Indexes the BAM file specified by `self.bam_path`. Parameters: self (object): The instance of the class. Returns: None """ bam_index_path = self.bam_path.with_suffix('.bam.bai') if self.bam_path.exists() and not bam_index_path.exists(): self.logger.info(f"Indexing BAM file {self.bam_path.name}") pysam.index(self.bam_path.as_posix())
def _nb_segments_total(self) -> int: """ Count the number of segments in the BAM file. Parameters: self (object): The instance of the class. Returns: int: The number of segments in the BAM file. """ with pysam.AlignmentFile(self.bam_path.as_posix(), "rb", check_sq=False, threads=self.threads) as bam_file: total_segments = sum(1 for _ in bam_file) return total_segments
[docs] def build_segments(self, list_segments: list = None): """ Extract segments from a BAM file. Parameters: list_segments (list): A list of segment IDs to extract from the BAM file. Returns: tuple: A tuple containing the GFA segments list, segment samples dictionary, and segment sequence dictionary. """ #click.secho(f"Build segment line with BAM file {self.bam_path.name}", fg="green") gfa_segments_list = [] dict_segments_samples = defaultdict(list) dict_segments_sequence = defaultdict() with (tempfile.NamedTemporaryFile(mode='wb', dir=self.bam_path.parent.as_posix(), suffix='.bam') as bam_temp, tempfile.NamedTemporaryFile(mode='w', dir=self.bam_path.parent.as_posix(), suffix='.txt') as list_seg_file): # write segment list to temporary file list_seg_file list_seg_file.writelines(f"{seg_id}\n" for seg_id in list_segments) list_seg_file.flush() # run pysam view to extract list of segments write on temporary file list_seg_file bam = pysam.view('-hbN', list_seg_file.name, self.bam_path.as_posix()) bam_temp.write(bam) bam_temp.flush() # save to file # read bam as AlignmentFile total_segments = sum(1 for _ in pysam.AlignmentFile(bam_temp.name, "rb", check_sq=False, threads=self.threads)) with (pysam.AlignmentFile(bam_temp.name, "rb", check_sq=False, threads=self.threads) as bam_file_filter, ): #task_id = progress.add_task("Processing segments progress...", total=total_segments, visible=True) for segment in bam_file_filter.fetch(until_eof=True): #progress.advance(task_id, advance=1) txt_segment = f"S\t{segment.query_name}\t{segment.query_sequence}\t{segment.get_tag('SU')}" gfa_segments_list.append(txt_segment) # get samples on segment txt_samples = segment.get_tag('SW') dict_segments_samples[segment.query_name] = txt_samples.replace(',',';').split(";") # get sequence dict_segments_sequence[segment.query_name] = segment.query_sequence #click.secho("End of BAM processing", fg="green") return gfa_segments_list, dict_segments_samples, dict_segments_sequence
[docs] def tag(self, dict_link_SA_SB: dict, dict_segments_samples: dict): """ Add tags to the BAM file with the given dictionary data. Parameters: dict_link_SA_SB (dict): Dictionary containing link information for SA and SB. dict_segments_samples (dict): Dictionary containing segment samples. Returns: str: The path to the tagged segments BAM file. """ segment_tag_path = self.bam_path.parent / "segments_tag.bam" nb_segments = len(dict_segments_samples) self.logger.info(f"Tagging {nb_segments} segments in BAM file: {self.bam_path.name}") with (pysam.AlignmentFile(self.bam_path.as_posix(), "rb", check_sq=False, threads=self.threads) as bam_file_in, pysam.AlignmentFile(segment_tag_path.as_posix(), "wb", template=bam_file_in, threads=self.threads) as bam_file_out, ): #task_id = progress.add_task("BAM tagging progress...", total=nb_segments, visible=True) for segment in bam_file_in.fetch(until_eof=True): #progress.advance(task_id, advance=1) if segment not in dict_link_SA_SB: txt_before = txt_after = "" else: link_info = dict_link_SA_SB[segment.query_name] # Générer les chaînes pour segments après txt_after = ",".join((f"({C},{A},{B})" for A, B, C in zip( link_info.ID_plus, link_info.Orient_plus, link_info.OrientKey_plus ))) # Générer les chaînes pour segments avant txt_before = ",".join((f"({C},{A},{B})" for A, B, C in zip( link_info.ID_minus, link_info.Orient_minus, link_info.OrientKey_minus ))) # add SW tag txt_seg_samples = ",".join(dict_segments_samples[segment.query_name]) # add list of tuple with segment orientation, segment_before id, segment orientation of segment_before id segment.set_tag("SB", f"{txt_before}", replace=False) # segments before segment.set_tag("SA", f"{txt_after}", replace=False) # segments after segment.set_tag("SW", f"{txt_seg_samples}", replace=False) # samples walks bam_file_out.write(segment) segment_tag_path.rename(self.bam_path) self.index_bam() return segment_tag_path
[docs] def segments_info(self, output_base_path, nb_samples_gfa, shared_min=1, specific_max=None, filter_len=1): # Check if shared_min and specific_max are percentages if isinstance(shared_min, float): shared_min = round(shared_min * nb_samples_gfa) if isinstance(specific_max, float): specific_max = round(specific_max * nb_samples_gfa) # Log initial info self.logger.info(f"Number of samples in GFA: {nb_samples_gfa}, shared_min={shared_min}, specific_max={specific_max}, filter_len={filter_len}") # Initialize counters and dictionaries counter = Counter() dico_count_samples = defaultdict(int) dico_count_samples_filtered = defaultdict(int) # Read BAM file total_segments = self._nb_segments_total() with pysam.AlignmentFile(self.bam_path, "rb", check_sq=False, threads=self.threads) as bam_file: for segment in tqdm(bam_file.fetch(until_eof=True), total=total_segments, desc="Processing segments", disable=False, file=sys.stdout): seg_id = segment.query_name seg_length = len(segment.query_sequence) txt_samples = segment.get_tag('SW') samples_list = set(elm.split(";")[0] for elm in txt_samples.split(',')) nb_samples_seg = len(samples_list) dico_count_samples[nb_samples_seg] += 1 if nb_samples_seg >= shared_min: counter['shared'] += 1 if seg_length >= filter_len: counter['shared_filtered'] += 1 elif nb_samples_seg <= specific_max: counter['specific'] += 1 if seg_length >= filter_len: counter['specific_filtered'] += 1 if seg_length < filter_len: counter['filtered'] += 1 else: dico_count_samples_filtered[nb_samples_seg] += 1 # Calculate percentages filtered_segments = total_segments - counter['filtered'] percent_shared = (counter['shared'] / total_segments) * 100 percent_specific = (counter['specific'] / total_segments) * 100 percent_shared_filtered = (counter['shared_filtered'] / filtered_segments) * 100 percent_specific_filtered = (counter['specific_filtered'] / filtered_segments) * 100 percent_filtered = (counter['filtered'] / total_segments) * 100 # Create and save DataFrame for segment info data = { "Category": ["Shared", "Specific", "Filtered", "Shared Filtered", "Specific Filtered"], "Count Segments": [counter['shared'], counter['specific'], counter['filtered'], counter['shared_filtered'], counter['specific_filtered']], "Total Segments": [total_segments, total_segments, total_segments, filtered_segments, filtered_segments], "Percentage": [percent_shared, percent_specific, percent_filtered, percent_shared_filtered, percent_specific_filtered] } df_segments = pd.DataFrame(data) segments_info_file = output_base_path.parent.joinpath(f"{output_base_path.stem}_segments_info.csv") self.logger.info(f"Saving segments info to {segments_info_file}") df_segments.to_csv(segments_info_file, index=False) print(df_segments.to_string(index=False)) # Create and save DataFrame for sample count info df_samples = pd.DataFrame(list(dico_count_samples.items()), columns=['Sample_number', 'Segments']) df_samples['Percentage'] = (df_samples['Segments'] / df_samples['Segments'].sum()) * 100 df_samples_filtered = pd.DataFrame(list(dico_count_samples_filtered.items()), columns=['Sample_number', 'Filtered_Segments']) df_samples = df_samples.merge(df_samples_filtered, on='Sample_number', how='outer').fillna(0) df_samples['Filtered_Percentage'] = (df_samples['Filtered_Segments'] / df_samples[ 'Filtered_Segments'].sum()) * 100 df_samples_sorted = df_samples.sort_values(by='Sample_number').reset_index(drop=True) samples_info_file = output_base_path.parent.joinpath(f"{output_base_path.stem}_segments_info_samples.csv") self.logger.info(f"Saving segments samples info to {samples_info_file}") df_samples_sorted.to_csv(samples_info_file, index=False) print(df_samples_sorted.to_string(index=False))
[docs] def specific_and_shared_segments(self, samples_list_A=None, samples_list_B=None, filter_len=None): # Define sample lists samples_set_A = set(samples_list_A) if samples_list_B: samples_set_B = set(samples_list_B) # Counters for shared and specific segments counter = Counter() # add argument to logger information # Log initial info self.logger.info(f"Shared on samples: {samples_list_A}") self.logger.info(f"Missing on samples: {samples_list_B}") # Read the BAM file and count total segments total_segments = self._nb_segments_total() # Read BAM file again with multiple threads for processing with pysam.AlignmentFile(self.bam_path, "rb", check_sq=False, threads=self.threads) as bam_file: for segment in tqdm(bam_file.fetch(until_eof=True), total=total_segments, desc="Processing segments", disable=False, file=sys.stdout): # Get segment information seg_length = len(segment.query_sequence) if seg_length <= filter_len: continue # Skip segments shorter than filter length txt_samples = segment.get_tag('SW') samples_set_GFA_seg = set(elm.split(";")[0] for elm in txt_samples.split(',')) # Check conditions for shared and specific segments # All elements of samples_list_A are in samples_list_GFA_seg if samples_set_A.issubset(samples_set_GFA_seg): counter['counter_shared_A'] += 1 counter['counter_shared_A_length'] += seg_length # No elements of samples_list_B are in samples_list_GFA_seg if samples_list_B and samples_set_B.isdisjoint(samples_set_GFA_seg): counter['counter_specific'] += 1 counter['counter_specific_length'] += seg_length # Calculate percentages percent_shared_A = (counter['counter_shared_A'] / total_segments) * 100 percent_specific = (counter['counter_specific'] / total_segments) * 100 # Log results self.logger.info( f"Shared segments between {samples_list_A}: {counter['counter_shared_A']}/{total_segments} ({percent_shared_A:.2f}%)\tShared segments size: {counter['counter_shared_A_length']}") self.logger.info(f"Specific segments between {samples_list_A}:{counter['counter_specific']}/{total_segments} ({percent_specific:.2f}%)\tSpecific segments size: {counter['counter_specific_length']}")
if __name__ == "__main__": bam_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/Og_cactus_GraTools/bam_files/Og_cactus.bam") #print(bam_path_input) bam = GratoolsBam(bam_path=bam_path_input) bam.specific_and_shared_segments()