# 测试自动量化

In [3]:
import numpy as np

import tvm
from tvm import te
from tvm import relay
from tvm.relay import testing
from tvm.relay.expr import Call
from tvm.topi.utils import get_const_tuple


def quantize_and_build(out, skip_conv_layers=[]):
    f = relay.Function(relay.analysis.free_vars(out), out)
    mod, params = testing.create_workload(f)

    with relay.quantize.qconfig(skip_conv_layers=skip_conv_layers):
        qmod = relay.quantize.quantize(mod, params)

    relay.build(qmod, "llvm", params=params)
    return mod, qmod

In [4]:
relay.transform.FuseOps??

[0;31mSignature:[0m [0mrelay[0m[0;34m.[0m[0mtransform[0m[0;34m.[0m[0mFuseOps[0m[0;34m([0m[0mfuse_opt_level[0m[0;34m=[0m[0;34m-[0m[0;36m1[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mSource:[0m   
[0;32mdef[0m [0mFuseOps[0m[0;34m([0m[0mfuse_opt_level[0m[0;34m=[0m[0;34m-[0m[0;36m1[0m[0;34m)[0m[0;34m:[0m[0;34m[0m
[0;34m[0m    [0;34m"""Fuse operators in an expr to a larger operator according to some rules.[0m
[0;34m[0m
[0;34m    Parameters[0m
[0;34m    ----------[0m
[0;34m    fuse_opt_level : int[0m
[0;34m        The level of fuse optimization. -1 indicates that the level will be[0m
[0;34m        inferred from pass context.[0m
[0;34m[0m
[0;34m    Returns[0m
[0;34m    -------[0m
[0;34m    ret : tvm.transform.Pass[0m
[0;34m        The registered pass for operator fusion.[0m
[0;34m    """[0m[0;34m[0m
[0;34m[0m    [0;32mreturn[0m [0m_ffi_api[0m[0;34m.[0m[0mFuseOps[0m[0;34m([0m[0mfuse_opt_level[0m[0;34m)

## 乘法算子的右操作数不是常量

In [2]:
data = relay.var("data", shape=(1, 16, 64, 64))
multiplier = relay.sigmoid(relay.var("data", shape=(1, 16, 1, 1)))
conv = relay.nn.conv2d(
    data, relay.var("weight"), kernel_size=(3, 3), padding=(1, 1), channels=16
)
act = relay.nn.relu(data=conv)
mod, qmod = quantize_and_build(act * multiplier)
mod.show()
qmod.show()
pool = relay.nn.global_avg_pool2d(data=act)
mod, qmod = quantize_and_build(act * pool)
mod.show()
qmod.show()

One or more operators have not been tuned. Please tune your model for better performance. Use DEBUG logging level to see more details.
To print formatted TVM script, please install the formatter 'Black':
/media/pc/data/tmp/cache/conda/envs/tvmz/bin/python -m pip install "black==22.3.0" --upgrade --user


To print formatted TVM script, please install the formatter 'Black':
/media/pc/data/tmp/cache/conda/envs/tvmz/bin/python -m pip install "black==22.3.0" --upgrade --user


To print formatted TVM script, please install the formatter 'Black':
/media/pc/data/tmp/cache/conda/envs/tvmz/bin/python -m pip install "black==22.3.0" --upgrade --user


To print formatted TVM script, please install the formatter 'Black':
/media/pc/data/tmp/cache/conda/envs/tvmz/bin/python -m pip install "black==22.3.0" --upgrade --user


## 跳过卷积

In [None]:
data = relay.var("data", shape=(1, 16, 64, 64))
np_weight = np.random.rand(16, 16, 3, 3)
conv0_weight = relay.Constant(tvm.nd.array(np_weight)).astype("float32")
conv1_weight = relay.Constant(tvm.nd.array(np_weight)).astype("float32")
multiplier = relay.sigmoid(relay.var("data", shape=(1, 16, 1, 1)))

conv0 = relay.nn.conv2d(data, conv0_weight, kernel_size=(3, 3), padding=(1, 1), channels=16)
act0 = relay.nn.relu(data=conv0)
conv1 = relay.nn.conv2d(act0, conv1_weight, kernel_size=(3, 3), padding=(1, 1), channels=16)
act1 = relay.nn.relu(data=conv1)

quantize_and_build(act1 * multiplier)
quantize_and_build(act1 * multiplier, skip_conv_layers=[0])
quantize_and_build(act1 * multiplier, skip_conv_layers=[1])
mod, qmod = quantize_and_build(act1 * multiplier, skip_conv_layers=[0, 1])

## `stop_quantize`

In [None]:
data = relay.var("data", shape=(1, 16, 64, 64))
np_weight0 = np.random.rand(16, 16, 3, 3)
conv0_weight = relay.Constant(tvm.nd.array(np_weight0)).astype("float32")
np_weight1 = np.random.rand(16, 16, 1, 1)
conv1_weight = relay.Constant(tvm.nd.array(np_weight1)).astype("float32")
multiplier = relay.sigmoid(relay.var("data", shape=(1, 16, 1, 1)))

conv0 = relay.nn.conv2d(data, conv0_weight, kernel_size=(3, 3), padding=(1, 1), channels=16)
act0 = relay.nn.relu(data=conv0)

pool = relay.nn.global_avg_pool2d(data=act0)

conv1 = relay.nn.conv2d(pool, conv1_weight, kernel_size=(1, 1), padding=(0, 0), channels=16)
act1 = relay.nn.relu(data=conv1)

mod, qmod = quantize_and_build(act1 * multiplier)
mod.show()
qmod.show()

## `batch_flatten`

In [None]:
data = relay.var("data", shape=(1, 16, 64, 64), dtype="float32")

out = relay.nn.conv2d(
    data, relay.var("weight"), kernel_size=(3, 3), padding=(1, 1), channels=16
)

out = relay.nn.batch_flatten(out)

mod, qmod = quantize_and_build(out)

def _check_batch_flatten(node):
    if isinstance(node, Call):
        if node.op.name == "nn.batch_flatten":
            assert node.checked_type.dtype == "int8"

# check if batch_flatten is quantized
relay.analysis.post_order_visit(qmod["main"], _check_batch_flatten)

## `batch_matmul`

In [None]:
data = relay.var("data", shape=(1, 4, 16, 16))
data2 = relay.sigmoid(relay.var("data", shape=(4, 16, 64)))
out = relay.nn.conv2d(data, relay.var("weight"), kernel_size=(3, 3), padding=(1, 1), channels=8)

out = relay.nn.batch_flatten(out)
out = relay.reshape(out, [1, 32, 64])
out = relay.nn.batch_matmul(out, data2)

mod, qmod = quantize_and_build(out)

def _check_batch_matmul(node):
    if isinstance(node, Call):

        if node.op.name in ["nn.batch_matmul", "nn.conv2d"]:
            assert node.checked_type.dtype == "int32"
        elif node.op.name == "nn.batch_flatten":
            assert node.checked_type.dtype == "int8"

# check if batch_matmul is quantized
relay.analysis.post_order_visit(qmod["main"], _check_batch_matmul)

## `calibration_dataset`

In [None]:
def get_calibration_dataset(mod, input_name):
    dataset = []
    input_shape = [int(x) for x in mod["main"].checked_type.arg_types[0].shape]
    for i in range(5):
        data = np.random.uniform(size=input_shape)
        dataset.append({input_name: data})
    return dataset

In [None]:
mod, params = testing.synthetic.get_workload()
dataset = get_calibration_dataset(mod, "data")
create_target = True
with relay.quantize.qconfig(calibrate_mode="kl_divergence"):
    if create_target:
        with tvm.target.Target("llvm"):
            relay.quantize.quantize(mod, params, dataset)
    else:
        # current_target = None
        relay.quantize.quantize(mod, params, dataset)

`calibrate_memory_bound`:

In [None]:
mod, params = testing.synthetic.get_workload()
dataset = get_calibration_dataset(mod, "data")
import multiprocessing

num_cpu = multiprocessing.cpu_count()
with relay.quantize.qconfig(calibrate_mode="kl_divergence", calibrate_chunk_by=num_cpu):
    relay.quantize.quantize(mod, params, dataset)

`calibrate_percentile`:

In [None]:
mod, params = testing.synthetic.get_workload()
dataset = get_calibration_dataset(mod, "data")
with relay.quantize.qconfig(calibrate_mode="percentile"):
    relay.quantize.quantize(mod, params, dataset)