Coverage for mlprodict/onnxrt/validate/validate_latency.py: 88%

96 statements  

« prev     ^ index     » next       coverage.py v7.1.0, created at 2023-02-04 02:28 +0100

1""" 

2@file 

3@brief Command line about validation of prediction runtime. 

4""" 

5import os 

6from collections import OrderedDict 

7import json 

8import numpy 

9from onnx import TensorProto 

10from pandas import DataFrame 

11from .. import OnnxInference 

12from ..ops_whole.session import OnnxWholeSession 

13 

14 

15def _random_input(typ, shape, batch): 

16 if typ in ('tensor(double)', TensorProto.DOUBLE): # pylint: disable=E1101 

17 dtype = numpy.float64 

18 elif typ in ('tensor(float)', TensorProto.FLOAT): # pylint: disable=E1101 

19 dtype = numpy.float32 

20 else: 

21 raise NotImplementedError( 

22 f"Unable to guess dtype from {typ!r}.") 

23 

24 if len(shape) <= 1: 

25 new_shape = shape 

26 elif shape[0] in (None, 0): 

27 new_shape = tuple([batch] + list(shape[1:])) 

28 else: 

29 new_shape = shape 

30 return numpy.random.randn(*new_shape).astype(dtype) 

31 

32 

33def random_feed(inputs, batch=10, empty_dimension=1): 

34 """ 

35 Creates a dictionary of random inputs. 

36 

37 :param batch: dimension to use as batch dimension if unknown 

38 :param empty_dimension: if a dimension is null, replaces it by this value 

39 :return: dictionary 

40 """ 

41 res = OrderedDict() 

42 for inp in inputs: 

43 name = inp.name 

44 if hasattr(inp.type, 'tensor_type'): 

45 typ = inp.type.tensor_type.elem_type 

46 shape = tuple(getattr(d, 'dim_value', batch) 

47 for d in inp.type.tensor_type.shape.dim) 

48 shape = (shape[0], ) + tuple( 

49 b if b > 0 else empty_dimension for b in shape[1:]) 

50 else: 

51 typ = inp.type 

52 shape = inp.shape 

53 res[name] = _random_input(typ, shape, batch) 

54 return res 

55 

56 

57def latency(model, law='normal', size=1, number=10, repeat=10, max_time=0, 

58 runtime="onnxruntime", device='cpu', profiling=None): 

59 """ 

60 Measures the latency of a model (python API). 

61 

62 :param model: ONNX graph 

63 :param law: random law used to generate fake inputs 

64 :param size: batch size, it replaces the first dimension 

65 of every input if it is left unknown 

66 :param number: number of calls to measure 

67 :param repeat: number of times to repeat the experiment 

68 :param max_time: if it is > 0, it runs as many time during 

69 that period of time 

70 :param runtime: available runtime 

71 :param device: device, `cpu`, `cuda:0` 

72 :param profiling: if True, profile the execution of every 

73 node, if can be sorted by name or type, 

74 the value for this parameter should e in `(None, 'name', 'type')`, 

75 :return: dictionary or a tuple (dictionary, dataframe) 

76 if the profiling is enable 

77 

78 .. cmdref:: 

79 :title: Measures model latency 

80 :cmd: -m mlprodict latency --help 

81 :lid: l-cmd-latency 

82 

83 The command generates random inputs and call many times the 

84 model on these inputs. It returns the processing time for one 

85 iteration. 

86 

87 Example:: 

88 

89 python -m mlprodict latency --model "model.onnx" 

90 """ 

91 from cpyquickhelper.numbers import measure_time # delayed import 

92 

93 if isinstance(model, str) and not os.path.exists(model): 

94 raise FileNotFoundError( # pragma: no cover 

95 f"Unable to find model {model!r}.") 

96 if profiling not in (None, '', 'name', 'type'): 

97 raise ValueError( 

98 f"Unexpected value for profiling: {profiling!r}.") 

99 size = int(size) 

100 number = int(number) 

101 repeat = int(repeat) 

102 if max_time in (None, 0, ""): 

103 max_time = None 

104 else: 

105 max_time = float(max_time) 

106 if max_time <= 0: 

107 max_time = None 

108 

109 if law != "normal": 

110 raise ValueError( 

111 f"Only law='normal' is supported, not {law!r}.") 

112 

113 if device in ('cpu', 'CPUExecutionProviders'): 

114 providers = ['CPUExecutionProviders'] 

115 elif device in ('cuda:0', 'CUDAExecutionProviders'): 

116 if runtime != 'onnxruntime': 

117 raise NotImplementedError( # pragma: no cover 

118 "Only runtime 'onnxruntime' supports this device or provider " 

119 "%r." % device) 

120 providers = ['CUDAExecutionProviders'] 

121 elif ',' in device: 

122 from onnxruntime import get_all_providers # delayed import 

123 if runtime != 'onnxruntime': 

124 raise NotImplementedError( # pragma: no cover 

125 "Only runtime 'onnxruntime' supports this device or provider " 

126 "%r." % device) 

127 providers = device.split(',') 

128 allp = set(get_all_providers()) 

129 for p in providers: 

130 if p not in allp: 

131 raise ValueError( 

132 f"One device or provider {p!r} is not supported among {allp!r}.") 

133 else: 

134 raise ValueError( # pragma no cover 

135 f"Device {device!r} not supported.") 

136 

137 if runtime in ("onnxruntime", "onnxruntime-cuda"): 

138 from onnxruntime import InferenceSession, SessionOptions # delayed import 

139 providers = ['CPUExecutionProvider'] 

140 if runtime == "onnxruntime-cuda": 

141 providers = ['CUDAExecutionProvider'] + providers 

142 if profiling in ('name', 'type'): 

143 so = SessionOptions() 

144 so.enable_profiling = True 

145 sess = InferenceSession( 

146 model, sess_options=so, providers=providers) 

147 else: 

148 sess = InferenceSession(model, providers=providers) 

149 fct = lambda feeds: sess.run(None, feeds) 

150 inputs = sess.get_inputs() 

151 else: 

152 if profiling in ('name', 'type'): 

153 runtime_options = {"enable_profiling": True} 

154 if runtime != 'onnxruntime1': 

155 raise NotImplementedError( # pragma: no cover 

156 f"Profiling is not implemented for runtime={runtime!r}.") 

157 else: 

158 runtime_options = None 

159 oinf = OnnxInference(model, runtime=runtime, 

160 runtime_options=runtime_options) 

161 fct = lambda feeds: oinf.run(feeds) 

162 inputs = oinf.obj.graph.input 

163 

164 feeds = random_feed(inputs, size) 

165 res = measure_time( 

166 lambda: fct(feeds), number=number, repeat=repeat, context={}, 

167 max_time=max_time, div_by_number=True) 

168 for k, v in feeds.items(): 

169 res[f"shape({k})"] = "x".join(map(str, v.shape)) 

170 if profiling in ('name', 'type'): 

171 if runtime == 'onnxruntime': 

172 profile_name = sess.end_profiling() 

173 with open(profile_name, 'r', encoding='utf-8') as f: 

174 js = json.load(f) 

175 js = OnnxWholeSession.process_profiling(js) 

176 df = DataFrame(js) 

177 else: 

178 df = oinf.get_profiling(as_df=True) 

179 if profiling == 'name': 

180 gr = df[['dur', "args_op_name", "name"]].groupby( 

181 ["args_op_name", "name"]).sum().sort_values('dur') 

182 else: 

183 gr = df[['dur', "args_op_name"]].groupby( 

184 "args_op_name").sum().sort_values('dur') 

185 return res, gr 

186 

187 return res