Coverage for mlprodict/onnxrt/ops_cpu/op_random.py: 88%

113 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-04 02:28 +0100

1# -*- encoding: utf-8 -*- 

2# pylint: disable=E0203,E1101,C0111 

3""" 

4@file 

5@brief Runtime operator. 

6""" 

7import numpy 

8from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE 

9from ._op import OpRun 

10 

11 

12class _CommonRandom(OpRun): 

13 """ 

14 Common methods to all random operators. 

15 """ 

16 

17 def __init__(self, *args, **kwargs): 

18 OpRun.__init__(self, *args, **kwargs) 

19 

20 def _dtype(self, *data, dtype_first=False): 

21 if dtype_first: 

22 if self.dtype != 0: 

23 return self.numpy_type 

24 if len(data) > 0: 

25 return data[0].dtype 

26 raise RuntimeError( # pragma: no cover 

27 "dtype cannot be None for operator %s, " 

28 "self.numpy_type=%r, len(data)=%r." 

29 "" % (self.__class__.__name__, 

30 self.numpy_type, len(data))) 

31 res = None 

32 if len(data) == 0: 

33 res = self.numpy_type 

34 elif self.numpy_type is not None: 

35 res = self.numpy_type 

36 elif hasattr(data[0], 'dtype'): 

37 res = data[0].dtype 

38 if res is None: 

39 raise RuntimeError( # pragma: no cover 

40 "dtype cannot be None for operator %s, " 

41 "self.numpy_type=%r, type(data[0])=%r." 

42 "" % (self.__class__.__name__, 

43 self.numpy_type, type(data[0]))) 

44 return res 

45 

46 def _get_state(self, seed): 

47 if numpy.isnan(self.seed): 

48 state = numpy.random.RandomState() 

49 else: 

50 state = numpy.random.RandomState(seed=self.seed) 

51 return state 

52 

53 

54class Bernoulli(_CommonRandom): 

55 

56 atts = {'dtype': 0, 

57 'seed': numpy.nan} 

58 

59 def __init__(self, onnx_node, desc=None, **options): 

60 _CommonRandom.__init__(self, onnx_node, desc=desc, 

61 expected_attributes=Bernoulli.atts, 

62 **options) 

63 self.numpy_type = ( 

64 TENSOR_TYPE_TO_NP_TYPE[self.dtype] if self.dtype > 0 

65 else None) 

66 

67 def _run(self, x, attributes=None, verbose=0, fLOG=None): # pylint: disable=W0221 

68 dtype = self._dtype(x, dtype_first=True) 

69 state = self._get_state(self.seed) 

70 res = state.binomial(1, p=x).astype(dtype) 

71 return (res.astype(dtype), ) 

72 

73 def to_python(self, inputs): 

74 lines = [ 

75 'numpy_dtype = TENSOR_TYPE_TO_NP_TYPE[dtype]', 

76 'state = numpy.random.RandomState(seed=seed)', 

77 f'return state.binomial(1, {inputs[0]}).astype(numpy_dtype)'] 

78 return ("import numpy\nfrom numpy import nan\n" 

79 "from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE", 

80 "\n".join(lines)) 

81 

82 

83class RandomUniform(_CommonRandom): 

84 

85 atts = {'dtype': 1, 

86 'low': 0., 

87 'high': 1., 

88 'seed': numpy.nan, 

89 'shape': []} 

90 

91 def __init__(self, onnx_node, desc=None, **options): 

92 _CommonRandom.__init__(self, onnx_node, desc=desc, 

93 expected_attributes=RandomUniform.atts, 

94 **options) 

95 if len(self.shape) == 0: 

96 raise ValueError( # pragma: no cover 

97 f"shape cannot be empty for operator {self.__class__.__name__}.") 

98 self.numpy_type = TENSOR_TYPE_TO_NP_TYPE[self.dtype] 

99 

100 def _run(self, *args, attributes=None, verbose=0, fLOG=None): # pylint: disable=W0221 

101 if len(args) != 0: 

102 raise RuntimeError( # pragma: no cover 

103 f"Operator {self.__class__.__name__} cannot have inputs.") 

104 dtype = self._dtype(*args) 

105 state = self._get_state(self.seed) 

106 res = state.rand(*self.shape).astype(dtype) 

107 res *= (self.high - self.low) 

108 res += self.low 

109 return (res.astype(dtype), ) 

110 

111 def to_python(self, inputs): 

112 lines = [ 

113 'numpy_dtype = TENSOR_TYPE_TO_NP_TYPE[dtype]', 

114 'state = numpy.random.RandomState(seed=seed)', 

115 'return (state.rand(*%r).astype(numpy.%s) * (%f - %f)) + %f' % ( 

116 list(self.shape), self.numpy_type, self.high, self.low, self.low)] 

117 return ("import numpy\nfrom onnx.mapping import TENSOR_TYPE_TO_NP_TYPE", 

118 "\n".join(lines)) 

119 

120 

121class RandomUniformLike(_CommonRandom): 

122 

123 atts = {'low': 0., 

124 'high': 1., 

125 'seed': numpy.nan, 

126 'dtype': 0} 

127 

128 def __init__(self, onnx_node, desc=None, **options): 

129 _CommonRandom.__init__(self, onnx_node, desc=desc, 

130 expected_attributes=RandomUniformLike.atts, 

131 **options) 

132 self.numpy_type = ( 

133 None if self.dtype == 0 else TENSOR_TYPE_TO_NP_TYPE[self.dtype]) 

134 

135 def _run(self, x, attributes=None, verbose=0, fLOG=None): # pylint: disable=W0221 

136 dtype = self._dtype(x) 

137 state = self._get_state(self.seed) 

138 res = state.rand(*x.shape).astype(dtype) 

139 res *= (self.high - self.low) 

140 res += self.low 

141 return (res.astype(dtype), ) 

142 

143 def to_python(self, inputs): 

144 if len(inputs) > 0 and hasattr(inputs[0], 'dtype'): 

145 dtype = inputs[0].dtype 

146 shape = inputs[0].shape 

147 else: 

148 dtype = self.numpy_type or numpy.float32 

149 shape = (1, ) 

150 lines = [ 

151 'numpy_dtype = TENSOR_TYPE_TO_NP_TYPE[dtype]', 

152 'state = numpy.random.RandomState(seed=seed)', 

153 'return (state.rand(*%r).astype(numpy.%s) * (%f - %f)) + %f' % ( 

154 shape, dtype, self.high, self.low, self.low)] 

155 return ("import numpy\nfrom onnx.mapping import TENSOR_TYPE_TO_NP_TYPE", 

156 "\n".join(lines)) 

157 

158 

159class RandomNormal(_CommonRandom): 

160 

161 atts = {'dtype': 1, 

162 'mean': 0., 

163 'scale': 1., 

164 'seed': numpy.nan, 

165 'shape': []} 

166 

167 def __init__(self, onnx_node, desc=None, **options): 

168 _CommonRandom.__init__(self, onnx_node, desc=desc, 

169 expected_attributes=RandomNormal.atts, 

170 **options) 

171 if len(self.shape) == 0: 

172 raise ValueError( # pragma: no cover 

173 f"shape cannot be empty for operator {self.__class__.__name__}.") 

174 self.numpy_type = TENSOR_TYPE_TO_NP_TYPE[self.dtype] 

175 

176 def _run(self, *args, attributes=None, verbose=0, fLOG=None): # pylint: disable=W0221 

177 if len(args) != 0: 

178 raise RuntimeError( # pragma: no cover 

179 f"Operator {self.__class__.__name__} cannot have inputs.") 

180 state = self._get_state(self.seed) 

181 res = state.randn(*self.shape).astype(self.numpy_type) 

182 res *= self.scale 

183 res += self.mean 

184 return (res.astype(self.numpy_type), ) 

185 

186 def to_python(self, inputs): 

187 lines = [ 

188 'numpy_dtype = TENSOR_TYPE_TO_NP_TYPE[dtype]', 

189 'state = numpy.random.RandomState(seed=seed)', 

190 'return (state.randn(*%r).astype(numpy.%s) * %f) + %f' % ( 

191 list(self.shape), self.numpy_type, self.scale, self.mean)] 

192 return ("import numpy\nfrom onnx.mapping import TENSOR_TYPE_TO_NP_TYPE", 

193 "\n".join(lines)) 

194 

195 

196class RandomNormalLike(_CommonRandom): 

197 

198 atts = {'dtype': 0, 

199 'mean': 0., 

200 'scale': 1., 

201 'seed': numpy.nan} 

202 

203 def __init__(self, onnx_node, desc=None, **options): 

204 _CommonRandom.__init__(self, onnx_node, desc=desc, 

205 expected_attributes=RandomNormalLike.atts, 

206 **options) 

207 self.numpy_type = ( 

208 None if self.dtype == 0 else TENSOR_TYPE_TO_NP_TYPE[self.dtype]) 

209 

210 def _run(self, x, attributes=None, verbose=0, fLOG=None): # pylint: disable=W0221 

211 dtype = self._dtype(x) 

212 state = self._get_state(self.seed) 

213 res = state.randn(*x.shape).astype(dtype) 

214 res *= self.scale 

215 res += self.mean 

216 return (res.astype(dtype), ) 

217 

218 def to_python(self, inputs): 

219 if len(inputs) > 0 and hasattr(inputs[0], 'dtype'): 

220 dtype = inputs[0].dtype 

221 shape = inputs[0].shape 

222 else: 

223 dtype = self.numpy_type or numpy.float32 

224 shape = (1, ) 

225 lines = [ 

226 'numpy_dtype = TENSOR_TYPE_TO_NP_TYPE[dtype]', 

227 'state = numpy.random.RandomState(seed=seed)', 

228 'return (state.randn(%r).astype(numpy.%s) * %f) + %f' % ( 

229 shape, dtype, self.scale, self.mean)] 

230 return ("import numpy\nfrom onnx.mapping import TENSOR_TYPE_TO_NP_TYPE", 

231 "\n".join(lines))