Coverage for mlair/model_modules/recurrent_networks.py: 0%

2 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:40 +0000

1__author__ = "Lukas Leufen" 

2__date__ = '2021-05-25' 

3 

4from functools import reduce, partial 

5from typing import Union 

6 

7from mlair.model_modules import AbstractModelClass 

8from mlair.helpers import select_from_dict 

9from mlair.model_modules.loss import var_loss, custom_loss 

10 

11import tensorflow.keras as keras 

12 

13 

14class RNN(AbstractModelClass): # pragma: no cover 

15 """ 

16 

17 """ 

18 

19 _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), 

20 "sigmoid": partial(keras.layers.Activation, "sigmoid"), 

21 "linear": partial(keras.layers.Activation, "linear"), 

22 "selu": partial(keras.layers.Activation, "selu"), 

23 "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), 

24 "leakyrelu": partial(keras.layers.LeakyReLU)} 

25 _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", 

26 "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), 

27 "prelu": keras.initializers.he_normal()} 

28 _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD} 

29 _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} 

30 _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] 

31 _dropout = {"selu": keras.layers.AlphaDropout} 

32 _rnn = {"lstm": keras.layers.LSTM, "gru": keras.layers.GRU} 

33 

34 def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear", 

35 activation_rnn="tanh", dropout_rnn=0, 

36 optimizer="adam", n_layer=1, n_hidden=10, regularizer=None, dropout=None, layer_configuration=None, 

37 batch_normalization=False, rnn_type="lstm", add_dense_layer=False, dense_layer_configuration=None, 

38 kernel_regularizer=None, **kwargs): 

39 """ 

40 Sets model and loss depending on the given arguments. 

41 

42 :param input_shape: list of input shapes (expect len=1 with shape=(window_hist, station, variables)) 

43 :param output_shape: list of output shapes (expect len=1 with shape=(window_forecast)) 

44 

45 Customize this RNN model via the following parameters: 

46 

47 :param activation: set your desired activation function for appended dense layers (add_dense_layer=True). Choose 

48 from relu, tanh, sigmoid, linear, selu, prelu, leakyrelu. (Default relu) 

49 :param activation_rnn: set your desired activation function of the rnn output. Choose from relu, tanh, sigmoid, 

50 linear, selu, prelu, leakyrelu. To use the fast cuDNN implementation, tensorflow requires to use tanh as 

51 activation. Note that this is not the recurrent activation (which is not mutable in this class) but the 

52 activation of the cell. (Default tanh) 

53 :param activation_output: same as activation parameter but exclusively applied on output layer only. (Default 

54 linear) 

55 :param optimizer: set optimizer method. Can be either adam or sgd. (Default adam) 

56 :param n_layer: define number of hidden layers in the network. Given number of hidden neurons are used in each 

57 layer. (Default 1) 

58 :param n_hidden: define number of hidden units per layer. This number is used in each hidden layer. (Default 10) 

59 :param layer_configuration: alternative formulation of the network's architecture. This will overwrite the 

60 settings from n_layer and n_hidden. Provide a list where each element represent the number of units in the 

61 hidden layer. The number of hidden layers is equal to the total length of this list. 

62 :param dropout: use dropout with given rate. If no value is provided, dropout layers are not added to the 

63 network at all. (Default None) 

64 :param dropout_rnn: use recurrent dropout with given rate. This is applied along the recursion and not after 

65 a rnn layer. Be aware that tensorflow is only able to use the fast cuDNN implementation with no recurrent 

66 dropout. (Default 0) 

67 :param batch_normalization: use batch normalization layer in the network if enabled. These layers are inserted 

68 between the linear part of a layer (the nn part) and the non-linear part (activation function). No BN layer 

69 is added if set to false. (Default false) 

70 :param rnn_type: define which kind of recurrent network should be applied. Chose from either lstm or gru. All 

71 units will be of this kind. (Default lstm) 

72 :param add_dense_layer: set True to use additional dense layers between last recurrent layer and output layer.  

73 If no further specification is made on the exact dense_layer_configuration, a single layer as added with n  

74 neurons where n is equal to min(n_previous_layer, n_output**2). If set to False, the output layer directly  

75 follows after the last recurrent layer. 

76 :param dense_layer_configuration: specify the number of dense layers and the number of neurons given as list 

77 where each element corresponds to the number of neurons to add. The position / length of the list specifies 

78 the number of layers to add. The last layer is followed by the output layer. In case a value is given for 

79 the number of neurons that is less than the number of output neurons, the addition of dense layers is  

80 stopped immediately. 

81 """ 

82 

83 assert len(input_shape) == 1 

84 assert len(output_shape) == 1 

85 super().__init__(input_shape[0], output_shape[0]) 

86 

87 # settings 

88 self.activation = self._set_activation(activation.lower()) 

89 self.activation_name = activation 

90 self.activation_rnn = self._set_activation(activation_rnn.lower()) 

91 self.activation_rnn_name = activation 

92 self.activation_output = self._set_activation(activation_output.lower()) 

93 self.activation_output_name = activation_output 

94 self.optimizer = self._set_optimizer(optimizer.lower(), **kwargs) 

95 self.bn = batch_normalization 

96 self.add_dense_layer = add_dense_layer 

97 self.dense_layer_configuration = dense_layer_configuration or [] 

98 self.layer_configuration = (n_layer, n_hidden) if layer_configuration is None else layer_configuration 

99 self.RNN = self._rnn.get(rnn_type.lower()) 

100 self._update_model_name(rnn_type) 

101 self.kernel_initializer = self._initializer.get(activation, "glorot_uniform") 

102 self.kernel_regularizer, self.kernel_regularizer_opts = self._set_regularizer(kernel_regularizer, **kwargs) 

103 self.dropout, self.dropout_rate = self._set_dropout(activation, dropout) 

104 assert 0 <= dropout_rnn <= 1 

105 self.dropout_rnn = dropout_rnn 

106 

107 # apply to model 

108 self.set_model() 

109 self.set_compile_options() 

110 self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) 

111 

112 def set_model(self): 

113 """ 

114 Build the model. 

115 """ 

116 if isinstance(self.layer_configuration, tuple) is True: 

117 n_layer, n_hidden = self.layer_configuration 

118 conf = [n_hidden for _ in range(n_layer)] 

119 else: 

120 assert isinstance(self.layer_configuration, list) is True 

121 conf = self.layer_configuration 

122 

123 x_input = keras.layers.Input(shape=self._input_shape) 

124 x_in = keras.layers.Reshape((self._input_shape[0], reduce((lambda x, y: x * y), self._input_shape[1:])))( 

125 x_input) 

126 

127 for layer, n_hidden in enumerate(conf): 

128 return_sequences = (layer < len(conf) - 1) 

129 x_in = self.RNN(n_hidden, return_sequences=return_sequences, recurrent_dropout=self.dropout_rnn, 

130 kernel_regularizer=self.kernel_regularizer)(x_in) 

131 if self.bn is True: 

132 x_in = keras.layers.BatchNormalization()(x_in) 

133 x_in = self.activation_rnn(name=f"{self.activation_rnn_name}_{layer + 1}")(x_in) 

134 if self.dropout is not None: 

135 x_in = self.dropout(self.dropout_rate)(x_in) 

136 

137 if self.add_dense_layer is True: 

138 if len(self.dense_layer_configuration) == 0: 

139 x_in = keras.layers.Dense(min(self._output_shape ** 2, conf[-1]), name=f"Dense_{len(conf) + 1}", 

140 kernel_initializer=self.kernel_initializer, )(x_in) 

141 x_in = self.activation(name=f"{self.activation_name}_{len(conf) + 1}")(x_in) 

142 if self.dropout is not None: 

143 x_in = self.dropout(self.dropout_rate)(x_in) 

144 else: 

145 for layer, n_hidden in enumerate(self.dense_layer_configuration): 

146 if n_hidden < self._output_shape: 

147 break 

148 x_in = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}", 

149 kernel_initializer=self.kernel_initializer, )(x_in) 

150 x_in = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_in) 

151 if self.dropout is not None: 

152 x_in = self.dropout(self.dropout_rate)(x_in) 

153 

154 x_in = keras.layers.Dense(self._output_shape)(x_in) 

155 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) 

156 self.model = keras.Model(inputs=x_input, outputs=[out]) 

157 print(self.model.summary()) 

158 

159 # x_in = keras.layers.LSTM(32)(x_in) 

160 # if self.dropout is not None: 

161 # x_in = self.dropout(self.dropout_rate)(x_in) 

162 # x_in = keras.layers.RepeatVector(self._output_shape)(x_in) 

163 # x_in = keras.layers.LSTM(32, return_sequences=True)(x_in) 

164 # if self.dropout is not None: 

165 # x_in = self.dropout(self.dropout_rate)(x_in) 

166 # out = keras.layers.TimeDistributed(keras.layers.Dense(1))(x_in) 

167 # out = keras.layers.Flatten()(out) 

168 

169 def _set_dropout(self, activation, dropout_rate): 

170 if dropout_rate is None: 

171 return None, None 

172 assert 0 <= dropout_rate < 1 

173 return self._dropout.get(activation, keras.layers.Dropout), dropout_rate 

174 

175 def _set_activation(self, activation): 

176 try: 

177 return self._activation.get(activation.lower()) 

178 except KeyError: 

179 raise AttributeError(f"Given activation {activation} is not supported in this model class.") 

180 

181 def set_compile_options(self): 

182 self.compile_options = {"loss": [keras.losses.mean_squared_error], 

183 "metrics": ["mse", "mae", var_loss]} 

184 

185 def _set_optimizer(self, optimizer, **kwargs): 

186 try: 

187 opt_name = optimizer.lower() 

188 opt = self._optimizer.get(opt_name) 

189 opt_kwargs = {} 

190 if opt_name == "adam": 

191 opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"]) 

192 elif opt_name == "sgd": 

193 opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"]) 

194 return opt(**opt_kwargs) 

195 except KeyError: 

196 raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.") 

197 

198 def _set_regularizer(self, regularizer: Union[None, str], **kwargs): 

199 if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"): 

200 return None, None 

201 try: 

202 reg_name = regularizer.lower() 

203 reg = self._regularizer.get(reg_name) 

204 reg_kwargs = {} 

205 if reg_name in ["l1", "l2"]: 

206 reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True) 

207 if reg_name in reg_kwargs: 

208 reg_kwargs["l"] = reg_kwargs.pop(reg_name) 

209 elif reg_name == "l1_l2": 

210 reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True) 

211 return reg(**reg_kwargs), reg_kwargs 

212 except KeyError: 

213 raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") 

214 

215 def _update_model_name(self, rnn_type): 

216 n_input = str(reduce(lambda x, y: x * y, self._input_shape)) 

217 n_output = str(self._output_shape) 

218 self.model_name = rnn_type.upper() 

219 if isinstance(self.layer_configuration, tuple) and len(self.layer_configuration) == 2: 

220 n_layer, n_hidden = self.layer_configuration 

221 self.model_name += "_".join(["", n_input, *[f"{n_hidden}" for _ in range(n_layer)], n_output]) 

222 else: 

223 self.model_name += "_".join(["", n_input, *[f"{n}" for n in self.layer_configuration], n_output])