Coverage for mlair/model_modules/convolutional_networks.py: 0%

122 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-12-18 17:51 +0000

1__author__ = "Lukas Leufen" 

2__date__ = '2021-02-' 

3 

4from functools import reduce, partial 

5from typing import Union 

6 

7from mlair.model_modules import AbstractModelClass 

8from mlair.helpers import select_from_dict, to_list 

9from mlair.model_modules.loss import var_loss, custom_loss 

10from mlair.model_modules.advanced_paddings import PadUtils, Padding2D, SymmetricPadding2D 

11 

12import tensorflow.keras as keras 

13 

14 

15class CNNfromConfig(AbstractModelClass): 

16 _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), 

17 "sigmoid": partial(keras.layers.Activation, "sigmoid"), 

18 "linear": partial(keras.layers.Activation, "linear"), 

19 "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), 

20 "leakyrelu": keras.layers.LeakyReLU} 

21 _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", 

22 "relu": keras.initializers.he_normal(), "prelu": keras.initializers.he_normal()} 

23 _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD} 

24 _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} 

25 _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] 

26 

27 """ 

28 Use this class like the following. Note that all keys must match the corresponding tf/keras keys of the layer 

29  

30 ```python 

31 input_shape = [(65,1,9)] 

32 output_shape = [(4, )] 

33 layer_configuration=[ 

34 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 8},  

35 {"type": "Dropout", "rate": 0.2}, 

36 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 16},  

37 {"type": "Dropout", "rate": 0.2}, 

38 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, 

39 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 16},  

40 {"type": "Dropout", "rate": 0.2}, 

41 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 32},  

42 {"type": "Dropout", "rate": 0.2}, 

43 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, 

44 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 32},  

45 {"type": "Dropout", "rate": 0.2}, 

46 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 64},  

47 {"type": "Dropout", "rate": 0.2}, 

48 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)}, 

49 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 64},  

50 {"type": "Dropout", "rate": 0.2}, 

51 {"type": "Flatten"}, 

52 # {"type": "Dense", "units": 128, "activation": "relu"} 

53 ] 

54 model = CNNfromConfig(input_shape, output_shape, layer_configuration) 

55 ``` 

56 

57 """ 

58 

59 def __init__(self, input_shape: list, output_shape: list, layer_configuration: list, optimizer="adam", 

60 batch_normalization=False, **kwargs): 

61 

62 assert len(input_shape) == 1 

63 assert len(output_shape) == 1 

64 super().__init__(input_shape[0], output_shape[0]) 

65 

66 self.conf = layer_configuration 

67 activation_output = kwargs.pop("activation_output", "linear") 

68 self.activation_output = self._activation.get(activation_output) 

69 self.activation_output_name = activation_output 

70 self.kwargs = kwargs 

71 self.bn = batch_normalization 

72 self.optimizer = self._set_optimizer(optimizer, **kwargs) 

73 self._layer_save = [] 

74 

75 # apply to model 

76 self.set_model() 

77 self.set_compile_options() 

78 self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) 

79 

80 def set_model(self): 

81 x_input = keras.layers.Input(shape=self._input_shape) 

82 x_in = x_input 

83 

84 for pos, layer_opts in enumerate(self.conf): 

85 print(layer_opts) 

86 layer, layer_kwargs, follow_up_layer = self._extract_layer_conf(layer_opts) 

87 layer_name = self._get_layer_name(layer, layer_kwargs, pos) 

88 x_in = layer(**layer_kwargs, name=layer_name)(x_in) 

89 if follow_up_layer is not None: 

90 for follow_up in to_list(follow_up_layer): 

91 layer_name = self._get_layer_name(follow_up, None, pos) 

92 x_in = follow_up(name=layer_name)(x_in) 

93 self._layer_save.append({"layer": layer, **layer_kwargs, "follow_up_layer": follow_up_layer}) 

94 

95 x_in = keras.layers.Dense(self._output_shape)(x_in) 

96 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) 

97 self.model = keras.Model(inputs=x_input, outputs=[out]) 

98 print(self.model.summary()) 

99 

100 @staticmethod 

101 def _get_layer_name(layer: keras.layers, layer_kwargs: Union[dict, None], pos: int, *args): 

102 if isinstance(layer, partial): 

103 name = layer.args[0] if layer.func.__name__ == "Activation" else layer.func.__name__ 

104 else: 

105 name = layer.__name__ 

106 if "Conv" in name and isinstance(layer_kwargs, dict) and "kernel_size" in layer_kwargs: 

107 name = name + "_" + "x".join(map(str, layer_kwargs["kernel_size"])) 

108 if "Pooling" in name and isinstance(layer_kwargs, dict) and "pool_size" in layer_kwargs: 

109 name = name + "_" + "x".join(map(str, layer_kwargs["pool_size"])) 

110 name += f"_{pos + 1}" 

111 return name 

112 

113 def _set_optimizer(self, optimizer, **kwargs): 

114 try: 

115 opt_name = optimizer.lower() 

116 opt = self._optimizer.get(opt_name) 

117 opt_kwargs = {} 

118 if opt_name == "adam": 

119 opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"]) 

120 elif opt_name == "sgd": 

121 opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"]) 

122 return opt(**opt_kwargs) 

123 except KeyError: 

124 raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.") 

125 

126 def _set_regularizer(self, regularizer, **kwargs): 

127 if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"): 

128 return None 

129 try: 

130 reg_name = regularizer.lower() 

131 reg = self._regularizer.get(reg_name) 

132 reg_kwargs = {} 

133 if reg_name in ["l1", "l2"]: 

134 reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True) 

135 if reg_name in reg_kwargs: 

136 reg_kwargs["l"] = reg_kwargs.pop(reg_name) 

137 elif reg_name == "l1_l2": 

138 reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True) 

139 return reg(**reg_kwargs) 

140 except KeyError: 

141 raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") 

142 

143 def set_compile_options(self): 

144 # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], 

145 # "metrics": ["mse", "mae", var_loss]} 

146 self.compile_options = {"loss": [keras.losses.mean_squared_error], 

147 "metrics": ["mse", "mae", var_loss]} 

148 

149 def _extract_layer_conf(self, layer_opts): 

150 follow_up_layer = None 

151 layer_type = layer_opts.pop("type") 

152 layer = getattr(keras.layers, layer_type, None) 

153 activation_type = layer_opts.pop("activation", None) 

154 if activation_type is not None: 

155 activation = self._activation.get(activation_type) 

156 kernel_initializer = self._initializer.get(activation_type, "glorot_uniform") 

157 layer_opts["kernel_initializer"] = kernel_initializer 

158 follow_up_layer = activation 

159 if self.bn is True: 

160 another_layer = keras.layers.BatchNormalization 

161 if activation_type in ["relu", "linear", "prelu", "leakyrelu"]: 

162 follow_up_layer = (another_layer, follow_up_layer) 

163 else: 

164 follow_up_layer = (follow_up_layer, another_layer) 

165 regularizer_type = layer_opts.pop("kernel_regularizer", None) 

166 if regularizer_type is not None: 

167 layer_opts["kernel_regularizer"] = self._set_regularizer(regularizer_type, **self.kwargs) 

168 return layer, layer_opts, follow_up_layer 

169 

170 

171class CNN(AbstractModelClass): # pragma: no cover 

172 

173 _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"), 

174 "sigmoid": partial(keras.layers.Activation, "sigmoid"), 

175 "linear": partial(keras.layers.Activation, "linear"), 

176 "selu": partial(keras.layers.Activation, "selu"), 

177 "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)), 

178 "leakyrelu": partial(keras.layers.LeakyReLU)} 

179 _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform", 

180 "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(), 

181 "prelu": keras.initializers.he_normal()} 

182 _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD} 

183 _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2} 

184 _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"] 

185 _dropout = {"selu": keras.layers.AlphaDropout} 

186 _pooling = {"max": keras.layers.MaxPooling2D, "average": keras.layers.AveragePooling2D, 

187 "mean": keras.layers.AveragePooling2D} 

188 

189 """ 

190 Define CNN model as in the following examples: 

191  

192 * use same kernel for all layers and use in total 3 conv layers, no dropout or pooling is applied 

193  

194 ```python 

195 model=CNN, 

196 kernel_size=5, 

197 n_layer=3, 

198 dense_layer_configuration=[128, 64],  

199 ``` 

200  

201 * specify the kernel sizes, make sure len of kernel size parameter matches number of layers 

202  

203 ```python 

204 model=CNN, 

205 kernel_size=[3, 7, 11], 

206 n_layer=3, 

207 dense_layer_configuration=[128, 64],  

208 ``` 

209  

210 * use different number of filters in each layer (can be combined either with fixed or individual kernel sizes),  

211 make sure that lengths match. Using layer_configuration always overwrites any value given to n_layers parameter. 

212  

213 ```python 

214 model=CNN, 

215 kernel_size=[3, 7, 11], 

216 layer_configuration=[24, 48, 48], 

217 ``` 

218  

219 * now specify individual kernel sizes and number of filters for each layer 

220  

221 ```python 

222 model=CNN, 

223 layer_configuration=[(16, 3), (32, 7), (64, 11)], 

224 dense_layer_configuration=[128, 64],  

225 ``` 

226  

227 * add also some dropout and pooling every 2nd layer, dropout is applied after the conv layer, pooling before. Note  

228 that pooling will not used in the init layer whereas dropout is already applied there. 

229  

230 ```python 

231 model=CNN, 

232 dropout_freq=2, 

233 dropout=0.3, 

234 pooling_type="max", 

235 pooling_freq=2, 

236 pooling_size=3, 

237 layer_configuration=[(16, 3), (32, 7), (64, 11)], 

238 dense_layer_configuration=[128, 64],  

239 ``` 

240 """ 

241 

242 def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear", 

243 optimizer="adam", regularizer=None, kernel_size=7, dropout=None, dropout_freq=None, pooling_freq=None, 

244 pooling_type="max", 

245 n_layer=1, n_filter=10, layer_configuration=None, pooling_size=None, 

246 dense_layer_configuration=None, **kwargs): 

247 

248 assert len(input_shape) == 1 

249 assert len(output_shape) == 1 

250 super().__init__(input_shape[0], output_shape[0]) 

251 

252 # settings 

253 self.activation = self._set_activation(activation) 

254 self.activation_name = activation 

255 self.activation_output = self._set_activation(activation_output) 

256 self.activation_output_name = activation_output 

257 self.kernel_initializer = self._initializer.get(activation, "glorot_uniform") 

258 self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs) 

259 self.kernel_size = kernel_size 

260 self.optimizer = self._set_optimizer(optimizer, **kwargs) 

261 self.layer_configuration = (n_layer, n_filter, self.kernel_size) if layer_configuration is None else layer_configuration 

262 self.dense_layer_configuration = dense_layer_configuration or [] 

263 self.pooling = self._set_pooling(pooling_type) 

264 self.pooling_size = pooling_size 

265 self.dropout, self.dropout_rate = self._set_dropout(activation, dropout) 

266 self.dropout_freq = self._set_layer_freq(dropout_freq) 

267 self.pooling_freq = self._set_layer_freq(pooling_freq) 

268 

269 # apply to model 

270 self.set_model() 

271 self.set_compile_options() 

272 # self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss) 

273 self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss) 

274 

275 def _set_pooling(self, pooling): 

276 try: 

277 return self._pooling.get(pooling.lower()) 

278 except KeyError: 

279 raise AttributeError(f"Given pooling {pooling} is not supported in this model class.") 

280 

281 def _set_layer_freq(self, param): 

282 param = 0 if param is None else param 

283 assert 0 <= param 

284 assert isinstance(param, int) 

285 return param 

286 

287 def _set_activation(self, activation): 

288 try: 

289 return self._activation.get(activation.lower()) 

290 except KeyError: 

291 raise AttributeError(f"Given activation {activation} is not supported in this model class.") 

292 

293 def _set_optimizer(self, optimizer, **kwargs): 

294 try: 

295 opt_name = optimizer.lower() 

296 opt = self._optimizer.get(opt_name) 

297 opt_kwargs = {} 

298 if opt_name == "adam": 

299 opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"]) 

300 elif opt_name == "sgd": 

301 opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"]) 

302 return opt(**opt_kwargs) 

303 except KeyError: 

304 raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.") 

305 

306 def _set_regularizer(self, regularizer, **kwargs): 

307 if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"): 

308 return None 

309 try: 

310 reg_name = regularizer.lower() 

311 reg = self._regularizer.get(reg_name) 

312 reg_kwargs = {} 

313 if reg_name in ["l1", "l2"]: 

314 reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True) 

315 if reg_name in reg_kwargs: 

316 reg_kwargs["l"] = reg_kwargs.pop(reg_name) 

317 elif reg_name == "l1_l2": 

318 reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True) 

319 return reg(**reg_kwargs) 

320 except KeyError: 

321 raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.") 

322 

323 def _set_dropout(self, activation, dropout_rate): 

324 if dropout_rate is None: 

325 return None, None 

326 assert 0 <= dropout_rate < 1 

327 return self._dropout.get(activation, keras.layers.Dropout), dropout_rate 

328 

329 def set_model(self): 

330 """ 

331 Build the model. 

332 """ 

333 if isinstance(self.layer_configuration, tuple) is True: 

334 n_layer, n_hidden, kernel_size = self.layer_configuration 

335 if isinstance(kernel_size, list): 

336 assert len(kernel_size) == n_layer # use individual filter sizes for each layer 

337 conf = [(n_hidden, kernel_size[i]) for i in range(n_layer)] 

338 else: 

339 assert isinstance(kernel_size, int) # use same filter size for all layers 

340 conf = [(n_hidden, kernel_size) for _ in range(n_layer)] 

341 else: 

342 assert isinstance(self.layer_configuration, list) is True 

343 if not isinstance(self.layer_configuration[0], tuple): 

344 if isinstance(self.kernel_size, list): 

345 assert len(self.kernel_size) == len(self.layer_configuration) # use individual filter sizes for each layer 

346 conf = [(n_filter, self.kernel_size[i]) for i, n_filter in enumerate(self.layer_configuration)] 

347 else: 

348 assert isinstance(self.kernel_size, int) # use same filter size for all layers 

349 conf = [(n_filter, self.kernel_size) for n_filter in self.layer_configuration] 

350 else: 

351 assert len(self.layer_configuration[0]) == 2 

352 conf = self.layer_configuration 

353 

354 x_input = keras.layers.Input(shape=self._input_shape) 

355 x_in = x_input 

356 for layer, (n_filter, kernel_size) in enumerate(conf): 

357 if self.pooling_size is not None and self.pooling_freq > 0 and layer % self.pooling_freq == 0 and layer > 0: 

358 x_in = self.pooling((self.pooling_size, 1), strides=(1, 1), padding='valid')(x_in) 

359 x_in = keras.layers.Conv2D(filters=n_filter, kernel_size=(kernel_size, 1), 

360 kernel_initializer=self.kernel_initializer, 

361 kernel_regularizer=self.kernel_regularizer)(x_in) 

362 x_in = self.activation()(x_in) 

363 if self.dropout is not None and self.dropout_freq > 0 and layer % self.dropout_freq == 0: 

364 x_in = self.dropout(self.dropout_rate)(x_in) 

365 

366 x_in = keras.layers.Flatten()(x_in) 

367 for layer, n_hidden in enumerate(self.dense_layer_configuration): 

368 if n_hidden < self._output_shape: 

369 break 

370 x_in = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}", 

371 kernel_initializer=self.kernel_initializer, )(x_in) 

372 x_in = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_in) 

373 if self.dropout is not None: 

374 x_in = self.dropout(self.dropout_rate)(x_in) 

375 

376 x_in = keras.layers.Dense(self._output_shape)(x_in) 

377 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) 

378 self.model = keras.Model(inputs=x_input, outputs=[out]) 

379 print(self.model.summary()) 

380 

381 def set_compile_options(self): 

382 # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])], 

383 # "metrics": ["mse", "mae", var_loss]} 

384 self.compile_options = {"loss": [keras.losses.mean_squared_error], 

385 "metrics": ["mse", "mae", var_loss]} 

386 

387 

388class CNN_16_32_64(CNN): 

389 

390 def set_model(self): 

391 """ 

392 Build the model. 

393 """ 

394 x_input = keras.layers.Input(shape=self._input_shape) 

395 x_in = keras.layers.Conv2D(filters=16, kernel_size=(73, 1), 

396 kernel_initializer=self.kernel_initializer, 

397 kernel_regularizer=self.kernel_regularizer)(x_input) 

398 x_in = self.activation()(x_in) 

399 x_in = keras.layers.Conv2D(filters=32, kernel_size=(49, 1), 

400 kernel_initializer=self.kernel_initializer, 

401 kernel_regularizer=self.kernel_regularizer)(x_in) 

402 x_in = self.activation()(x_in) 

403 if self.dropout is not None: 

404 x_in = self.dropout(self.dropout_rate)(x_in) 

405 x_in = keras.layers.MaxPooling2D((25, 1), strides=(1, 1), padding='valid')(x_in) 

406 x_in = keras.layers.Conv2D(filters=64, kernel_size=(13, 1), 

407 kernel_initializer=self.kernel_initializer, 

408 kernel_regularizer=self.kernel_regularizer)(x_in) 

409 x_in = self.activation()(x_in) 

410 if self.dropout is not None: 

411 x_in = self.dropout(self.dropout_rate)(x_in) 

412 x_in = keras.layers.Flatten()(x_in) 

413 x_in = keras.layers.Dense(128, kernel_initializer=self.kernel_initializer, 

414 kernel_regularizer=self.kernel_regularizer)(x_in) 

415 x_in = self.activation()(x_in) 

416 x_in = keras.layers.Dense(32, kernel_initializer=self.kernel_initializer, 

417 kernel_regularizer=self.kernel_regularizer)(x_in) 

418 x_in = self.activation()(x_in) 

419 x_in = keras.layers.Dense(self._output_shape)(x_in) 

420 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in) 

421 self.model = keras.Model(inputs=x_input, outputs=[out])