#!/usr/bin/python from itertools import product from tqdm import tqdm import qa from latencyParse import getLatencyTable import os import stat import subprocess import pandas as pd from extra import log_args_decorator, sudo_check from autotestConfig import AutotestConfig from dotenv import load_dotenv import argparse import emoji import logging # Configure logging to show informational messages logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename=f"{os.path.basename(__file__).split('.')[0]}.log") config = None def parse_args(): parser = argparse.ArgumentParser(prog=__file__) parser.add_argument('-c', '--config', type=str, default='autotest-conf.yaml', help="Path to autotest configuration yaml file") return parser.parse_args() def get_config(): global config if config is None: config = AutotestConfig() return config # Step-by-step: # 1. Generate all combinations for each encoder # 2. For each combination, create a GStreamer pipeline string # 3. Start each pipeline with latency tracing enabled # 3.1 Monitor CPU, GPU and memory usage during each pipeline run (nah, later, maybe) # 4. Start latency parsing script after each pipeline and store results in a pandas dataframe: # - two key columns: encoder name, parameters string # 5. Run PSNR check after each pipeline and add results in the dataframe # 6. Save dataframe to CSV file class Pipeline: def __init__(self): self.pipeline = "gst-launch-1.0 -e " self.options = "" def add_tracing(self): self.pipeline = ( "GST_DEBUG_COLOR_MODE=off " + "GST_TRACERS=\"latency(flags=pipeline+element)\" " + "GST_DEBUG=GST_TRACER:7 GST_DEBUG_FILE=" + get_config().latency_filename + " " + self.pipeline ) return self def add_source(self, source): self.pipeline += source + " ! clocksync sync-to-first=true ! " return self def __add_tee(self, encoder): pass # self.pipeline += "tee name=t t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! filesink location=\"base-autotest.yuv\" " def add_encoder(self, encoder, params): self.pipeline += get_config().videoconvert[encoder] + " ! " self.pipeline += "capsfilter caps=\"" + \ get_config().formats[encoder] + "\" ! " # self.__add_tee(encoder) self.options += " ".join(params) + " " # self.pipeline += "t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! " self.pipeline += encoder + " " self.pipeline += " ".join(params) + " " return self def add_profile(self, profile): self.pipeline += "! capsfilter caps=\"video/x-h264,profile=" + profile + "\" ! " self.options += "profile=" + profile + " " return self def to_file(self, filename): self.pipeline += "h264parse ! mpegtsmux ! filesink location=\"" + filename + "\"" return self def makeVideoSrc(videoName): return ( get_config().videosrc["raw"][0] + get_config().videos[videoName] + get_config().videosrc["raw"][1] + get_config().gst_video_info[videoName] ) def generateEncoderStrings(): options = get_config().options result = dict() for encoder, value in options.items(): val = generate_combinations(value) logging.debug(len(val)) result[encoder] = val return result def generate_combinations(config_dict): """ Generate all combinations of values from a configuration dictionary. Args: config_dict (dict): Dictionary with parameter names as keys and lists of values as values Returns: list: List of strings containing all parameter combinations """ combinations = [] keys = list(config_dict.keys()) value_lists = [config_dict[key] for key in keys] for combo in product(*value_lists): param_strings = [] for key, value in zip(keys, combo): param_strings.append(f"{key}={value}") combinations.append(" ".join(param_strings)) return combinations def execPermissions(scriptFile="to_exec.sh"): current_permissions = os.stat(scriptFile).st_mode new_permissions = current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH os.chmod(scriptFile, new_permissions) def writeToExecFile(contents, file): with open(file, "w") as f: f.write(str(contents)) execPermissions(file) def is_docker(func): def wrapper(pipeline): script_name = "to_exec.sh" for encoder in get_config().with_docker: if encoder in pipeline: writeToExecFile(pipeline, script_name) pipeline = get_config().docker_run_string + f" {script_name}" func(pipeline) return wrapper def is_sudo(pipeline): if pipeline.startswith("sudo"): return True return False def passwordAuth(proc): password = os.getenv("UAUTH") if password is not None: proc.communicate(password) def printLog(file): with open(file, "r") as f: out = f.read() logging.debug(f"\n{out}") @is_docker @log_args_decorator def run_pipeline(pipeline): logfile = "pipeline-log.txt" with open(logfile, "w") as f: proc = subprocess.Popen(pipeline, shell=True, stdin=subprocess.PIPE, stdout=f, stderr=subprocess.STDOUT, text=True) if is_sudo(pipeline): passwordAuth(proc) code = proc.wait() printLog(logfile) if proc.returncode != 0: raise Exception(emoji.emojize( ":cross_mark: Pipeline failed, see log for details")) def time_trace(func): def wrapper(): import time start_time = time.time() func() end_time = time.time() elapsed_time = end_time - start_time logging.info(emoji.emojize( f":alarm_clock: Total execution time: {elapsed_time} seconds")) return wrapper @time_trace def run_autotest(): qualityDataframe = pd.DataFrame() latencyDataframe = pd.DataFrame() encoders = generateEncoderStrings() for encoder, combinations in tqdm(encoders.items(), desc="Encoder loop"): qualityDataframe = pd.DataFrame() latencyDataframe = pd.DataFrame() for params in tqdm(combinations, desc="params combination loop", leave=False): for profile in tqdm(get_config().profiles, desc="profile loop", leave=False): for videoName, videoPath in tqdm(get_config().videos.items(), desc="videos loop", leave=False): for _ in tqdm(range(get_config().repeats), desc="repeat loop", leave=False): filename = "autotest-" + encoder + "-" + \ profile + "-test-" + videoName + ".mp4" pipeline = Pipeline() pipeline = ( pipeline.add_tracing() .add_source(makeVideoSrc(videoName)) .add_encoder(encoder, params.split(" ")) .add_profile(profile) .to_file(filename) ) logging.debug(pipeline.pipeline) try: run_pipeline(pipeline.pipeline) except Exception as e: logging.error(emoji.emojize( f":cross_mark: Error occurred: {e}")) continue psnr_metrics, ssim_metrics = qa.run_quality_check( videoPath, filename, get_config().video_info[videoName] + " " + get_config().psnr_check[encoder] ) dfPsnr = qa.parse_quality_report( psnr_metrics, ssim_metrics) logging.info("-----") dfLatency = getLatencyTable( get_config().latency_filename) columnsQ = pd.MultiIndex.from_tuples( [(encoder, profile, videoName, params, col) for col in dfPsnr.columns] ) columnsLatency = pd.MultiIndex.from_tuples( [(encoder, profile, videoName, params, col) for col in dfLatency.columns] ) dfPsnr.columns = columnsQ dfLatency.columns = columnsLatency qualityDataframe = pd.concat( [qualityDataframe, dfPsnr], axis=1) latencyDataframe = pd.concat( [latencyDataframe, dfLatency], axis=1) logging.info("="*50) logging.info("Current results:") logging.info(f"\n{dfPsnr}") logging.info(f"\n{dfLatency}") qualityDataframe.to_csv( get_config().results_dir + f"qualityResults{encoder}.csv") latencyDataframe.to_csv( get_config().results_dir + f"latencyDataframe{encoder}.csv") if __name__ == "__main__": args = parse_args() config = AutotestConfig(args.config) logging.debug(f"yaml config path={args.config}") os.makedirs(get_config().results_dir, exist_ok=True) load_dotenv() sudo_check(__file__) run_autotest()