microsoft/onnxruntime-extensions
Publicmirrored fromhttps://github.com/microsoft/onnxruntime-extensionsAvailable
onnxruntime_extensions/tools/pre_post_processing/steps/vision.py
984lines · modecode
| 1 | # Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | # Licensed under the MIT License. |
| 3 | |
| 4 | import onnx |
| 5 | import numpy as np |
| 6 | |
| 7 | from typing import List, Optional, Tuple, Union |
| 8 | from ..step import Step |
| 9 | from .general import Transpose |
| 10 | |
| 11 | # |
| 12 | # Image conversion |
| 13 | # |
| 14 | |
| 15 | |
| 16 | class ConvertImageToBGR(Step): |
| 17 | """ |
| 18 | Convert the bytes of an image by decoding to BGR ordered uint8 values. |
| 19 | Supported input formats: jpg, png |
| 20 | Input shape: {num_encoded_bytes} |
| 21 | Output shape: {input_image_height, input_image_width, 3} |
| 22 | """ |
| 23 | |
| 24 | def __init__(self, name: Optional[str] = None): |
| 25 | """ |
| 26 | Args: |
| 27 | name: Optional name of step. Defaults to 'ConvertImageToBGR' |
| 28 | |
| 29 | NOTE: Input image format is inferred and does not need to be specified. |
| 30 | """ |
| 31 | super().__init__(["image"], ["bgr_data"], name) |
| 32 | |
| 33 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 34 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 35 | assert input_type_str == "uint8" |
| 36 | output_shape_str = f"to_bgr_ppp_{self.step_num}_h, to_bgr_ppp_{self.step_num}_w, 3" |
| 37 | |
| 38 | converter_graph = onnx.parser.parse_graph( |
| 39 | f"""\ |
| 40 | image_to_bgr (uint8[{input_shape_str}] {self.input_names[0]}) |
| 41 | => (uint8[{output_shape_str}] {self.output_names[0]}) |
| 42 | {{ |
| 43 | {self.output_names[0]} = com.microsoft.extensions.DecodeImage({self.input_names[0]}) |
| 44 | }} |
| 45 | """ |
| 46 | ) |
| 47 | |
| 48 | return converter_graph |
| 49 | |
| 50 | |
| 51 | class ConvertBGRToImage(Step): |
| 52 | """ |
| 53 | Convert BGR ordered uint8 data into an encoded image. |
| 54 | Supported output input formats: jpg, png |
| 55 | Input shape: {input_image_height, input_image_width, 3} |
| 56 | Output shape: {num_encoded_bytes} |
| 57 | """ |
| 58 | |
| 59 | def __init__(self, image_format: str = "jpg", name: Optional[str] = None): |
| 60 | """ |
| 61 | Args: |
| 62 | image_format: Format to encode to. jpg and png are supported. |
| 63 | name: Optional step name. Defaults to 'ConvertBGRToImage' |
| 64 | """ |
| 65 | super().__init__(["bgr_data"], ["image"], name) |
| 66 | assert image_format == "jpg" or image_format == "png" |
| 67 | self._format = image_format |
| 68 | |
| 69 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 70 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 71 | assert input_type_str == "uint8" |
| 72 | output_shape_str = f"to_image_ppp_{self.step_num}_num_bytes" |
| 73 | |
| 74 | converter_graph = onnx.parser.parse_graph( |
| 75 | f"""\ |
| 76 | bgr_to_image (uint8[{input_shape_str}] {self.input_names[0]}) |
| 77 | => (uint8[{output_shape_str}] {self.output_names[0]}) |
| 78 | {{ |
| 79 | {self.output_names[0]} = com.microsoft.extensions.EncodeImage ({self.input_names[0]}) |
| 80 | }} |
| 81 | """ |
| 82 | ) |
| 83 | |
| 84 | # as this is a custom op we have to add the attribute for `format` directly to the node. |
| 85 | # parse_graph doesn't have a schema for the operator and fails attempting to validate the attribute. |
| 86 | format_attr = converter_graph.node[0].attribute.add() |
| 87 | format_attr.name = "format" |
| 88 | format_attr.type = onnx.AttributeProto.AttributeType.STRING |
| 89 | format_attr.s = bytes(self._format, "utf-8") |
| 90 | |
| 91 | return converter_graph |
| 92 | |
| 93 | |
| 94 | class PixelsToYCbCr(Step): |
| 95 | """ |
| 96 | Convert RGB or BGR pixel data to YCbCr format. |
| 97 | Input shape: {height, width, 3} |
| 98 | Output shape is the same. |
| 99 | Output data is float, but rounded and clipped to the range 0..255 as per the spec for YCbCr conversion. |
| 100 | """ |
| 101 | |
| 102 | def __init__(self, layout: str = "BGR", name: Optional[str] = None): |
| 103 | """ |
| 104 | Args: |
| 105 | layout: Input data layout. Can be 'BGR' or 'RGB' |
| 106 | name: Optional step name. Defaults to 'PixelsToYCbCr' |
| 107 | """ |
| 108 | super().__init__(["pixels"], ["Y", "Cb", "Cr"], name) |
| 109 | assert layout == "RGB" or layout == "BGR" |
| 110 | self._layout = layout |
| 111 | |
| 112 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 113 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 114 | # input should be uint8 data HWC |
| 115 | input_dims = input_shape_str.split(",") |
| 116 | assert input_type_str == "uint8" and len(input_dims) == 3 and input_dims[2] == "3" |
| 117 | |
| 118 | # https://en.wikipedia.org/wiki/YCbCr |
| 119 | # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en |
| 120 | rgb_weights = np.array([[0.299, 0.587, 0.114], |
| 121 | [-0.299 / 1.772, -0.587 / 1.772, 0.500], |
| 122 | [0.500, -0.587 / 1.402, -0.114 / 1.402]], |
| 123 | dtype=np.float32) # fmt: skip |
| 124 | |
| 125 | bias = [0.0, 128.0, 128.0] |
| 126 | |
| 127 | if self._layout == "RGB": |
| 128 | weights = rgb_weights |
| 129 | else: |
| 130 | weights = rgb_weights[:, ::-1] # reverse the order of the last dim for BGR input |
| 131 | |
| 132 | # Weights are transposed for usage in matmul. |
| 133 | weights_shape = "3, 3" |
| 134 | weights = ",".join([str(w) for w in weights.T.flatten()]) |
| 135 | |
| 136 | bias_shape = "3" |
| 137 | bias = ",".join([str(b) for b in bias]) |
| 138 | |
| 139 | # each output is {h, w}. TBD if input is CHW or HWC though. Once we figure that out we could copy values from |
| 140 | # the input shape |
| 141 | output_shape_str = f"YCbCr_ppp_{self.step_num}_h, YCbCr_ppp_{self.step_num}_w" |
| 142 | assert input_type_str == "uint8" |
| 143 | |
| 144 | split_attr = "axis = -1" |
| 145 | if onnx_opset >= 18: |
| 146 | # Split now requires the number of outputs to be specified even though that can be easily inferred... |
| 147 | split_attr += ", num_outputs = 3" |
| 148 | |
| 149 | # convert to float for MatMul |
| 150 | # apply weights and bias |
| 151 | # round and clip so it's in the range 0..255 |
| 152 | # split into channels. shape will be {h, w, 1} |
| 153 | # remove the trailing '1' so output is {h, w} |
| 154 | converter_graph = onnx.parser.parse_graph( |
| 155 | f"""\ |
| 156 | pixels_to_YCbCr (uint8[{input_shape_str}] {self.input_names[0]}) |
| 157 | => (float[{output_shape_str}] {self.output_names[0]}, |
| 158 | float[{output_shape_str}] {self.output_names[1]}, |
| 159 | float[{output_shape_str}] {self.output_names[2]}) |
| 160 | {{ |
| 161 | kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> () |
| 162 | kBias = Constant <value = float[{bias_shape}] {{{bias}}}> () |
| 163 | i64_neg1 = Constant <value = int64[1] {{-1}}> () |
| 164 | f_0 = Constant <value = float[1] {{0.0}}> () |
| 165 | f_255 = Constant <value = float[1] {{255.0}}> () |
| 166 | |
| 167 | f_pixels = Cast <to = 1> ({self.input_names[0]}) |
| 168 | f_weighted = MatMul(f_pixels, kWeights) |
| 169 | f_biased = Add(f_weighted, kBias) |
| 170 | f_rounded = Round(f_biased) |
| 171 | f_clipped = Clip (f_rounded, f_0, f_255) |
| 172 | split_Y, split_Cb, split_Cr = Split <{split_attr}>(f_clipped) |
| 173 | {self.output_names[0]} = Squeeze (split_Y, i64_neg1) |
| 174 | {self.output_names[1]} = Squeeze (split_Cb, i64_neg1) |
| 175 | {self.output_names[2]} = Squeeze (split_Cr, i64_neg1) |
| 176 | }} |
| 177 | """ |
| 178 | ) |
| 179 | |
| 180 | return converter_graph |
| 181 | |
| 182 | |
| 183 | class YCbCrToPixels(Step): |
| 184 | """ |
| 185 | Convert YCbCr input to RGB or BGR. |
| 186 | |
| 187 | Input data can be uint8 or float but all inputs must use the same type. |
| 188 | Input shape: {height, width, 3} |
| 189 | Output shape is the same. |
| 190 | """ |
| 191 | |
| 192 | def __init__(self, layout: str = "BGR", name: Optional[str] = None): |
| 193 | """ |
| 194 | Args: |
| 195 | layout: Output layout. Can be 'BGR' or 'RGB' |
| 196 | name: Optional step name. Defaults to 'YCbCrToPixels' |
| 197 | """ |
| 198 | super().__init__(["Y", "Cb", "Cr"], ["bgr_data"], name) |
| 199 | assert layout == "RGB" or layout == "BGR" |
| 200 | self._layout = layout |
| 201 | |
| 202 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 203 | input_type_str0, input_shape_str0 = self._get_input_type_and_shape_strs(graph, 0) |
| 204 | input_type_str1, input_shape_str1 = self._get_input_type_and_shape_strs(graph, 1) |
| 205 | input_type_str2, input_shape_str2 = self._get_input_type_and_shape_strs(graph, 2) |
| 206 | assert (input_type_str0 == "uint8" and input_type_str1 == "uint8" and input_type_str2 == "uint8") or ( |
| 207 | input_type_str0 == "float" and input_type_str1 == "float" and input_type_str2 == "float" |
| 208 | ) |
| 209 | |
| 210 | assert ( |
| 211 | len(input_shape_str0.split(",")) == 2 |
| 212 | and len(input_shape_str1.split(",")) == 2 |
| 213 | and len(input_shape_str2.split(",")) == 2 |
| 214 | ) |
| 215 | |
| 216 | output_shape_str = f"{input_shape_str0}, 3" |
| 217 | |
| 218 | # fmt: off |
| 219 | # https://en.wikipedia.org/wiki/YCbCr |
| 220 | # exact weights from https://www.itu.int/rec/T-REC-T.871-201105-I/en |
| 221 | ycbcr_to_rgb_weights = np.array([[1, 0, 1.402], |
| 222 | [1, -0.114*1.772/0.587, -0.299*1.402/0.587], |
| 223 | [1, 1.772, 0]], |
| 224 | dtype=np.float32) |
| 225 | # fmt: on |
| 226 | |
| 227 | # reverse first dim of weights for output to be bgr |
| 228 | ycbcr_to_bgr_weights = ycbcr_to_rgb_weights[::-1, :] |
| 229 | |
| 230 | weights = ycbcr_to_bgr_weights if self._layout == "BGR" else ycbcr_to_rgb_weights |
| 231 | bias = [0.0, 128.0, 128.0] |
| 232 | |
| 233 | weights_shape = "3, 3" |
| 234 | # transpose weights for use in matmul |
| 235 | weights = ",".join([str(w) for w in weights.T.flatten()]) |
| 236 | |
| 237 | bias_shape = "3" |
| 238 | bias = ",".join([str(b) for b in bias]) |
| 239 | |
| 240 | # unsqueeze the {h, w} inputs to add channels dim. new shape is {h, w, 1} |
| 241 | # merge Y, Cb, Cr data on the new channel axis |
| 242 | # convert to float to apply weights etc. |
| 243 | # remove bias |
| 244 | # apply weights |
| 245 | # round and clip to 0..255 |
| 246 | # convert to uint8. |
| 247 | converter_graph = onnx.parser.parse_graph( |
| 248 | f"""\ |
| 249 | YCbCr_to_RGB ({input_type_str0}[{input_shape_str0}] {self.input_names[0]}, |
| 250 | {input_type_str1}[{input_shape_str1}] {self.input_names[1]}, |
| 251 | {input_type_str2}[{input_shape_str2}] {self.input_names[2]}) |
| 252 | => (uint8[{output_shape_str}] {self.output_names[0]}) |
| 253 | {{ |
| 254 | kWeights = Constant <value = float[{weights_shape}] {{{weights}}}> () |
| 255 | kBias = Constant <value = float[{bias_shape}] {{{bias}}}> () |
| 256 | f_0 = Constant <value = float[1] {{0.0}}> () |
| 257 | f_255 = Constant <value = float[1] {{255.0}}> () |
| 258 | i64_neg1 = Constant <value = int64[1] {{-1}}> () |
| 259 | |
| 260 | Y1 = Unsqueeze({self.input_names[0]}, i64_neg1) |
| 261 | Cb1 = Unsqueeze({self.input_names[1]}, i64_neg1) |
| 262 | Cr1 = Unsqueeze({self.input_names[2]}, i64_neg1) |
| 263 | YCbCr = Concat <axis = -1> (Y1, Cb1, Cr1) |
| 264 | f_YCbCr = Cast <to = 1> (YCbCr) |
| 265 | f_unbiased = Sub (f_YCbCr, kBias) |
| 266 | f_pixels = MatMul (f_unbiased, kWeights) |
| 267 | f_rounded = Round (f_pixels) |
| 268 | clipped = Clip (f_rounded, f_0, f_255) |
| 269 | {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped) |
| 270 | }} |
| 271 | """ |
| 272 | ) |
| 273 | |
| 274 | return converter_graph |
| 275 | |
| 276 | |
| 277 | # |
| 278 | # Pre-processing |
| 279 | # |
| 280 | class Resize(Step): |
| 281 | """ |
| 282 | Resize input data. Aspect ratio is maintained. |
| 283 | e.g. if image is 1200 x 600 and 300 x 300 is requested the result will be 600 x 300 |
| 284 | """ |
| 285 | |
| 286 | def __init__(self, resize_to: Union[int, Tuple[int, int]], layout: str = "HWC", |
| 287 | policy: str = "not_smaller", name: Optional[str] = None): |
| 288 | """ |
| 289 | Args: |
| 290 | resize_to: Target size. Can be a single value or a tuple with (target_height, target_width). |
| 291 | The aspect ratio will be maintained and neither height or width in the result will be smaller |
| 292 | than the requested value. |
| 293 | layout: Input layout. 'NCHW', 'NHWC', 'CHW', 'HWC' and 'HW' are supported. |
| 294 | policy: not_smaller (default) |
| 295 | the sizes are adjusted so that no extent of the output is larger than the specified size, |
| 296 | while keeping the original aspect ratio |
| 297 | not_larger |
| 298 | the sizes are adjusted so that no extent of the output is smaller than the specified size, |
| 299 | while keeping the original aspect ratio. |
| 300 | Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#Resize for more details. |
| 301 | name: Optional name. Defaults to 'Resize' |
| 302 | """ |
| 303 | super().__init__(["image"], ["resized_image"], name) |
| 304 | if isinstance(resize_to, int): |
| 305 | self._height = self._width = resize_to |
| 306 | else: |
| 307 | assert isinstance(resize_to, tuple) |
| 308 | self._height, self._width = resize_to |
| 309 | |
| 310 | self._layout = layout |
| 311 | self.policy_ = policy |
| 312 | |
| 313 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 314 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 315 | dims = input_shape_str.split(",") |
| 316 | |
| 317 | # adjust for layout |
| 318 | # resize will use the largest ratio so both sides won't necessarily match the requested height and width. |
| 319 | # use symbolic names for the output dims as we have to provide values. prefix the names to try and |
| 320 | # avoid any clashes. |
| 321 | add_batch_dim = False |
| 322 | |
| 323 | if self._layout == "NHWC": |
| 324 | assert len(dims) == 4 |
| 325 | split_str = "n, h, w, c" |
| 326 | sizes_str = "n, h2, w2, c" |
| 327 | output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}" |
| 328 | elif self._layout == "NCHW": |
| 329 | assert len(dims) == 4 |
| 330 | split_str = "n, c, h, w" |
| 331 | sizes_str = "n, c, h2, w2" |
| 332 | output_shape_str = f"{dims[0]}, {dims[1]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w" |
| 333 | elif self._layout == "HWC": |
| 334 | assert len(dims) == 3 |
| 335 | add_batch_dim = True |
| 336 | split_str = "h, w, c" |
| 337 | sizes_str = "h2, w2, c" |
| 338 | output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w, {dims[-1]}" |
| 339 | elif self._layout == "CHW": |
| 340 | assert len(dims) == 3 |
| 341 | add_batch_dim = True |
| 342 | split_str = "c, h, w" |
| 343 | sizes_str = "c, h2, w2" |
| 344 | output_shape_str = f"{dims[0]}, resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w" |
| 345 | elif self._layout == "HW": |
| 346 | assert len(dims) == 2 |
| 347 | split_str = "h, w" |
| 348 | sizes_str = "h2, w2" |
| 349 | output_shape_str = f"resize_ppp_{self.step_num}_h, resize_ppp_{self.step_num}_w" |
| 350 | else: |
| 351 | raise ValueError(f"Unsupported layout of {self._layout}") |
| 352 | |
| 353 | # TODO: Make this configurable. Matching PIL resize for now. |
| 354 | resize_attributes = 'mode = "linear", nearest_mode = "floor"' |
| 355 | if onnx_opset >= 18: |
| 356 | # Resize matches PIL better if antialiasing is used, but that isn't available until ONNX opset 18. |
| 357 | # Allow this to be used with older opsets as well. |
| 358 | resize_attributes += ', antialias = 1' |
| 359 | |
| 360 | u64_1_str = "" |
| 361 | |
| 362 | # Rank 3 input uses trilinear interpolation, so if input is HWC or CHW we need to add a temporary batch dim |
| 363 | # to make it rank 4, which will result in Resize using the desired bilinear interpolation. |
| 364 | if add_batch_dim: |
| 365 | u64_1_str = "u64_1 = Constant <value = int64[1] {1}> ()" |
| 366 | sizes_str = "u64_1, " + sizes_str |
| 367 | resize_str = \ |
| 368 | f"""\ |
| 369 | axes = Constant <value = int64[1] {{{0}}}> () |
| 370 | unsqueezed = Unsqueeze ({self.input_names[0]}, axes) |
| 371 | resized = Resize <{resize_attributes}> (unsqueezed, , , sizes_resize) |
| 372 | {self.output_names[0]} = Squeeze (resized, axes) |
| 373 | """ |
| 374 | else: |
| 375 | resize_str = \ |
| 376 | f"{self.output_names[0]} = Resize <{resize_attributes}> ({self.input_names[0]}, , , sizes_resize)" |
| 377 | |
| 378 | split_input_shape_attr = "axis = 0" |
| 379 | split_new_sizes_attr = "axis = 0" |
| 380 | if onnx_opset >= 18: |
| 381 | # Split now requires the number of outputs to be specified even though that can be easily inferred... |
| 382 | split_input_shape_attr += f", num_outputs = {len(dims)}" |
| 383 | split_new_sizes_attr += ", num_outputs = 2" |
| 384 | |
| 385 | # Resize-18 has the attribute "not_larger/not_smaller" to specify the resize policy, however |
| 386 | # we want to support older opsets as well. |
| 387 | assert (self.policy_ in ["not_smaller", "not_larger"], |
| 388 | f"Unsupported resize policy of {self.policy_}, must be 'not_smaller' or 'not_larger'") |
| 389 | ratio_resize_func = "ReduceMax" |
| 390 | if self.policy_ == "not_larger": |
| 391 | ratio_resize_func = "ReduceMin" |
| 392 | |
| 393 | resize_graph = onnx.parser.parse_graph( |
| 394 | f"""\ |
| 395 | resize ({input_type_str}[{input_shape_str}] {self.input_names[0]}) => |
| 396 | ({input_type_str}[{output_shape_str}] {self.output_names[0]}) |
| 397 | {{ |
| 398 | target_size = Constant <value = float[2] {{{float(self._height)}, {float(self._width)}}}> () |
| 399 | image_shape = Shape ({self.input_names[0]}) |
| 400 | {split_str} = Split <{split_input_shape_attr}> (image_shape) |
| 401 | hw = Concat <axis = 0> (h, w) |
| 402 | f_hw = Cast <to = 1> (hw) |
| 403 | ratios = Div (target_size, f_hw) |
| 404 | ratio_resize = {ratio_resize_func} (ratios) |
| 405 | f_hw2_exact = Mul (f_hw, ratio_resize) |
| 406 | f_hw2_round = Round (f_hw2_exact) |
| 407 | hw2 = Cast <to = 7> (f_hw2_round) |
| 408 | h2, w2 = Split <{split_new_sizes_attr}> (hw2) |
| 409 | {u64_1_str} |
| 410 | sizes_resize = Concat <axis = 0> ({sizes_str}) |
| 411 | {resize_str} |
| 412 | }} |
| 413 | """ |
| 414 | ) |
| 415 | |
| 416 | return resize_graph |
| 417 | |
| 418 | |
| 419 | class CenterCrop(Step): |
| 420 | """ |
| 421 | Crop the input to the requested dimensions, with the crop being centered. |
| 422 | Currently only HWC input is handled. |
| 423 | """ |
| 424 | |
| 425 | def __init__(self, height: int, width: int, name: Optional[str] = None): |
| 426 | """ |
| 427 | Args: |
| 428 | height: Height of area to crop. |
| 429 | width: Width of area to crop. |
| 430 | name: Optional step name. Defaults to 'CenterCrop' |
| 431 | """ |
| 432 | super().__init__(["image"], ["cropped_image"], name) |
| 433 | self._height = height |
| 434 | self._width = width |
| 435 | |
| 436 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 437 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 438 | dims = input_shape_str.split(",") |
| 439 | output_shape_str = f"{self._height}, {self._width}, {dims[-1]}" |
| 440 | |
| 441 | crop_graph = onnx.parser.parse_graph( |
| 442 | f"""\ |
| 443 | crop ({input_type_str}[{input_shape_str}] {self.input_names[0]}) |
| 444 | => ({input_type_str}[{output_shape_str}] {self.output_names[0]}) |
| 445 | {{ |
| 446 | target_crop = Constant <value = int64[2] {{{self._height}, {self._width}}}> () |
| 447 | i64_2 = Constant <value = int64[1] {{2}}> () |
| 448 | axes = Constant <value = int64[2] {{0, 1}}> () |
| 449 | x_shape = Shape ({self.input_names[0]}) |
| 450 | hw = Gather (x_shape, axes) |
| 451 | hw_diff = Sub (hw, target_crop) |
| 452 | start_xy = Div (hw_diff, i64_2) |
| 453 | end_xy = Add (start_xy, target_crop) |
| 454 | {self.output_names[0]} = Slice ({self.input_names[0]}, start_xy, end_xy, axes) |
| 455 | }} |
| 456 | """ |
| 457 | ) |
| 458 | |
| 459 | return crop_graph |
| 460 | |
| 461 | |
| 462 | class Normalize(Step): |
| 463 | """ |
| 464 | Normalize input data on a per-channel basis. |
| 465 | `x -> (x - mean) / stddev` |
| 466 | Output is float with same shape as input. |
| 467 | """ |
| 468 | |
| 469 | def __init__(self, normalization_values: List[Tuple[float, float]], layout: str = "CHW", name: Optional[str] = None): |
| 470 | """ |
| 471 | Args: |
| 472 | normalization_values: Tuple with (mean, stddev). One entry per channel. |
| 473 | If single entry is provided it will be used for all channels. |
| 474 | layout: Input layout. Can be 'CHW' or 'HWC' |
| 475 | name: Optional step name. Defaults to 'Normalize' |
| 476 | """ |
| 477 | super().__init__(["data"], ["normalized_data"], name) |
| 478 | |
| 479 | # duplicate for each channel if needed |
| 480 | if len(normalization_values) == 1: |
| 481 | normalization_values *= 3 |
| 482 | |
| 483 | assert len(normalization_values) == 3 |
| 484 | self._normalization_values = normalization_values |
| 485 | assert layout == "HWC" or layout == "CHW" |
| 486 | self._hwc_layout = True if layout == "HWC" else False |
| 487 | |
| 488 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 489 | mean0 = self._normalization_values[0][0] |
| 490 | mean1 = self._normalization_values[1][0] |
| 491 | mean2 = self._normalization_values[2][0] |
| 492 | stddev0 = self._normalization_values[0][1] |
| 493 | stddev1 = self._normalization_values[1][1] |
| 494 | stddev2 = self._normalization_values[2][1] |
| 495 | |
| 496 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 497 | values_shape = "3" if self._hwc_layout else "3, 1, 1" |
| 498 | |
| 499 | normalize_graph = onnx.parser.parse_graph( |
| 500 | f"""\ |
| 501 | normalize ({input_type_str}[{input_shape_str}] {self.input_names[0]}) |
| 502 | => (float[{input_shape_str}] {self.output_names[0]}) |
| 503 | {{ |
| 504 | kMean = Constant <value = float[{values_shape}] {{{mean0}, {mean1}, {mean2}}}> () |
| 505 | kStddev = Constant <value = float[{values_shape}] {{{stddev0}, {stddev1}, {stddev2}}}> () |
| 506 | f_input = Cast <to = 1> ({self.input_names[0]}) |
| 507 | f_sub_mean = Sub (f_input, kMean) |
| 508 | {self.output_names[0]} = Div (f_sub_mean, kStddev) |
| 509 | }} |
| 510 | """ |
| 511 | ) |
| 512 | |
| 513 | onnx.checker.check_graph(normalize_graph) |
| 514 | return normalize_graph |
| 515 | |
| 516 | |
| 517 | # |
| 518 | # Utilities |
| 519 | # |
| 520 | class ImageBytesToFloat(Step): |
| 521 | """ |
| 522 | Convert uint8 or float values in range 0..255 to floating point values in range 0..1 |
| 523 | """ |
| 524 | |
| 525 | def __init__(self, rescale_factor: float = 1/255, name: Optional[str] = None): |
| 526 | """ |
| 527 | Args: |
| 528 | name: Optional step name. Defaults to 'ImageBytesToFloat' |
| 529 | """ |
| 530 | super().__init__(["data"], ["float_data"], name) |
| 531 | self.rescale_factor_ = rescale_factor |
| 532 | |
| 533 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 534 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 535 | if input_type_str == "uint8": |
| 536 | optional_cast = f"""\ |
| 537 | input_f = Cast <to = 1> ({self.input_names[0]}) |
| 538 | """ |
| 539 | else: |
| 540 | # no-op that optimizer will remove |
| 541 | optional_cast = f"input_f = Identity ({self.input_names[0]})" |
| 542 | |
| 543 | byte_to_float_graph = onnx.parser.parse_graph( |
| 544 | f"""\ |
| 545 | byte_to_float ({input_type_str}[{input_shape_str}] {self.input_names[0]}) |
| 546 | => (float[{input_shape_str}] {self.output_names[0]}) |
| 547 | {{ |
| 548 | f_scale = Constant <value = float[1] {{{self.rescale_factor_}}}>() |
| 549 | |
| 550 | {optional_cast} |
| 551 | {self.output_names[0]} = Mul(input_f, f_scale) |
| 552 | }} |
| 553 | """ |
| 554 | ) |
| 555 | |
| 556 | onnx.checker.check_graph(byte_to_float_graph) |
| 557 | return byte_to_float_graph |
| 558 | |
| 559 | |
| 560 | class FloatToImageBytes(Step): |
| 561 | """ |
| 562 | Converting floating point values to uint8 values in range 0..255. |
| 563 | Typically this reverses ImageBytesToFloat by converting input data in the range 0..1, but an optional multiplier |
| 564 | can be specified if the input data has a different range. |
| 565 | Values will be rounded prior to clipping and conversion to uint8. |
| 566 | """ |
| 567 | |
| 568 | def __init__(self, multiplier: float = 255.0, name: Optional[str] = None): |
| 569 | """ |
| 570 | Args: |
| 571 | multiplier: Optional multiplier. Currently, the expected values are 255 (input data is in range 0..1), or |
| 572 | 1 (input data is in range 0..255). |
| 573 | name: Optional step name. Defaults to 'FloatToImageBytes' |
| 574 | """ |
| 575 | super().__init__(["float_data"], ["pixel_data"], name) |
| 576 | self._multiplier = multiplier |
| 577 | |
| 578 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 579 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 580 | assert input_type_str == "float" |
| 581 | |
| 582 | if self._multiplier == 1.0: |
| 583 | scale_input = '' |
| 584 | scaled_input_name = self.input_names[0] |
| 585 | else: |
| 586 | scale_input = \ |
| 587 | f"""\ |
| 588 | f_multiplier = Constant <value = float[1] {{{self._multiplier}}}> () |
| 589 | scaled_input = Mul ({self.input_names[0]}, f_multiplier) |
| 590 | """ |
| 591 | scaled_input_name = 'scaled_input' |
| 592 | |
| 593 | float_to_byte_graphs = onnx.parser.parse_graph( |
| 594 | f"""\ |
| 595 | float_to_type (float[{input_shape_str}] {self.input_names[0]}) |
| 596 | => (uint8[{input_shape_str}] {self.output_names[0]}) |
| 597 | {{ |
| 598 | f_0 = Constant <value = float[1] {{0.0}}> () |
| 599 | f_255 = Constant <value = float[1] {{255.0}}>() |
| 600 | |
| 601 | {scale_input} |
| 602 | rounded = Round ({scaled_input_name}) |
| 603 | clipped = Clip (rounded, f_0, f_255) |
| 604 | {self.output_names[0]} = Cast <to = {onnx.TensorProto.UINT8}> (clipped) |
| 605 | }} |
| 606 | """ |
| 607 | ) |
| 608 | |
| 609 | onnx.checker.check_graph(float_to_byte_graphs) |
| 610 | return float_to_byte_graphs |
| 611 | |
| 612 | |
| 613 | class ChannelsLastToChannelsFirst(Transpose): |
| 614 | """ |
| 615 | Convert channels last data to channels first. |
| 616 | Input can be NHWC or HWC. |
| 617 | """ |
| 618 | |
| 619 | def __init__(self, has_batch_dim: bool = False, name: Optional[str] = None): |
| 620 | """ |
| 621 | Args: |
| 622 | has_batch_dim: Set to True if the input has a batch dimension (i.e. is NHWC) |
| 623 | name: Optional step name. Defaults to 'ChannelsLastToChannelsFirst' |
| 624 | """ |
| 625 | perms = [0, 3, 1, 2] if has_batch_dim else [2, 0, 1] |
| 626 | super().__init__(perms, name) |
| 627 | |
| 628 | |
| 629 | class DrawBoundingBoxes(Step): |
| 630 | """ |
| 631 | Draw boxes on BGR image at given position, image is channel last and ordered by BGR. |
| 632 | Input shape: <uint8_t>{height, width, 3<BGR>} |
| 633 | boxes: <float>{num_boxes, 6<x, y, x/w, y/h, score, class>} |
| 634 | The coordinates is the absolute pixel values in the picture. Its value is determined by `mode`. |
| 635 | we have different modes to represent the coordinates of the box.[XYXY, XYWH, CENTER_XYWH]. |
| 636 | Please refer to the following link for more details. https://keras.io/api/keras_cv/bounding_box/formats/ |
| 637 | **score** is the confidence of the box(object score * class probability) and **class** is the class of the box. |
| 638 | |
| 639 | Output shape: <uint8_t>{height, width, 3<BGR>} |
| 640 | """ |
| 641 | |
| 642 | def __init__(self, mode: str = "XYXY", thickness: int = 4, num_classes: int = 10, |
| 643 | colour_by_classes=False, name: Optional[str] = None): |
| 644 | """ |
| 645 | Args: |
| 646 | mode: The mode of the boxes, |
| 647 | "XYXY" (xmin ymin xmax ymax) All values in the XYXY format should be absolute pixel values. |
| 648 | "XYWH" (xmin ymin width height) |
| 649 | "CENTER_XYWH" (x_center, y_center, width, height) |
| 650 | All values in the CENTER_XYWH format should be absolute pixel values. |
| 651 | |
| 652 | |
| 653 | thickness: Thickness of the box edge |
| 654 | num_colours: Number of colours to use |
| 655 | We support 10 predefined colours and the other classes more than 10 wouldn't be drawn. |
| 656 | colors are [Red, Yellow, Lime, Cyan, Blue, Magenta, Orange, Maroon, Green, Navy] |
| 657 | and are used in that order. i.e. result with best score will use red. |
| 658 | colour_by_classes: Colour boxes by classes or by score. |
| 659 | If `True` we use a colour for each unique class, with all results from the top |
| 660 | `num_colours` classes displayed. A colour is only used for a single class. |
| 661 | If `False`, we draw boxes for the top `num_colours` results. A colour is used |
| 662 | for a single result, regardless of class. |
| 663 | name: Optional name of step. Defaults to 'DrawBoundingBoxes' |
| 664 | """ |
| 665 | super().__init__(["image", "boxes"], ["image_out"], name) |
| 666 | self.thickness_ = thickness |
| 667 | self.num_classes_ = num_classes |
| 668 | self.colour_by_classes_ = colour_by_classes |
| 669 | self.mode_ = mode |
| 670 | |
| 671 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 672 | input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 673 | input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1) |
| 674 | assert input0_type_str == "uint8" and input1_type_str == "float" |
| 675 | |
| 676 | assert str(input1_shape_str.split(",")[-1]) == "6" |
| 677 | |
| 678 | |
| 679 | output_shape_str = input0_shape_str |
| 680 | converter_graph = onnx.parser.parse_graph( |
| 681 | f"""\ |
| 682 | bounding_box (uint8[{input0_shape_str}] {self.input_names[0]}, float[{input1_shape_str}] {self.input_names[1]}) |
| 683 | => (uint8[{output_shape_str}] {self.output_names[0]}) |
| 684 | {{ |
| 685 | {self.output_names[0]} = com.microsoft.extensions.DrawBoundingBoxes({self.input_names[0]}, {self.input_names[1]}) |
| 686 | }} |
| 687 | """ |
| 688 | ) |
| 689 | op_attr = ["thickness", "num_classes", "colour_by_classes","mode"] |
| 690 | token_model_attr = [] |
| 691 | token_model_attr.append(onnx.helper.make_attribute(op_attr[0], self.thickness_)) |
| 692 | token_model_attr.append(onnx.helper.make_attribute(op_attr[1], self.num_classes_)) |
| 693 | token_model_attr.append(onnx.helper.make_attribute(op_attr[2], int(self.colour_by_classes_))) |
| 694 | token_model_attr.append(onnx.helper.make_attribute(op_attr[3], self.mode_)) |
| 695 | converter_graph.node[0].attribute.extend(token_model_attr) |
| 696 | |
| 697 | return converter_graph |
| 698 | |
| 699 | |
| 700 | class LetterBox(Step): |
| 701 | """ |
| 702 | Image is channel last and ordered by BGR. |
| 703 | mainly used in object detection, it mostly follows behind resize operation. |
| 704 | This step either add border or crop the image to satisfy network input. |
| 705 | ----- bbbbbbbbb |
| 706 | |img| --- > bb-----bb |
| 707 | ----- bb|img|bb |
| 708 | bb-----bb |
| 709 | bbbbbbbbb |
| 710 | If target_shape is less than the original image, it will crop the image in a center mode. |
| 711 | And the padding values will be negative and the Pad op performs cropping. |
| 712 | |
| 713 | Input shape: <uint8_t>{height, width, 3<BGR>} |
| 714 | target_shape: <uint8_t>{out_height, out_width, 3<BGR>} |
| 715 | Output shape: specified by target_shape |
| 716 | """ |
| 717 | |
| 718 | def __init__(self, target_shape: Union[int, Tuple[int, int]], fill_value=0, name: Optional[str] = None): |
| 719 | """ |
| 720 | Args: |
| 721 | target_shape: the size of the output image |
| 722 | fill_value: a constant value used to fill the border |
| 723 | name: Optional name of step. Defaults to 'LetterBox' |
| 724 | """ |
| 725 | super().__init__(["image"], ["image_pad"], name) |
| 726 | |
| 727 | self.target_shape_ = target_shape |
| 728 | self.fill_value_ = fill_value |
| 729 | |
| 730 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 731 | input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 732 | |
| 733 | assert len(input0_shape_str.split(',')) == 3, " expected BGR image" |
| 734 | |
| 735 | target_shape_str = f"{self.target_shape_[0]}, {self.target_shape_[1]}, 3" |
| 736 | |
| 737 | split_input_shape_attr = "axis = 0" |
| 738 | if onnx_opset >= 18: |
| 739 | # Split now requires the number of outputs to be specified even though that can be easily inferred... |
| 740 | split_input_shape_attr += f", num_outputs = 3" |
| 741 | |
| 742 | converter_graph = onnx.parser.parse_graph( |
| 743 | f"""\ |
| 744 | LetterBox (uint8[{input0_shape_str}] {self.input_names[0]}) |
| 745 | => (uint8[{target_shape_str}] {self.output_names[0]}) |
| 746 | {{ |
| 747 | target_size = Constant <value = int64[2] {{{(self.target_shape_[0])}, {(self.target_shape_[1])}}}> () |
| 748 | i64_2 = Constant <value = int64[1] {{2}}>() |
| 749 | i64_0 = Constant <value = int64[1] {{0}}>() |
| 750 | const_val = Constant <value = uint8[1] {{{self.fill_value_}}}> () |
| 751 | image_shape = Shape ({self.input_names[0]}) |
| 752 | h,w,c = Split <{split_input_shape_attr}> (image_shape) |
| 753 | hw = Concat <axis = 0> (h, w) |
| 754 | pad_hw = Sub (target_size, hw) |
| 755 | half_pad_hw = Div (pad_hw, i64_2) |
| 756 | remainder_pad_hw = Sub (pad_hw, half_pad_hw) |
| 757 | pad_value = Concat <axis = 0> (half_pad_hw, i64_0,remainder_pad_hw,i64_0) |
| 758 | {self.output_names[0]} = Pad({self.input_names[0]}, pad_value, const_val) |
| 759 | }} |
| 760 | """ |
| 761 | ) |
| 762 | |
| 763 | return converter_graph |
| 764 | |
| 765 | |
| 766 | class SplitOutBoxAndScore(Step): |
| 767 | r""" |
| 768 | Split the output of the model into boxes and scores. This step will also handle the optional object score. |
| 769 | Input shape: <float>{num_boxes, 4/5+num_classes} |
| 770 | Output shape: <float>{num_boxes, 4}, <float>{num_boxes, num_classes} |
| 771 | |x1,x2,x3,x4, (obj), cls_1, ... cls_num| |
| 772 | /\ |
| 773 | / \ |
| 774 | |x1,x2,x3,x4| |cls_1, ... clx_num|*(obj) |
| 775 | obj is optional, if it is not present, it will be set to 1.0 |
| 776 | This is where 4/5 comes from, '4' represent coordinates and the fifth object probability. |
| 777 | """ |
| 778 | def __init__(self, num_classes:int = 80, name: Optional[str] = None): |
| 779 | """ |
| 780 | Args: |
| 781 | num_classes: number of classes |
| 782 | name: Optional name of step. Defaults to 'SplitOutBoxAndScore' |
| 783 | """ |
| 784 | |
| 785 | super().__init__(["box_and_score"], ["_pre_boxes", "_pre_scores"], name) |
| 786 | self.num_classes_ = num_classes |
| 787 | |
| 788 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 789 | input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 790 | |
| 791 | input_shape_list = input0_shape_str.split(',') |
| 792 | assert len(input_shape_list) == 2, " expected [num_boxes, 4/5+num_classes]" |
| 793 | |
| 794 | target_shape_str_0 = f"{input_shape_list[0]}, 4" |
| 795 | target_shape_str_1 = f"{input_shape_list[0]}, _{self._step_num}_class" |
| 796 | |
| 797 | converter_graph = onnx.parser.parse_graph( |
| 798 | f"""\ |
| 799 | SplitOutBoxAndScore (float[{input0_shape_str}] {self.input_names[0]}) |
| 800 | => (float[{target_shape_str_0}] {self.output_names[0]}, float[{target_shape_str_1}] {self.output_names[1]}) |
| 801 | {{ |
| 802 | |
| 803 | i64_neg1 = Constant <value = int64[1] {{-1}}>() |
| 804 | i64_4 = Constant <value = int64[1] {{4}}>() |
| 805 | i64_0 = Constant <value = int64[1] {{0}}>() |
| 806 | fp32_1 = Constant <value = float[1] {{1.0}}>() |
| 807 | i64_classes = Constant <value = int64[1] {{{self.num_classes_}}}>() |
| 808 | out_shape = Shape ({self.input_names[0]}) |
| 809 | class_and_coor_dim = Gather (out_shape, i64_neg1) |
| 810 | coor_and_obj = Sub (class_and_coor_dim, i64_classes) |
| 811 | obj_0_or_1 = Sub (coor_and_obj, i64_4) |
| 812 | bool_num_obj_0_or_1 = Cast<to=9>(obj_0_or_1) |
| 813 | |
| 814 | box_obj_class_concat = Concat <axis = 0> (i64_4, obj_0_or_1, i64_classes) |
| 815 | boxes_o, scores_obj_o, scores_cls_o = Split <axis = -1> ({self.input_names[0]}, box_obj_class_concat) |
| 816 | scores_obj_not_null = Concat <axis = -1> (scores_obj_o, boxes_o) |
| 817 | coef_obj_cat = Where(bool_num_obj_0_or_1, scores_obj_not_null,fp32_1) |
| 818 | coef_obj = Gather <axis=-1> (coef_obj_cat, i64_0) |
| 819 | scores_o = Mul (scores_cls_o, coef_obj) |
| 820 | {self.output_names[0]} = Identity (boxes_o) |
| 821 | {self.output_names[1]} = Identity (scores_o) |
| 822 | |
| 823 | }} |
| 824 | """ |
| 825 | ) |
| 826 | return converter_graph |
| 827 | |
| 828 | |
| 829 | class SelectBestBoundingBoxesByNMS(Step): |
| 830 | """ |
| 831 | Non-maximum suppression (NMS) is to filter out redundant bounding boxes. |
| 832 | This step is used to warp the boxes and scores into onnx SelectBestBoundingBoxesByNMS op. |
| 833 | Input: |
| 834 | boxes: float[num_boxes, 4] |
| 835 | scores: shape float[num_boxes, num_classes] |
| 836 | |
| 837 | Output: |
| 838 | nms_out: float[_few_num_boxes, 6<coordinate+score+class>] |
| 839 | """ |
| 840 | |
| 841 | def __init__(self, iou_threshold:float = 0.5, score_threshold:float = 0.67, |
| 842 | max_detections:int = 300, name: Optional[str] = None): |
| 843 | """ |
| 844 | Args: |
| 845 | Please refer to https://github.com/onnx/onnx/blob/main/docs/Operators.md#SelectBestBoundingBoxesByNMS |
| 846 | for more details about the parameters. |
| 847 | iou_threshold: same as SelectBestBoundingBoxesByNMS op, intersection /union of boxes |
| 848 | score_threshold: If this box's score is lower than score_threshold, it will be removed. |
| 849 | max_detections: max number of boxes to be selected |
| 850 | name: Optional name of step. Defaults to 'SelectBestBoundingBoxesByNMS' |
| 851 | """ |
| 852 | super().__init__(["boxes", "scores"], ["nms_out"], name) |
| 853 | self.iou_threshold_ = iou_threshold |
| 854 | self.score_threshold_ = score_threshold |
| 855 | self.max_detections_ = max_detections |
| 856 | |
| 857 | |
| 858 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 859 | input0_type_str, input0_shape_str = self._get_input_type_and_shape_strs(graph, 0) |
| 860 | input1_type_str, input1_shape_str = self._get_input_type_and_shape_strs(graph, 1) |
| 861 | |
| 862 | input0_shape_list = input0_shape_str.split(',') |
| 863 | assert len(input0_shape_list) == 2, " expected [num_boxes, 4]" |
| 864 | |
| 865 | target_shape_str = f"_{self._step_num}_nms_boxes, 6" |
| 866 | |
| 867 | reduce_score = '(score_select_nm,i64_neg1)' if onnx_opset >= 18 else '<axes=[-1]>(score_select_nm)' |
| 868 | |
| 869 | converter_graph = onnx.parser.parse_graph( |
| 870 | f"""\ |
| 871 | SelectBestBoundingBoxesByNMS (float[{input0_shape_str}] {self.input_names[0]},float[{input1_shape_str}] {self.input_names[1]}) |
| 872 | => (float[{target_shape_str}] {self.output_names[0]}) |
| 873 | {{ |
| 874 | i64_2 = Constant <value = int64[1] {{2}}>() |
| 875 | i64_0 = Constant <value = int64[1] {{0}}>() |
| 876 | i64_1 = Constant <value = int64[1] {{1}}>() |
| 877 | i64_max_obj = Constant <value = int64[1] {{{self.max_detections_}}}>() |
| 878 | i64_neg1 = Constant <value = int64[1] {{-1}}>() |
| 879 | fp32_iou_th = Constant <value = float[1] {{{self.iou_threshold_}}}>() |
| 880 | fp32_score_th = Constant <value = float[1] {{{self.score_threshold_}}}>() |
| 881 | |
| 882 | boxes_i = Identity ({self.input_names[0]}) |
| 883 | scores_i = Identity({self.input_names[1]}) |
| 884 | scores_c_b = Transpose<perm=[1,0]>(scores_i) |
| 885 | batch_boxes = Unsqueeze(boxes_i, i64_0) |
| 886 | batch_scores = Unsqueeze(scores_c_b, i64_0) |
| 887 | |
| 888 | nmsbox = NonMaxSuppression<center_point_box =1>(batch_boxes, batch_scores, i64_max_obj,fp32_iou_th,fp32_score_th) |
| 889 | classes_i64 = Gather <axis=-1>(nmsbox,i64_1) |
| 890 | class_select = Cast <to = 1>(classes_i64) |
| 891 | |
| 892 | boxes_idx_us = Gather <axis=-1>(nmsbox,i64_2) |
| 893 | boxes_idx = Squeeze(boxes_idx_us, i64_neg1) |
| 894 | boxes_select = Gather <axis=0>(boxes_i, boxes_idx) |
| 895 | |
| 896 | score_select_nm = Gather <axis=0>(scores_i, boxes_idx) |
| 897 | score_select = ReduceMax{reduce_score} |
| 898 | |
| 899 | {self.output_names[0]} = Concat <axis = -1> (boxes_select, score_select, class_select) |
| 900 | }} |
| 901 | """ |
| 902 | ) |
| 903 | return converter_graph |
| 904 | |
| 905 | |
| 906 | class ScaleBoundingBoxes(Step): |
| 907 | """ |
| 908 | Mapping boxes coordinate to scale in original image. |
| 909 | The coordinate of boxes from detection model is relative to the input image of network, |
| 910 | image is scaled and padded/cropped. So we need to do a linear mapping to get the real coordinate of original image. |
| 911 | input: |
| 912 | box_of_nms_out: output of NMS, shape [num_boxes, 6] |
| 913 | original_image: original image decoded from jpg/png<uint8_t>[H, W, 3<BGR>] |
| 914 | scaled_image: scaled image, but without padding/crop[<uint8_t>[H1, W1, 3<BGR>] |
| 915 | letter_boxed_image: scaled image and with padding/crop[<uint8_t>[H2, W3, 3<BGR>] |
| 916 | |
| 917 | output: |
| 918 | scaled_box_out: shape [num_boxes, 6] with coordinate mapped to original image. |
| 919 | """ |
| 920 | |
| 921 | def __init__(self, name: Optional[str] = None): |
| 922 | """ |
| 923 | Args: |
| 924 | name: Optional name of step. Defaults to 'ScaleBoundingBoxes' |
| 925 | """ |
| 926 | super().__init__(["box_of_nms_out", "original_image", "scaled_image", |
| 927 | "letter_boxed_image"], ["scaled_box_out"], name) |
| 928 | |
| 929 | def _create_graph_for_step(self, graph: onnx.GraphProto, onnx_opset: int): |
| 930 | graph_input_param = [] |
| 931 | target_shape = [] |
| 932 | for idx,input_name in enumerate(self.input_names): |
| 933 | input_type_str, input_shape_str = self._get_input_type_and_shape_strs(graph, idx) |
| 934 | graph_input_param.append(f"{input_type_str}[{input_shape_str}] {input_name}") |
| 935 | target_shape.append(input_shape_str) |
| 936 | graph_input_param = ','.join(graph_input_param) |
| 937 | |
| 938 | target_shape = target_shape[:1] |
| 939 | graph_output_param = [] |
| 940 | for idx,output_name in enumerate(self.output_names): |
| 941 | graph_output_param.append(f"float[{target_shape[idx]}] {output_name}") |
| 942 | graph_output_param = ','.join(graph_output_param) |
| 943 | |
| 944 | def split_num_ouputs(num_outputs: int): |
| 945 | split_input_shape_attr= '' |
| 946 | if onnx_opset >= 18: |
| 947 | split_input_shape_attr = f", num_outputs = {num_outputs}" |
| 948 | return split_input_shape_attr |
| 949 | |
| 950 | converter_graph = onnx.parser.parse_graph( |
| 951 | f"""\ |
| 952 | ScaleBoundingBoxes ({graph_input_param}) |
| 953 | => ({graph_output_param}) |
| 954 | {{ |
| 955 | i64_2 = Constant <value = int64[1] {{2}}>() |
| 956 | |
| 957 | ori_shape = Shape ({self.input_names[1]}) |
| 958 | scaled_shape = Shape ({self.input_names[2]}) |
| 959 | lettered_shape = Shape ({self.input_names[3]}) |
| 960 | oh,ow,oc = Split <axis = 0 {split_num_ouputs(3)}> (ori_shape) |
| 961 | sh,sw,sc = Split <axis = 0 {split_num_ouputs(3)}> (scaled_shape) |
| 962 | lh,lw,lc = Split <axis = 0 {split_num_ouputs(3)}> (lettered_shape) |
| 963 | swh = Concat <axis = -1> (sw,sh) |
| 964 | lwh = Concat <axis = -1> (lw,lh) |
| 965 | |
| 966 | f_oh = Cast <to = 1> (oh) |
| 967 | f_sh = Cast <to = 1> (sh) |
| 968 | ratios = Div (f_oh, f_sh) |
| 969 | |
| 970 | pad_wh = Sub (lwh, swh) |
| 971 | half_pad_wh = Div (pad_wh, i64_2) |
| 972 | f_half_pad_wh = Cast <to = 1> (half_pad_wh) |
| 973 | |
| 974 | boxes_xy,boxes_wh_orxy,boxes_score_class = Split <axis=-1 {split_num_ouputs(3)}>({self.input_names[0]}) |
| 975 | offset_boxes_xy = Sub (boxes_xy, f_half_pad_wh) |
| 976 | restored_boxes = Concat <axis=-1> (offset_boxes_xy, boxes_wh_orxy) |
| 977 | scaled_boxes_coor = Mul (restored_boxes, ratios) |
| 978 | restored_boxes_res = Concat <axis=-1> (scaled_boxes_coor, boxes_score_class) |
| 979 | |
| 980 | {self.output_names[0]} = Identity (restored_boxes_res) |
| 981 | }} |
| 982 | """ |
| 983 | ) |
| 984 | return converter_graph |