Coverage for mlair/data_handler/input_bootstraps.py: 29%

135 statements  

« prev     ^ index     » next       coverage.py v6.4.2, created at 2023-06-30 10:22 +0000

1""" 

2Collections of bootstrap methods and classes. 

3 

4How to use 

5---------- 

6 

7test 

8 

9""" 

10 

11__author__ = 'Felix Kleinert, Lukas Leufen' 

12__date__ = '2020-02-07' 

13 

14 

15import os 

16from collections import Iterator, Iterable 

17from itertools import chain 

18from typing import Union, List 

19 

20import numpy as np 

21import xarray as xr 

22 

23from mlair.data_handler.abstract_data_handler import AbstractDataHandler 

24from mlair.helpers.helpers import to_list 

25 

26 

27class BootstrapIterator(Iterator): 

28 

29 _position: int = None 

30 

31 def __init__(self, data: "Bootstraps", method, return_reshaped=False): 

32 assert isinstance(data, Bootstraps) 

33 self._data = data 

34 self._dimension = data.bootstrap_dimension 

35 self.boot_dim = "boots" 

36 self._method = method 

37 self._return_reshaped = return_reshaped 

38 self._collection = self.create_collection(self._data.data, self._dimension) 

39 self._position = 0 

40 

41 def __next__(self): 

42 """Return next element or stop iteration.""" 

43 raise NotImplementedError 

44 

45 @classmethod 

46 def create_collection(cls, data, dim): 

47 raise NotImplementedError 

48 

49 def _reshape(self, d): 

50 if self._return_reshaped: 

51 if isinstance(d, list): 

52 return list(map(lambda x: self._reshape(x), d)) 

53 # return list(map(lambda x: np.rollaxis(x, -1, 0).reshape(x.shape[0] * x.shape[-1], *x.shape[1:-1]), d)) 

54 else: 

55 shape = d.shape 

56 return np.rollaxis(d, -1, 0).reshape(shape[0] * shape[-1], *shape[1:-1]) 

57 else: 

58 return d 

59 

60 def _to_numpy(self, d): 

61 if isinstance(d, list): 

62 return list(map(lambda x: self._to_numpy(x), d)) 

63 else: 

64 return d.values 

65 

66 def apply_bootstrap_method(self, data: np.ndarray) -> Union[np.ndarray, List[np.ndarray]]: 

67 """ 

68 Apply predefined bootstrap method from given data. 

69 

70 :param data: data to apply bootstrap method on 

71 :return: processed data as numpy array 

72 """ 

73 if isinstance(data, list): 

74 return list(map(lambda x: self.apply_bootstrap_method(x.values), data)) 

75 else: 

76 return self._method.apply(data) 

77 

78 

79class BootstrapIteratorSingleInput(BootstrapIterator): 

80 _position: int = None 

81 

82 def __init__(self, *args, **kwargs): 

83 super().__init__(*args, **kwargs) 

84 

85 def __next__(self): 

86 """Return next element or stop iteration.""" 

87 try: 

88 index, dimension = self._collection[self._position] 

89 nboot = self._data.number_of_bootstraps 

90 _X, _Y = self._data.data.get_data(as_numpy=False) 

91 _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) 

92 _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) 

93 single_variable = _X[index].sel({self._dimension: [dimension]}) 

94 bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) 

95 bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, 

96 dims=single_variable.dims) 

97 _X[index] = bootstrapped_data.combine_first(_X[index]).reindex_like(_X[index]) 

98 self._position += 1 

99 except IndexError: 

100 raise StopIteration() 

101 _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) 

102 return self._reshape(_X), self._reshape(_Y), (index, dimension) 

103 

104 @classmethod 

105 def create_collection(cls, data, dim): 

106 l = [] 

107 for i, x in enumerate(data.get_X(as_numpy=False)): 

108 l.append(list(map(lambda y: (i, y), x.indexes[dim]))) 

109 return list(chain(*l)) 

110 

111 

112class BootstrapIteratorVariable(BootstrapIterator): 

113 

114 def __init__(self, *args, **kwargs): 

115 super().__init__(*args, **kwargs) 

116 

117 def __next__(self): 

118 """Return next element or stop iteration.""" 

119 try: 

120 dimension = self._collection[self._position] 

121 nboot = self._data.number_of_bootstraps 

122 _X, _Y = self._data.data.get_data(as_numpy=False) 

123 _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) 

124 _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) 

125 for index in range(len(_X)): 

126 if dimension in _X[index].coords[self._dimension]: 

127 single_variable = _X[index].sel({self._dimension: [dimension]}) 

128 bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) 

129 bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, 

130 dims=single_variable.dims) 

131 _X[index] = bootstrapped_data.combine_first(_X[index]).transpose(*_X[index].dims) 

132 self._position += 1 

133 except IndexError: 

134 raise StopIteration() 

135 _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) 

136 return self._reshape(_X), self._reshape(_Y), (None, dimension) 

137 

138 @classmethod 

139 def create_collection(cls, data, dim): 

140 l = set() 

141 for i, x in enumerate(data.get_X(as_numpy=False)): 

142 l.update(x.indexes[dim].to_list()) 

143 return to_list(l) 

144 

145 

146class BootstrapIteratorBranch(BootstrapIterator): 

147 

148 def __init__(self, *args, **kwargs): 

149 super().__init__(*args, **kwargs) 

150 

151 def __next__(self): 

152 try: 

153 index = self._collection[self._position] 

154 nboot = self._data.number_of_bootstraps 

155 _X, _Y = self._data.data.get_data(as_numpy=False) 

156 _X = list(map(lambda x: x.expand_dims({self.boot_dim: range(nboot)}, axis=-1), _X)) 

157 _Y = _Y.expand_dims({self.boot_dim: range(nboot)}, axis=-1) 

158 for dimension in _X[index].coords[self._dimension].values: 

159 single_variable = _X[index].sel({self._dimension: [dimension]}) 

160 bootstrapped_variable = self.apply_bootstrap_method(single_variable.values) 

161 bootstrapped_data = xr.DataArray(bootstrapped_variable, coords=single_variable.coords, 

162 dims=single_variable.dims) 

163 _X[index] = bootstrapped_data.combine_first(_X[index]).transpose(*_X[index].dims) 

164 self._position += 1 

165 except IndexError: 

166 raise StopIteration() 

167 _X, _Y = self._to_numpy(_X), self._to_numpy(_Y) 

168 return self._reshape(_X), self._reshape(_Y), (None, index) 

169 

170 @classmethod 

171 def create_collection(cls, data, dim): 

172 return list(range(len(data.get_X(as_numpy=False)))) 

173 

174 

175class ShuffleBootstraps: 

176 

177 @staticmethod 

178 def apply(data): 

179 size = data.shape 

180 return np.random.choice(data.reshape(-1, ), size=size) 

181 

182 

183class MeanBootstraps: 

184 

185 def __init__(self, mean): 

186 self._mean = mean 

187 

188 def apply(self, data): 

189 return np.ones_like(data) * self._mean 

190 

191 

192class Bootstraps(Iterable): 

193 """ 

194 Main class to perform bootstrap operations. 

195 

196 This class requires a data handler following the definition of the AbstractDataHandler, the number of bootstraps 

197 to create and the dimension along this bootstrapping is performed (default dimension is `variables`). 

198 

199 When iterating on this class, it returns the bootstrapped X, Y and a tuple with (position of variable in X, name of 

200 this variable). The tuple is interesting if X consists on mutliple input streams X_i (e.g. two or more stations) 

201 because it shows which variable of which input X_i has been bootstrapped. All bootstrap combinations can be 

202 retrieved by calling the .bootstraps() method. Further more, by calling the .get_orig_prediction() this class 

203 imitates according to the set number of bootstraps the original prediction. 

204 

205 As bootstrap method, this class can currently make use of the ShuffleBoostraps class that uses drawing with 

206 replacement to destroy the variables information by keeping its statistical properties. Use `bootstrap="shuffle"` to 

207 call this method. Another method is the zero mean bootstrapping triggered by `bootstrap="zero_mean"` and performed 

208 by the MeanBootstraps class. This method destroy the variable's information by a mode collapse to constant value of 

209 zero. In case, the variable is normalized with a zero mean, this is equivalent to a mode collapse to the variable's 

210 mean value. Statistics in general are not conserved in this case, but the mean value of course. A custom mean value 

211 for bootstrapping is currently not supported. 

212 """ 

213 

214 def __init__(self, data: AbstractDataHandler, number_of_bootstraps: int = 10, 

215 bootstrap_dimension: str = "variables", bootstrap_type="singleinput", bootstrap_method="shuffle"): 

216 """ 

217 Create iterable class to be ready to iter. 

218 

219 :param data: a data generator object to get data / history 

220 :param number_of_bootstraps: the number of bootstrap realisations 

221 """ 

222 self.data = data 

223 self.number_of_bootstraps = number_of_bootstraps if bootstrap_method == "shuffle" else 1 

224 self.bootstrap_dimension = bootstrap_dimension 

225 self.bootstrap_method = {"shuffle": ShuffleBootstraps(), 

226 "zero_mean": MeanBootstraps(mean=0)}.get( 

227 bootstrap_method) # todo adjust number of bootstraps if mean bootstrapping 

228 self.BootstrapIterator = {"singleinput": BootstrapIteratorSingleInput, 

229 "branch": BootstrapIteratorBranch, 

230 "variable": BootstrapIteratorVariable}.get(bootstrap_type, 

231 BootstrapIteratorSingleInput) 

232 

233 def __iter__(self): 

234 return self.BootstrapIterator(self, self.bootstrap_method) 

235 

236 def __len__(self): 

237 return len(self.BootstrapIterator.create_collection(self.data, self.bootstrap_dimension)) 

238 

239 def bootstraps(self): 

240 return self.BootstrapIterator.create_collection(self.data, self.bootstrap_dimension) 

241 

242 def get_orig_prediction(self, path: str, file_name: str, prediction_name: str = "CNN") -> np.ndarray: 

243 """ 

244 Repeat predictions from given file(_name) in path by the number of boots. 

245 

246 :param path: path to file 

247 :param file_name: file name 

248 :param prediction_name: name of the prediction to select from loaded file (default CNN) 

249 :return: repeated predictions 

250 """ 

251 file = os.path.join(path, file_name) 

252 prediction = xr.open_dataarray(file).sel(type=prediction_name).squeeze() 

253 vals = np.tile(prediction.data, (self.number_of_bootstraps, 1)) 

254 return vals[~np.isnan(vals).any(axis=1), :]