gstAutotest/gstreamerAutotest.py
2025-10-13 11:58:21 +03:00

280 lines
9.6 KiB
Python
Executable File

#!/usr/bin/python
from itertools import product
from tqdm import tqdm
import qa
from latencyParse import getLatencyTable
import os
import stat
import subprocess
import pandas as pd
from extra import log_args_decorator, sudo_check
from autotestConfig import AutotestConfig
from dotenv import load_dotenv
import argparse
import emoji
import logging
# Configure logging to show informational messages
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
filename=f"{os.path.basename(__file__).split('.')[0]}.log")
config = None
def parse_args():
parser = argparse.ArgumentParser(prog=__file__)
parser.add_argument('-c', '--config',
type=str,
default='autotest-conf.yaml',
help="Path to autotest configuration yaml file")
return parser.parse_args()
def get_config():
global config
if config is None:
config = AutotestConfig()
return config
# Step-by-step:
# 1. Generate all combinations for each encoder
# 2. For each combination, create a GStreamer pipeline string
# 3. Start each pipeline with latency tracing enabled
# 3.1 Monitor CPU, GPU and memory usage during each pipeline run (nah, later, maybe)
# 4. Start latency parsing script after each pipeline and store results in a pandas dataframe:
# - two key columns: encoder name, parameters string
# 5. Run PSNR check after each pipeline and add results in the dataframe
# 6. Save dataframe to CSV file
class Pipeline:
def __init__(self):
self.pipeline = "gst-launch-1.0 -e "
self.options = ""
def add_tracing(self):
self.pipeline = (
"GST_DEBUG_COLOR_MODE=off " +
"GST_TRACERS=\"latency(flags=pipeline+element)\" " +
"GST_DEBUG=GST_TRACER:7 GST_DEBUG_FILE=" + get_config().latency_filename + " " +
self.pipeline
)
return self
def add_source(self, source):
self.pipeline += source + " ! clocksync sync-to-first=true ! "
return self
def __add_tee(self, encoder):
pass
# self.pipeline += "tee name=t t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! filesink location=\"base-autotest.yuv\" "
def add_encoder(self, encoder, params):
self.pipeline += get_config().videoconvert[encoder] + " ! "
self.pipeline += "capsfilter caps=\"" + \
get_config().formats[encoder] + "\" ! "
# self.__add_tee(encoder)
self.options += " ".join(params) + " "
# self.pipeline += "t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! "
self.pipeline += encoder + " "
self.pipeline += " ".join(params) + " "
return self
def add_profile(self, profile):
self.pipeline += "! capsfilter caps=\"video/x-h264,profile=" + profile + "\" ! "
self.options += "profile=" + profile + " "
return self
def to_file(self, filename):
self.pipeline += "h264parse ! mpegtsmux ! filesink location=\"" + filename + "\""
return self
def makeVideoSrc(videoName):
return (
get_config().videosrc["raw"][0] +
get_config().videos[videoName] +
get_config().videosrc["raw"][1] +
get_config().gst_video_info[videoName]
)
def generateEncoderStrings():
options = get_config().options
result = dict()
for encoder, value in options.items():
val = generate_combinations(value)
logging.debug(len(val))
result[encoder] = val
return result
def generate_combinations(config_dict):
"""
Generate all combinations of values from a configuration dictionary.
Args:
config_dict (dict): Dictionary with parameter names as keys and lists of values as values
Returns:
list: List of strings containing all parameter combinations
"""
combinations = []
keys = list(config_dict.keys())
value_lists = [config_dict[key] for key in keys]
for combo in product(*value_lists):
param_strings = []
for key, value in zip(keys, combo):
param_strings.append(f"{key}={value}")
combinations.append(" ".join(param_strings))
return combinations
def execPermissions(scriptFile="to_exec.sh"):
current_permissions = os.stat(scriptFile).st_mode
new_permissions = current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
os.chmod(scriptFile, new_permissions)
def writeToExecFile(contents, file):
with open(file, "w") as f:
f.write(str(contents))
execPermissions(file)
def is_docker(func):
def wrapper(pipeline):
script_name = "to_exec.sh"
for encoder in get_config().with_docker:
if encoder in pipeline:
writeToExecFile(pipeline, script_name)
pipeline = get_config().docker_run_string + f" {script_name}"
func(pipeline)
return wrapper
def is_sudo(pipeline):
if pipeline.startswith("sudo"):
return True
return False
def passwordAuth(proc):
password = os.getenv("UAUTH")
if password is not None:
proc.communicate(password)
def printLog(file):
with open(file, "r") as f:
out = f.read()
logging.debug(f"\n{out}")
@is_docker
@log_args_decorator
def run_pipeline(pipeline):
logfile = "pipeline-log.txt"
with open(logfile, "w") as f:
proc = subprocess.Popen(pipeline, shell=True,
stdin=subprocess.PIPE, stdout=f,
stderr=subprocess.STDOUT, text=True)
if is_sudo(pipeline):
passwordAuth(proc)
code = proc.wait()
printLog(logfile)
if proc.returncode != 0:
raise Exception(emoji.emojize(
":cross_mark: Pipeline failed, see log for details"))
def time_trace(func):
def wrapper():
import time
start_time = time.time()
func()
end_time = time.time()
elapsed_time = end_time - start_time
logging.info(emoji.emojize(
f":alarm_clock: Total execution time: {elapsed_time} seconds"))
return wrapper
@time_trace
def run_autotest():
qualityDataframe = pd.DataFrame()
latencyDataframe = pd.DataFrame()
encoders = generateEncoderStrings()
for encoder, combinations in tqdm(encoders.items(), desc="Encoder loop"):
qualityDataframe = pd.DataFrame()
latencyDataframe = pd.DataFrame()
for params in tqdm(combinations, desc="params combination loop", leave=False):
for profile in tqdm(get_config().profiles, desc="profile loop", leave=False):
for videoName, videoPath in tqdm(get_config().videos.items(), desc="videos loop", leave=False):
for _ in tqdm(range(get_config().repeats), desc="repeat loop", leave=False):
filename = "autotest-" + encoder + "-" + \
profile + "-test-" + videoName + ".mp4"
pipeline = Pipeline()
pipeline = (
pipeline.add_tracing()
.add_source(makeVideoSrc(videoName))
.add_encoder(encoder, params.split(" "))
.add_profile(profile)
.to_file(filename)
)
logging.debug(pipeline.pipeline)
try:
run_pipeline(pipeline.pipeline)
except Exception as e:
logging.error(emoji.emojize(
f":cross_mark: Error occurred: {e}"))
continue
psnr_metrics, ssim_metrics = qa.run_quality_check(
videoPath,
filename,
get_config().video_info[videoName] +
" " + get_config().psnr_check[encoder]
)
dfPsnr = qa.parse_quality_report(
psnr_metrics, ssim_metrics)
logging.info("-----")
dfLatency = getLatencyTable(
get_config().latency_filename)
columnsQ = pd.MultiIndex.from_tuples(
[(encoder, profile, videoName, params, col)
for col in dfPsnr.columns]
)
columnsLatency = pd.MultiIndex.from_tuples(
[(encoder, profile, videoName, params, col)
for col in dfLatency.columns]
)
dfPsnr.columns = columnsQ
dfLatency.columns = columnsLatency
qualityDataframe = pd.concat(
[qualityDataframe, dfPsnr], axis=1)
latencyDataframe = pd.concat(
[latencyDataframe, dfLatency], axis=1)
logging.info("="*50)
logging.info("Current results:")
logging.info(f"\n{dfPsnr}")
logging.info(f"\n{dfLatency}")
qualityDataframe.to_csv(
get_config().results_dir + f"qualityResults{encoder}.csv")
latencyDataframe.to_csv(
get_config().results_dir + f"latencyDataframe{encoder}.csv")
if __name__ == "__main__":
args = parse_args()
config = AutotestConfig(args.config)
logging.debug(f"yaml config path={args.config}")
os.makedirs(get_config().results_dir, exist_ok=True)
load_dotenv()
sudo_check(__file__)
run_autotest()