banner
NEWS LETTER

ECnet代码解读(三)

Scroll down
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class DataFrameDataset(Dataset):
def __init__(self, seq_df):
self._cache = []
for seq_id, seq in zip(
seq_df['ID'].values,
seq_df['sequence'].values
):
self._cache.append({
'id': seq_id,
'primary': seq,
'protein_length': len(seq)
})
self._num_examples = len(self._cache)

def __len__(self) -> int:
return self._num_examples

def __getitem__(self, index: int):
if not 0 <= index < self._num_examples:
raise IndexError(index)

return self._cache[index]

def _pad_sequences(sequences: Sequence[np.ndarray], constant_value=0) -> np.ndarray:
batch_size = len(sequences)
shape = [batch_size] + np.max([seq.shape for seq in sequences], 0).tolist()
array = np.zeros(shape, sequences[0].dtype) + constant_value

for arr, seq in zip(array, sequences):
arrslice = tuple(slice(dim) for dim in seq.shape)
arr[arrslice] = seq

return array

class EmbedDataset(Dataset):
def __init__(self,
data,
tokenizer: Union[str, TAPETokenizer] = 'iupac',
):
super().__init__()

if isinstance(tokenizer, str):
tokenizer = TAPETokenizer(vocab=tokenizer)
self.tokenizer = tokenizer
if isinstance(data, pd.DataFrame):
self.data = DataFrameDataset(data)

def __len__(self) -> int:
return len(self.data)

def __getitem__(self, index: int):
item = self.data[index]
token_ids = self.tokenizer.encode(item['primary'])
input_mask = np.ones_like(token_ids)
return item['id'], token_ids, input_mask

def collate_fn(self, batch: List[Tuple[Any, ...]]) -> Dict[str, torch.Tensor]:
ids, tokens, input_mask = zip(*batch)
ids = list(ids)
tokens = torch.from_numpy(_pad_sequences(tokens))
input_mask = torch.from_numpy(_pad_sequences(input_mask))
return {'ids': ids, 'input_ids': tokens, 'input_mask': input_mask} # type: ignore


训练数据预处理

  1. DataFrameDataset

    • 目的:将数据框(通常包含蛋白质ID和相应的蛋白质序列)转换为易于访问和使用的数据集格式。

    • 方法

      • __init__: 初始化数据集并将数据存储在缓存中。
      • __len__: 返回数据集中的项数。
      • __getitem__: 根据索引返回数据集中的一个项。
  2. _pad_sequences 函数

    • 目的:给定一系列不等长的序列,使用常数值填充它们,使得所有序列具有相同的长度。

    • 参数

      • sequences: 要填充的序列列表。
      • constant_value: 用于填充的常数值。
    • 返回:一个填充后的 NumPy 数组,其中每个序列都有相同的长度。

  3. EmbedDataset

    • 目的:对蛋白质序列进行嵌入编码,并为训练和评估模型准备适当的输入格式。

    • 方法

      • __init__: 初始化数据集并设置分词器(用于将蛋白质序列转换为嵌入向量)。
      • __len__: 返回数据集中的项数。
      • __getitem__: 根据索引返回数据集中的一个项。它将蛋白质序列转换为嵌入向量。
      • collate_fn: 一个用于数据加载器的收集函数,它将一批数据组合成一个单一的输入张量。这是为了处理不同长度的蛋白质序列。

TAPE编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class TAPEEncoder(object):
def __init__(self,
batch_size: int = 128,
model_config_file: Optional[str] = None,
full_sequence_embed: bool = True,
no_cuda: bool = False,
seed: int = 42,
tokenizer: str = 'iupac',
num_workers: int = 4,
log_level: Union[str, int] = logging.INFO,
progress_bar: bool = True
):
"""
Parameters
----------
seq_df: pd.DataFrame object. Two columns are requried `ID` and `sequence`.
"""
local_rank = -1 # TAPE does not support torch.distributed.launch for embedding
device, n_gpu, is_master = utils.setup_distributed(local_rank, no_cuda)
utils.setup_logging(local_rank, save_path=None, log_level=log_level)
utils.set_random_seeds(seed, n_gpu)

model = ProteinBertModel.from_pretrained('bert-base')
model = model.to(device)
runner = ForwardRunner(model, device, n_gpu)
runner.initialize_distributed_model()
runner.eval()
self.local_rank = local_rank
self.n_gpu = n_gpu
self.batch_size = batch_size
self.num_workers = num_workers
self.runner = runner
self.full_sequence_embed = full_sequence_embed
self.progress_bar = progress_bar

def encode(self, sequences: [str]) -> np.ndarray:
"""
Parameters
----------
sequences: list of equal-length sequences

Returns
-------
np array with shape (#sequences, length of sequences, embedding dim)
"""
seq_df = pd.DataFrame({'ID': sequences, 'sequence': sequences})
embed_dict = self.tape_embed(seq_df)
encoding = np.array([embed_dict[s].numpy() for s in sequences])
return encoding

def tape_embed(self, seq_df: pd.DataFrame) -> Dict[str, torch.Tensor]:
"""
Parameters
----------
seq_df: pd.DataFrame with at least two columns `ID` and `sequence`

Returns
-------
dict with ID as keys and embeddings as values. Embeddings are
pytorch tensor with shape (#sequences, length of sequences, embedding dim)
"""
local_rank = self.local_rank
n_gpu = self.n_gpu
batch_size = self.batch_size
num_workers = self.num_workers
runner = self.runner
full_sequence_embed = self.full_sequence_embed
progress_bar = self.progress_bar

dataset = EmbedDataset(seq_df)
valid_loader = utils.setup_loader(dataset, batch_size, local_rank, n_gpu, 1, num_workers)

embed_dict = {}
with torch.no_grad():
with utils.wrap_cuda_oom_error(local_rank, batch_size, n_gpu):
for batch in (tqdm(valid_loader, total=len(valid_loader),
desc='encode', leave=False) if progress_bar else valid_loader):
outputs = runner.forward(batch, no_loss=True)
ids = batch['ids']
sequence_embed = outputs[0]
pooled_embed = outputs[1]
sequence_lengths = batch['input_mask'].sum(1)
sequence_embed = sequence_embed.cpu()
sequence_lengths = sequence_lengths.cpu()

for seqembed, length, protein_id in zip(
sequence_embed, sequence_lengths, ids):
seqembed = seqembed[:length]
seqembed = seqembed[1:-1] # remove <cls> <sep>
if not full_sequence_embed:
seqembed = seqembed.mean(0)
embed_dict[protein_id] = seqembed
return embed_dict

1. 初始化方法 (__init__):

  • 参数

    • batch_size: 批次大小。
    • model_config_file: 模型配置文件(如果有)。
    • full_sequence_embed: 是否返回整个序列的嵌入。
    • no_cuda: 是否禁用 CUDA。
    • seed: 随机种子。
    • tokenizer: 分词器类型。
    • num_workers: 用于数据加载的工作进程数。
    • log_level: 日志级别。
    • progress_bar: 是否显示进度条。
  • 功能

    • 设置分布式计算环境和随机种子。
    • 从预训练的权重中加载 ProteinBertModel。
    • 初始化 ForwardRunner,用于模型推理。
    • 存储相关参数和对象。

2. 编码方法 (encode):

  • 参数sequences,要编码的等长序列列表。
  • 返回:嵌入编码的 NumPy 数组,形状为 (#sequences, length of sequences, embedding dim)。
  • 功能:将给定的蛋白质序列转换为嵌入向量。

3. TAPE 嵌入方法 (tape_embed):

  • 参数seq_df,一个包含至少两列(IDsequence)的 pandas 数据框。

  • 返回:一个字典,其中键是 ID,值是嵌入向量。嵌入向量是具有形状 (#sequences, length of sequences, embedding dim) 的 PyTorch 张量。

  • 功能

    • 创建一个嵌入数据集并设置数据加载器。
    • 通过 TAPE 的 ProteinBertModel 运行前向传播来生成嵌入。
    • 根据需要,可以选择返回整个序列的嵌入或返回序列的平均嵌入。

CCMPred编码局部变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@numba.njit
def all_sequence_pairwise_profile(args):
'''
Parameters
----------
e: 4-d (L, L, 21, 21) np.array where e(i, j, a, b) is the epsilon values in CCMPred for
`a` at the i-th postion and `b` at the j-th position.
Amino acids are encoded by 0-20 using the same index in CCMPred.
index_encoded: (N, L) array of index-encoded sequence using CCMPred index

Returns
-------
encoding: (N, L, L) array of sequence encoding. Each amino acid in a sequence
is encoded by values in the pairwise epsilon value table of CCMPred.
'''
e, index_encoded = args
N, L = index_encoded.shape
encoding = np.zeros((N, L, L))
for k in range(N):
for i in range(L - 1):
a = index_encoded[k, i]
for j in range(i + 1, L):
b = index_encoded[k, j]
encoding[k, i, j] = e[i, j, a, b]
encoding[k] += encoding[k].T
return encoding

@numba.njit
def all_sequence_singleton_profile(args):
'''
ei: 2-d (L, 20) np.array where e(i, a) is the epsilon values in CCMPred for
`a` at the i-th postion.
Amino acids are encoded by 0-20 using the same index in CCMPred.
index_encoded: (N, L) array of index-encoded sequence using CCMPred index

Returns
-------
encoding: (N, L, 1) array of sequence encoding. Each amino acid in a sequence
is encoded by values in the singleton epsilon value table of CCMPred.
'''
e, index_encoded = args
N, L = index_encoded.shape
encoding = np.zeros((N, L, 1))
for k in range(N):
for i in range(L):
a = index_encoded[k, i]
encoding[k, i] = e[i, a]
return encoding

class CCMPredEncoder(object):
def __init__(self, ccmpred_output=None, seq_len=None):
'''
brawfile: path to msgpack file storing the (L, L, 21, 21) table of
CCMPred epsilon values of a target sequence
'''
self.seq_len = seq_len
self.vocab_index = vocab.CCMPRED_AMINO_ACID_INDEX
brawfile = pathlib.Path(ccmpred_output)
self.eij, self.ei = self.load_data(brawfile)

def load_data(self, brawfile):
'''
Returns
-------
eij: 4-d (L, L, 21, 21) np.array where e(i, j, a, b) is the epsilon values in CCMPred for
`a` at the i-th postion and `b` at the j-th position.
ei: 2-d (L, 20) np.array where e(i, a) is the epsilon values in CCMPred for
`a` at the i-th postion
Amino acids are encoded by 0-20 using the same index in CCMPred.
'''
if not brawfile.exists():
raise FileNotFoundError(brawfile)
data = msgpack.unpack(open(brawfile, 'rb'))
L = self.seq_len
V = len(self.vocab_index)
eij = np.zeros((L, L, V, V))
for i in range(L - 1):
for j in range(i + 1, L):
arr = np.array(data[b'x_pair'][b'%d/%d'%(i, j)][b'x']).reshape(V, V)
eij[i, j] = arr
eij[j, i] = arr.T

ei = np.array(data[b'x_single']).reshape(L, V - 1)

return eij, ei


def index_encoding(self, sequences, letter_to_index_dict):
'''
Modified from https://github.com/openvax/mhcflurry/blob/master/mhcflurry/amino_acid.py#L110-L130

Parameters
----------
sequences: list of equal-length sequences
letter_to_index_dict: char -> int

Returns
-------
np.array with shape (#sequences, length of sequences)
'''
df = pd.DataFrame(iter(s) for s in sequences)
encoding = df.replace(letter_to_index_dict)
encoding = encoding.values.astype(np.int)
return encoding

def ccmpred_encoding(self, index_encoded, profile='pair'):
if profile == 'pair':
encoding = all_sequence_pairwise_profile((self.eij, index_encoded))
elif profile == 'single':
encoding = all_sequence_singleton_profile((self.ei, index_encoded))
else:
raise NotImplementedError
return encoding

def encode(self, sequences, mode='train'):
"""
Returns
-------
encoding: (N, L, L + 1) array of sequence encoding. Each amino acid in a sequence
is encoded by values in the singble and pairwise epsilon value tables of CCMPred
"""
index_encoded = self.index_encoding(sequences, self.vocab_index)
single = self.ccmpred_encoding(index_encoded, profile='single')
pair = self.ccmpred_encoding(index_encoded, profile='pair')
self.ccmpred_encoded = np.concatenate([single, pair], axis=2)
return self.ccmpred_encoded

1. 函数 all_sequence_pairwise_profile

  • 参数:一个包含 CCMPred 的 �ε 值和索引编码序列的元组。
  • 返回:一个编码数组,形状为 (�,�,�)(N,L,L)。
  • 功能:计算所有序列的两两配对特征。

2. 函数 all_sequence_singleton_profile

  • 参数:一个包含 CCMPred 的 �ε 值和索引编码序列的元组。
  • 返回:一个编码数组,形状为 (�,�,1)(N,L,1)。
  • 功能:计算所有序列的单体特征。

3. CCMPredEncoder

  • 方法 __init__
    • 初始化类,加载 CCMPred 输出文件并设置词汇表索引。
  • 方法 load_data
    • 从给定的文件中加载 CCMPred 数据并返回两个数组,分别表示两两配对和单体特征。
  • 方法 index_encoding
    • 将给定的序列列表转换为整数索引编码。
  • 方法 ccmpred_encoding
    • 根据 CCMPred 的两两配对或单体特征计算序列的编码。
  • 方法 encode
    • 返回序列的 CCMPred 编码,将单体和两两配对特征组合在一起。

该代码部分的核心目标是使用 CCMPred 的共进化信息为蛋白质序列生成特征编码。共进化信息可以捕获蛋白质残基之间的相互作用和依赖性,对于许多下游分析和预测任务(例如,三维结构预测)可能是有用的。通过使用 Numba(一个用于编译 Python 的高性能库)进行即时编译,相关的数值计算得到了优化。

模型构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class PositionalEmbedding(nn.Module):
'''
Modified from Annotated Transformer
http://nlp.seas.harvard.edu/2018/04/03/attention.html
'''
def __init__(self, d_model, max_len=1024):
super(PositionalEmbedding, self).__init__()
pe = torch.zeros((max_len, d_model), requires_grad=False).float()
position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)

def forward(self, x):
return self.pe[:, :x.size(1)]


class InputPositionEmbedding(nn.Module):
def __init__(self, vocab_size=None, embed_dim=None, dropout=0.1,
init_weight=None, seq_len=None):
super(InputPositionEmbedding, self).__init__()
self.embed = nn.Embedding(vocab_size, embed_dim)
self.dropout = nn.Dropout(dropout)
self.position_embed = PositionalEmbedding(embed_dim, max_len=seq_len)
self.reproject = nn.Identity()
if init_weight is not None:
self.embed = nn.Embedding.from_pretrained(init_weight)
self.reproject = nn.Linear(init_weight.size(1), embed_dim)

def forward(self, inputs):
x = self.embed(inputs)
x = x + self.position_embed(inputs)
x = self.reproject(x)
x = self.dropout(x)
return x


class AggregateLayer(nn.Module):
def __init__(self, d_model=None, dropout=0.1):
super(AggregateLayer, self).__init__()
self.attn = nn.Sequential(collections.OrderedDict([
('layernorm', nn.LayerNorm(d_model)),
('fc', nn.Linear(d_model, 1, bias=False)),
('dropout', nn.Dropout(dropout)),
('softmax', nn.Softmax(dim=1))
]))

def forward(self, context):
'''
Parameters
----------
context: token embedding from encoder (Transformer/LSTM)
(batch_size, seq_len, embed_dim)
'''

weight = self.attn(context)
# (batch_size, seq_len, embed_dim).T * (batch_size, seq_len, 1) * ->
# (batch_size, embed_dim, 1)
output = torch.bmm(context.transpose(1, 2), weight)
output = output.squeeze(2)
return output



class GlobalPredictor(nn.Module):
def __init__(self, d_model=None, d_h=None, d_out=None, dropout=0.5):
super(GlobalPredictor, self).__init__()
self.predict_layer = nn.Sequential(collections.OrderedDict([
('batchnorm', nn.BatchNorm1d(d_model)),
('fc1', nn.Linear(d_model, d_h)),
('tanh', nn.Tanh()),
('dropout', nn.Dropout(dropout)),
('fc2', nn.Linear(d_h, d_out))
]))

def forward(self, x):
x = self.predict_layer(x)
return x


class SequenceLSTM(nn.Module):
"""Container module with an encoder, a recurrent module, and a decoder."""

def __init__(self, d_input=None, d_embed=20, d_model=128,
vocab_size=None, seq_len=None,
dropout=0.1, lstm_dropout=0,
nlayers=1, bidirectional=False,
proj_loc_config=None):
super(SequenceLSTM, self).__init__()

self.embed = InputPositionEmbedding(vocab_size=vocab_size,
seq_len=seq_len, embed_dim=d_embed)

self.lstm = nn.LSTM(input_size=d_input,
hidden_size=d_model//2 if bidirectional else d_model,
num_layers=nlayers, dropout=lstm_dropout,
bidirectional=bidirectional)
self.drop = nn.Dropout(dropout)
self.proj_loc_layer = proj_loc_config['layer'](
proj_loc_config['d_in'], proj_loc_config['d_out']
)

def forward(self, x, loc_feat=None):
x = self.embed(x)
if loc_feat is not None:
loc_feat = self.proj_loc_layer(loc_feat)
x = torch.cat([x, loc_feat], dim=2)
x = x.transpose(0, 1).contiguous()
x, _ = self.lstm(x)
x = x.transpose(0, 1).contiguous()
x = self.drop(x)
return x


class LSTMPredictor(nn.Module):
def __init__(self, d_embed=20, d_model=128, d_h=128, d_out=1,
vocab_size=None, seq_len=None,
dropout=0.1, lstm_dropout=0, nlayers=1, bidirectional=False,
use_loc_feat=True, use_glob_feat=True,
proj_loc_config=None, proj_glob_config=None):
super(LSTMPredictor, self).__init__()
self.seq_lstm = SequenceLSTM(
d_input=d_embed + (proj_loc_config['d_out'] if use_loc_feat else 0),
d_embed=d_embed, d_model=d_model,
vocab_size=vocab_size, seq_len=seq_len,
dropout=dropout, lstm_dropout=lstm_dropout,
nlayers=nlayers, bidirectional=bidirectional,
proj_loc_config=proj_loc_config)
self.proj_glob_layer = proj_glob_config['layer'](
proj_glob_config['d_in'], proj_glob_config['d_out']
)
self.aggragator = AggregateLayer(
d_model = d_model + (proj_glob_config['d_out'] if use_glob_feat else 0))
self.predictor = GlobalPredictor(
d_model = d_model + (proj_glob_config['d_out'] if use_glob_feat else 0),
d_h=d_h, d_out=d_out)

def forward(self, x, glob_feat=None, loc_feat=None):
x = self.seq_lstm(x, loc_feat=loc_feat)
if glob_feat is not None:
glob_feat = self.proj_glob_layer(glob_feat)
x = torch.cat([x, glob_feat], dim=2)
x = self.aggragator(x)
output = self.predictor(x)
return output

1. PositionalEmbedding

  • 功能:创建了一个位置嵌入矩阵,用于向序列的每个位置添加独特的嵌入。

  • 属性

    • pe:存储位置嵌入的张量。
  • 原理:使用 sine 和 cosine 函数生成位置嵌入,使得每个位置的嵌入都是唯一的。

2. InputPositionEmbedding

  • 功能:将输入的词汇表索引转换为连续的嵌入表示,并添加位置嵌入。

  • 属性和组件

    • embed:一个嵌入层,用于将输入的词汇表索引转换为连续的向量表示。
    • position_embed:一个 PositionalEmbedding 层,用于添加位置信息。
    • reproject:一个线性层,用于重新投影嵌入空间(如果需要)。
    • dropout:Dropout 层,用于正则化。

3. AggregateLayer

  • 功能:聚合编码后的序列表示。

  • 组件

    • attn:一个包括层归一化、全连接层、Dropout 层和 Softmax 层的序列,用于计算每个位置的权重,然后通过加权平均进行聚合。

4. GlobalPredictor

  • 功能:从聚合后的表示生成最终的预测输出。

  • 组件

    • predict_layer:一个包括批归一化、全连接层、激活函数和 Dropout 层的序列,用于最终的预测。

5. SequenceLSTM

  • 功能:使用 LSTM 处理序列输入并生成连续的序列表示。

  • 属性和组件

    • embed:一个 InputPositionEmbedding 层,用于输入和位置嵌入。
    • lstm:LSTM 层,用于编码序列。
    • drop:Dropout 层,用于正则化。
    • proj_loc_layer:可选的全连接层,用于投影局部特征。

6. LSTMPredictor

  • 功能:整个 LSTM 预测模型的容器。

  • 属性和组件

    • seq_lstm:一个 SequenceLSTM 层,用于序列编码。
    • proj_glob_layer:全连接层,用于投影全局特征。
    • aggregator:一个 AggregateLayer 层,用于聚合编码后的序列表示。
    • predictor:一个 GlobalPredictor 层,用于最终的预测。

I'm so cute. Please give me money.

其他文章
cover
Python入门1
  • 23/08/18
  • 10:55
  • 125
  • 1
请输入关键词进行搜索