microsoft/onnxruntime-extensions

Public

mirrored fromhttps://github.com/microsoft/onnxruntime-extensionsAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
natke-patch-1

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

onnxruntime_extensions/tools/pre_post_processing/steps/vision.py

983lines · modecode

1# Copyright (c) Microsoft Corporation. All rights reserved.
2# Licensed under the MIT License.
3
4import onnx
5import numpy as np
6
7from typing import List, Optional, Tuple, Union
8from ..step import Step
9from .general import Transpose
10
11#
12# Image conversion
13#
14
15
16class ConvertImageToBGR(Step):
17 """
18 Convert the bytes of an image by decoding to BGR ordered uint8 values.
19 Supported input formats: jpg, png
20 Input shape: {num_encoded_bytes}
21 Output shape: {input_image_height, input_image_width, 3}
22 """
23
24 def __init__(self, name: Optional[str] = None):
25 """
26 Args:
27 name: Optional name of step. Defaults to 'ConvertImageToBGR'
28
29 NOTE: Input image format is inferred and does not need to be specified.
30 """
31 super().__init__(["image"], ["bgr_data"], name)
32
33 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
34 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
35 assert input_type_str == "uint8"
36 output_shape_str = f"to_bgr_ppp_{self.step_num}_h, to_bgr_ppp_{self.step_num}_w, 3"
37
38 converter_graph = onnx.parser.parse_graph(
39 f"""\
40 image_to_bgr (uint8[{input_shape_str}] {self.input_names[0]})
41 => (uint8[{output_shape_str}] {self.output_names[0]})
42 {{
43 {self.output_names[0]} = com.microsoft.extensions.DecodeImage({self.input_names[0]})
44 }}
45 """
46 )
47
48 return converter_graph
49
50
51class ConvertBGRToImage(Step):
52 """
53 Convert BGR ordered uint8 data into an encoded image.
54 Supported output input formats: jpg, png
55 Input shape: {input_image_height, input_image_width, 3}
56 Output shape: {num_encoded_bytes}
57 """
58
59 def __init__(self, image_format: str = "jpg", name: Optional[str] = None):
60 """
61 Args:
62 image_format: Format to encode to. jpg and png are supported.
63 name: Optional step name. Defaults to 'ConvertBGRToImage'
64 """
65 super().__init__(["bgr_data"], ["image"], name)
66 assert image_format == "jpg" or image_format == "png"
67 self._format = image_format
68
69 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
70 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
71 assert input_type_str == "uint8"
72 output_shape_str = f"to_image_ppp_{self.step_num}_num_bytes"
73
74 converter_graph = onnx.parser.parse_graph(
75 f"""\
76 bgr_to_image (uint8[{input_shape_str}] {self.input_names[0]})
77 => (uint8[{output_shape_str}] {self.output_names[0]})
78 {{
79 {self.output_names[0]} = com.microsoft.extensions.EncodeImage ({self.input_names[0]})
80 }}
81 """
82 )
83
84 # as this is a custom op we have to add the attribute for `format` directly to the node.
85 # parse_graph doesn't have a schema for the operator and fails attempting to validate the attribute.
86 format_attr = converter_graph.node[0].attribute.add()
87 format_attr.name = "format"
88 format_attr.type = onnx.AttributeProto.AttributeType.STRING
89 format_attr.s = bytes(self._format, "utf-8")
90
91 return converter_graph
92
93
94class PixelsToYCbCr(Step):
95 """
96 Convert RGB or BGR pixel data to YCbCr format.
97 Input shape: {height, width, 3}
98 Output shape is the same.
99 Output data is float, but rounded and clipped to the range 0..255 as per the spec for YCbCr conversion.
100 """
101
102 def __init__(self, layout: str = "BGR", name: Optional[str] = None):
103 """
104 Args:
105 layout: Input data layout. Can be 'BGR' or 'RGB'
106 name: Optional step name. Defaults to 'PixelsToYCbCr'
107 """
108 super().__init__(["pixels"], ["Y", "Cb", "Cr"], name)
109 assert layout == "RGB" or layout == "BGR"
110 self._layout = layout
111
112 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
113 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
114 # input should be uint8 data HWC
115 input_dims = input_shape_str.split(",")
116 assert input_type_str == "uint8" and len(input_dims) == 3 and input_dims[2] == "3"
117
118 # https://en.wikipedia.org/wiki/YCbCr
119 # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en
120 rgb_weights = np.array([[0.299, 0.587, 0.114],
121 [-0.299 / 1.772, -0.587 / 1.772, 0.500],
122 [0.500, -0.587 / 1.402, -0.114 / 1.402]],
123 dtype=np.float32) # fmt: skip
124
125 bias = [0.0, 128.0, 128.0]
126
127 if self._layout == "RGB":
128 weights = rgb_weights
129 else:
130 weights = rgb_weights[:, ::-1] # reverse the order of the last dim for BGR input
131
132 # Weights are transposed for usage in matmul.
133 weights_shape = "3, 3"
134 weights = ",".join([str(w) for w in weights.T.flatten()])
135
136 bias_shape = "3"
137 bias = ",".join([str(b) for b in bias])
138
139 # each output is {h, w}. TBD if input is CHW or HWC though. Once we figure that out we could copy values from
140 # the input shape
141 output_shape_str = f"YCbCr_ppp_{self.step_num}_h, YCbCr_ppp_{self.step_num}_w"
142 assert input_type_str == "uint8"
143
144 split_attr = "axis = -1"
145 if onnx_opset >= 18:
146 # Split now requires the number of outputs to be specified even though that can be easily inferred...
147 split_attr += ", num_outputs = 3"
148
149 # convert to float for MatMul
150 # apply weights and bias
151 # round and clip so it's in the range 0..255
152 # split into channels. shape will be {h, w, 1}
153 # remove the trailing '1' so output is {h, w}
154 converter_graph = onnx.parser.parse_graph(
155 f"""\
156 pixels_to_YCbCr (uint8[{input_shape_str}] {self.input_names[0]})
157 => (float[{output_shape_str}] {self.output_names[0]},
158 float[{output_shape_str}] {self.output_names[1]},
159 float[{output_shape_str}] {self.output_names[2]})
160 {{
161 kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> ()
162 kBias = Constant <value = float[{bias_shape}] {{{bias}}}> ()
163 i64_neg1 = Constant <value = int64[1] {{-1}}> ()
164 f_0 = Constant <value = float[1] {{0.0}}> ()
165 f_255 = Constant <value = float[1] {{255.0}}> ()
166
167 f_pixels = Cast <to = 1> ({self.input_names[0]})
168 f_weighted = MatMul(f_pixels, kWeights)
169 f_biased = Add(f_weighted, kBias)
170 f_rounded = Round(f_biased)
171 f_clipped = Clip (f_rounded, f_0, f_255)
172 split_Y, split_Cb, split_Cr = Split <{split_attr}>(f_clipped)
173 {self.output_names[0]} = Squeeze (split_Y, i64_neg1)
174 {self.output_names[1]} = Squeeze (split_Cb, i64_neg1)
175 {self.output_names[2]} = Squeeze (split_Cr, i64_neg1)
176 }}
177 """
178 )
179
180 return converter_graph
181
182
183class YCbCrToPixels(Step):
184 """
185 Convert YCbCr input to RGB or BGR.
186
187 Input data can be uint8 or float but all inputs must use the same type.
188 Input shape: {height, width, 3}
189 Output shape is the same.
190 """
191
192 def __init__(self, layout: str = "BGR", name: Optional[str] = None):
193 """
194 Args:
195 layout: Output layout. Can be 'BGR' or 'RGB'
196 name: Optional step name. Defaults to 'YCbCrToPixels'
197 """
198 super().__init__(["Y", "Cb", "Cr"], ["bgr_data"], name)
199 assert layout == "RGB" or layout == "BGR"
200 self._layout = layout
201
202 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
203 input_type_str0, input_shape_str0 = self._get_input_type_and_shape_strs(graph, 0)
204 input_type_str1, input_shape_str1 = self._get_input_type_and_shape_strs(graph, 1)
205 input_type_str2, input_shape_str2 = self._get_input_type_and_shape_strs(graph, 2)
206 assert (input_type_str0 == "uint8" and input_type_str1 == "uint8" and input_type_str2 == "uint8") or (
207 input_type_str0 == "float" and input_type_str1 == "float" and input_type_str2 == "float"
208 )
209
210 assert (
211 len(input_shape_str0.split(",")) == 2
212 and len(input_shape_str1.split(",")) == 2
213 and len(input_shape_str2.split(",")) == 2
214 )
215
216 output_shape_str = f"{input_shape_str0}, 3"
217
218 # fmt: off
219 # https://en.wikipedia.org/wiki/YCbCr
220 # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en
221 ycbcr_to_rgb_weights = np.array([[1, 0, 1.402],
222 [1, -0.114*1.772/0.587, -0.299*1.402/0.587],
223 [1, 1.772, 0]],
224 dtype=np.float32)
225 # fmt: on
226
227 # reverse first dim of weights for output to be bgr
228 ycbcr_to_bgr_weights = ycbcr_to_rgb_weights[::-1, :]
229
230 weights = ycbcr_to_bgr_weights if self._layout == "BGR" else ycbcr_to_rgb_weights
231 bias = [0.0, 128.0, 128.0]
232
233 weights_shape = "3, 3"
234 # transpose weights for use in matmul
235 weights = ",".join([str(w) for w in weights.T.flatten()])
236
237 bias_shape = "3"
238 bias = ",".join([str(b) for b in bias])
239
240 # unsqueeze the {h, w} inputs to add channels dim. new shape is {h, w, 1}
241 # merge Y, Cb, Cr data on the new channel axis
242 # convert to float to apply weights etc.
243 # remove bias
244 # apply weights
245 # round and clip to 0..255
246 # convert to uint8.
247 converter_graph = onnx.parser.parse_graph(
248 f"""\
249 YCbCr_to_RGB ({input_type_str0}[{input_shape_str0}] {self.input_names[0]},
250 {input_type_str1}[{input_shape_str1}] {self.input_names[1]},
251 {input_type_str2}[{input_shape_str2}] {self.input_names[2]})
252 => (uint8[{output_shape_str}] {self.output_names[0]})
253 {{
254 kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> ()
255 kBias = Constant <value = float[{bias_shape}] {{{bias}}}> ()
256 f_0 = Constant <value = float[1] {{0.0}}> ()
257 f_255 = Constant <value = float[1] {{255.0}}> ()
258 i64_neg1 = Constant <value = int64[1] {{-1}}> ()
259
260 Y1 = Unsqueeze({self.input_names[0]}, i64_neg1)
261 Cb1 = Unsqueeze({self.input_names[1]}, i64_neg1)
262 Cr1 = Unsqueeze({self.input_names[2]}, i64_neg1)
263 YCbCr = Concat <axis = -1> (Y1, Cb1, Cr1)
264 f_YCbCr = Cast <to = 1> (YCbCr)
265 f_unbiased = Sub (f_YCbCr, kBias)
266 f_pixels = MatMul (f_unbiased, kWeights)
267 f_rounded = Round (f_pixels)
268 clipped = Clip (f_rounded, f_0, f_255)
269 {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped)
270 }}
271 """
272 )
273
274 return converter_graph
275
276
277#
278# Pre-processing
279#
280class Resize(Step):
281 """
282 Resize input data. Aspect ratio is maintained.
283 e.g. if image is 1200 x 600 and 300 x 300 is requested the result will be 600 x 300
284 """
285
286 def __init__(self, resize_to: Union[int, Tuple[int, int]], layout: str = "HWC",
287 policy: str = "not_smaller", name: Optional[str] = None):
288 """
289 Args:
290 resize_to: Target size. Can be a single value or a tuple with (target_height, target_width).
291 The aspect ratio will be maintained and neither height or width in the result will be smaller
292 than the requested value.
293 layout: Input layout. 'NCHW', 'NHWC', 'CHW', 'HWC' and 'HW' are supported.
294 policy: not_smaller (default)
295 the sizes are adjusted so that no extent of the output is larger than the specified size,
296 while keeping the original aspect ratio
297 not_larger
298 the sizes are adjusted so that no extent of the output is smaller than the specified size,
299 while keeping the original aspect ratio.
300 Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#Resize for more details.
301 name: Optional name. Defaults to 'Resize'
302 """
303 super().__init__(["image"], ["resized_image"], name)
304 if isinstance(resize_to, int):
305 self._height = self._width = resize_to
306 else:
307 assert isinstance(resize_to, tuple)
308 self._height, self._width = resize_to
309
310 self._layout = layout
311 self.policy_ = policy
312
313 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
314 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
315 dims = input_shape_str.split(",")
316
317 # adjust for layout
318 # resize will use the largest ratio so both sides won't necessarily match the requested height and width.
319 # use symbolic names for the output dims as we have to provide values. prefix the names to try and
320 # avoid any clashes.
321 add_batch_dim = False
322
323 if self._layout == "NHWC":
324 assert len(dims) == 4
325 split_str = "n, h, w, c"
326 sizes_str = "n, h2, w2, c"
327 output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}"
328 elif self._layout == "NCHW":
329 assert len(dims) == 4
330 split_str = "n, c, h, w"
331 sizes_str = "n, c, h2, w2"
332 output_shape_str = f"{dims[0]}, {dims[1]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
333 elif self._layout == "HWC":
334 assert len(dims) == 3
335 add_batch_dim = True
336 split_str = "h, w, c"
337 sizes_str = "h2, w2, c"
338 output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}"
339 elif self._layout == "CHW":
340 assert len(dims) == 3
341 add_batch_dim = True
342 split_str = "c, h, w"
343 sizes_str = "c, h2, w2"
344 output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
345 elif self._layout == "HW":
346 assert len(dims) == 2
347 split_str = "h, w"
348 sizes_str = "h2, w2"
349 output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
350 else:
351 raise ValueError(f"Unsupported layout of {self._layout}")
352
353 # TODO: Make this configurable. Matching PIL resize for now.
354 resize_attributes = 'mode = "linear", nearest_mode = "floor"'
355 if onnx_opset >= 18:
356 # Resize matches PIL better if antialiasing is used, but that isn't available until ONNX opset 18.
357 # Allow this to be used with older opsets as well.
358 resize_attributes += ', antialias = 1'
359
360 u64_1_str = ""
361
362 # Rank 3 input uses trilinear interpolation, so if input is HWC or CHW we need to add a temporary batch dim
363 # to make it rank 4, which will result in Resize using the desired bilinear interpolation.
364 if add_batch_dim:
365 u64_1_str = "u64_1 = Constant <value = int64[1] {1}> ()"
366 sizes_str = "u64_1, " + sizes_str
367 resize_str = \
368 f"""\
369 axes = Constant <value = int64[1] {{{0}}}> ()
370 unsqueezed = Unsqueeze ({self.input_names[0]}, axes)
371 resized = Resize <{resize_attributes}> (unsqueezed, , , sizes_resize)
372 {self.output_names[0]} = Squeeze (resized, axes)
373 """
374 else:
375 resize_str = \
376 f"{self.output_names[0]} = Resize <{resize_attributes}> ({self.input_names[0]}, , , sizes_resize)"
377
378 split_input_shape_attr = "axis = 0"
379 split_new_sizes_attr = "axis = 0"
380 if onnx_opset >= 18:
381 # Split now requires the number of outputs to be specified even though that can be easily inferred...
382 split_input_shape_attr += f", num_outputs = {len(dims)}"
383 split_new_sizes_attr += ", num_outputs = 2"
384
385 # Resize-18 has the attribute "not_larger/not_smaller" to specify the resize policy, however
386 # we want to support older opsets as well.
387 assert (self.policy_ in ["not_smaller", "not_larger"],
388 f"Unsupported resize policy of {self.policy_}, must be 'not_smaller' or 'not_larger'")
389 ratio_resize_func = "ReduceMax"
390 if self.policy_ == "not_larger":
391 ratio_resize_func = "ReduceMin"
392
393 resize_graph = onnx.parser.parse_graph(
394 f"""\
395 resize ({input_type_str}[{input_shape_str}] {self.input_names[0]}) =>
396 ({input_type_str}[{output_shape_str}] {self.output_names[0]})
397 {{
398 target_size = Constant <value = float[2] {{{float(self._height)}, {float(self._width)}}}> ()
399 image_shape = Shape ({self.input_names[0]})
400 {split_str} = Split <{split_input_shape_attr}> (image_shape)
401 hw = Concat <axis = 0> (h, w)
402 f_hw = Cast <to = 1> (hw)
403 ratios = Div (target_size, f_hw)
404 ratio_resize = {ratio_resize_func} (ratios)
405 f_hw2_exact = Mul (f_hw, ratio_resize)
406 f_hw2_round = Round (f_hw2_exact)
407 hw2 = Cast <to = 7> (f_hw2_round)
408 h2, w2 = Split <{split_new_sizes_attr}> (hw2)
409 {u64_1_str}
410 sizes_resize = Concat <axis = 0> ({sizes_str})
411 {resize_str}
412 }}
413 """
414 )
415
416 return resize_graph
417
418
419class CenterCrop(Step):
420 """
421 Crop the input to the requested dimensions, with the crop being centered.
422 Currently only HWC input is handled.
423 """
424
425 def __init__(self, height: int, width: int, name: Optional[str] = None):
426 """
427 Args:
428 height: Height of area to crop.
429 width: Width of area to crop.
430 name: Optional step name. Defaults to 'CenterCrop'
431 """
432 super().__init__(["image"], ["cropped_image"], name)
433 self._height = height
434 self._width = width
435
436 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
437 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
438 dims = input_shape_str.split(",")
439 output_shape_str = f"{self._height}, {self._width}, {dims[-1]}"
440
441 crop_graph = onnx.parser.parse_graph(
442 f"""\
443 crop ({input_type_str}[{input_shape_str}] {self.input_names[0]})
444 => ({input_type_str}[{output_shape_str}] {self.output_names[0]})
445 {{
446 target_crop = Constant <value = int64[2] {{{self._height}, {self._width}}}> ()
447 i64_2 = Constant <value = int64[1] {{2}}> ()
448 axes = Constant <value = int64[2] {{0, 1}}> ()
449 x_shape = Shape ({self.input_names[0]})
450 hw = Gather (x_shape, axes)
451 hw_diff = Sub (hw, target_crop)
452 start_xy = Div (hw_diff, i64_2)
453 end_xy = Add (start_xy, target_crop)
454 {self.output_names[0]} = Slice ({self.input_names[0]}, start_xy, end_xy, axes)
455 }}
456 """
457 )
458
459 return crop_graph
460
461
462class Normalize(Step):
463 """
464 Normalize input data on a per-channel basis.
465 `x -> (x - mean) / stddev`
466 Output is float with same shape as input.
467 """
468
469 def __init__(self, normalization_values: List[Tuple[float, float]], layout: str = "CHW", name: Optional[str] = None):
470 """
471 Args:
472 normalization_values: Tuple with (mean, stddev). One entry per channel.
473 If single entry is provided it will be used for all channels.
474 layout: Input layout. Can be 'CHW' or 'HWC'
475 name: Optional step name. Defaults to 'Normalize'
476 """
477 super().__init__(["data"], ["normalized_data"], name)
478
479 # duplicate for each channel if needed
480 if len(normalization_values) == 1:
481 normalization_values *= 3
482
483 assert len(normalization_values) == 3
484 self._normalization_values = normalization_values
485 assert layout == "HWC" or layout == "CHW"
486 self._hwc_layout = True if layout == "HWC" else False
487
488 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
489 mean0 = self._normalization_values[0][0]
490 mean1 = self._normalization_values[1][0]
491 mean2 = self._normalization_values[2][0]
492 stddev0 = self._normalization_values[0][1]
493 stddev1 = self._normalization_values[1][1]
494 stddev2 = self._normalization_values[2][1]
495
496 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
497 values_shape = "3" if self._hwc_layout else "3, 1, 1"
498
499 normalize_graph = onnx.parser.parse_graph(
500 f"""\
501 normalize ({input_type_str}[{input_shape_str}] {self.input_names[0]})
502 => (float[{input_shape_str}] {self.output_names[0]})
503 {{
504 kMean = Constant <value = float[{values_shape}] {{{mean0}, {mean1}, {mean2}}}> ()
505 kStddev = Constant <value = float[{values_shape}] {{{stddev0}, {stddev1}, {stddev2}}}> ()
506 f_input = Cast <to = 1> ({self.input_names[0]})
507 f_sub_mean = Sub (f_input, kMean)
508 {self.output_names[0]} = Div (f_sub_mean, kStddev)
509 }}
510 """
511 )
512
513 onnx.checker.check_graph(normalize_graph)
514 return normalize_graph
515
516
517#
518# Utilities
519#
520class ImageBytesToFloat(Step):
521 """
522 Convert uint8 or float values in range 0..255 to floating point values in range 0..1
523 """
524
525 def __init__(self, name: Optional[str] = None):
526 """
527 Args:
528 name: Optional step name. Defaults to 'ImageBytesToFloat'
529 """
530 super().__init__(["data"], ["float_data"], name)
531
532 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
533 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
534 if input_type_str == "uint8":
535 optional_cast = f"""\
536 input_f = Cast <to = 1> ({self.input_names[0]})
537 """
538 else:
539 # no-op that optimizer will remove
540 optional_cast = f"input_f = Identity ({self.input_names[0]})"
541
542 byte_to_float_graph = onnx.parser.parse_graph(
543 f"""\
544 byte_to_float ({input_type_str}[{input_shape_str}] {self.input_names[0]})
545 => (float[{input_shape_str}] {self.output_names[0]})
546 {{
547 f_255 = Constant <value = float[1] {{255.0}}>()
548
549 {optional_cast}
550 {self.output_names[0]} = Div(input_f, f_255)
551 }}
552 """
553 )
554
555 onnx.checker.check_graph(byte_to_float_graph)
556 return byte_to_float_graph
557
558
559class FloatToImageBytes(Step):
560 """
561 Converting floating point values to uint8 values in range 0..255.
562 Typically this reverses ImageBytesToFloat by converting input data in the range 0..1, but an optional multiplier
563 can be specified if the input data has a different range.
564 Values will be rounded prior to clipping and conversion to uint8.
565 """
566
567 def __init__(self, multiplier: float = 255.0, name: Optional[str] = None):
568 """
569 Args:
570 multiplier: Optional multiplier. Currently, the expected values are 255 (input data is in range 0..1), or
571 1 (input data is in range 0..255).
572 name: Optional step name. Defaults to 'FloatToImageBytes'
573 """
574 super().__init__(["float_data"], ["pixel_data"], name)
575 self._multiplier = multiplier
576
577 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
578 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
579 assert input_type_str == "float"
580
581 if self._multiplier == 1.0:
582 scale_input = ''
583 scaled_input_name = self.input_names[0]
584 else:
585 scale_input = \
586 f"""\
587 f_multiplier = Constant <value = float[1] {{{self._multiplier}}}> ()
588 scaled_input = Mul ({self.input_names[0]}, f_multiplier)
589 """
590 scaled_input_name = 'scaled_input'
591
592 float_to_byte_graphs = onnx.parser.parse_graph(
593 f"""\
594 float_to_type (float[{input_shape_str}] {self.input_names[0]})
595 => (uint8[{input_shape_str}] {self.output_names[0]})
596 {{
597 f_0 = Constant <value = float[1] {{0.0}}> ()
598 f_255 = Constant <value = float[1] {{255.0}}>()
599
600 {scale_input}
601 rounded = Round ({scaled_input_name})
602 clipped = Clip (rounded, f_0, f_255)
603 {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped)
604 }}
605 """
606 )
607
608 onnx.checker.check_graph(float_to_byte_graphs)
609 return float_to_byte_graphs
610
611
612class ChannelsLastToChannelsFirst(Transpose):
613 """
614 Convert channels last data to channels first.
615 Input can be NHWC or HWC.
616 """
617
618 def __init__(self, has_batch_dim: bool = False, name: Optional[str] = None):
619 """
620 Args:
621 has_batch_dim: Set to True if the input has a batch dimension (i.e. is NHWC)
622 name: Optional step name. Defaults to 'ChannelsLastToChannelsFirst'
623 """
624 perms = [0, 3, 1, 2] if has_batch_dim else [2, 0, 1]
625 super().__init__(perms, name)
626
627
628class DrawBoundingBoxes(Step):
629 """
630 Draw boxes on BGR image at given position, image is channel last and ordered by BGR.
631 Input shape: <uint8_t>{height, width, 3<BGR>}
632 boxes: <float>{num_boxes, 6<x, y, x/w, y/h, score, class>}
633 The coordinates is the absolute pixel values in the picture. Its value is determined by `mode`.
634 we have different modes to represent the coordinates of the box.[XYXY, XYWH, CENTER_XYWH].
635 Please refer to the following link for more details. https://keras.io/api/keras_cv/bounding_box/formats/
636 **score** is the confidence of the box(object score * class probability) and **class** is the class of the box.
637
638 Output shape: <uint8_t>{height, width, 3<BGR>}
639 """
640
641 def __init__(self, mode: str = "XYXY", thickness: int = 4, num_classes: int = 10,
642 colour_by_classes=False, name: Optional[str] = None):
643 """
644 Args:
645 mode: The mode of the boxes,
646 "XYXY" (xmin ymin xmax ymax) All values in the XYXY format should be absolute pixel values.
647 "XYWH" (xmin ymin width height)
648 "CENTER_XYWH" (x_center, y_center, width, height)
649 All values in the CENTER_XYWH format should be absolute pixel values.
650
651
652 thickness: Thickness of the box edge
653 num_colours: Number of colours to use
654 We support 10 predefined colours and the other classes more than 10 wouldn't be drawn.
655 colors are [Red, Yellow, Lime, Cyan, Blue, Magenta, Orange, Maroon, Green, Navy]
656 and are used in that order. i.e. result with best score will use red.
657 colour_by_classes: Colour boxes by classes or by score.
658 If `True` we use a colour for each unique class, with all results from the top
659 `num_colours` classes displayed. A colour is only used for a single class.
660 If `False`, we draw boxes for the top `num_colours` results. A colour is used
661 for a single result, regardless of class.
662 name: Optional name of step. Defaults to 'DrawBoundingBoxes'
663 """
664 super().__init__(["image", "boxes"], ["image_out"], name)
665 self.thickness_ = thickness
666 self.num_classes_ = num_classes
667 self.colour_by_classes_ = colour_by_classes
668 self.mode_ = mode
669
670 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
671 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
672 input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1)
673 assert input0_type_str == "uint8" and input1_type_str == "float"
674
675 assert str(input1_shape_str.split(",")[-1]) == "6"
676
677
678 output_shape_str = input0_shape_str
679 converter_graph = onnx.parser.parse_graph(
680 f"""\
681 bounding_box (uint8[{input0_shape_str}] {self.input_names[0]}, float[{input1_shape_str}] {self.input_names[1]})
682 => (uint8[{output_shape_str}] {self.output_names[0]})
683 {{
684 {self.output_names[0]} = com.microsoft.extensions.DrawBoundingBoxes({self.input_names[0]}, {self.input_names[1]})
685 }}
686 """
687 )
688 op_attr = ["thickness", "num_classes", "colour_by_classes","mode"]
689 token_model_attr = []
690 token_model_attr.append(onnx.helper.make_attribute(op_attr[0], self.thickness_))
691 token_model_attr.append(onnx.helper.make_attribute(op_attr[1], self.num_classes_))
692 token_model_attr.append(onnx.helper.make_attribute(op_attr[2], int(self.colour_by_classes_)))
693 token_model_attr.append(onnx.helper.make_attribute(op_attr[3], self.mode_))
694 converter_graph.node[0].attribute.extend(token_model_attr)
695
696 return converter_graph
697
698
699class LetterBox(Step):
700 """
701 Image is channel last and ordered by BGR.
702 mainly used in object detection, it mostly follows behind resize operation.
703 This step either add border or crop the image to satisfy network input.
704 ----- bbbbbbbbb
705 |img| --- > bb-----bb
706 ----- bb|img|bb
707 bb-----bb
708 bbbbbbbbb
709 If target_shape is less than the original image, it will crop the image in a center mode.
710 And the padding values will be negative and the Pad op performs cropping.
711
712 Input shape: <uint8_t>{height, width, 3<BGR>}
713 target_shape: <uint8_t>{out_height, out_width, 3<BGR>}
714 Output shape: specified by target_shape
715 """
716
717 def __init__(self, target_shape: Union[int, Tuple[int, int]], fill_value=0, name: Optional[str] = None):
718 """
719 Args:
720 target_shape: the size of the output image
721 fill_value: a constant value used to fill the border
722 name: Optional name of step. Defaults to 'LetterBox'
723 """
724 super().__init__(["image"], ["image_pad"], name)
725
726 self.target_shape_ = target_shape
727 self.fill_value_ = fill_value
728
729 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
730 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
731
732 assert len(input0_shape_str.split(',')) == 3, " expected BGR image"
733
734 target_shape_str = f"{self.target_shape_[0]}, {self.target_shape_[1]}, 3"
735
736 split_input_shape_attr = "axis = 0"
737 if onnx_opset >= 18:
738 # Split now requires the number of outputs to be specified even though that can be easily inferred...
739 split_input_shape_attr += f", num_outputs = 3"
740
741 converter_graph = onnx.parser.parse_graph(
742 f"""\
743 LetterBox (uint8[{input0_shape_str}] {self.input_names[0]})
744 => (uint8[{target_shape_str}] {self.output_names[0]})
745 {{
746 target_size = Constant <value = int64[2] {{{(self.target_shape_[0])}, {(self.target_shape_[1])}}}> ()
747 i64_2 = Constant <value = int64[1] {{2}}>()
748 i64_0 = Constant <value = int64[1] {{0}}>()
749 const_val = Constant <value = uint8[1] {{{self.fill_value_}}}> ()
750 image_shape = Shape ({self.input_names[0]})
751 h,w,c = Split <{split_input_shape_attr}> (image_shape)
752 hw = Concat <axis = 0> (h, w)
753 pad_hw = Sub (target_size, hw)
754 half_pad_hw = Div (pad_hw, i64_2)
755 remainder_pad_hw = Sub (pad_hw, half_pad_hw)
756 pad_value = Concat <axis = 0> (half_pad_hw, i64_0,remainder_pad_hw,i64_0)
757 {self.output_names[0]} = Pad({self.input_names[0]}, pad_value, const_val)
758 }}
759 """
760 )
761
762 return converter_graph
763
764
765class SplitOutBoxAndScore(Step):
766 r"""
767 Split the output of the model into boxes and scores. This step will also handle the optional object score.
768 Input shape: <float>{num_boxes, 4/5+num_classes}
769 Output shape: <float>{num_boxes, 4}, <float>{num_boxes, num_classes}
770 |x1,x2,x3,x4, (obj), cls_1, ... cls_num|
771 /\
772 / \
773 |x1,x2,x3,x4| |cls_1, ... clx_num|*(obj)
774 obj is optional, if it is not present, it will be set to 1.0
775 This is where 4/5 comes from, '4' represent coordinates and the fifth object probability.
776 """
777 def __init__(self, num_classes:int = 80, name: Optional[str] = None):
778 """
779 Args:
780 num_classes: number of classes
781 name: Optional name of step. Defaults to 'SplitOutBoxAndScore'
782 """
783
784 super().__init__(["box_and_score"], ["_pre_boxes", "_pre_scores"], name)
785 self.num_classes_ = num_classes
786
787 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
788 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
789
790 input_shape_list = input0_shape_str.split(',')
791 assert len(input_shape_list) == 2, " expected [num_boxes, 4/5+num_classes]"
792
793 target_shape_str_0 = f"{input_shape_list[0]}, 4"
794 target_shape_str_1 = f"{input_shape_list[0]}, _{self._step_num}_class"
795
796 converter_graph = onnx.parser.parse_graph(
797 f"""\
798 SplitOutBoxAndScore (float[{input0_shape_str}] {self.input_names[0]})
799 => (float[{target_shape_str_0}] {self.output_names[0]}, float[{target_shape_str_1}] {self.output_names[1]})
800 {{
801
802 i64_neg1 = Constant <value = int64[1] {{-1}}>()
803 i64_4 = Constant <value = int64[1] {{4}}>()
804 i64_0 = Constant <value = int64[1] {{0}}>()
805 fp32_1 = Constant <value = float[1] {{1.0}}>()
806 i64_classes = Constant <value = int64[1] {{{self.num_classes_}}}>()
807 out_shape = Shape ({self.input_names[0]})
808 class_and_coor_dim = Gather (out_shape, i64_neg1)
809 coor_and_obj = Sub (class_and_coor_dim, i64_classes)
810 obj_0_or_1 = Sub (coor_and_obj, i64_4)
811 bool_num_obj_0_or_1 = Cast<to=9>(obj_0_or_1)
812
813 box_obj_class_concat = Concat <axis = 0> (i64_4, obj_0_or_1, i64_classes)
814 boxes_o, scores_obj_o, scores_cls_o = Split <axis = -1> ({self.input_names[0]}, box_obj_class_concat)
815 scores_obj_not_null = Concat <axis = -1> (scores_obj_o, boxes_o)
816 coef_obj_cat = Where(bool_num_obj_0_or_1, scores_obj_not_null,fp32_1)
817 coef_obj = Gather <axis=-1> (coef_obj_cat, i64_0)
818 scores_o = Mul (scores_cls_o, coef_obj)
819 {self.output_names[0]} = Identity (boxes_o)
820 {self.output_names[1]} = Identity (scores_o)
821
822 }}
823 """
824 )
825 return converter_graph
826
827
828class SelectBestBoundingBoxesByNMS(Step):
829 """
830 Non-maximum suppression (NMS) is to filter out redundant bounding boxes.
831 This step is used to warp the boxes and scores into onnx SelectBestBoundingBoxesByNMS op.
832 Input:
833 boxes: float[num_boxes, 4]
834 scores: shape float[num_boxes, num_classes]
835
836 Output:
837 nms_out: float[_few_num_boxes, 6<coordinate+score+class>]
838 """
839
840 def __init__(self, iou_threshold:float = 0.5, score_threshold:float = 0.67,
841 max_detections:int = 300, name: Optional[str] = None):
842 """
843 Args:
844 Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#SelectBestBoundingBoxesByNMS
845 for more details about the parameters.
846 iou_threshold: same as SelectBestBoundingBoxesByNMS op, intersection /union of boxes
847 score_threshold: If this box's score is lower than score_threshold, it will be removed.
848 max_detections: max number of boxes to be selected
849 name: Optional name of step. Defaults to 'SelectBestBoundingBoxesByNMS'
850 """
851 super().__init__(["boxes", "scores"], ["nms_out"], name)
852 self.iou_threshold_ = iou_threshold
853 self.score_threshold_ = score_threshold
854 self.max_detections_ = max_detections
855
856
857 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
858 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
859 input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1)
860
861 input0_shape_list = input0_shape_str.split(',')
862 assert len(input0_shape_list) == 2, " expected [num_boxes, 4]"
863
864 target_shape_str = f"_{self._step_num}_nms_boxes, 6"
865
866 reduce_score = '(score_select_nm,i64_neg1)' if onnx_opset >= 18 else '<axes=[-1]>(score_select_nm)'
867
868 converter_graph = onnx.parser.parse_graph(
869 f"""\
870 SelectBestBoundingBoxesByNMS (float[{input0_shape_str}] {self.input_names[0]},float[{input1_shape_str}] {self.input_names[1]})
871 => (float[{target_shape_str}] {self.output_names[0]})
872 {{
873 i64_2 = Constant <value = int64[1] {{2}}>()
874 i64_0 = Constant <value = int64[1] {{0}}>()
875 i64_1 = Constant <value = int64[1] {{1}}>()
876 i64_max_obj = Constant <value = int64[1] {{{self.max_detections_}}}>()
877 i64_neg1 = Constant <value = int64[1] {{-1}}>()
878 fp32_iou_th = Constant <value = float[1] {{{self.iou_threshold_}}}>()
879 fp32_score_th = Constant <value = float[1] {{{self.score_threshold_}}}>()
880
881 boxes_i = Identity ({self.input_names[0]})
882 scores_i = Identity({self.input_names[1]})
883 scores_c_b = Transpose<perm=[1,0]>(scores_i)
884 batch_boxes = Unsqueeze(boxes_i, i64_0)
885 batch_scores = Unsqueeze(scores_c_b, i64_0)
886
887 nmsbox = NonMaxSuppression<center_point_box =1>(batch_boxes, batch_scores, i64_max_obj,fp32_iou_th,fp32_score_th)
888 classes_i64 = Gather <axis=-1>(nmsbox,i64_1)
889 class_select = Cast <to = 1>(classes_i64)
890
891 boxes_idx_us = Gather <axis=-1>(nmsbox,i64_2)
892 boxes_idx = Squeeze(boxes_idx_us, i64_neg1)
893 boxes_select = Gather <axis=0>(boxes_i, boxes_idx)
894
895 score_select_nm = Gather <axis=0>(scores_i, boxes_idx)
896 score_select = ReduceMax{reduce_score}
897
898 {self.output_names[0]} = Concat <axis = -1> (boxes_select, score_select, class_select)
899 }}
900 """
901 )
902 return converter_graph
903
904
905class ScaleBoundingBoxes(Step):
906 """
907 Mapping boxes coordinate to scale in original image.
908 The coordinate of boxes from detection model is relative to the input image of network,
909 image is scaled and padded/cropped. So we need to do a linear mapping to get the real coordinate of original image.
910 input:
911 box_of_nms_out: output of NMS, shape [num_boxes, 6]
912 original_image: original image decoded from jpg/png<uint8_t>[H, W, 3<BGR>]
913 scaled_image: scaled image, but without padding/crop[<uint8_t>[H1, W1, 3<BGR>]
914 letter_boxed_image: scaled image and with padding/crop[<uint8_t>[H2, W3, 3<BGR>]
915
916 output:
917 scaled_box_out: shape [num_boxes, 6] with coordinate mapped to original image.
918 """
919
920 def __init__(self, name: Optional[str] = None):
921 """
922 Args:
923 name: Optional name of step. Defaults to 'ScaleBoundingBoxes'
924 """
925 super().__init__(["box_of_nms_out", "original_image", "scaled_image",
926 "letter_boxed_image"], ["scaled_box_out"], name)
927
928 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
929 graph_input_param = []
930 target_shape = []
931 for idx,input_name in enumerate(self.input_names):
932 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, idx)
933 graph_input_param.append(f"{input_type_str}[{input_shape_str}] {input_name}")
934 target_shape.append(input_shape_str)
935 graph_input_param = ','.join(graph_input_param)
936
937 target_shape = target_shape[:1]
938 graph_output_param = []
939 for idx,output_name in enumerate(self.output_names):
940 graph_output_param.append(f"float[{target_shape[idx]}] {output_name}")
941 graph_output_param = ','.join(graph_output_param)
942
943 def split_num_ouputs(num_outputs: int):
944 split_input_shape_attr= ''
945 if onnx_opset >= 18:
946 split_input_shape_attr = f", num_outputs = {num_outputs}"
947 return split_input_shape_attr
948
949 converter_graph = onnx.parser.parse_graph(
950 f"""\
951 ScaleBoundingBoxes ({graph_input_param})
952 => ({graph_output_param})
953 {{
954 i64_2 = Constant <value = int64[1] {{2}}>()
955
956 ori_shape = Shape ({self.input_names[1]})
957 scaled_shape = Shape ({self.input_names[2]})
958 lettered_shape = Shape ({self.input_names[3]})
959 oh,ow,oc = Split <axis = 0 {split_num_ouputs(3)}> (ori_shape)
960 sh,sw,sc = Split <axis = 0 {split_num_ouputs(3)}> (scaled_shape)
961 lh,lw,lc = Split <axis = 0 {split_num_ouputs(3)}> (lettered_shape)
962 swh = Concat <axis = -1> (sw,sh)
963 lwh = Concat <axis = -1> (lw,lh)
964
965 f_oh = Cast <to = 1> (oh)
966 f_sh = Cast <to = 1> (sh)
967 ratios = Div (f_oh, f_sh)
968
969 pad_wh = Sub (lwh, swh)
970 half_pad_wh = Div (pad_wh, i64_2)
971 f_half_pad_wh = Cast <to = 1> (half_pad_wh)
972
973 boxes_xy,boxes_wh_orxy,boxes_score_class = Split <axis=-1 {split_num_ouputs(3)}>({self.input_names[0]})
974 offset_boxes_xy = Sub (boxes_xy, f_half_pad_wh)
975 restored_boxes = Concat <axis=-1> (offset_boxes_xy, boxes_wh_orxy)
976 scaled_boxes_coor = Mul (restored_boxes, ratios)
977 restored_boxes_res = Concat <axis=-1> (scaled_boxes_coor, boxes_score_class)
978
979 {self.output_names[0]} = Identity (restored_boxes_res)
980 }}
981 """
982 )
983 return converter_graph