基于Keras的扩展性使用

基于Keras的扩展性使用

Keras是一个用于在python上搭神经网络模型的框架,语法和torch比较相似。我个人认为Keras最大的特点是包装很好,一些在训练过程中要输出的方法和常用的优化函数、目标函数都已经内置了,非常适合用来写大作业。Keras和python的哲学有些相似,那就是尽量不自己造轮子。

但是最近逛知乎,看到有答案说,Keras只能用来搭一些世面上已经普及的网络,和其它框架相比比较小白。换句话说,就是Keras的扩展性不好。作为一个试用过theano、tensorflow、torch、caffe等框架,最后定居在Keras的人,我对此不太同意。事实上,Keras拥有不错的扩展性,这一方面是因为设计时就留好的接口,另一方面是因为清晰的代码结构,让你可以有很多自定义的空间。所以下面用几个例子介绍在Keras中如何自定义层和各种方法。

0、backend

如果想在Keras中自定义各种层和函数,一定会用到的就是backend。一般导入的方法是

from keras import backend as K

这是因为Keras可以有两种后台,即theano和tensorflow,所以一些操作张量的函数可能是随后台的不同而不同的,

通过引入这个backend,就可以让Keras来处理兼容性。

比如求x的平均,就是K.mean(x)。backend文件本身在keras/backend文件夹下,可以通过阅读代码来了解backend都支持哪些操作。backend里面函数很多,一般都够用了。

1、Lambda 层

如果你只是想对流经该层的数据做个变换,而这个变换本身没有什么需要学习的参数,那么直接用Lambda Layer是最合适的了。

导入的方法是

from keras.layers.core import Lambda

Lambda函数接受两个参数,第一个是输入张量对输出张量的映射函数,第二个是输入的shape对输出的shape的映射函数。比如想构建这样一个层,流经该层的数据会被减去平均值,那么可以这样定义:

def sub_mean(x):
    x -= K.mean(x,axis=1,keepdims=True)
    return x
model.add( Lambda(sub_mean,output_shape=lambda input_shape:input_shape ))

因为输出的shape和输入的shape是一样的,第二个参数就直接用了恒等映射。

把模型完整地建立出来:

def get_submean_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    def sub_mean(x):
        x -= K.mean(x,axis=1,keepdims=True)
        return x
    model.add( Lambda(sub_mean,output_shape=lambda input_shape:input_shape))
    model.compile(optimizer="rmsprop",loss="mse")
    return model
model = get_submean_model()
res=model.predict(np.random.random((3,7)))

得到地res的平均值是[ 5.96046448e-08 -5.96046448e-08 0.00000000e+00],可见确实实现了减去均值的作用。

2、自定义非递归层

如果自己想定义的层中有需要学习的变量,那么就不能用lambda层了,需要自己写一个出来。

比如说我想定义一个层,它的效果是对张量乘一个正对角阵(换句话说,输入向量与一个要学习的向量逐元素相乘),那么可以这样写:

首先要导入基类

from keras.engine.topology import Layer

然后对MyLaber定义如下:

class MyLayer(Layer):
    def __init__(self,output_dim,**kw):
        self.output_dim = output_dim
        super(MyLayer,self).__init__(**kw)
    def build(self,input_shape):
        input_dim = input_shape[1]
        assert(input_dim == self.output_dim)
        inital_SCALER = np.ones((input_dim,))*1000
        self.SCALER = K.variable(inital_SCALER)
        self.trainable_weights = [self.SCALER]
        super(MyLayer,self).build(input_shape)
    def call(self,x,mask=None):
        #return x - K.mean(x,axis=1,keepdims=True)
        x *= self.SCALER
        return x
    def get_output_shape_for(self,input_shape):
        return input_shape

主要参照Keras内置的层的写法,比如Dense在keras/layers/core.py中,要把能学习的参数放在self.trainable_weights中。这里把初始值设成了1000是为了让该层的效果更显著。然后把模型写全来测试一下

def get_mylayer_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    model.add(MyLayer(5))
    model.compile(optimizer="rmsprop",loss="mse")
    return model
model = get_mylayer_model()
res=model.predict(np.random.random((3,7)))
print res

res如下:

[[ 271.2746582 -1053.31506348 147.17185974 -1120.33740234 609.54876709]

[ -263.69671631 -390.41921997 291.17721558 -594.58721924 615.97369385]

[ -46.58752823 -733.11328125 -21.9815979 -570.79351807 649.44158936]]

都是很大的数,而不加MyLayer时每个值一般也不超过+-2,这个层确实起了作用。

在fit之前调用model.get_weights(),看到该层的权重都是1000,随便随机出来个测试集,fit几千个epoch只后,loss变得很小,MyLayer的权重变成了997左右,而前面一层Dense的权重都成10^-4量级,说明MyLayer中的参数也确实是可学习的。

3、自定义损失函数

Keras内置的损失函数都在keras/objectives.py中,比如mse的定义是:

def mean_squared_error(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

按照相同的格式,可以定义自己的损失函数。比如我们想要差值的4次方的平均作为损失函数:

def my_object(y_true,y_pred):
    return K.mean(K.square(K.square(y_pred-y_true)),axis=-1)

把模型写全:

def get_myobj_model():
    model = Sequential()
    model.add(Dense(5,input_dim=7))
    model.add(Dense(3))
    def my_object(y_true,y_pred):
        return K.mean(K.square(K.square(y_pred-y_true)),axis=-1)
    model.compile(optimizer="sgd",loss=my_object)
    return model
model = get_myobj_model()

能自定义损失函数是非常重要一环,它极大的扩展了网络的应用。例如希望用cnn训练出来一个前后景分割的滤波器,它的输出的像素在对应前景的位置是1,在对应后景的位置是0。不但希望网络输出的值的mse小,而且希望0和1分别都连在一起,不要出来雪花状的输出。那么自定义损失函数就能做到了,实际是把两个损失函数放到了一个损失函数中。

另外一些很有用的损失函数如warp-ctc,就可以在这里集成进模型。

4、自定义递归层

递归层的定义方法和非递归层不太一样。根据Keras内LSTM的写法,它还有一个reset_states函数和step函数,这是由递归的性质决定的。例子都在keras/layers/recurrent.py中。

之前看学长用lasagne写的LSTM的变体,看得我想哭,还不如在Keras中把LSTM得代码复制过来修修改改。不过LSTM也不能直接复制过来,还需要import几个依赖:

rom keras.layers.recurrent import LSTM,Recurrent,time_distributed_dense
from keras import initializations,regularizers,activations
from keras.engine import InputSpec

5、自定义优化函数

Keras的代码确实好,耦合度很低。Keras内置的优化函数在keras/optimizers.py中,基类Optimizer也在这个文件里。例如把它内置的SGD算法拷贝到自己的文件中,只要先from keras.optimizers import Optimizer就能编译通过。

有时候要得到state-of-the-art的结果,需要用sgd加动量法充分收敛。比如学习率0.01学习上100epoch,再把学习率减半,再学100epoch,依次类推。如果不自定义优化函数的话,就要分阶段调用fit函数,修改学习率,可能还要重新compile。这就不是很优美了。其它一些奇葩的学习策略,也可以通过自定义优化函数来得到。

6、后记

Keras确实非常强大,不但能用来写大作业,做一些研究也够用了。Yeah

补充:keras的扩展性:自定义keras

1. 自定义keras

keras是一种深度学习的API,能够快速实现你的实验。keras也集成了很多预训练的模型,可以实现很多常规的任务,如图像分类。TensorFlow 2.0之后tensorflow本身也变的很keras化。

另一方面,keras表现出高度的模块化和封装性,所以有的人会觉得keras不易于扩展, 比如实现一种新的Loss,新的网络层结构;其实可以通过keras的基础模块进行快速的扩展,实现更新的算法。

本文就keras的扩展性,总结了对layer,model和loss的自定义。

2. 自定义keras layers

layers是keras中重要的组成部分,网络结构中每一个组成都要以layers来表现。keras提供了很多常规的layer,如Convolution layers,pooling layers, activation layers, dense layers等, 我们可以通过继承基础layers来扩展自定义的layers。

2.1 base layer

layer实了输入tensor和输出tensor的操作类,以下为base layer的5个方法,自定义layer只要重写这些方法就可以了。

init(): 定义自定义layer的一些属性

build(self, input_shape):定义layer需要的权重weights

call(self, *args, **kwargs):layer具体的操作,会在调用自定义layer自动执行

get_config(self):layer初始化的配置,是一个字典dictionary。

compute_output_shape(self,input_shape):计算输出tensor的shape

2.2 例子

# 标准化层
class InstanceNormalize(Layer):
    def __init__(self, **kwargs):
        super(InstanceNormalize, self).__init__(**kwargs)
        self.epsilon = 1e-3
            
    def call(self, x, mask=None):
        mean, var = tf.nn.moments(x, [1, 2], keep_dims=True)
        return tf.div(tf.subtract(x, mean), tf.sqrt(tf.add(var, self.epsilon)))
                                                 
    def compute_output_shape(self,input_shape):
        return input_shape
# 调用
inputs = keras.Input(shape=(None, None, 3))
x = InstanceNormalize()(inputs)
# 可以通过add_weight() 创建权重
class SimpleDense(Layer):
  def __init__(self, units=32):
      super(SimpleDense, self).__init__()
      self.units = units
  def build(self, input_shape):
      self.w = self.add_weight(shape=(input_shape[-1], self.units),
                               initializer="random_normal",
                               trainable=True)
      self.b = self.add_weight(shape=(self.units,),
                               initializer="random_normal",
                               trainable=True)
  def call(self, inputs):
      return tf.matmul(inputs, self.w) + self.b
# 调用
inputs = keras.Input(shape=(None, None, 3))
x = SimpleDense(units=64)(inputs)

3. 自定义keras model

我们在定义完网络结构时,会把整个工作流放在 keras.Model, 进行 compile(), 然后通过 fit() 进行训练过程。执行 fit() 的时候,执行每个 batch size data 的时候,都会调用 Model 中train_step(self, data)

from keras.models import Sequential
from keras.layers import Dense, Activation
model = Sequential()
model.add(Dense(units=64, input_dim=100))
model.add(Activation("relu"))
model.add(Dense(units=10))
model.add(Activation("softmax"))
model.compile(loss="categorical_crossentropy", optimizer="sgd", metrics=["accuracy"])
model.fit(x_train, y_train, epochs=5, batch_size=32)

当你需要自己控制训练过程的时候,可以重写Model的train_step(self, data)方法

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)

4. 自定义keras loss

keras实现了交叉熵等常见的loss,自定义loss对于使用keras来说是比较常见,实现各种魔改loss,如focal loss。

我们来看看keras源码中对loss实现

def categorical_crossentropy(y_true, y_pred):
    return K.categorical_crossentropy(y_true, y_pred)
def mean_squared_error(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

可以看出输入是groud true y_true和预测值y_pred, 返回为计算loss的函数。自定义loss可以参照如此模式即可。

def focal_loss(weights=None, alpha=0.25, gamma=2):
    r"""Compute focal loss for predictions.
        Multi-labels Focal loss formula:
            FL = -alpha * (z-p)^gamma * log(p) -(1-alpha) * p^gamma * log(1-p)
                 ,which alpha = 0.25, gamma = 2, p = sigmoid(x), z = target_tensor.
    # https://github.com/ailias/Focal-Loss-implement-on-Tensorflow/blob/master/focal_loss.py
    Args:
     prediction_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing the predicted logits for each class
     target_tensor: A float tensor of shape [batch_size, num_anchors,
        num_classes] representing one-hot encoded classification targets
     weights: A float tensor of shape [batch_size, num_anchors]
     alpha: A scalar tensor for focal loss alpha hyper-parameter
     gamma: A scalar tensor for focal loss gamma hyper-parameter
    Returns:
        loss: A (scalar) tensor representing the value of the loss function
    """
    def _custom_loss(y_true, y_pred):
        sigmoid_p = tf.nn.sigmoid(y_pred)
        zeros = array_ops.zeros_like(sigmoid_p, dtype=sigmoid_p.dtype)
        # For poitive prediction, only need consider front part loss, back part is 0;
        # target_tensor > zeros <=> z=1, so poitive coefficient = z - p.
        pos_p_sub = array_ops.where(y_true > zeros, y_true - sigmoid_p, zeros)
        # For negative prediction, only need consider back part loss, front part is 0;
        # target_tensor > zeros <=> z=1, so negative coefficient = 0.
        neg_p_sub = array_ops.where(y_true > zeros, zeros, sigmoid_p)
        per_entry_cross_ent = - alpha * (pos_p_sub ** gamma) * tf.log(tf.clip_by_value(sigmoid_p, 1e-8, 1.0)) 
                              - (1 - alpha) * (neg_p_sub ** gamma) * tf.log(
            tf.clip_by_value(1.0 - sigmoid_p, 1e-8, 1.0))
        return tf.reduce_sum(per_entry_cross_ent)
    return _custom_loss

5. 总结

本文分享了keras的扩展功能,扩展功能其实也是实现Keras模块化的一种继承实现。

总结如下:

继承Layer实现自定义layer, 记住bulid() call()

继续Model实现train_step定义训练过程,记住梯度计算tape.gradient(loss, trainable_vars) ,权重更新optimizer.apply_gradients, 计算evaluate compiled_metrics.update_state(y, y_pred)

魔改loss,记住groud true y_true和预测值y_pred输入,返回loss function

以上为个人经验,希望能给大家一个参考,也希望大家多多支持云海天教程。