import itertools
import random
from difflib import SequenceMatcher
from pathlib import Path
import numpy as np
import torch.utils.data as tdata
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import mubind as mb
from scipy import sparse
import pandas as pd
import os
import pickle
import urllib.request
# Class for reading training/testing SELEX dataset files.
[docs]
class SelexDataset(tdata.Dataset):
[docs]
def __init__(self, df, n_rounds=None, enr_series=True, single_encoding_step=False, store_rev=False,
labels=None, index_type=str, use_sparse=False):
assert n_rounds is not None
self.n_rounds = n_rounds if not isinstance(n_rounds, int) else np.repeat(n_rounds, df.shape[0])
self.enr_series = enr_series
self.store_rev = store_rev
self.length = len(df)
self.index_type = index_type
# df = df.reset_index(drop=True)
# this only works if the columns are equal to the round names (partly obsolete)
# labels = [i for i in range(n_rounds + 1)]
# self.rounds = np.array(df[labels])
self.rounds = np.array(df) if labels is None else np.array(df[labels])
# this statement has to be done before sparsifing the matrix
# countsum ignoring -1 and nan
# print(self.rounds)
self.countsum = np.nansum(np.where(self.rounds == -1, 0, self.rounds), axis=1).astype(np.float32)
if "batch" not in df.columns:
df["batch"] = np.repeat(0, df.shape[0])
self.batch_names = {}
for i, name in enumerate(set(df["batch"])):
self.batch_names[i] = name
mask = df["batch"] == name
df.loc[mask, "batch"] = i
self.batch = np.array(df["batch"])
self.n_batches = len(set(df["batch"]))
self.use_sparse = use_sparse
if self.use_sparse:
self.rounds = sparse.csr_matrix(self.rounds)
if "batch" in df.columns:
del df['batch']
seq = df["seq"] if "seq" in df else df.index
self.seq = np.array(list(map(mb.tl.encoding.string2bin, seq)) if self.index_type == int else seq)
if single_encoding_step:
assert len(set(seq.str.len())) == 1
n_entries = df.shape[0]
single_seq = "".join(seq.head(n_entries))
df_single_entry = df.head(1).copy()
df_single_entry["seq"] = [single_seq]
le = LabelEncoder()
oe = OneHotEncoder(sparse=False)
# single encoding step
self.mononuc = np.array(
[mb.tl.onehot_mononuc(row["seq"], le, oe) for index, row in df_single_entry.iterrows()]
)
# splitting step
self.mononuc = np.array(np.split(self.mononuc, n_entries, axis=2)).squeeze(1)
else:
max_length = max(set(seq.str.len()))
self.mononuc = mb.tl.onehot_mononuc_multi(pd.Series(seq), max_length=max_length)
if store_rev:
self.mononuc_rev = mb.tl.revert_onehot_mononuc(self.mononuc)
def __getitem__(self, index):
# Return a single input/label pair from the dataset.
sample = {
"mononuc": self.mononuc[index],
"batch": self.batch[index],
"rounds": self.rounds[index] if not self.use_sparse else self.rounds[index].A,
"n_rounds": self.n_rounds[index],
"seq": self.seq[index],
"countsum": self.countsum[index],
}
if self.store_rev:
sample["mononuc_rev"] = self.mononuc_rev[index]
return sample
def __len__(self):
return self.length
# Class for reading training/testing PBM dataset files.
[docs]
class PBMDataset(tdata.Dataset):
[docs]
def __init__(self, df, single_encoding_step=False, store_rev=False, labels=None, index_type=str):
self.store_rev = store_rev
self.signal = np.array(df).astype(np.float32) if labels is None else np.array(df[labels]).astype(np.float32)
self.n_proteins = self.signal.shape[1]
self.length = self.signal.shape[0] * self.signal.shape[1]
self.index_type = index_type
seq = df["seq"] if "seq" in df else df.index
self.seq = np.array(list(map(mb.tl.encoding.string2bin, seq)) if self.index_type == int else seq)
if single_encoding_step:
assert len(set(seq.str.len())) == 1
n_entries = df.shape[0]
single_seq = "".join(seq.head(n_entries))
df_single_entry = df.head(1).copy()
df_single_entry["seq"] = [single_seq]
le = LabelEncoder()
oe = OneHotEncoder(sparse=False)
# single encoding step
self.mononuc = np.array(
[mb.tl.onehot_mononuc(row["seq"], le, oe) for index, row in df_single_entry.iterrows()]
)
# splitting step
self.mononuc = np.array(np.split(self.mononuc, n_entries, axis=2)).squeeze(1)
else:
max_length = max(set(seq.str.len()))
self.mononuc = mb.tl.onehot_mononuc_multi(pd.Series(seq), max_length=max_length)
if store_rev:
self.mononuc_rev = mb.tl.revert_onehot_mononuc(self.mononuc)
def __getitem__(self, index):
# split up the index to one position in the signal matrix
x = index % self.signal.shape[0]
y = int((index - x) / self.signal.shape[0])
# Return a single input/label pair from the dataset.
sample = {
"mononuc": self.mononuc[x],
"rounds": self.signal[x, y:(y + 1)],
"seq": self.seq[x],
"protein_id": y,
}
if self.store_rev:
sample["mononuc_rev"] = self.mononuc_rev[index]
return sample
def __len__(self):
return self.length
# Class for reading training/testing genomic datasets, e.g. ATAC data.
[docs]
class GenomicsDataset(tdata.Dataset):
[docs]
def __init__(self, df, single_encoding_step=False, store_rev=False, labels=None):
self.store_rev = store_rev
self.signal = np.array(df).astype(np.float32) if labels is None else np.array(df[labels]).astype(np.float32)
self.n_cells = self.signal.shape[1]
self.length = self.signal.shape[0] * self.signal.shape[1]
seq = df["seq"] if "seq" in df else df.index
self.seq = np.array(seq)
self.use_sparse = False
if single_encoding_step:
assert len(set(seq.str.len())) == 1
n_entries = df.shape[0]
single_seq = "".join(seq.head(n_entries))
df_single_entry = df.head(1).copy()
df_single_entry["seq"] = [single_seq]
le = LabelEncoder()
oe = OneHotEncoder(sparse=False)
# single encoding step
self.mononuc = np.array(
[mb.tl.onehot_mononuc(row["seq"], le, oe) for index, row in df_single_entry.iterrows()]
)
# splitting step
self.mononuc = np.array(np.split(self.mononuc, n_entries, axis=2)).squeeze(1)
else:
max_length = max(set(seq.str.len()))
self.mononuc = mb.tl.onehot_mononuc_multi(pd.Series(seq), max_length=max_length)
if store_rev:
self.mononuc_rev = mb.tl.revert_onehot_mononuc(self.mononuc)
def __getitem__(self, index):
# split up the index to one position in the signal matrix
x = index % self.signal.shape[0]
y = int((index - x) / self.signal.shape[0])
# Return a single input/label pair from the dataset.
sample = {
"mononuc": self.mononuc[x],
"rounds": self.signal[x, y:(y + 1)],
"seq": self.seq[x],
"protein_id": y,
}
if self.store_rev:
sample["mononuc_rev"] = self.mononuc_rev[index]
return sample
def __len__(self):
return self.length
# Class for reading training/testing PBM data with residue sequences.
[docs]
class ResiduePBMDataset(tdata.Dataset):
[docs]
def __init__(self, df, msa_onehot, single_encoding_step=False, store_rev=False):
self.n_rounds = 0
self.store_rev = store_rev
self.length = df.shape[0] * df.shape[1]
self.signal = np.array(df).astype(np.float32)
self.msa_onehot = np.stack(msa_onehot).transpose((0, 2, 1)).astype(np.float32)
delete_batch_col = False
if "batch" not in df.columns:
df["batch"] = np.repeat(0, df.shape[0])
delete_batch_col = True
self.batch_names = {}
for i, name in enumerate(set(df["batch"])):
self.batch_names[i] = name
mask = df["batch"] == name
df.loc[mask, "batch"] = i
self.batch = np.array(df["batch"])
self.n_batches = len(set(df["batch"]))
if delete_batch_col:
del df['batch']
seq = df["seq"] if "seq" in df else df.index
self.seq = np.array(seq)
if single_encoding_step:
assert len(set(seq.str.len())) == 1
n_entries = df.shape[0]
single_seq = "".join(seq.head(n_entries))
df_single_entry = df.head(1).copy()
df_single_entry["seq"] = [single_seq]
le = LabelEncoder()
oe = OneHotEncoder(sparse=False)
# single encoding step
self.mononuc = np.array(
[mb.tl.onehot_mononuc(row["seq"], le, oe) for index, row in df_single_entry.iterrows()]
)
# splitting step
self.mononuc = np.array(np.split(self.mononuc, n_entries, axis=2)).squeeze(1)
else:
max_length = max(set(seq.str.len()))
self.mononuc = mb.tl.onehot_mononuc_multi(pd.Series(seq), max_length=max_length)
if store_rev:
self.mononuc_rev = mb.tl.revert_onehot_mononuc(self.mononuc)
def __getitem__(self, index):
# split up the index to one position in the signal matrix
x = index % self.signal.shape[0]
y = int((index - x) / self.signal.shape[0])
# Return a single input/label pair from the dataset.
sample = {
"mononuc": self.mononuc[x],
"batch": self.batch[x],
"rounds": self.signal[x, y:(y+1)],
"seq": self.seq[x],
"residues": self.msa_onehot[y],
"protein_id": y,
}
if self.store_rev:
sample["mononuc_rev"] = self.mononuc_rev[index]
return sample
def __len__(self):
return self.length
def get_max_residue_length(self):
return self.msa_onehot.shape[1]
# Class for reading training/testing ChIPSeq dataset files.
class ChipSeqDataset(tdata.Dataset):
def __init__(self, data_frame, use_dinuc=False, batch=None):
self.batch = batch
self.target = data_frame["target"].astype(np.float32)
# self.rounds = self.data[[0, 1]].to_numpy()
self.le = LabelEncoder()
self.oe = OneHotEncoder(sparse=False)
self.length = len(data_frame)
self.mononuc = np.array(
[mb.tl.onehot_mononuc(row["seq"], self.le, self.oe) for index, row in data_frame.iterrows()]
)
self.dinuc = np.array(
[mb.tl.onehot_dinuc(row["seq"], self.le, self.oe) for index, row in data_frame.iterrows()]
)
self.is_count_data = np.array(data_frame.is_count_data)
def __getitem__(self, index):
# Return a single input/label pair from the dataset.
mononuc_sample = self.mononuc[index]
target_sample = self.target[index]
# print(self.batch)
# print(self.batch.shape)
batch = self.batch[index]
dinuc_sample = self.dinuc[index]
is_count_data = self.is_count_data[index]
sample = {
"mononuc": mononuc_sample,
"dinuc": dinuc_sample,
"target": target_sample,
"batch": batch,
"is_count_data": is_count_data,
}
return sample
def __len__(self):
return self.length
# Class for curating multi-source data (chip/selex/PBM).
class MultiDataset(tdata.Dataset):
def __init__(self, data_frame, use_dinuc=False, batch=None):
self.batch = batch
self.target = data_frame["target"].astype(np.float32)
# self.rounds = self.data[[0, 1]].to_numpy()
self.le = LabelEncoder()
self.oe = OneHotEncoder(sparse=False)
self.length = len(data_frame)
# mononuc = []
# for index, row in data_frame.iterrows():
# # print(row['seq'], self.le, self.oe)
# m = mb.tl.onehot_mononuc(row['seq'], self.le, self.oe)
# mononuc.append(m)
# # assert False
# self.mononuc = np.array(mononuc)
# print('prepare mononuc feats...')
self.mononuc = np.array(
[mb.tl.onehot_mononuc_with_gaps(row["seq"], self.le, self.oe) for index, row in data_frame.iterrows()]
)
# print('prepare dinuc feats...')
self.dinuc = np.array([mb.tl.onehot_dinuc_with_gaps(row["seq"]) for index, row in data_frame.iterrows()])
self.is_count_data = data_frame["is_count_data"].astype(int)
def __getitem__(self, index):
# Return a single input/label pair from the dataset.
mononuc_sample = self.mononuc[index]
target_sample = self.target[index]
# print(self.batch)l
# print(self.batch.shape)
batch = self.batch_one_hot[index]
dinuc_sample = self.dinuc[index]
is_count_data = self.is_count_data[index]
sample = {
"mononuc": mononuc_sample,
"dinuc": dinuc_sample,
"target": target_sample,
"batch": batch,
"is_count_data": is_count_data,
}
return sample
def __len__(self):
return self.length
def _get_random_sequence(seqlen=50, options="ACTG"):
return "".join(random.choice(options) for _ in range(seqlen))
def _similar(a, b):
return SequenceMatcher(None, a, b).ratio()
def simulate_data(motif, seqlen=50, n_trials=1000, random_seed=500):
seqs = [_get_random_sequence(seqlen) for i in range(n_trials)]
random.seed(random_seed)
for i in range(len(seqs)):
s = seqs[i]
p = random.randint(0, len(s) - len(motif) + 1)
s = s[:p] + motif + s[p + len(motif) :]
seqs[i] = s
return seqs
def _mismatch(word, letters, num_mismatches):
for locs in itertools.combinations(range(len(word)), num_mismatches):
this_word = [[char] for char in word]
for loc in locs:
orig_char = word[loc]
this_word[loc] = [l for l in letters if l != orig_char]
for poss in itertools.product(*this_word):
yield "".join(poss)
# Generate a seq of sequences with a seuquence embedded
def simulate_xy(motif, batch=100, n_trials=500, seqlen=50, max_mismatches=-1, min_count=1):
import numpy as np
x = np.array([_get_random_sequence(seqlen) + "ACGT" for k in range(n_trials)])
options = []
mismatch_values = list(range(0, len(motif)))
if max_mismatches > 0:
mismatch_values = mismatch_values[:max_mismatches]
for n_mismatches in mismatch_values:
next_options = np.random.choice(
list(_mismatch(motif, "ACGT", n_mismatches)),
int(n_trials / len(mismatch_values)),
)
# print(n_mismatches, len(next_options), next_options)
options += list(next_options)
y = []
for i, opt in zip(range(len(x)), options):
p = np.random.choice(range(len(x[0]) - 4 - len(opt)))
x[i] = x[i][:p] + opt + x[i][p + len(opt) :]
y.append(int(_similar(motif, opt) * batch))
y = np.array(y) + min_count
x = np.array(x)
return x, y
def gata_remap(n_sample=5000):
from os.path import join
import screg as scr
# this is the directory where we are saving the annotations at the moment (hard to distribute for multiple genomes, so only ICB at the moment)
mb.bindome.constants.ANNOTATIONS_DIRECTORY = "/mnt/znas/icb_zstore01/groups/ml01/datasets/annotations"
# get data for gata1 using remap2020
peaks = mb.bindome.datasets.REMAP2020.get_remap_peaks("GATA1")
peaks_cd34 = peaks[peaks[3].str.contains("GATA1:CD34")].sample(n_sample)
gen_path = join(mb.bindome.constants.ANNOTATIONS_DIRECTORY, "hg38/genome/hg38.fa")
import tempfile
fa_path = tempfile.mkstemp()[1]
seqs = scr.gen.get_sequences_from_bed(
peaks_cd34[["chr", "summit.start", "summit.end"]],
fa_path,
gen_path=gen_path,
uppercase=True,
)
# only shuffling, no dinucleotide content control
shuffled_seqs = [[h + "_rand", scr.gen.randomize_sequence(s)] for h, s in seqs]
x = np.array([s[1] for s in seqs] + [s[1] for s in shuffled_seqs])
y = np.array([int(i % 2 == 0) for i, s in enumerate([seqs, shuffled_seqs]) for yi in range(len(s))])
return x, y
def cisbp_hs(**kwargs):
# path to dir containing pwm txt files
base_path = Path(mb.bindome.constants.ANNOTATIONS_DIRECTORY + '/cisbp/hs/pwms_all_motifs')
print(base_path)
# collect paths to all pwms
motif_paths = []
for p in base_path.rglob('*'):
motif_paths.append(p)
if kwargs.get('stop_at') is not None and kwargs.get('stop_at') >= len(motif_paths):
break
print(len(motif_paths))
pwms = [pd.read_csv(p, sep='\t').drop(columns=['Pos'], axis=1).T for p in motif_paths]
return pwms
def genre(**kwargs):
# path to dir containing pwm txt files
base_path = Path(mb.bindome.constants.ANNOTATIONS_DIRECTORY + '/genre/genre_pwms.pkl')
print(base_path)
# collect paths to all pwms
pwms_by_module = pickle.load(open(base_path, 'rb'))
pwms = [pwms_by_module[k].T for k in pwms_by_module]
return pwms
def archetypes_anno(**kwargs):
if not 'url' in kwargs:
url = 'https://www.dropbox.com/scl/fi/odxcg72nj3djbfz6r9nq8/motif_annotations.xlsx?rlkey=qlbyx9m7dj6qqui9ct80q9ejc&dl=1'
kwargs['url'] = url
url = kwargs['url']
# read reference clusters
archetypes_dir = 'data/archetypes'
archetypes_path = os.path.join(archetypes_dir, 'motif_annotations.xlsx')
# save to avoid future redownloads
if not os.path.exists(archetypes_path):
if not os.path.exists(archetypes_dir):
os.makedirs(archetypes_dir)
urllib.request.urlretrieve(kwargs['url'], archetypes_path)
anno = pd.read_excel(archetypes_path, sheet_name='Archetype clusters')
return anno
def archetypes_clu(**kwargs):
if not 'url' in kwargs:
url = 'https://www.dropbox.com/scl/fi/odxcg72nj3djbfz6r9nq8/motif_annotations.xlsx?rlkey=qlbyx9m7dj6qqui9ct80q9ejc&dl=1'
kwargs['url'] = url
url = kwargs['url']
archetypes_dir = 'data/archetypes'
archetypes_path = os.path.join(archetypes_dir, 'motif_annotations.xlsx')
clu = pd.read_excel(archetypes_path, sheet_name='Motifs')
return clu
def archetypes_pickle(**kwargs):
# read reference clusters
archetypes_dir = 'data/archetypes'
archetypes_path = os.path.join(archetypes_dir, 'archetypes_data.pkl')
# save to avoid future redownloads
if not os.path.exists(archetypes_path):
if not os.path.exists(archetypes_dir):
os.makedirs(archetypes_dir)
# print('downloading...')
urllib.request.urlretrieve(kwargs['url'], archetypes_path)
ppm_by_name = pickle.load(open(archetypes_path, 'rb'))
return ppm_by_name
def archetypes_meme(**kwargs):
ppm_by_name = {}
archetypes_dir = 'data/archetypes'
# read PFM across meme files
for f in os.listdir(archetypes_dir):
if f.endswith('.meme'):
# print(f)
lines = [r.strip() for r in open(os.path.join(archetypes_dir, f))]
for i, e in enumerate(lines):
if e.startswith('letter-probability matrix'):
name = lines[i - 1]
if len(name) == 0:
name = lines[i - 2]
name = name.split(' ')[1]
w = e.split('w= ')[1].split(' ')[0]
# print(name, w)
a, b = i + 1, i + 1 + int(w)
rows = lines[a: b]
ppm = [list(map(float, v.replace(' ', '\t').replace('\t\t', '\t').split('\t'))) for v in rows]
ppm = pd.DataFrame(ppm).T
ppm.index = 'A', 'C', 'G', 'T'
ppm_by_name[name] = ppm
print('# motifs loaded %i' % (len(ppm_by_name)))
return ppm_by_name
def archetypes(**kwargs):
# annotation table
url = 'https://www.dropbox.com/scl/fi/odxcg72nj3djbfz6r9nq8/motif_annotations.xlsx?rlkey=qlbyx9m7dj6qqui9ct80q9ejc&dl=1'
kwargs['url'] = url
archetypes_dir = 'data/archetypes'
anno = archetypes_anno(**kwargs)
clu = archetypes_anno(**kwargs)
# PWM weights
url = 'https://www.dropbox.com/scl/fi/gytniua2uay1p6st0svh9/archetypes_data.pkl?rlkey=qe7mzhwaiqfpkjbdj31ijx193&dl=1'
kwargs['url'] = url
ppm_by_name = archetypes_pickle(**kwargs)
# print(clu)
# return non-redundant groups
reduced_groups = []
for k in anno['Seed_motif']:
reduced_groups.append(ppm_by_name[k])
return reduced_groups
from pathlib import Path
from typing import Optional, Union
def pancreas_rna(
file_path: Optional[
Union[str, Path]
] = "data/scatac/pancreas_multiome/pancreas_multiome_2022_processed_rna_velocities_2024.h5ad"
):
from scanpy import read
# rna
url = 'https://www.dropbox.com/scl/fi/ryb3q25n0kc2vw297f2xd/pancreas_multiome_2022_processed_rna_velocities_2024.h5ad?rlkey=in0qlpv038cn6wxrops1wsxgm&dl=1'
print(os.path.exists(file_path), file_path)
# print('reading RNA')
adata = read(file_path, backup_url=url, sparse=True, cache=True)
adata.var_names_make_unique()
# print('opening RNA successful')
return adata
def pancreas_rna_pytest(
file_path: Optional[
Union[str, Path]
] = "data/scatac/pancreas_multiome/pancreas_multiome_2022_processed_rna_velocities_2024_pytest.h5ad"
):
from scanpy import read
# rna
url = 'https://www.dropbox.com/scl/fi/93hw0wru56ljryo6m17d9/pancreas_multiome_2022_processed_rna_velocities_2024_pytest.h5ad?rlkey=x8r14un3gu8ahyipcylwxytns&dl=1'
print(os.path.exists(file_path), file_path)
# print('reading RNA')
adata = read(file_path, backup_url=url, sparse=True, cache=True)
adata.var_names_make_unique()
# print('opening RNA successful')
return adata
def pancreas_atac(
file_path: Optional[
Union[str, Path]
] = "data/scatac/pancreas_multiome/pancreas_multiome_2022_processed_atac.h5ad"
):
from scanpy import read
# atac
url = 'https://www.dropbox.com/scl/fi/53wv4v7tbnsmr12fbmea7/pancreas_multiome_2022_processed_atac.h5ad?rlkey=1kf352wya0pzffkn990wkbwmd&e=1&st=m6gv9hp5&dl=1'
print(os.path.exists(file_path), file_path)
print('reading ATAC')
adata = read(file_path, backup_url=url, sparse=True, cache=True)
print('opening ATAC successful')
adata.var_names_make_unique()
return adata
def pancreas_multiome(data_directory='./data'):
# atac_path = '../../../annotations/scatac/ancreas_multiome_2022_processed_sample_c16918_p50000.h5ad'
all_path = os.path.join(data_directory, 'pancreas_multiome_2022_processed.h5ad')
atac_path = os.path.join(data_directory, 'pancreas_multiome_2022_processed_atac.h5ad')
# rna_path = '../../../annotations/scatac/pancreas_multiome_2022_processed_rna.h5ad'
rna_path = os.path.join(data_directory, 'pancreas_multiome_2022_processed_rna_velocities_2024.h5ad')
rna = pancreas_rna()
atac = pancreas_atac()
return rna, atac