280 lines
9.6 KiB
Python
Executable File
280 lines
9.6 KiB
Python
Executable File
#!/usr/bin/python
|
|
from itertools import product
|
|
from tqdm import tqdm
|
|
import qa
|
|
from latencyParse import getLatencyTable
|
|
import os
|
|
import stat
|
|
import subprocess
|
|
import pandas as pd
|
|
from extra import log_args_decorator, sudo_check
|
|
from autotestConfig import AutotestConfig
|
|
from dotenv import load_dotenv
|
|
import argparse
|
|
import emoji
|
|
import logging
|
|
|
|
# Configure logging to show informational messages
|
|
logging.basicConfig(level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
filename=f"{os.path.basename(__file__).split('.')[0]}.log")
|
|
|
|
config = None
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(prog=__file__)
|
|
parser.add_argument('-c', '--config',
|
|
type=str,
|
|
default='autotest-conf.yaml',
|
|
help="Path to autotest configuration yaml file")
|
|
return parser.parse_args()
|
|
|
|
|
|
def get_config():
|
|
global config
|
|
if config is None:
|
|
config = AutotestConfig()
|
|
return config
|
|
|
|
# Step-by-step:
|
|
# 1. Generate all combinations for each encoder
|
|
# 2. For each combination, create a GStreamer pipeline string
|
|
# 3. Start each pipeline with latency tracing enabled
|
|
# 3.1 Monitor CPU, GPU and memory usage during each pipeline run (nah, later, maybe)
|
|
# 4. Start latency parsing script after each pipeline and store results in a pandas dataframe:
|
|
# - two key columns: encoder name, parameters string
|
|
# 5. Run PSNR check after each pipeline and add results in the dataframe
|
|
# 6. Save dataframe to CSV file
|
|
|
|
|
|
class Pipeline:
|
|
def __init__(self):
|
|
self.pipeline = "gst-launch-1.0 -e "
|
|
self.options = ""
|
|
|
|
def add_tracing(self):
|
|
self.pipeline = (
|
|
"GST_DEBUG_COLOR_MODE=off " +
|
|
"GST_TRACERS=\"latency(flags=pipeline+element)\" " +
|
|
"GST_DEBUG=GST_TRACER:7 GST_DEBUG_FILE=" + get_config().latency_filename + " " +
|
|
self.pipeline
|
|
)
|
|
return self
|
|
|
|
def add_source(self, source):
|
|
self.pipeline += source + " ! clocksync sync-to-first=true ! "
|
|
return self
|
|
|
|
def __add_tee(self, encoder):
|
|
pass
|
|
# self.pipeline += "tee name=t t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! filesink location=\"base-autotest.yuv\" "
|
|
|
|
def add_encoder(self, encoder, params):
|
|
self.pipeline += get_config().videoconvert[encoder] + " ! "
|
|
self.pipeline += "capsfilter caps=\"" + \
|
|
get_config().formats[encoder] + "\" ! "
|
|
# self.__add_tee(encoder)
|
|
self.options += " ".join(params) + " "
|
|
# self.pipeline += "t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! "
|
|
self.pipeline += encoder + " "
|
|
self.pipeline += " ".join(params) + " "
|
|
return self
|
|
|
|
def add_profile(self, profile):
|
|
self.pipeline += "! capsfilter caps=\"video/x-h264,profile=" + profile + "\" ! "
|
|
self.options += "profile=" + profile + " "
|
|
return self
|
|
|
|
def to_file(self, filename):
|
|
self.pipeline += "h264parse ! mpegtsmux ! filesink location=\"" + filename + "\""
|
|
return self
|
|
|
|
|
|
def makeVideoSrc(videoName):
|
|
return (
|
|
get_config().videosrc["raw"][0] +
|
|
get_config().videos[videoName] +
|
|
get_config().videosrc["raw"][1] +
|
|
get_config().gst_video_info[videoName]
|
|
)
|
|
|
|
|
|
def generateEncoderStrings():
|
|
options = get_config().options
|
|
result = dict()
|
|
for encoder, value in options.items():
|
|
val = generate_combinations(value)
|
|
logging.debug(len(val))
|
|
result[encoder] = val
|
|
return result
|
|
|
|
|
|
def generate_combinations(config_dict):
|
|
"""
|
|
Generate all combinations of values from a configuration dictionary.
|
|
|
|
Args:
|
|
config_dict (dict): Dictionary with parameter names as keys and lists of values as values
|
|
|
|
Returns:
|
|
list: List of strings containing all parameter combinations
|
|
"""
|
|
combinations = []
|
|
|
|
keys = list(config_dict.keys())
|
|
value_lists = [config_dict[key] for key in keys]
|
|
|
|
for combo in product(*value_lists):
|
|
param_strings = []
|
|
for key, value in zip(keys, combo):
|
|
param_strings.append(f"{key}={value}")
|
|
|
|
combinations.append(" ".join(param_strings))
|
|
return combinations
|
|
|
|
|
|
def execPermissions(scriptFile="to_exec.sh"):
|
|
current_permissions = os.stat(scriptFile).st_mode
|
|
new_permissions = current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
|
os.chmod(scriptFile, new_permissions)
|
|
|
|
|
|
def writeToExecFile(contents, file):
|
|
with open(file, "w") as f:
|
|
f.write(str(contents))
|
|
execPermissions(file)
|
|
|
|
|
|
def is_docker(func):
|
|
def wrapper(pipeline):
|
|
script_name = "to_exec.sh"
|
|
for encoder in get_config().with_docker:
|
|
if encoder in pipeline:
|
|
writeToExecFile(pipeline, script_name)
|
|
pipeline = get_config().docker_run_string + f" {script_name}"
|
|
|
|
func(pipeline)
|
|
return wrapper
|
|
|
|
|
|
def is_sudo(pipeline):
|
|
if pipeline.startswith("sudo"):
|
|
return True
|
|
return False
|
|
|
|
|
|
def passwordAuth(proc):
|
|
password = os.getenv("UAUTH")
|
|
if password is not None:
|
|
proc.communicate(password)
|
|
|
|
|
|
def printLog(file):
|
|
with open(file, "r") as f:
|
|
out = f.read()
|
|
logging.debug(f"\n{out}")
|
|
|
|
|
|
@is_docker
|
|
@log_args_decorator
|
|
def run_pipeline(pipeline):
|
|
logfile = "pipeline-log.txt"
|
|
with open(logfile, "w") as f:
|
|
proc = subprocess.Popen(pipeline, shell=True,
|
|
stdin=subprocess.PIPE, stdout=f,
|
|
stderr=subprocess.STDOUT, text=True)
|
|
if is_sudo(pipeline):
|
|
passwordAuth(proc)
|
|
code = proc.wait()
|
|
printLog(logfile)
|
|
if proc.returncode != 0:
|
|
raise Exception(emoji.emojize(
|
|
":cross_mark: Pipeline failed, see log for details"))
|
|
|
|
|
|
def time_trace(func):
|
|
def wrapper():
|
|
import time
|
|
start_time = time.time()
|
|
func()
|
|
end_time = time.time()
|
|
elapsed_time = end_time - start_time
|
|
logging.info(emoji.emojize(
|
|
f":alarm_clock: Total execution time: {elapsed_time} seconds"))
|
|
return wrapper
|
|
|
|
|
|
@time_trace
|
|
def run_autotest():
|
|
qualityDataframe = pd.DataFrame()
|
|
latencyDataframe = pd.DataFrame()
|
|
encoders = generateEncoderStrings()
|
|
for encoder, combinations in tqdm(encoders.items(), desc="Encoder loop"):
|
|
qualityDataframe = pd.DataFrame()
|
|
latencyDataframe = pd.DataFrame()
|
|
for params in tqdm(combinations, desc="params combination loop", leave=False):
|
|
for profile in tqdm(get_config().profiles, desc="profile loop", leave=False):
|
|
for videoName, videoPath in tqdm(get_config().videos.items(), desc="videos loop", leave=False):
|
|
for _ in tqdm(range(get_config().repeats), desc="repeat loop", leave=False):
|
|
filename = "autotest-" + encoder + "-" + \
|
|
profile + "-test-" + videoName + ".mp4"
|
|
pipeline = Pipeline()
|
|
pipeline = (
|
|
pipeline.add_tracing()
|
|
.add_source(makeVideoSrc(videoName))
|
|
.add_encoder(encoder, params.split(" "))
|
|
.add_profile(profile)
|
|
.to_file(filename)
|
|
)
|
|
logging.debug(pipeline.pipeline)
|
|
try:
|
|
run_pipeline(pipeline.pipeline)
|
|
except Exception as e:
|
|
logging.error(emoji.emojize(
|
|
f":cross_mark: Error occurred: {e}"))
|
|
continue
|
|
psnr_metrics, ssim_metrics = qa.run_quality_check(
|
|
videoPath,
|
|
filename,
|
|
get_config().video_info[videoName] +
|
|
" " + get_config().psnr_check[encoder]
|
|
)
|
|
dfPsnr = qa.parse_quality_report(
|
|
psnr_metrics, ssim_metrics)
|
|
logging.info("-----")
|
|
dfLatency = getLatencyTable(
|
|
get_config().latency_filename)
|
|
columnsQ = pd.MultiIndex.from_tuples(
|
|
[(encoder, profile, videoName, params, col)
|
|
for col in dfPsnr.columns]
|
|
)
|
|
columnsLatency = pd.MultiIndex.from_tuples(
|
|
[(encoder, profile, videoName, params, col)
|
|
for col in dfLatency.columns]
|
|
)
|
|
dfPsnr.columns = columnsQ
|
|
dfLatency.columns = columnsLatency
|
|
qualityDataframe = pd.concat(
|
|
[qualityDataframe, dfPsnr], axis=1)
|
|
latencyDataframe = pd.concat(
|
|
[latencyDataframe, dfLatency], axis=1)
|
|
logging.info("="*50)
|
|
logging.info("Current results:")
|
|
logging.info(f"\n{dfPsnr}")
|
|
logging.info(f"\n{dfLatency}")
|
|
qualityDataframe.to_csv(
|
|
get_config().results_dir + f"qualityResults{encoder}.csv")
|
|
latencyDataframe.to_csv(
|
|
get_config().results_dir + f"latencyDataframe{encoder}.csv")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_args()
|
|
config = AutotestConfig(args.config)
|
|
logging.debug(f"yaml config path={args.config}")
|
|
os.makedirs(get_config().results_dir, exist_ok=True)
|
|
load_dotenv()
|
|
sudo_check(__file__)
|
|
run_autotest()
|