microsoft/onnxruntime-extensions

Public

mirrored fromhttps://github.com/microsoft/onnxruntime-extensionsAvailable

CodeCommitsIssuesPull requestsActionsInsightsSecurity
ba200b4a0e391b45c0df4f9b1a506f0a9f574dd4

Branches

Tags

  • No tags available.
0Branches0Tags
Go to file
Add file
Code

Clone

HTTPS

Download ZIP

test/test_pyops.py

238lines · modepreview

import os
import unittest
import numpy as np
from numpy.testing import assert_almost_equal
from onnx import helper, onnx_pb as onnx_proto
import onnxruntime as _ort
from onnxruntime_extensions import (
    onnx_op, PyCustomOpDef,
    get_library_path as _get_library_path)


def _create_test_model_test():
    nodes = []
    nodes.append(helper.make_node(
        'CustomOpOne', ['input_1', 'input_2'], ['output_1'],
        domain='ai.onnx.contrib'))
    nodes.append(helper.make_node(
        'CustomOpTwo', ['output_1'], ['output'],
        domain='ai.onnx.contrib'))

    input0 = helper.make_tensor_value_info(
        'input_1', onnx_proto.TensorProto.FLOAT, [3, 5])
    input1 = helper.make_tensor_value_info(
        'input_2', onnx_proto.TensorProto.FLOAT, [3, 5])
    output0 = helper.make_tensor_value_info(
        'output', onnx_proto.TensorProto.INT32, [3, 5])

    graph = helper.make_graph(nodes, 'test0', [input0, input1], [output0])
    model = helper.make_model(
        graph, opset_imports=[helper.make_operatorsetid('ai.onnx.contrib', 1)])
    return model


def _create_test_model():
    nodes = []
    nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
    nodes[1:] = [helper.make_node('PyReverseMatrix',
                                  ['identity1'], ['reversed'],
                                  domain='ai.onnx.contrib')]

    input0 = helper.make_tensor_value_info(
        'input_1', onnx_proto.TensorProto.FLOAT, [None, 2])
    output0 = helper.make_tensor_value_info(
        'reversed', onnx_proto.TensorProto.FLOAT, [None, 2])

    graph = helper.make_graph(nodes, 'test0', [input0], [output0])
    model = helper.make_model(
        graph, opset_imports=[helper.make_operatorsetid('ai.onnx.contrib', 1)])
    return model


def _create_test_model_double(prefix, domain='ai.onnx.contrib'):
    nodes = []
    nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
    nodes[1:] = [helper.make_node('%sAddEpsilon' % prefix,
                                  ['identity1'], ['customout'],
                                  domain=domain)]

    input0 = helper.make_tensor_value_info(
        'input_1', onnx_proto.TensorProto.DOUBLE, [None, None])
    output0 = helper.make_tensor_value_info(
        'customout', onnx_proto.TensorProto.DOUBLE, [None, None])

    graph = helper.make_graph(nodes, 'test0', [input0], [output0])
    model = helper.make_model(
        graph, opset_imports=[helper.make_operatorsetid(domain, 1)])
    return model


def _create_test_model_2outputs(prefix, domain='ai.onnx.contrib'):
    nodes = [
        helper.make_node('Identity', ['x'], ['identity1']),
        helper.make_node(
            '%sNegPos' % prefix, ['identity1'], ['neg', 'pos'],
            domain=domain)
    ]

    input0 = helper.make_tensor_value_info(
        'x', onnx_proto.TensorProto.FLOAT, [])
    output1 = helper.make_tensor_value_info(
        'neg', onnx_proto.TensorProto.FLOAT, [])
    output2 = helper.make_tensor_value_info(
        'pos', onnx_proto.TensorProto.FLOAT, [])

    graph = helper.make_graph(nodes, 'test0', [input0], [output1, output2])
    model = helper.make_model(
        graph, opset_imports=[helper.make_operatorsetid(domain, 1)])
    return model


def _create_test_join():
    nodes = []
    nodes[0:] = [helper.make_node('Identity', ['input_1'], ['identity1'])]
    nodes[1:] = [helper.make_node('PyOpJoin',
                                  ['identity1'], ['joined'],
                                  sep=';',
                                  domain='ai.onnx.contrib')]

    input0 = helper.make_tensor_value_info(
        'input_1', onnx_proto.TensorProto.STRING, [None, None])
    output0 = helper.make_tensor_value_info(
        'joined', onnx_proto.TensorProto.STRING, [None])

    graph = helper.make_graph(nodes, 'test0', [input0], [output0])
    model = helper.make_model(
        graph, opset_imports=[helper.make_operatorsetid('ai.onnx.contrib', 1)])
    return model


class TestPythonOp(unittest.TestCase):

    @classmethod
    def setUpClass(cls):

        @onnx_op(op_type="CustomOpOne",
                 inputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
        def custom_one_op(x, y):
            return np.add(x, y)

        @onnx_op(op_type="CustomOpTwo",
                 outputs=[PyCustomOpDef.dt_int32])
        def custom_two_op(f):
            return np.round(f).astype(np.int32)

        @onnx_op(op_type="PyReverseMatrix")
        def reverse_matrix(x):
            # The user custom op implementation here.
            return np.flip(x, axis=0).astype(np.float32)

        @onnx_op(op_type="PyAddEpsilon",
                 inputs=[PyCustomOpDef.dt_double],
                 outputs=[PyCustomOpDef.dt_double])
        def add_epsilon(x):
            # The user custom op implementation here.
            return x + 1e-3

        @onnx_op(op_type="PyNegPos",
                 inputs=[PyCustomOpDef.dt_float],
                 outputs=[PyCustomOpDef.dt_float, PyCustomOpDef.dt_float])
        def negpos(x):
            neg = x.copy()
            pos = x.copy()
            neg[x > 0] = 0
            pos[x < 0] = 0
            return neg, pos

        @onnx_op(op_type="PyOpJoin",
                 inputs=[PyCustomOpDef.dt_string],
                 outputs=[PyCustomOpDef.dt_string],
                 attrs=['sep'])
        def join(xs, **kwargs):
            sep = kwargs.get('sep', '')
            res = []
            for x in xs:
                res.append(sep.join(x))
            return np.array(res, dtype=np.object)

    def test_python_operator(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_model = _create_test_model()
        self.assertIn('op_type: "PyReverseMatrix"', str(onnx_model))
        sess = _ort.InferenceSession(onnx_model.SerializeToString(), so)
        input_1 = np.array(
            [1, 2, 3, 4, 5, 6]).astype(np.float32).reshape([3, 2])
        txout = sess.run(None, {'input_1': input_1})
        assert_almost_equal(txout[0], np.array([[5., 6.], [3., 4.], [1., 2.]]))

    def test_add_epsilon_python(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_model = _create_test_model_double('Py')
        self.assertIn('op_type: "PyAddEpsilon"', str(onnx_model))
        sess = _ort.InferenceSession(onnx_model.SerializeToString(), so)
        input_1 = np.array([[0., 1., 1.5], [7., 8., -5.5]])
        txout = sess.run(None, {'input_1': input_1})
        diff = txout[0] - input_1 - 1e-3
        assert_almost_equal(diff, np.zeros(diff.shape))

    def test_python_negpos(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_model = _create_test_model_2outputs('Py')
        self.assertIn('op_type: "PyNegPos"', str(onnx_model))
        sess = _ort.InferenceSession(onnx_model.SerializeToString(), so)
        x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
        neg, pos = sess.run(None, {'x': x})
        diff = x - (neg + pos)
        assert_almost_equal(diff, np.zeros(diff.shape))

    def test_cc_negpos(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_model = _create_test_model_2outputs("")
        self.assertIn('op_type: "NegPos"', str(onnx_model))
        sess = _ort.InferenceSession(onnx_model.SerializeToString(), so)
        x = np.array([[0., 1., 1.5], [7., 8., -5.5]]).astype(np.float32)
        neg, pos = sess.run(None, {'x': x})
        diff = x - (neg + pos)
        assert_almost_equal(diff, np.zeros(diff.shape))

    def test_check_saved_model(self):
        this = os.path.dirname(__file__)
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_content = _create_test_model_test()
        onnx_bytes = onnx_content.SerializeToString()
        with open(os.path.join(this, 'data', 'custom_op_test.onnx'),
                  'rb') as f:
            saved = f.read()
        assert onnx_bytes == saved

    def test_cc_operator(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_content = _create_test_model_test()
        self.assertIn('op_type: "CustomOpOne"', str(onnx_content))
        ser = onnx_content.SerializeToString()
        sess0 = _ort.InferenceSession(ser, so)
        res = sess0.run(None, {
            'input_1': np.random.rand(3, 5).astype(np.float32),
            'input_2': np.random.rand(3, 5).astype(np.float32)})
        self.assertEqual(res[0].shape, (3, 5))

    def test_python_join(self):
        so = _ort.SessionOptions()
        so.register_custom_ops_library(_get_library_path())
        onnx_model = _create_test_join()
        self.assertIn('op_type: "PyOpJoin"', str(onnx_model))
        sess = _ort.InferenceSession(onnx_model.SerializeToString(), so)
        arr = np.array([["a", "b"]], dtype=np.object)
        txout = sess.run(None, {'input_1': arr})
        exp = np.array(["a;b"], dtype=np.object)
        assert txout[0][0] == exp[0]


if __name__ == "__main__":
    unittest.main()