import logging
import sys
from collections import defaultdict, OrderedDict, Counter
from pathlib import Path
from tqdm import tqdm
from dataclasses import dataclass, field
import pysam
import gzip
from rich import print, get_console
from pysam import AlignedSegment
import tempfile
import re
import pybedtools
import pandas as pd
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from rich.progress import Progress, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn
# Enable tqdm for pandas
tqdm.pandas()
# configure display precision of floats
pd.set_option('display.precision', 2)
[docs]
def human_sort_key(s):
"""
Generate a sorting key for human-like sorting.
"""
return [int(text) if text.isdigit() else text.lower() for text in re.split('([0-9]+)', s)]
[docs]
@dataclass
class LinkInfo:
ID_plus: list
Orient_plus: list
OrientKey_plus: list
ID_minus: list
Orient_minus: list
OrientKey_minus: list
[docs]
@dataclass
class SubGraph:
bam_path: Path
bed_path: Path
logger: logging.Logger = field(default_factory=lambda: logging.getLogger("SubGraph"))
sample_name: str = None
chromosome: str = None
start: int = None
stop: int = None
intersect_bed: pybedtools.BedTool = None
segment_id_set: set = field(default_factory=set, repr=False)
segment_id_first: str = None
segment_id_last: str = None
gfa_walk_list: list = field(default_factory=list, repr=False)
gfa_link_list: list = field(default_factory=list, repr=False)
gfa_segment_list: list = field(default_factory=list, repr=False)
dict_segments_samples: defaultdict = field(default_factory=lambda: defaultdict(list), repr=False)
dict_segments_sequence: defaultdict = field(default_factory=lambda: defaultdict(), repr=False)
sequences_list: list = field(default_factory=list, repr=False)
progress = None
task_id = None
def __post_init__(self):
"""Initialize paths and directories."""
suffixes = ''.join(self.bed_path.suffixes)
if ".bed.gz" in suffixes:
self.sample_name = self.bed_path.name.replace(''.join(self.bed_path.suffixes[-2:]),'')
elif ".bed" in suffixes:
self.sample_name = self.bed_path.stem
else:
raise FileExistsError(f"The file {self.bed_path} is not a bed file")
[docs]
def compute_intersection(self, list_chromosomes=None):
"""Compute the intersection of BED regions."""
# TODO: use list of chromosomes
sample_bed = pybedtools.BedTool(self.bed_path)
extract_bed = pybedtools.BedTool(f'{self.chromosome} {self.start} {self.stop}', from_string=True)
self.intersect_bed = sample_bed.intersect(extract_bed, wa=True)
[docs]
def build_walks(self):
"""Build the walks from the intersected BED regions."""
self.logger.debug(f"{self.sample_name}:Building subgraph GFA walks and links")
walk_dict = defaultdict(lambda: defaultdict(lambda: {"seg_list": OrderedDict(), "start": float('inf'), "stop": float('-inf')}))
self.segment_id_set = OrderedDict()
if self.progress:
self.progress[self.task_id] = {
"progress": 4,
"total": 5,
"description": f"[cyan]{self.sample_name}: Building walks and links"
}
# for line in tqdm(self.intersect_bed, total=self.intersect_bed.count(), desc="Walks progress", unit="lines", disable=True, file=sys.stdout):
for i, line in enumerate(self.intersect_bed):
chromosome_name = line.chrom
segment_start = line.start
segment_stop = line.stop
segment_id = line.fields[3]
chromosome_fragment = line.fields[4]
haplotype_index = line.fields[5]
# self.segment_id_set.add(segment_id[1:])
self.segment_id_set[segment_id[1:]] = ""
key = f"{chromosome_name}:{haplotype_index}"
fragment_data = walk_dict[key][chromosome_fragment]
fragment_data["seg_list"][segment_id] = ""
fragment_data["start"] = min(fragment_data["start"], segment_start)
fragment_data["stop"] = max(fragment_data["stop"], segment_stop)
for chr_name_haplo, fragments in walk_dict.items():
chr_name, haplotype_index = chr_name_haplo.split(':')
for fragment, data in fragments.items():
chromosome_start = data["start"]
chromosome_stop = data["stop"]
segments_order = data["seg_list"].keys()
walk_list_fragment = [
"W", self.sample_name, haplotype_index, chr_name,
f"{chromosome_start}", f"{chromosome_stop}"
]
self._build_links(segments_order)
seg_path = "".join(segments_order).replace("+", ">").replace("-", "<")
walk_list_fragment.append(seg_path)
txt_walk = "\t".join(walk_list_fragment)
self.gfa_walk_list.append(txt_walk)
def _build_links(self, segments_order):
"""Build GFA link lines from ordered segments."""
segments_order = list(OrderedDict.fromkeys(segments_order))
for seg1, seg2 in zip(segments_order[:-1], segments_order[1:]):
orient1 = seg1[0]
orient2 = seg2[0]
txt_link = f"L\t{seg1[1:]}\t{orient1}\t{seg2[1:]}\t{orient2}\t0M"
self.gfa_link_list.append(txt_link)
[docs]
def build_segments(self):
if self.progress:
self.progress[self.task_id] = {
"progress": 4,
"total": 5,
"description": f"[cyan]{self.sample_name}: Building segments"
}
"""Build the GFA segments from BAM file segments."""
self.logger.debug(f"{self.sample_name}: Building subgraph GFA segments")
bam = GratoolsBam(bam_path=self.bam_path, logger=self.logger)
self.gfa_segment_list, self.dict_segments_samples, self.dict_segments_sequence = bam.build_segments(list_segments=list(self.segment_id_set))
[docs]
def get_chr_pos(self,progress, task_id):
self.progress = progress
self.task_id = task_id
self.progress[self.task_id] = {
"progress": 2,
"total": 5,
"description": f"[cyan]{self.sample_name}:search corresponding chr pos"
}
for segment in self.dict_segments_samples:
if self.sample_name in self.dict_segments_samples[segment]:
self.segment_id_first = segment
break
#click.secho(f"segment_id_first = {self.segment_id_first}", fg="red")
if self.segment_id_first:
#click.secho(f"Getting segment_id_last", fg="cyan")
reversed_segments = list(reversed(self.dict_segments_samples))
for segment in reversed_segments:
if self.sample_name in self.dict_segments_samples[segment]:
self.segment_id_last = segment
break
#click.secho(f"segment_id_last = {self.segment_id_last}", fg="red")
# check if segment_id_first and segment_id_last are not empty
if not self.segment_id_first and not self.segment_id_last:
self.progress[self.task_id] = {
"progress": 1,
"total": 1,
"description": f"[yellow]{self.sample_name}: not found in sub-graph"
}
self.logger.warning(f"Sample {self.sample_name} not found in sub-graph")
else:
#print(self.bed_path)
# ouvrir le bed avec pandas et chercher le segment_id_first et segment_id_last
# Créer des listes pour stocker les résultats finaux
df_first_list = []
df_last_list = []
# count number of lines
nb_lines = sum(1 for line in open(self.bed_path))
self.progress[self.task_id] = {
"progress": 3,
"total": 5,
"description": f"[cyan]{self.sample_name}: read bed to search chr pos of first and end segment"
}
# Lecture itérative du fichier CSV par morceaux
df_iter = pd.read_csv(self.bed_path, sep="\t", header=None,
names=["chromosome", "start", "stop", "segment_id", "fragment", "haplotype_index"],
dtype={"chromosome": str, "start": int, "stop": int, "segment_id": str,
"fragment": str, "haplotype_index": str},
engine='c', iterator=True, low_memory=False, memory_map=True,
chunksize=1000000) # chunksize à ajuster selon la mémoire disponible
for chunk in df_iter:
# Supprimer les signes + et - au début du segment_id
chunk["segment_id"] = chunk["segment_id"].str.lstrip('+-')
# Filtrer les segments d'intérêt et les ajouter aux listes
df_first_chunk = chunk[chunk["segment_id"] == self.segment_id_first]
df_last_chunk = chunk[chunk["segment_id"] == self.segment_id_last]
if not df_first_chunk.empty:
df_first_list.append(df_first_chunk)
if not df_last_chunk.empty:
df_last_list.append(df_last_chunk)
# Concaténer les résultats des morceaux pour obtenir les DataFrames finaux
df_first = pd.concat(df_first_list, ignore_index=True) if df_first_list else pd.DataFrame()
df_last = pd.concat(df_last_list, ignore_index=True) if df_last_list else pd.DataFrame()
#print(df_first)
# df_last = df[df["segment_id"] == self.segment_id_last]
#print(df_last)
if len(df_first) > 1 or len(df_last) > 1:
self.logger.warning("Warning duplicated fragments found")
print(df_first)
print(df_last)
chromosome_first = df_first["chromosome"].values[0]
chromosome_last = df_last["chromosome"].values[0]
if chromosome_first == chromosome_last:
self.chromosome = chromosome_first
# chromosome to chromosome
if chromosome_first == chromosome_last:
self.chromosome = chromosome_first
self.start = min(df_first["start"].values[0], df_last["start"].values[0])
self.stop = max(df_first["stop"].values[0], df_last["stop"].values[0])
self.compute_intersection()
self.build_walks()
self.build_segments()
self.progress[self.task_id] = {
"progress": 5,
"total": 5,
"description": f"[cyan]{self.sample_name}: End processing"
}
# chromosome to 2 chromosome
#elif df_first["chromosome"].values[0] != df_last["chromosome"].values[0]:
[docs]
def build_fasta(self):
self.logger.info(f"{self.sample_name}: Build fasta")
# use tqdm for progress bar
total = len(self.gfa_walk_list)
# Accumuler les séquences en utilisant une liste pour une meilleure performance
records = []
for walk in tqdm(self.gfa_walk_list, total=total, desc="Walks progress", unit="lines", disable=True, file=sys.stdout):
# Extraction des informations du walk
_, sample_name, haplotype_index, chromosome_name, chromosome_start, chromosome_stop, segments_order = walk.split("\t")
# TODO: calcul of start stop
# Construction de l'identifiant de séquence
seq_name = f"{self.sample_name}_{self.chromosome}_{self.start}_{self.stop}_{chromosome_start}-{chromosome_stop}"
record = SeqRecord(Seq(""), id=seq_name, name=seq_name, description="")
# Traitement des segments
segments = segments_order.replace('>', ',+').replace('<', ',-').split(',')[1:]
nb_segments = len(segments)
seq_objs = []
# Accumulation des séquences
for segment in tqdm(segments, total=nb_segments, desc="Segment to fasta progress", unit="lines", disable=True, file=sys.stdout):
segment_id = segment.strip('+-')
orient = segment[0]
seq = Seq(self.dict_segments_sequence[segment_id])
if orient == "-":
seq = seq.reverse_complement()
seq_objs.append(seq)
# Concaténation de toutes les séquences en une seule opération
record.seq = sum(seq_objs, Seq(""))
records.append(record)
# Mise à jour de la liste des séquences
self.sequences_list.extend(records)
[docs]
@dataclass
class GFA:
"""
A class to parse and process GFA files, including handling BAM files and extracting segments and links.
Attributes:
gfa_path (Path): The path to the GFA file.
gfa_name (str): The name of the GFA file.
version (float): The version of the GFA file.
header_gfa (list): The header information from the GFA file.
sample_reference (str): The reference sample from the GFA file.
bam_segments_file (Path): The path to the BAM segments file.
dict_link_SA_SB (defaultdict): A dictionary of links between segments.
dict_samples_chrom (defaultdict): A dictionary of sample chromosomes.
dict_segments_size (defaultdict): A dictionary of segment sizes.
dict_segments_samples (defaultdict): A dictionary of segment samples.
works_path (Path): The path to the working directory.
bed_path (Path): The path to the BED files directory.
bam_path (Path): The path to the BAM files directory.
"""
gfa_path: Path
logger: logging.Logger = field(default_factory=lambda: logging.getLogger("GFA"))
gfa_name: str = None
version: float = None
header_gfa: list = field(default_factory=list, repr=True)
sample_reference: str = None
bam_segments_file: Path = None
dict_link_SA_SB: defaultdict = field(default_factory=lambda: defaultdict(), repr=False)
dict_samples_chrom: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=True)
dict_segments_size: defaultdict = field(default_factory=lambda: defaultdict(int), repr=False)
dict_segments_samples: defaultdict = field(default_factory=lambda: defaultdict(list), repr=False)
dict_samples_bed: defaultdict = field(default_factory=lambda: defaultdict(OrderedDict), repr=True)
works_path: Path = None
bed_path: Path = None
bam_path: Path = None
def __post_init__(self):
"""Initialize paths and directories, then parse the GFA file and tag the BAM file."""
suffixes = ''.join(self.gfa_path.suffixes)
if ".gz" in suffixes:
self.gfa_name = self.gfa_path.name.replace(''.join(self.gfa_path.suffixes[-2:]),'')
elif ".gfa" in suffixes:
self.gfa_name = self.gfa_path.stem
self.works_path = self.gfa_path.parent / f"{self.gfa_name}_GraTools"
self.bed_path = self.works_path / "bed_files"
self.bed_path.mkdir(exist_ok=True, parents=True)
self.bam_path = self.works_path / "bam_files"
self.bam_path.mkdir(exist_ok=True, parents=True)
self.bam_segments_file = self.bam_path / f'{self.gfa_name}.bam'
self.header_gfa_file = self.works_path / f'header_{self.gfa_name}.txt'
# Initialisation spécifique pour dict_link_SA_SB avec des structures par défaut
self.dict_link_SA_SB = defaultdict(lambda: LinkInfo([], [], [], [], [], []))
self.parse_gfa()
self.tag_bam()
self.save_samples_chrom()
self.save_header()
self.save_samples_bed()
[docs]
def save_samples_chrom(self):
"""Save the samples and their corresponding chromosomes in a text file."""
self.logger.info(f"Saving samples and corresponding chromosomes")
with open(self.works_path / "samples_chrom.txt", "w") as f:
for sample in sorted(self.dict_samples_chrom.keys(), key=human_sort_key):
dict_chroms = self.dict_samples_chrom[sample]
for chrom in sorted(dict_chroms.keys(), key=human_sort_key):
fragments = dict_chroms[chrom]
for fragment in fragments:
txt = f"{sample}\t{chrom}\t{fragment}\n"
f.write(txt)
[docs]
def save_samples_bed(self):
"""Sauvegarde et trie les segments des échantillons dans un fichier BED."""
self.logger.info(f"Sorting samples chromosomes, and segments in a BED file")
for sample, bed_file in self.dict_samples_bed.items():
# Ferme le fichier avant de procéder avec le tri
bed_file.close()
# Trie le fichier BED en utilisant pybedtools
sample_bed = pybedtools.BedTool(bed_file.name) # On donne le chemin du fichier
sorted_bed = sample_bed.sort() # Trie le fichier
# Sauvegarde le fichier trié (vous pouvez remplacer le fichier original ou en créer un nouveau)
sorted_bed.saveas(bed_file.name) # Remplace le fichier original
[docs]
def parse_gfa(self):
"""
Parse the GFA file to extract specific information. Reads the GFA file,
processes each line based on its type (Header, Segment, Link, Walk),
and performs specific actions for each line type.
The function updates the header information, writes to a BAM file, builds a dictionary,
and processes the walk line.
"""
self.logger.info(f"Creating Gratools files {self.works_path}")
self.logger.info(f"Reading GFA {self.gfa_path.name}")
header = {'HD': {'VN': f'1.0'}}
if self.gfa_path.exists() and self.gfa_path.is_file():
open_fn = gzip.open if ".gz" in self.gfa_path.suffix else open
total_lines = sum(1 for _ in open_fn(self.gfa_path, "rt"))
with (open_fn(self.gfa_path, "rt") as gfa_file_in,
pysam.AlignmentFile(self.bam_segments_file, "wb", header=header) as bam_file_out,
Progress(
TextColumn("[bold blue]{task.description}"), # Description de la tâche
BarColumn(), # Barre de progression
"[progress.percentage]{task.percentage:>3.1f}%", # Pourcentage complété
"{task.completed}/{task.total} lines", # Lignes traitées
TimeElapsedColumn(), # Temps écoulé
TimeRemainingColumn(), # Temps restant estimé
refresh_per_second=4, # Mises à jour moins fréquentes
) as progress
):
# Création de la tâche de progression
task_id = progress.add_task("GFA parsing progress...", total=total_lines, visible=True)
for line in gfa_file_in:
# Mise à jour de la barre de progression
progress.advance(task_id)
line = line.strip("\n")
# Select Header line
if line.startswith("H"):
self.header_gfa.append(line)
tab_line = line.split("\t")
self.version = tab_line[1].split(":")[-1]
self.sample_reference = tab_line[2].split(":")[-1]
# Select Segment line
elif line.startswith("S"):
self._write_bam(line, bam_file_out)
# Select Link line
elif line.startswith("L"):
self._parse_link_line(line)
# Select Walk line
elif line.startswith("W"):
self._parse_walk_line(line)
def _parse_walk_line(self, line):
"""Parse a walk line from a GFA file to extract relevant information."""
_, sample_name, haplotype_index, chromosome_name, chromosome_start, chromosome_stop, segments_order = line.split("\t")
# Ensure the sample and chromosome entries exist in the dictionary
if sample_name not in self.dict_samples_chrom:
self.dict_samples_chrom[sample_name] = {}
if chromosome_name not in self.dict_samples_chrom[sample_name]:
self.dict_samples_chrom[sample_name][chromosome_name] = []
# Add the chromosome range to the sample's chromosome dictionary
chrom_range = f"{chromosome_start}\t{chromosome_stop}"
self.dict_samples_chrom[sample_name][chromosome_name].append(chrom_range)
segment_start_chr = int(chromosome_start)
# Parse the segments order and process each segment
for seg_id in segments_order.replace('>', ',+').replace('<', ',-').split(',')[1:]:
is_positive_strand = '+' in seg_id
seg_id_only = seg_id.strip('+-')
# Ensure the segment entry exists in the segments dictionary
if seg_id_only not in self.dict_segments_samples:
self.dict_segments_samples[seg_id_only] = []
# Append the segment information to the relevant dictionaries
# TODO: fix segment duplicated on same walk => dict of dict?
sample_chromosome = f"{sample_name};{chromosome_name}"
self.dict_segments_samples[seg_id_only].append(sample_chromosome)
segment_size = self.dict_segments_size[seg_id_only]
segment_stop_chr = segment_start_chr + segment_size
line_bed = f"{chromosome_name}\t{segment_start_chr}\t{segment_stop_chr}\t{seg_id}\t{chromosome_start}:{chromosome_stop}\t{haplotype_index}"
# Ouvre le fichier en mode append si ce n'est pas déjà fait
if sample_name not in self.dict_samples_bed:
self.dict_samples_bed[sample_name] = open(self.bed_path / f"{sample_name}.bed", "a")
# Écrit la ligne dans le fichier correspondant au sample
self.dict_samples_bed[sample_name].write(f"{line_bed}\n")
segment_start_chr = segment_stop_chr
def _parse_link_line(self, line):
"""Build a dictionary to store the links between segments in a GFA file."""
_, seg_id_1, orient_seg_1, seg_id_2, orient_seg_2, *_ = line.split("\t")
# Remplacement de l'orientation par 1 pour "+" et -1 pour "-"
orient_seg_1 = 1 if orient_seg_1 == "+" else -1
orient_seg_2 = 1 if orient_seg_2 == "+" else -1
# Accès aux listes dans la structure LinkInfo pour seg_id_1
link_info_1 = self.dict_link_SA_SB[seg_id_1]
link_info_1.ID_plus.append(seg_id_2)
link_info_1.Orient_plus.append(orient_seg_2)
link_info_1.OrientKey_plus.append(orient_seg_1)
# Accès aux listes dans la structure LinkInfo pour seg_id_2
link_info_2 = self.dict_link_SA_SB[seg_id_2]
link_info_2.ID_minus.append(seg_id_1)
link_info_2.Orient_minus.append(orient_seg_1)
link_info_2.OrientKey_minus.append(orient_seg_2)
def _write_bam(self, line, bam_file_out):
"""
Write a line from a GFA file to a BAM file.
Args:
line (str): A line from a GFA file containing segment information.
bam_file_out (pysam.AlignmentFile): The BAM file to write the segment information to.
Returns:
None
This function takes a line from a GFA file and writes it to a BAM file. It first splits the line into its components: `line_type`, `seg_id`, `sequence`, and `supp`. It then adds the segment size to a dictionary and creates an AlignedSegment object with the segment information. The AlignedSegment object is then written to the BAM file.
The function assumes that the BAM file has already been opened and is passed as the `bam_file_out` parameter.
Note:
- The `line_type` is not used in this function.
- The `sequence` is used as the query sequence for the AlignedSegment object.
- The `supp` is used as the value for the "SU" tag in the AlignedSegment object.
- The "SB" tag is commented out in the AlignedSegment object.
- The "SA" tag is commented out in the AlignedSegment object.
- The "SW" tag is commented out in the AlignedSegment object.
"""
_, seg_id, sequence, *supp = line.split("\t")
# add segment size to dict
self.dict_segments_size[seg_id] = len(sequence)
# Create segment to write
segment = AlignedSegment()
segment.query_name = seg_id
segment.flag = 4
segment.reference_name = "*"
segment.reference_start = 0
segment.mapping_quality = 255
segment.cigarstring = "1M"
segment.next_reference_name = "*"
segment.next_reference_start = 0
segment.template_length = 0
segment.query_sequence = sequence
segment.query_qualities = pysam.qualitystring_to_array("*" * self.dict_segments_size[seg_id])
segment.tags = (("SU", ','.join(supp)),)
bam_file_out.write(segment)
[docs]
def tag_bam(self):
"""
Tags the BAM file with the given dictionary of segment links and segment samples.
This function takes the `self.bam_segments_file` and uses the `Bam` class to tag it with the
`self.dict_link_SA_SB` and `self.dict_segments_samples` dictionaries. The resulting tagged BAM file
is then assigned to `self.bam_segments_file`.
Parameters:
self (object): The instance of the class.
Returns:
None
"""
bam = GratoolsBam(bam_path=self.bam_segments_file, logger=self.logger)
self.bam_segments_file = bam.tag(self.dict_link_SA_SB, self.dict_segments_samples)
if __name__ == "__main__":
bam_path_input = Path("/shared/home/sravel/glp701/sandbox/toolbox/Og_cactus_GraTools/bam_files/Og_cactus.bam")
#print(bam_path_input)
bam = GratoolsBam(bam_path=bam_path_input)
bam.specific_and_shared_segments()