Coverage for mlair/model_modules/convolutional_networks.py: 0%
122 statements
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
« prev ^ index » next coverage.py v6.4.2, created at 2023-12-18 17:51 +0000
1__author__ = "Lukas Leufen"
2__date__ = '2021-02-'
4from functools import reduce, partial
5from typing import Union
7from mlair.model_modules import AbstractModelClass
8from mlair.helpers import select_from_dict, to_list
9from mlair.model_modules.loss import var_loss, custom_loss
10from mlair.model_modules.advanced_paddings import PadUtils, Padding2D, SymmetricPadding2D
12import tensorflow.keras as keras
15class CNNfromConfig(AbstractModelClass):
16 _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
17 "sigmoid": partial(keras.layers.Activation, "sigmoid"),
18 "linear": partial(keras.layers.Activation, "linear"),
19 "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
20 "leakyrelu": keras.layers.LeakyReLU}
21 _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
22 "relu": keras.initializers.he_normal(), "prelu": keras.initializers.he_normal()}
23 _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD}
24 _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
25 _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
27 """
28 Use this class like the following. Note that all keys must match the corresponding tf/keras keys of the layer
30 ```python
31 input_shape = [(65,1,9)]
32 output_shape = [(4, )]
33 layer_configuration=[
34 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 8},
35 {"type": "Dropout", "rate": 0.2},
36 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 16},
37 {"type": "Dropout", "rate": 0.2},
38 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)},
39 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 16},
40 {"type": "Dropout", "rate": 0.2},
41 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 32},
42 {"type": "Dropout", "rate": 0.2},
43 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)},
44 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 32},
45 {"type": "Dropout", "rate": 0.2},
46 {"type": "Conv2D", "activation": "relu", "kernel_size": (5, 1), "filters": 64},
47 {"type": "Dropout", "rate": 0.2},
48 {"type": "MaxPooling2D", "pool_size": (8, 1), "strides": (1, 1)},
49 {"type": "Conv2D", "activation": "relu", "kernel_size": (1, 1), "filters": 64},
50 {"type": "Dropout", "rate": 0.2},
51 {"type": "Flatten"},
52 # {"type": "Dense", "units": 128, "activation": "relu"}
53 ]
54 model = CNNfromConfig(input_shape, output_shape, layer_configuration)
55 ```
57 """
59 def __init__(self, input_shape: list, output_shape: list, layer_configuration: list, optimizer="adam",
60 batch_normalization=False, **kwargs):
62 assert len(input_shape) == 1
63 assert len(output_shape) == 1
64 super().__init__(input_shape[0], output_shape[0])
66 self.conf = layer_configuration
67 activation_output = kwargs.pop("activation_output", "linear")
68 self.activation_output = self._activation.get(activation_output)
69 self.activation_output_name = activation_output
70 self.kwargs = kwargs
71 self.bn = batch_normalization
72 self.optimizer = self._set_optimizer(optimizer, **kwargs)
73 self._layer_save = []
75 # apply to model
76 self.set_model()
77 self.set_compile_options()
78 self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
80 def set_model(self):
81 x_input = keras.layers.Input(shape=self._input_shape)
82 x_in = x_input
84 for pos, layer_opts in enumerate(self.conf):
85 print(layer_opts)
86 layer, layer_kwargs, follow_up_layer = self._extract_layer_conf(layer_opts)
87 layer_name = self._get_layer_name(layer, layer_kwargs, pos)
88 x_in = layer(**layer_kwargs, name=layer_name)(x_in)
89 if follow_up_layer is not None:
90 for follow_up in to_list(follow_up_layer):
91 layer_name = self._get_layer_name(follow_up, None, pos)
92 x_in = follow_up(name=layer_name)(x_in)
93 self._layer_save.append({"layer": layer, **layer_kwargs, "follow_up_layer": follow_up_layer})
95 x_in = keras.layers.Dense(self._output_shape)(x_in)
96 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in)
97 self.model = keras.Model(inputs=x_input, outputs=[out])
98 print(self.model.summary())
100 @staticmethod
101 def _get_layer_name(layer: keras.layers, layer_kwargs: Union[dict, None], pos: int, *args):
102 if isinstance(layer, partial):
103 name = layer.args[0] if layer.func.__name__ == "Activation" else layer.func.__name__
104 else:
105 name = layer.__name__
106 if "Conv" in name and isinstance(layer_kwargs, dict) and "kernel_size" in layer_kwargs:
107 name = name + "_" + "x".join(map(str, layer_kwargs["kernel_size"]))
108 if "Pooling" in name and isinstance(layer_kwargs, dict) and "pool_size" in layer_kwargs:
109 name = name + "_" + "x".join(map(str, layer_kwargs["pool_size"]))
110 name += f"_{pos + 1}"
111 return name
113 def _set_optimizer(self, optimizer, **kwargs):
114 try:
115 opt_name = optimizer.lower()
116 opt = self._optimizer.get(opt_name)
117 opt_kwargs = {}
118 if opt_name == "adam":
119 opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
120 elif opt_name == "sgd":
121 opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
122 return opt(**opt_kwargs)
123 except KeyError:
124 raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
126 def _set_regularizer(self, regularizer, **kwargs):
127 if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
128 return None
129 try:
130 reg_name = regularizer.lower()
131 reg = self._regularizer.get(reg_name)
132 reg_kwargs = {}
133 if reg_name in ["l1", "l2"]:
134 reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
135 if reg_name in reg_kwargs:
136 reg_kwargs["l"] = reg_kwargs.pop(reg_name)
137 elif reg_name == "l1_l2":
138 reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
139 return reg(**reg_kwargs)
140 except KeyError:
141 raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
143 def set_compile_options(self):
144 # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])],
145 # "metrics": ["mse", "mae", var_loss]}
146 self.compile_options = {"loss": [keras.losses.mean_squared_error],
147 "metrics": ["mse", "mae", var_loss]}
149 def _extract_layer_conf(self, layer_opts):
150 follow_up_layer = None
151 layer_type = layer_opts.pop("type")
152 layer = getattr(keras.layers, layer_type, None)
153 activation_type = layer_opts.pop("activation", None)
154 if activation_type is not None:
155 activation = self._activation.get(activation_type)
156 kernel_initializer = self._initializer.get(activation_type, "glorot_uniform")
157 layer_opts["kernel_initializer"] = kernel_initializer
158 follow_up_layer = activation
159 if self.bn is True:
160 another_layer = keras.layers.BatchNormalization
161 if activation_type in ["relu", "linear", "prelu", "leakyrelu"]:
162 follow_up_layer = (another_layer, follow_up_layer)
163 else:
164 follow_up_layer = (follow_up_layer, another_layer)
165 regularizer_type = layer_opts.pop("kernel_regularizer", None)
166 if regularizer_type is not None:
167 layer_opts["kernel_regularizer"] = self._set_regularizer(regularizer_type, **self.kwargs)
168 return layer, layer_opts, follow_up_layer
171class CNN(AbstractModelClass): # pragma: no cover
173 _activation = {"relu": keras.layers.ReLU, "tanh": partial(keras.layers.Activation, "tanh"),
174 "sigmoid": partial(keras.layers.Activation, "sigmoid"),
175 "linear": partial(keras.layers.Activation, "linear"),
176 "selu": partial(keras.layers.Activation, "selu"),
177 "prelu": partial(keras.layers.PReLU, alpha_initializer=keras.initializers.constant(value=0.25)),
178 "leakyrelu": partial(keras.layers.LeakyReLU)}
179 _initializer = {"tanh": "glorot_uniform", "sigmoid": "glorot_uniform", "linear": "glorot_uniform",
180 "relu": keras.initializers.he_normal(), "selu": keras.initializers.lecun_normal(),
181 "prelu": keras.initializers.he_normal()}
182 _optimizer = {"adam": keras.optimizers.Adam, "sgd": keras.optimizers.SGD}
183 _regularizer = {"l1": keras.regularizers.l1, "l2": keras.regularizers.l2, "l1_l2": keras.regularizers.l1_l2}
184 _requirements = ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad", "momentum", "nesterov", "l1", "l2"]
185 _dropout = {"selu": keras.layers.AlphaDropout}
186 _pooling = {"max": keras.layers.MaxPooling2D, "average": keras.layers.AveragePooling2D,
187 "mean": keras.layers.AveragePooling2D}
189 """
190 Define CNN model as in the following examples:
192 * use same kernel for all layers and use in total 3 conv layers, no dropout or pooling is applied
194 ```python
195 model=CNN,
196 kernel_size=5,
197 n_layer=3,
198 dense_layer_configuration=[128, 64],
199 ```
201 * specify the kernel sizes, make sure len of kernel size parameter matches number of layers
203 ```python
204 model=CNN,
205 kernel_size=[3, 7, 11],
206 n_layer=3,
207 dense_layer_configuration=[128, 64],
208 ```
210 * use different number of filters in each layer (can be combined either with fixed or individual kernel sizes),
211 make sure that lengths match. Using layer_configuration always overwrites any value given to n_layers parameter.
213 ```python
214 model=CNN,
215 kernel_size=[3, 7, 11],
216 layer_configuration=[24, 48, 48],
217 ```
219 * now specify individual kernel sizes and number of filters for each layer
221 ```python
222 model=CNN,
223 layer_configuration=[(16, 3), (32, 7), (64, 11)],
224 dense_layer_configuration=[128, 64],
225 ```
227 * add also some dropout and pooling every 2nd layer, dropout is applied after the conv layer, pooling before. Note
228 that pooling will not used in the init layer whereas dropout is already applied there.
230 ```python
231 model=CNN,
232 dropout_freq=2,
233 dropout=0.3,
234 pooling_type="max",
235 pooling_freq=2,
236 pooling_size=3,
237 layer_configuration=[(16, 3), (32, 7), (64, 11)],
238 dense_layer_configuration=[128, 64],
239 ```
240 """
242 def __init__(self, input_shape: list, output_shape: list, activation="relu", activation_output="linear",
243 optimizer="adam", regularizer=None, kernel_size=7, dropout=None, dropout_freq=None, pooling_freq=None,
244 pooling_type="max",
245 n_layer=1, n_filter=10, layer_configuration=None, pooling_size=None,
246 dense_layer_configuration=None, **kwargs):
248 assert len(input_shape) == 1
249 assert len(output_shape) == 1
250 super().__init__(input_shape[0], output_shape[0])
252 # settings
253 self.activation = self._set_activation(activation)
254 self.activation_name = activation
255 self.activation_output = self._set_activation(activation_output)
256 self.activation_output_name = activation_output
257 self.kernel_initializer = self._initializer.get(activation, "glorot_uniform")
258 self.kernel_regularizer = self._set_regularizer(regularizer, **kwargs)
259 self.kernel_size = kernel_size
260 self.optimizer = self._set_optimizer(optimizer, **kwargs)
261 self.layer_configuration = (n_layer, n_filter, self.kernel_size) if layer_configuration is None else layer_configuration
262 self.dense_layer_configuration = dense_layer_configuration or []
263 self.pooling = self._set_pooling(pooling_type)
264 self.pooling_size = pooling_size
265 self.dropout, self.dropout_rate = self._set_dropout(activation, dropout)
266 self.dropout_freq = self._set_layer_freq(dropout_freq)
267 self.pooling_freq = self._set_layer_freq(pooling_freq)
269 # apply to model
270 self.set_model()
271 self.set_compile_options()
272 # self.set_custom_objects(loss=custom_loss([keras.losses.mean_squared_error, var_loss]), var_loss=var_loss)
273 self.set_custom_objects(loss=self.compile_options["loss"][0], var_loss=var_loss)
275 def _set_pooling(self, pooling):
276 try:
277 return self._pooling.get(pooling.lower())
278 except KeyError:
279 raise AttributeError(f"Given pooling {pooling} is not supported in this model class.")
281 def _set_layer_freq(self, param):
282 param = 0 if param is None else param
283 assert 0 <= param
284 assert isinstance(param, int)
285 return param
287 def _set_activation(self, activation):
288 try:
289 return self._activation.get(activation.lower())
290 except KeyError:
291 raise AttributeError(f"Given activation {activation} is not supported in this model class.")
293 def _set_optimizer(self, optimizer, **kwargs):
294 try:
295 opt_name = optimizer.lower()
296 opt = self._optimizer.get(opt_name)
297 opt_kwargs = {}
298 if opt_name == "adam":
299 opt_kwargs = select_from_dict(kwargs, ["lr", "beta_1", "beta_2", "epsilon", "decay", "amsgrad"])
300 elif opt_name == "sgd":
301 opt_kwargs = select_from_dict(kwargs, ["lr", "momentum", "decay", "nesterov"])
302 return opt(**opt_kwargs)
303 except KeyError:
304 raise AttributeError(f"Given optimizer {optimizer} is not supported in this model class.")
306 def _set_regularizer(self, regularizer, **kwargs):
307 if regularizer is None or (isinstance(regularizer, str) and regularizer.lower() == "none"):
308 return None
309 try:
310 reg_name = regularizer.lower()
311 reg = self._regularizer.get(reg_name)
312 reg_kwargs = {}
313 if reg_name in ["l1", "l2"]:
314 reg_kwargs = select_from_dict(kwargs, reg_name, remove_none=True)
315 if reg_name in reg_kwargs:
316 reg_kwargs["l"] = reg_kwargs.pop(reg_name)
317 elif reg_name == "l1_l2":
318 reg_kwargs = select_from_dict(kwargs, ["l1", "l2"], remove_none=True)
319 return reg(**reg_kwargs)
320 except KeyError:
321 raise AttributeError(f"Given regularizer {regularizer} is not supported in this model class.")
323 def _set_dropout(self, activation, dropout_rate):
324 if dropout_rate is None:
325 return None, None
326 assert 0 <= dropout_rate < 1
327 return self._dropout.get(activation, keras.layers.Dropout), dropout_rate
329 def set_model(self):
330 """
331 Build the model.
332 """
333 if isinstance(self.layer_configuration, tuple) is True:
334 n_layer, n_hidden, kernel_size = self.layer_configuration
335 if isinstance(kernel_size, list):
336 assert len(kernel_size) == n_layer # use individual filter sizes for each layer
337 conf = [(n_hidden, kernel_size[i]) for i in range(n_layer)]
338 else:
339 assert isinstance(kernel_size, int) # use same filter size for all layers
340 conf = [(n_hidden, kernel_size) for _ in range(n_layer)]
341 else:
342 assert isinstance(self.layer_configuration, list) is True
343 if not isinstance(self.layer_configuration[0], tuple):
344 if isinstance(self.kernel_size, list):
345 assert len(self.kernel_size) == len(self.layer_configuration) # use individual filter sizes for each layer
346 conf = [(n_filter, self.kernel_size[i]) for i, n_filter in enumerate(self.layer_configuration)]
347 else:
348 assert isinstance(self.kernel_size, int) # use same filter size for all layers
349 conf = [(n_filter, self.kernel_size) for n_filter in self.layer_configuration]
350 else:
351 assert len(self.layer_configuration[0]) == 2
352 conf = self.layer_configuration
354 x_input = keras.layers.Input(shape=self._input_shape)
355 x_in = x_input
356 for layer, (n_filter, kernel_size) in enumerate(conf):
357 if self.pooling_size is not None and self.pooling_freq > 0 and layer % self.pooling_freq == 0 and layer > 0:
358 x_in = self.pooling((self.pooling_size, 1), strides=(1, 1), padding='valid')(x_in)
359 x_in = keras.layers.Conv2D(filters=n_filter, kernel_size=(kernel_size, 1),
360 kernel_initializer=self.kernel_initializer,
361 kernel_regularizer=self.kernel_regularizer)(x_in)
362 x_in = self.activation()(x_in)
363 if self.dropout is not None and self.dropout_freq > 0 and layer % self.dropout_freq == 0:
364 x_in = self.dropout(self.dropout_rate)(x_in)
366 x_in = keras.layers.Flatten()(x_in)
367 for layer, n_hidden in enumerate(self.dense_layer_configuration):
368 if n_hidden < self._output_shape:
369 break
370 x_in = keras.layers.Dense(n_hidden, name=f"Dense_{len(conf) + layer + 1}",
371 kernel_initializer=self.kernel_initializer, )(x_in)
372 x_in = self.activation(name=f"{self.activation_name}_{len(conf) + layer + 1}")(x_in)
373 if self.dropout is not None:
374 x_in = self.dropout(self.dropout_rate)(x_in)
376 x_in = keras.layers.Dense(self._output_shape)(x_in)
377 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in)
378 self.model = keras.Model(inputs=x_input, outputs=[out])
379 print(self.model.summary())
381 def set_compile_options(self):
382 # self.compile_options = {"loss": [custom_loss([keras.losses.mean_squared_error, var_loss])],
383 # "metrics": ["mse", "mae", var_loss]}
384 self.compile_options = {"loss": [keras.losses.mean_squared_error],
385 "metrics": ["mse", "mae", var_loss]}
388class CNN_16_32_64(CNN):
390 def set_model(self):
391 """
392 Build the model.
393 """
394 x_input = keras.layers.Input(shape=self._input_shape)
395 x_in = keras.layers.Conv2D(filters=16, kernel_size=(73, 1),
396 kernel_initializer=self.kernel_initializer,
397 kernel_regularizer=self.kernel_regularizer)(x_input)
398 x_in = self.activation()(x_in)
399 x_in = keras.layers.Conv2D(filters=32, kernel_size=(49, 1),
400 kernel_initializer=self.kernel_initializer,
401 kernel_regularizer=self.kernel_regularizer)(x_in)
402 x_in = self.activation()(x_in)
403 if self.dropout is not None:
404 x_in = self.dropout(self.dropout_rate)(x_in)
405 x_in = keras.layers.MaxPooling2D((25, 1), strides=(1, 1), padding='valid')(x_in)
406 x_in = keras.layers.Conv2D(filters=64, kernel_size=(13, 1),
407 kernel_initializer=self.kernel_initializer,
408 kernel_regularizer=self.kernel_regularizer)(x_in)
409 x_in = self.activation()(x_in)
410 if self.dropout is not None:
411 x_in = self.dropout(self.dropout_rate)(x_in)
412 x_in = keras.layers.Flatten()(x_in)
413 x_in = keras.layers.Dense(128, kernel_initializer=self.kernel_initializer,
414 kernel_regularizer=self.kernel_regularizer)(x_in)
415 x_in = self.activation()(x_in)
416 x_in = keras.layers.Dense(32, kernel_initializer=self.kernel_initializer,
417 kernel_regularizer=self.kernel_regularizer)(x_in)
418 x_in = self.activation()(x_in)
419 x_in = keras.layers.Dense(self._output_shape)(x_in)
420 out = self.activation_output(name=f"{self.activation_output_name}_output")(x_in)
421 self.model = keras.Model(inputs=x_input, outputs=[out])