mirror of
https://github.com/dragen1860/TensorFlow-2.x-Tutorials.git
synced 2021-05-12 18:32:23 +03:00
159 lines
5.5 KiB
Python
159 lines
5.5 KiB
Python
import tensorflow as tf
|
|
import numpy as np
|
|
|
|
from utils import positional_encoding
|
|
from attlayer import EncoderLayer,DecoderLayer
|
|
|
|
|
|
|
|
class Encoder(tf.keras.layers.Layer):
|
|
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
|
|
rate=0.1):
|
|
super(Encoder, self).__init__()
|
|
|
|
self.d_model = d_model
|
|
self.num_layers = num_layers
|
|
|
|
self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
|
|
self.pos_encoding = positional_encoding(input_vocab_size, self.d_model)
|
|
|
|
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate)
|
|
for _ in range(num_layers)]
|
|
|
|
self.dropout = tf.keras.layers.Dropout(rate)
|
|
|
|
def call(self, x, training, mask):
|
|
seq_len = tf.shape(x)[1]
|
|
|
|
# adding embedding and position encoding.
|
|
x = self.embedding(x) # (batch_size, input_seq_len, d_model)
|
|
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
|
|
x += self.pos_encoding[:, :seq_len, :]
|
|
|
|
x = self.dropout(x, training=training)
|
|
|
|
for i in range(self.num_layers):
|
|
x = self.enc_layers[i](x, training, mask)
|
|
|
|
return x # (batch_size, input_seq_len, d_model)
|
|
|
|
|
|
"""### Decoder
|
|
|
|
The `Decoder` consists of:
|
|
1. Output Embedding
|
|
2. Positional Encoding
|
|
3. N decoder layers
|
|
|
|
The target is put through an embedding which is summed with the positional encoding. The output of this summation is the input to the decoder layers. The output of the decoder is the input to the final linear layer.
|
|
"""
|
|
|
|
|
|
class Decoder(tf.keras.layers.Layer):
|
|
def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size,
|
|
rate=0.1):
|
|
super(Decoder, self).__init__()
|
|
|
|
self.d_model = d_model
|
|
self.num_layers = num_layers
|
|
|
|
self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
|
|
self.pos_encoding = positional_encoding(target_vocab_size, self.d_model)
|
|
|
|
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate)
|
|
for _ in range(num_layers)]
|
|
self.dropout = tf.keras.layers.Dropout(rate)
|
|
|
|
def call(self, x, enc_output, training,
|
|
look_ahead_mask, padding_mask):
|
|
seq_len = tf.shape(x)[1]
|
|
attention_weights = {}
|
|
|
|
x = self.embedding(x) # (batch_size, target_seq_len, d_model)
|
|
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
|
|
x += self.pos_encoding[:, :seq_len, :]
|
|
|
|
x = self.dropout(x, training=training)
|
|
|
|
for i in range(self.num_layers):
|
|
x, block1, block2 = self.dec_layers[i](x, enc_output, training,
|
|
look_ahead_mask, padding_mask)
|
|
|
|
attention_weights['decoder_layer{}_block1'.format(i + 1)] = block1
|
|
attention_weights['decoder_layer{}_block2'.format(i + 1)] = block2
|
|
|
|
# x.shape == (batch_size, target_seq_len, d_model)
|
|
return x, attention_weights
|
|
|
|
|
|
|
|
"""## Create the Transformer
|
|
|
|
Transformer consists of the encoder, decoder and a final linear layer. The output of the decoder is the input to the linear layer and its output is returned.
|
|
"""
|
|
|
|
|
|
class Transformer(tf.keras.Model):
|
|
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size,
|
|
target_vocab_size, rate=0.1):
|
|
super(Transformer, self).__init__()
|
|
|
|
self.encoder = Encoder(num_layers, d_model, num_heads, dff,
|
|
input_vocab_size, rate)
|
|
|
|
self.decoder = Decoder(num_layers, d_model, num_heads, dff,
|
|
target_vocab_size, rate)
|
|
|
|
self.final_layer = tf.keras.layers.Dense(target_vocab_size)
|
|
|
|
def call(self, inp, tar, training, enc_padding_mask,
|
|
look_ahead_mask, dec_padding_mask):
|
|
enc_output = self.encoder(inp, training, enc_padding_mask) # (batch_size, inp_seq_len, d_model)
|
|
|
|
# dec_output.shape == (batch_size, tar_seq_len, d_model)
|
|
dec_output, attention_weights = self.decoder(
|
|
tar, enc_output, training, look_ahead_mask, dec_padding_mask)
|
|
|
|
final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)
|
|
|
|
return final_output, attention_weights
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sample_encoder = Encoder(num_layers=2, d_model=512, num_heads=8,
|
|
dff=2048, input_vocab_size=8500)
|
|
|
|
sample_encoder_output = sample_encoder(tf.random.uniform((64, 62)),
|
|
training=False, mask=None)
|
|
|
|
print(sample_encoder_output.shape) # (batch_size, input_seq_len, d_model)
|
|
|
|
sample_decoder = Decoder(num_layers=2, d_model=512, num_heads=8,
|
|
dff=2048, target_vocab_size=8000)
|
|
|
|
output, attn = sample_decoder(tf.random.uniform((64, 26)),
|
|
enc_output=sample_encoder_output,
|
|
training=False, look_ahead_mask=None,
|
|
padding_mask=None)
|
|
|
|
output.shape, attn['decoder_layer2_block2'].shape
|
|
|
|
|
|
sample_transformer = Transformer(
|
|
num_layers=2, d_model=512, num_heads=8, dff=2048,
|
|
input_vocab_size=8500, target_vocab_size=8000)
|
|
|
|
temp_input = tf.random.uniform((64, 62))
|
|
temp_target = tf.random.uniform((64, 26))
|
|
|
|
fn_out, _ = sample_transformer(temp_input, temp_target, training=False,
|
|
enc_padding_mask=None,
|
|
look_ahead_mask=None,
|
|
dec_padding_mask=None)
|
|
|
|
fn_out.shape # (batch_size, tar_seq_len, target_vocab_size)
|
|
|