microsoft/onnxruntime-extensions

Public

mirrored fromhttps://github.com/microsoft/onnxruntime-extensionsAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
rel-0.9

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

onnxruntime_extensions/tools/pre_post_processing/steps/vision.py

984lines · modecode

1# Copyright (c) Microsoft Corporation. All rights reserved.
2# Licensed under the MIT License.
3
4import onnx
5import numpy as np
6
7from typing import List, Optional, Tuple, Union
8from ..step import Step
9from .general import Transpose
10
11#
12# Image conversion
13#
14
15
16class ConvertImageToBGR(Step):
17 """
18 Convert the bytes of an image by decoding to BGR ordered uint8 values.
19 Supported input formats: jpg, png
20 Input shape: {num_encoded_bytes}
21 Output shape: {input_image_height, input_image_width, 3}
22 """
23
24 def __init__(self, name: Optional[str] = None):
25 """
26 Args:
27 name: Optional name of step. Defaults to 'ConvertImageToBGR'
28
29 NOTE: Input image format is inferred and does not need to be specified.
30 """
31 super().__init__(["image"], ["bgr_data"], name)
32
33 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
34 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
35 assert input_type_str == "uint8"
36 output_shape_str = f"to_bgr_ppp_{self.step_num}_h, to_bgr_ppp_{self.step_num}_w, 3"
37
38 converter_graph = onnx.parser.parse_graph(
39 f"""\
40 image_to_bgr (uint8[{input_shape_str}] {self.input_names[0]})
41 => (uint8[{output_shape_str}] {self.output_names[0]})
42 {{
43 {self.output_names[0]} = com.microsoft.extensions.DecodeImage({self.input_names[0]})
44 }}
45 """
46 )
47
48 return converter_graph
49
50
51class ConvertBGRToImage(Step):
52 """
53 Convert BGR ordered uint8 data into an encoded image.
54 Supported output input formats: jpg, png
55 Input shape: {input_image_height, input_image_width, 3}
56 Output shape: {num_encoded_bytes}
57 """
58
59 def __init__(self, image_format: str = "jpg", name: Optional[str] = None):
60 """
61 Args:
62 image_format: Format to encode to. jpg and png are supported.
63 name: Optional step name. Defaults to 'ConvertBGRToImage'
64 """
65 super().__init__(["bgr_data"], ["image"], name)
66 assert image_format == "jpg" or image_format == "png"
67 self._format = image_format
68
69 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
70 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
71 assert input_type_str == "uint8"
72 output_shape_str = f"to_image_ppp_{self.step_num}_num_bytes"
73
74 converter_graph = onnx.parser.parse_graph(
75 f"""\
76 bgr_to_image (uint8[{input_shape_str}] {self.input_names[0]})
77 => (uint8[{output_shape_str}] {self.output_names[0]})
78 {{
79 {self.output_names[0]} = com.microsoft.extensions.EncodeImage ({self.input_names[0]})
80 }}
81 """
82 )
83
84 # as this is a custom op we have to add the attribute for `format` directly to the node.
85 # parse_graph doesn't have a schema for the operator and fails attempting to validate the attribute.
86 format_attr = converter_graph.node[0].attribute.add()
87 format_attr.name = "format"
88 format_attr.type = onnx.AttributeProto.AttributeType.STRING
89 format_attr.s = bytes(self._format, "utf-8")
90
91 return converter_graph
92
93
94class PixelsToYCbCr(Step):
95 """
96 Convert RGB or BGR pixel data to YCbCr format.
97 Input shape: {height, width, 3}
98 Output shape is the same.
99 Output data is float, but rounded and clipped to the range 0..255 as per the spec for YCbCr conversion.
100 """
101
102 def __init__(self, layout: str = "BGR", name: Optional[str] = None):
103 """
104 Args:
105 layout: Input data layout. Can be 'BGR' or 'RGB'
106 name: Optional step name. Defaults to 'PixelsToYCbCr'
107 """
108 super().__init__(["pixels"], ["Y", "Cb", "Cr"], name)
109 assert layout == "RGB" or layout == "BGR"
110 self._layout = layout
111
112 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
113 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
114 # input should be uint8 data HWC
115 input_dims = input_shape_str.split(",")
116 assert input_type_str == "uint8" and len(input_dims) == 3 and input_dims[2] == "3"
117
118 # https://en.wikipedia.org/wiki/YCbCr
119 # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en
120 rgb_weights = np.array([[0.299, 0.587, 0.114],
121 [-0.299 / 1.772, -0.587 / 1.772, 0.500],
122 [0.500, -0.587 / 1.402, -0.114 / 1.402]],
123 dtype=np.float32) # fmt: skip
124
125 bias = [0.0, 128.0, 128.0]
126
127 if self._layout == "RGB":
128 weights = rgb_weights
129 else:
130 weights = rgb_weights[:, ::-1] # reverse the order of the last dim for BGR input
131
132 # Weights are transposed for usage in matmul.
133 weights_shape = "3, 3"
134 weights = ",".join([str(w) for w in weights.T.flatten()])
135
136 bias_shape = "3"
137 bias = ",".join([str(b) for b in bias])
138
139 # each output is {h, w}. TBD if input is CHW or HWC though. Once we figure that out we could copy values from
140 # the input shape
141 output_shape_str = f"YCbCr_ppp_{self.step_num}_h, YCbCr_ppp_{self.step_num}_w"
142 assert input_type_str == "uint8"
143
144 split_attr = "axis = -1"
145 if onnx_opset >= 18:
146 # Split now requires the number of outputs to be specified even though that can be easily inferred...
147 split_attr += ", num_outputs = 3"
148
149 # convert to float for MatMul
150 # apply weights and bias
151 # round and clip so it's in the range 0..255
152 # split into channels. shape will be {h, w, 1}
153 # remove the trailing '1' so output is {h, w}
154 converter_graph = onnx.parser.parse_graph(
155 f"""\
156 pixels_to_YCbCr (uint8[{input_shape_str}] {self.input_names[0]})
157 => (float[{output_shape_str}] {self.output_names[0]},
158 float[{output_shape_str}] {self.output_names[1]},
159 float[{output_shape_str}] {self.output_names[2]})
160 {{
161 kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> ()
162 kBias = Constant <value = float[{bias_shape}] {{{bias}}}> ()
163 i64_neg1 = Constant <value = int64[1] {{-1}}> ()
164 f_0 = Constant <value = float[1] {{0.0}}> ()
165 f_255 = Constant <value = float[1] {{255.0}}> ()
166
167 f_pixels = Cast <to = 1> ({self.input_names[0]})
168 f_weighted = MatMul(f_pixels, kWeights)
169 f_biased = Add(f_weighted, kBias)
170 f_rounded = Round(f_biased)
171 f_clipped = Clip (f_rounded, f_0, f_255)
172 split_Y, split_Cb, split_Cr = Split <{split_attr}>(f_clipped)
173 {self.output_names[0]} = Squeeze (split_Y, i64_neg1)
174 {self.output_names[1]} = Squeeze (split_Cb, i64_neg1)
175 {self.output_names[2]} = Squeeze (split_Cr, i64_neg1)
176 }}
177 """
178 )
179
180 return converter_graph
181
182
183class YCbCrToPixels(Step):
184 """
185 Convert YCbCr input to RGB or BGR.
186
187 Input data can be uint8 or float but all inputs must use the same type.
188 Input shape: {height, width, 3}
189 Output shape is the same.
190 """
191
192 def __init__(self, layout: str = "BGR", name: Optional[str] = None):
193 """
194 Args:
195 layout: Output layout. Can be 'BGR' or 'RGB'
196 name: Optional step name. Defaults to 'YCbCrToPixels'
197 """
198 super().__init__(["Y", "Cb", "Cr"], ["bgr_data"], name)
199 assert layout == "RGB" or layout == "BGR"
200 self._layout = layout
201
202 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
203 input_type_str0, input_shape_str0 = self._get_input_type_and_shape_strs(graph, 0)
204 input_type_str1, input_shape_str1 = self._get_input_type_and_shape_strs(graph, 1)
205 input_type_str2, input_shape_str2 = self._get_input_type_and_shape_strs(graph, 2)
206 assert (input_type_str0 == "uint8" and input_type_str1 == "uint8" and input_type_str2 == "uint8") or (
207 input_type_str0 == "float" and input_type_str1 == "float" and input_type_str2 == "float"
208 )
209
210 assert (
211 len(input_shape_str0.split(",")) == 2
212 and len(input_shape_str1.split(",")) == 2
213 and len(input_shape_str2.split(",")) == 2
214 )
215
216 output_shape_str = f"{input_shape_str0}, 3"
217
218 # fmt: off
219 # https://en.wikipedia.org/wiki/YCbCr
220 # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en
221 ycbcr_to_rgb_weights = np.array([[1, 0, 1.402],
222 [1, -0.114*1.772/0.587, -0.299*1.402/0.587],
223 [1, 1.772, 0]],
224 dtype=np.float32)
225 # fmt: on
226
227 # reverse first dim of weights for output to be bgr
228 ycbcr_to_bgr_weights = ycbcr_to_rgb_weights[::-1, :]
229
230 weights = ycbcr_to_bgr_weights if self._layout == "BGR" else ycbcr_to_rgb_weights
231 bias = [0.0, 128.0, 128.0]
232
233 weights_shape = "3, 3"
234 # transpose weights for use in matmul
235 weights = ",".join([str(w) for w in weights.T.flatten()])
236
237 bias_shape = "3"
238 bias = ",".join([str(b) for b in bias])
239
240 # unsqueeze the {h, w} inputs to add channels dim. new shape is {h, w, 1}
241 # merge Y, Cb, Cr data on the new channel axis
242 # convert to float to apply weights etc.
243 # remove bias
244 # apply weights
245 # round and clip to 0..255
246 # convert to uint8.
247 converter_graph = onnx.parser.parse_graph(
248 f"""\
249 YCbCr_to_RGB ({input_type_str0}[{input_shape_str0}] {self.input_names[0]},
250 {input_type_str1}[{input_shape_str1}] {self.input_names[1]},
251 {input_type_str2}[{input_shape_str2}] {self.input_names[2]})
252 => (uint8[{output_shape_str}] {self.output_names[0]})
253 {{
254 kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> ()
255 kBias = Constant <value = float[{bias_shape}] {{{bias}}}> ()
256 f_0 = Constant <value = float[1] {{0.0}}> ()
257 f_255 = Constant <value = float[1] {{255.0}}> ()
258 i64_neg1 = Constant <value = int64[1] {{-1}}> ()
259
260 Y1 = Unsqueeze({self.input_names[0]}, i64_neg1)
261 Cb1 = Unsqueeze({self.input_names[1]}, i64_neg1)
262 Cr1 = Unsqueeze({self.input_names[2]}, i64_neg1)
263 YCbCr = Concat <axis = -1> (Y1, Cb1, Cr1)
264 f_YCbCr = Cast <to = 1> (YCbCr)
265 f_unbiased = Sub (f_YCbCr, kBias)
266 f_pixels = MatMul (f_unbiased, kWeights)
267 f_rounded = Round (f_pixels)
268 clipped = Clip (f_rounded, f_0, f_255)
269 {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped)
270 }}
271 """
272 )
273
274 return converter_graph
275
276
277#
278# Pre-processing
279#
280class Resize(Step):
281 """
282 Resize input data. Aspect ratio is maintained.
283 e.g. if image is 1200 x 600 and 300 x 300 is requested the result will be 600 x 300
284 """
285
286 def __init__(self, resize_to: Union[int, Tuple[int, int]], layout: str = "HWC",
287 policy: str = "not_smaller", name: Optional[str] = None):
288 """
289 Args:
290 resize_to: Target size. Can be a single value or a tuple with (target_height, target_width).
291 The aspect ratio will be maintained and neither height or width in the result will be smaller
292 than the requested value.
293 layout: Input layout. 'NCHW', 'NHWC', 'CHW', 'HWC' and 'HW' are supported.
294 policy: not_smaller (default)
295 the sizes are adjusted so that no extent of the output is larger than the specified size,
296 while keeping the original aspect ratio
297 not_larger
298 the sizes are adjusted so that no extent of the output is smaller than the specified size,
299 while keeping the original aspect ratio.
300 Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#Resize for more details.
301 name: Optional name. Defaults to 'Resize'
302 """
303 super().__init__(["image"], ["resized_image"], name)
304 if isinstance(resize_to, int):
305 self._height = self._width = resize_to
306 else:
307 assert isinstance(resize_to, tuple)
308 self._height, self._width = resize_to
309
310 self._layout = layout
311 self.policy_ = policy
312
313 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
314 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
315 dims = input_shape_str.split(",")
316
317 # adjust for layout
318 # resize will use the largest ratio so both sides won't necessarily match the requested height and width.
319 # use symbolic names for the output dims as we have to provide values. prefix the names to try and
320 # avoid any clashes.
321 add_batch_dim = False
322
323 if self._layout == "NHWC":
324 assert len(dims) == 4
325 split_str = "n, h, w, c"
326 sizes_str = "n, h2, w2, c"
327 output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}"
328 elif self._layout == "NCHW":
329 assert len(dims) == 4
330 split_str = "n, c, h, w"
331 sizes_str = "n, c, h2, w2"
332 output_shape_str = f"{dims[0]}, {dims[1]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
333 elif self._layout == "HWC":
334 assert len(dims) == 3
335 add_batch_dim = True
336 split_str = "h, w, c"
337 sizes_str = "h2, w2, c"
338 output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}"
339 elif self._layout == "CHW":
340 assert len(dims) == 3
341 add_batch_dim = True
342 split_str = "c, h, w"
343 sizes_str = "c, h2, w2"
344 output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
345 elif self._layout == "HW":
346 assert len(dims) == 2
347 split_str = "h, w"
348 sizes_str = "h2, w2"
349 output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w"
350 else:
351 raise ValueError(f"Unsupported layout of {self._layout}")
352
353 # TODO: Make this configurable. Matching PIL resize for now.
354 resize_attributes = 'mode = "linear", nearest_mode = "floor"'
355 if onnx_opset >= 18:
356 # Resize matches PIL better if antialiasing is used, but that isn't available until ONNX opset 18.
357 # Allow this to be used with older opsets as well.
358 resize_attributes += ', antialias = 1'
359
360 u64_1_str = ""
361
362 # Rank 3 input uses trilinear interpolation, so if input is HWC or CHW we need to add a temporary batch dim
363 # to make it rank 4, which will result in Resize using the desired bilinear interpolation.
364 if add_batch_dim:
365 u64_1_str = "u64_1 = Constant <value = int64[1] {1}> ()"
366 sizes_str = "u64_1, " + sizes_str
367 resize_str = \
368 f"""\
369 axes = Constant <value = int64[1] {{{0}}}> ()
370 unsqueezed = Unsqueeze ({self.input_names[0]}, axes)
371 resized = Resize <{resize_attributes}> (unsqueezed, , , sizes_resize)
372 {self.output_names[0]} = Squeeze (resized, axes)
373 """
374 else:
375 resize_str = \
376 f"{self.output_names[0]} = Resize <{resize_attributes}> ({self.input_names[0]}, , , sizes_resize)"
377
378 split_input_shape_attr = "axis = 0"
379 split_new_sizes_attr = "axis = 0"
380 if onnx_opset >= 18:
381 # Split now requires the number of outputs to be specified even though that can be easily inferred...
382 split_input_shape_attr += f", num_outputs = {len(dims)}"
383 split_new_sizes_attr += ", num_outputs = 2"
384
385 # Resize-18 has the attribute "not_larger/not_smaller" to specify the resize policy, however
386 # we want to support older opsets as well.
387 assert (self.policy_ in ["not_smaller", "not_larger"],
388 f"Unsupported resize policy of {self.policy_}, must be 'not_smaller' or 'not_larger'")
389 ratio_resize_func = "ReduceMax"
390 if self.policy_ == "not_larger":
391 ratio_resize_func = "ReduceMin"
392
393 resize_graph = onnx.parser.parse_graph(
394 f"""\
395 resize ({input_type_str}[{input_shape_str}] {self.input_names[0]}) =>
396 ({input_type_str}[{output_shape_str}] {self.output_names[0]})
397 {{
398 target_size = Constant <value = float[2] {{{float(self._height)}, {float(self._width)}}}> ()
399 image_shape = Shape ({self.input_names[0]})
400 {split_str} = Split <{split_input_shape_attr}> (image_shape)
401 hw = Concat <axis = 0> (h, w)
402 f_hw = Cast <to = 1> (hw)
403 ratios = Div (target_size, f_hw)
404 ratio_resize = {ratio_resize_func} (ratios)
405 f_hw2_exact = Mul (f_hw, ratio_resize)
406 f_hw2_round = Round (f_hw2_exact)
407 hw2 = Cast <to = 7> (f_hw2_round)
408 h2, w2 = Split <{split_new_sizes_attr}> (hw2)
409 {u64_1_str}
410 sizes_resize = Concat <axis = 0> ({sizes_str})
411 {resize_str}
412 }}
413 """
414 )
415
416 return resize_graph
417
418
419class CenterCrop(Step):
420 """
421 Crop the input to the requested dimensions, with the crop being centered.
422 Currently only HWC input is handled.
423 """
424
425 def __init__(self, height: int, width: int, name: Optional[str] = None):
426 """
427 Args:
428 height: Height of area to crop.
429 width: Width of area to crop.
430 name: Optional step name. Defaults to 'CenterCrop'
431 """
432 super().__init__(["image"], ["cropped_image"], name)
433 self._height = height
434 self._width = width
435
436 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
437 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
438 dims = input_shape_str.split(",")
439 output_shape_str = f"{self._height}, {self._width}, {dims[-1]}"
440
441 crop_graph = onnx.parser.parse_graph(
442 f"""\
443 crop ({input_type_str}[{input_shape_str}] {self.input_names[0]})
444 => ({input_type_str}[{output_shape_str}] {self.output_names[0]})
445 {{
446 target_crop = Constant <value = int64[2] {{{self._height}, {self._width}}}> ()
447 i64_2 = Constant <value = int64[1] {{2}}> ()
448 axes = Constant <value = int64[2] {{0, 1}}> ()
449 x_shape = Shape ({self.input_names[0]})
450 hw = Gather (x_shape, axes)
451 hw_diff = Sub (hw, target_crop)
452 start_xy = Div (hw_diff, i64_2)
453 end_xy = Add (start_xy, target_crop)
454 {self.output_names[0]} = Slice ({self.input_names[0]}, start_xy, end_xy, axes)
455 }}
456 """
457 )
458
459 return crop_graph
460
461
462class Normalize(Step):
463 """
464 Normalize input data on a per-channel basis.
465 `x -> (x - mean) / stddev`
466 Output is float with same shape as input.
467 """
468
469 def __init__(self, normalization_values: List[Tuple[float, float]], layout: str = "CHW", name: Optional[str] = None):
470 """
471 Args:
472 normalization_values: Tuple with (mean, stddev). One entry per channel.
473 If single entry is provided it will be used for all channels.
474 layout: Input layout. Can be 'CHW' or 'HWC'
475 name: Optional step name. Defaults to 'Normalize'
476 """
477 super().__init__(["data"], ["normalized_data"], name)
478
479 # duplicate for each channel if needed
480 if len(normalization_values) == 1:
481 normalization_values *= 3
482
483 assert len(normalization_values) == 3
484 self._normalization_values = normalization_values
485 assert layout == "HWC" or layout == "CHW"
486 self._hwc_layout = True if layout == "HWC" else False
487
488 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
489 mean0 = self._normalization_values[0][0]
490 mean1 = self._normalization_values[1][0]
491 mean2 = self._normalization_values[2][0]
492 stddev0 = self._normalization_values[0][1]
493 stddev1 = self._normalization_values[1][1]
494 stddev2 = self._normalization_values[2][1]
495
496 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
497 values_shape = "3" if self._hwc_layout else "3, 1, 1"
498
499 normalize_graph = onnx.parser.parse_graph(
500 f"""\
501 normalize ({input_type_str}[{input_shape_str}] {self.input_names[0]})
502 => (float[{input_shape_str}] {self.output_names[0]})
503 {{
504 kMean = Constant <value = float[{values_shape}] {{{mean0}, {mean1}, {mean2}}}> ()
505 kStddev = Constant <value = float[{values_shape}] {{{stddev0}, {stddev1}, {stddev2}}}> ()
506 f_input = Cast <to = 1> ({self.input_names[0]})
507 f_sub_mean = Sub (f_input, kMean)
508 {self.output_names[0]} = Div (f_sub_mean, kStddev)
509 }}
510 """
511 )
512
513 onnx.checker.check_graph(normalize_graph)
514 return normalize_graph
515
516
517#
518# Utilities
519#
520class ImageBytesToFloat(Step):
521 """
522 Convert uint8 or float values in range 0..255 to floating point values in range 0..1
523 """
524
525 def __init__(self, rescale_factor: float = 1/255, name: Optional[str] = None):
526 """
527 Args:
528 name: Optional step name. Defaults to 'ImageBytesToFloat'
529 """
530 super().__init__(["data"], ["float_data"], name)
531 self.rescale_factor_ = rescale_factor
532
533 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
534 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
535 if input_type_str == "uint8":
536 optional_cast = f"""\
537 input_f = Cast <to = 1> ({self.input_names[0]})
538 """
539 else:
540 # no-op that optimizer will remove
541 optional_cast = f"input_f = Identity ({self.input_names[0]})"
542
543 byte_to_float_graph = onnx.parser.parse_graph(
544 f"""\
545 byte_to_float ({input_type_str}[{input_shape_str}] {self.input_names[0]})
546 => (float[{input_shape_str}] {self.output_names[0]})
547 {{
548 f_scale = Constant <value = float[1] {{{self.rescale_factor_}}}>()
549
550 {optional_cast}
551 {self.output_names[0]} = Mul(input_f, f_scale)
552 }}
553 """
554 )
555
556 onnx.checker.check_graph(byte_to_float_graph)
557 return byte_to_float_graph
558
559
560class FloatToImageBytes(Step):
561 """
562 Converting floating point values to uint8 values in range 0..255.
563 Typically this reverses ImageBytesToFloat by converting input data in the range 0..1, but an optional multiplier
564 can be specified if the input data has a different range.
565 Values will be rounded prior to clipping and conversion to uint8.
566 """
567
568 def __init__(self, multiplier: float = 255.0, name: Optional[str] = None):
569 """
570 Args:
571 multiplier: Optional multiplier. Currently, the expected values are 255 (input data is in range 0..1), or
572 1 (input data is in range 0..255).
573 name: Optional step name. Defaults to 'FloatToImageBytes'
574 """
575 super().__init__(["float_data"], ["pixel_data"], name)
576 self._multiplier = multiplier
577
578 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
579 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0)
580 assert input_type_str == "float"
581
582 if self._multiplier == 1.0:
583 scale_input = ''
584 scaled_input_name = self.input_names[0]
585 else:
586 scale_input = \
587 f"""\
588 f_multiplier = Constant <value = float[1] {{{self._multiplier}}}> ()
589 scaled_input = Mul ({self.input_names[0]}, f_multiplier)
590 """
591 scaled_input_name = 'scaled_input'
592
593 float_to_byte_graphs = onnx.parser.parse_graph(
594 f"""\
595 float_to_type (float[{input_shape_str}] {self.input_names[0]})
596 => (uint8[{input_shape_str}] {self.output_names[0]})
597 {{
598 f_0 = Constant <value = float[1] {{0.0}}> ()
599 f_255 = Constant <value = float[1] {{255.0}}>()
600
601 {scale_input}
602 rounded = Round ({scaled_input_name})
603 clipped = Clip (rounded, f_0, f_255)
604 {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped)
605 }}
606 """
607 )
608
609 onnx.checker.check_graph(float_to_byte_graphs)
610 return float_to_byte_graphs
611
612
613class ChannelsLastToChannelsFirst(Transpose):
614 """
615 Convert channels last data to channels first.
616 Input can be NHWC or HWC.
617 """
618
619 def __init__(self, has_batch_dim: bool = False, name: Optional[str] = None):
620 """
621 Args:
622 has_batch_dim: Set to True if the input has a batch dimension (i.e. is NHWC)
623 name: Optional step name. Defaults to 'ChannelsLastToChannelsFirst'
624 """
625 perms = [0, 3, 1, 2] if has_batch_dim else [2, 0, 1]
626 super().__init__(perms, name)
627
628
629class DrawBoundingBoxes(Step):
630 """
631 Draw boxes on BGR image at given position, image is channel last and ordered by BGR.
632 Input shape: <uint8_t>{height, width, 3<BGR>}
633 boxes: <float>{num_boxes, 6<x, y, x/w, y/h, score, class>}
634 The coordinates is the absolute pixel values in the picture. Its value is determined by `mode`.
635 we have different modes to represent the coordinates of the box.[XYXY, XYWH, CENTER_XYWH].
636 Please refer to the following link for more details. https://keras.io/api/keras_cv/bounding_box/formats/
637 **score** is the confidence of the box(object score * class probability) and **class** is the class of the box.
638
639 Output shape: <uint8_t>{height, width, 3<BGR>}
640 """
641
642 def __init__(self, mode: str = "XYXY", thickness: int = 4, num_classes: int = 10,
643 colour_by_classes=False, name: Optional[str] = None):
644 """
645 Args:
646 mode: The mode of the boxes,
647 "XYXY" (xmin ymin xmax ymax) All values in the XYXY format should be absolute pixel values.
648 "XYWH" (xmin ymin width height)
649 "CENTER_XYWH" (x_center, y_center, width, height)
650 All values in the CENTER_XYWH format should be absolute pixel values.
651
652
653 thickness: Thickness of the box edge
654 num_colours: Number of colours to use
655 We support 10 predefined colours and the other classes more than 10 wouldn't be drawn.
656 colors are [Red, Yellow, Lime, Cyan, Blue, Magenta, Orange, Maroon, Green, Navy]
657 and are used in that order. i.e. result with best score will use red.
658 colour_by_classes: Colour boxes by classes or by score.
659 If `True` we use a colour for each unique class, with all results from the top
660 `num_colours` classes displayed. A colour is only used for a single class.
661 If `False`, we draw boxes for the top `num_colours` results. A colour is used
662 for a single result, regardless of class.
663 name: Optional name of step. Defaults to 'DrawBoundingBoxes'
664 """
665 super().__init__(["image", "boxes"], ["image_out"], name)
666 self.thickness_ = thickness
667 self.num_classes_ = num_classes
668 self.colour_by_classes_ = colour_by_classes
669 self.mode_ = mode
670
671 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
672 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
673 input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1)
674 assert input0_type_str == "uint8" and input1_type_str == "float"
675
676 assert str(input1_shape_str.split(",")[-1]) == "6"
677
678
679 output_shape_str = input0_shape_str
680 converter_graph = onnx.parser.parse_graph(
681 f"""\
682 bounding_box (uint8[{input0_shape_str}] {self.input_names[0]}, float[{input1_shape_str}] {self.input_names[1]})
683 => (uint8[{output_shape_str}] {self.output_names[0]})
684 {{
685 {self.output_names[0]} = com.microsoft.extensions.DrawBoundingBoxes({self.input_names[0]}, {self.input_names[1]})
686 }}
687 """
688 )
689 op_attr = ["thickness", "num_classes", "colour_by_classes","mode"]
690 token_model_attr = []
691 token_model_attr.append(onnx.helper.make_attribute(op_attr[0], self.thickness_))
692 token_model_attr.append(onnx.helper.make_attribute(op_attr[1], self.num_classes_))
693 token_model_attr.append(onnx.helper.make_attribute(op_attr[2], int(self.colour_by_classes_)))
694 token_model_attr.append(onnx.helper.make_attribute(op_attr[3], self.mode_))
695 converter_graph.node[0].attribute.extend(token_model_attr)
696
697 return converter_graph
698
699
700class LetterBox(Step):
701 """
702 Image is channel last and ordered by BGR.
703 mainly used in object detection, it mostly follows behind resize operation.
704 This step either add border or crop the image to satisfy network input.
705 ----- bbbbbbbbb
706 |img| --- > bb-----bb
707 ----- bb|img|bb
708 bb-----bb
709 bbbbbbbbb
710 If target_shape is less than the original image, it will crop the image in a center mode.
711 And the padding values will be negative and the Pad op performs cropping.
712
713 Input shape: <uint8_t>{height, width, 3<BGR>}
714 target_shape: <uint8_t>{out_height, out_width, 3<BGR>}
715 Output shape: specified by target_shape
716 """
717
718 def __init__(self, target_shape: Union[int, Tuple[int, int]], fill_value=0, name: Optional[str] = None):
719 """
720 Args:
721 target_shape: the size of the output image
722 fill_value: a constant value used to fill the border
723 name: Optional name of step. Defaults to 'LetterBox'
724 """
725 super().__init__(["image"], ["image_pad"], name)
726
727 self.target_shape_ = target_shape
728 self.fill_value_ = fill_value
729
730 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
731 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
732
733 assert len(input0_shape_str.split(',')) == 3, " expected BGR image"
734
735 target_shape_str = f"{self.target_shape_[0]}, {self.target_shape_[1]}, 3"
736
737 split_input_shape_attr = "axis = 0"
738 if onnx_opset >= 18:
739 # Split now requires the number of outputs to be specified even though that can be easily inferred...
740 split_input_shape_attr += f", num_outputs = 3"
741
742 converter_graph = onnx.parser.parse_graph(
743 f"""\
744 LetterBox (uint8[{input0_shape_str}] {self.input_names[0]})
745 => (uint8[{target_shape_str}] {self.output_names[0]})
746 {{
747 target_size = Constant <value = int64[2] {{{(self.target_shape_[0])}, {(self.target_shape_[1])}}}> ()
748 i64_2 = Constant <value = int64[1] {{2}}>()
749 i64_0 = Constant <value = int64[1] {{0}}>()
750 const_val = Constant <value = uint8[1] {{{self.fill_value_}}}> ()
751 image_shape = Shape ({self.input_names[0]})
752 h,w,c = Split <{split_input_shape_attr}> (image_shape)
753 hw = Concat <axis = 0> (h, w)
754 pad_hw = Sub (target_size, hw)
755 half_pad_hw = Div (pad_hw, i64_2)
756 remainder_pad_hw = Sub (pad_hw, half_pad_hw)
757 pad_value = Concat <axis = 0> (half_pad_hw, i64_0,remainder_pad_hw,i64_0)
758 {self.output_names[0]} = Pad({self.input_names[0]}, pad_value, const_val)
759 }}
760 """
761 )
762
763 return converter_graph
764
765
766class SplitOutBoxAndScore(Step):
767 r"""
768 Split the output of the model into boxes and scores. This step will also handle the optional object score.
769 Input shape: <float>{num_boxes, 4/5+num_classes}
770 Output shape: <float>{num_boxes, 4}, <float>{num_boxes, num_classes}
771 |x1,x2,x3,x4, (obj), cls_1, ... cls_num|
772 /\
773 / \
774 |x1,x2,x3,x4| |cls_1, ... clx_num|*(obj)
775 obj is optional, if it is not present, it will be set to 1.0
776 This is where 4/5 comes from, '4' represent coordinates and the fifth object probability.
777 """
778 def __init__(self, num_classes:int = 80, name: Optional[str] = None):
779 """
780 Args:
781 num_classes: number of classes
782 name: Optional name of step. Defaults to 'SplitOutBoxAndScore'
783 """
784
785 super().__init__(["box_and_score"], ["_pre_boxes", "_pre_scores"], name)
786 self.num_classes_ = num_classes
787
788 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
789 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
790
791 input_shape_list = input0_shape_str.split(',')
792 assert len(input_shape_list) == 2, " expected [num_boxes, 4/5+num_classes]"
793
794 target_shape_str_0 = f"{input_shape_list[0]}, 4"
795 target_shape_str_1 = f"{input_shape_list[0]}, _{self._step_num}_class"
796
797 converter_graph = onnx.parser.parse_graph(
798 f"""\
799 SplitOutBoxAndScore (float[{input0_shape_str}] {self.input_names[0]})
800 => (float[{target_shape_str_0}] {self.output_names[0]}, float[{target_shape_str_1}] {self.output_names[1]})
801 {{
802
803 i64_neg1 = Constant <value = int64[1] {{-1}}>()
804 i64_4 = Constant <value = int64[1] {{4}}>()
805 i64_0 = Constant <value = int64[1] {{0}}>()
806 fp32_1 = Constant <value = float[1] {{1.0}}>()
807 i64_classes = Constant <value = int64[1] {{{self.num_classes_}}}>()
808 out_shape = Shape ({self.input_names[0]})
809 class_and_coor_dim = Gather (out_shape, i64_neg1)
810 coor_and_obj = Sub (class_and_coor_dim, i64_classes)
811 obj_0_or_1 = Sub (coor_and_obj, i64_4)
812 bool_num_obj_0_or_1 = Cast<to=9>(obj_0_or_1)
813
814 box_obj_class_concat = Concat <axis = 0> (i64_4, obj_0_or_1, i64_classes)
815 boxes_o, scores_obj_o, scores_cls_o = Split <axis = -1> ({self.input_names[0]}, box_obj_class_concat)
816 scores_obj_not_null = Concat <axis = -1> (scores_obj_o, boxes_o)
817 coef_obj_cat = Where(bool_num_obj_0_or_1, scores_obj_not_null,fp32_1)
818 coef_obj = Gather <axis=-1> (coef_obj_cat, i64_0)
819 scores_o = Mul (scores_cls_o, coef_obj)
820 {self.output_names[0]} = Identity (boxes_o)
821 {self.output_names[1]} = Identity (scores_o)
822
823 }}
824 """
825 )
826 return converter_graph
827
828
829class SelectBestBoundingBoxesByNMS(Step):
830 """
831 Non-maximum suppression (NMS) is to filter out redundant bounding boxes.
832 This step is used to warp the boxes and scores into onnx SelectBestBoundingBoxesByNMS op.
833 Input:
834 boxes: float[num_boxes, 4]
835 scores: shape float[num_boxes, num_classes]
836
837 Output:
838 nms_out: float[_few_num_boxes, 6<coordinate+score+class>]
839 """
840
841 def __init__(self, iou_threshold:float = 0.5, score_threshold:float = 0.67,
842 max_detections:int = 300, name: Optional[str] = None):
843 """
844 Args:
845 Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#SelectBestBoundingBoxesByNMS
846 for more details about the parameters.
847 iou_threshold: same as SelectBestBoundingBoxesByNMS op, intersection /union of boxes
848 score_threshold: If this box's score is lower than score_threshold, it will be removed.
849 max_detections: max number of boxes to be selected
850 name: Optional name of step. Defaults to 'SelectBestBoundingBoxesByNMS'
851 """
852 super().__init__(["boxes", "scores"], ["nms_out"], name)
853 self.iou_threshold_ = iou_threshold
854 self.score_threshold_ = score_threshold
855 self.max_detections_ = max_detections
856
857
858 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
859 input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0)
860 input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1)
861
862 input0_shape_list = input0_shape_str.split(',')
863 assert len(input0_shape_list) == 2, " expected [num_boxes, 4]"
864
865 target_shape_str = f"_{self._step_num}_nms_boxes, 6"
866
867 reduce_score = '(score_select_nm,i64_neg1)' if onnx_opset >= 18 else '<axes=[-1]>(score_select_nm)'
868
869 converter_graph = onnx.parser.parse_graph(
870 f"""\
871 SelectBestBoundingBoxesByNMS (float[{input0_shape_str}] {self.input_names[0]},float[{input1_shape_str}] {self.input_names[1]})
872 => (float[{target_shape_str}] {self.output_names[0]})
873 {{
874 i64_2 = Constant <value = int64[1] {{2}}>()
875 i64_0 = Constant <value = int64[1] {{0}}>()
876 i64_1 = Constant <value = int64[1] {{1}}>()
877 i64_max_obj = Constant <value = int64[1] {{{self.max_detections_}}}>()
878 i64_neg1 = Constant <value = int64[1] {{-1}}>()
879 fp32_iou_th = Constant <value = float[1] {{{self.iou_threshold_}}}>()
880 fp32_score_th = Constant <value = float[1] {{{self.score_threshold_}}}>()
881
882 boxes_i = Identity ({self.input_names[0]})
883 scores_i = Identity({self.input_names[1]})
884 scores_c_b = Transpose<perm=[1,0]>(scores_i)
885 batch_boxes = Unsqueeze(boxes_i, i64_0)
886 batch_scores = Unsqueeze(scores_c_b, i64_0)
887
888 nmsbox = NonMaxSuppression<center_point_box =1>(batch_boxes, batch_scores, i64_max_obj,fp32_iou_th,fp32_score_th)
889 classes_i64 = Gather <axis=-1>(nmsbox,i64_1)
890 class_select = Cast <to = 1>(classes_i64)
891
892 boxes_idx_us = Gather <axis=-1>(nmsbox,i64_2)
893 boxes_idx = Squeeze(boxes_idx_us, i64_neg1)
894 boxes_select = Gather <axis=0>(boxes_i, boxes_idx)
895
896 score_select_nm = Gather <axis=0>(scores_i, boxes_idx)
897 score_select = ReduceMax{reduce_score}
898
899 {self.output_names[0]} = Concat <axis = -1> (boxes_select, score_select, class_select)
900 }}
901 """
902 )
903 return converter_graph
904
905
906class ScaleBoundingBoxes(Step):
907 """
908 Mapping boxes coordinate to scale in original image.
909 The coordinate of boxes from detection model is relative to the input image of network,
910 image is scaled and padded/cropped. So we need to do a linear mapping to get the real coordinate of original image.
911 input:
912 box_of_nms_out: output of NMS, shape [num_boxes, 6]
913 original_image: original image decoded from jpg/png<uint8_t>[H, W, 3<BGR>]
914 scaled_image: scaled image, but without padding/crop[<uint8_t>[H1, W1, 3<BGR>]
915 letter_boxed_image: scaled image and with padding/crop[<uint8_t>[H2, W3, 3<BGR>]
916
917 output:
918 scaled_box_out: shape [num_boxes, 6] with coordinate mapped to original image.
919 """
920
921 def __init__(self, name: Optional[str] = None):
922 """
923 Args:
924 name: Optional name of step. Defaults to 'ScaleBoundingBoxes'
925 """
926 super().__init__(["box_of_nms_out", "original_image", "scaled_image",
927 "letter_boxed_image"], ["scaled_box_out"], name)
928
929 def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int):
930 graph_input_param = []
931 target_shape = []
932 for idx,input_name in enumerate(self.input_names):
933 input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, idx)
934 graph_input_param.append(f"{input_type_str}[{input_shape_str}] {input_name}")
935 target_shape.append(input_shape_str)
936 graph_input_param = ','.join(graph_input_param)
937
938 target_shape = target_shape[:1]
939 graph_output_param = []
940 for idx,output_name in enumerate(self.output_names):
941 graph_output_param.append(f"float[{target_shape[idx]}] {output_name}")
942 graph_output_param = ','.join(graph_output_param)
943
944 def split_num_ouputs(num_outputs: int):
945 split_input_shape_attr= ''
946 if onnx_opset >= 18:
947 split_input_shape_attr = f", num_outputs = {num_outputs}"
948 return split_input_shape_attr
949
950 converter_graph = onnx.parser.parse_graph(
951 f"""\
952 ScaleBoundingBoxes ({graph_input_param})
953 => ({graph_output_param})
954 {{
955 i64_2 = Constant <value = int64[1] {{2}}>()
956
957 ori_shape = Shape ({self.input_names[1]})
958 scaled_shape = Shape ({self.input_names[2]})
959 lettered_shape = Shape ({self.input_names[3]})
960 oh,ow,oc = Split <axis = 0 {split_num_ouputs(3)}> (ori_shape)
961 sh,sw,sc = Split <axis = 0 {split_num_ouputs(3)}> (scaled_shape)
962 lh,lw,lc = Split <axis = 0 {split_num_ouputs(3)}> (lettered_shape)
963 swh = Concat <axis = -1> (sw,sh)
964 lwh = Concat <axis = -1> (lw,lh)
965
966 f_oh = Cast <to = 1> (oh)
967 f_sh = Cast <to = 1> (sh)
968 ratios = Div (f_oh, f_sh)
969
970 pad_wh = Sub (lwh, swh)
971 half_pad_wh = Div (pad_wh, i64_2)
972 f_half_pad_wh = Cast <to = 1> (half_pad_wh)
973
974 boxes_xy,boxes_wh_orxy,boxes_score_class = Split <axis=-1 {split_num_ouputs(3)}>({self.input_names[0]})
975 offset_boxes_xy = Sub (boxes_xy, f_half_pad_wh)
976 restored_boxes = Concat <axis=-1> (offset_boxes_xy, boxes_wh_orxy)
977 scaled_boxes_coor = Mul (restored_boxes, ratios)
978 restored_boxes_res = Concat <axis=-1> (scaled_boxes_coor, boxes_score_class)
979
980 {self.output_names[0]} = Identity (restored_boxes_res)
981 }}
982 """
983 )
984 return converter_graph