gstreamer-pipelines/PyScripts/gstreamerAutotest.py
Artur Mukhamadiev f6fd5e50c8 autotest implementation
:Release Notes:
-

:Detailed Notes:
-

:Testing Performed:
-

:QA Notes:
-

:Issues Addressed:
-
2025-09-29 19:32:32 +03:00

216 lines
7.1 KiB
Python

#!/usr/bin/python
from itertools import product
import qa
from latencyParse import getLatencyTable
options = {
"x264enc": {
"bitrate": ["10000", "20000", "5000"],
"speed-preset": ["ultrafast", "fast", "medium"],
"tune": ["zerolatency"],
"slices-threads": ["true", "false"],
"b-adapt": ["true", "false"],
"rc-lookahead": ["40", "0"],
"ref": ["3", "0"]
},
"nvh264enc": {
"bitrate": ["10000", "20000", "5000"],
"preset": ["4", "5", "1"],
"rc-lookahead": ["0"],
"rc-mode": ["2", "0", "5"],
"zerolatency": ["true", "false"],
}
# ,
# "nvv4l2h264enc": {
# "bitrate": ["10000000", "20000000", "5000000"],
# "profile": ["0", "1", "2"],
# "preset-id": ["1", "2", "3"],
# "control-id": ["1", "2"],
# "tuning-info-id": ["4", "2"]
# }
}
videos = [""]
testsource = "videotestsrc pattern=smpte"
videosrc = ["filesrc location=", "! qtdemux ! h264parse ! avdec_h264"]
psnr_check = {
"x264enc": "-pixel_format yuv420p -color_range pc",
"nvh264enc": "-pixel_format nv12 -color_range tv",
"nvv4l2h264enc": "-pixel_format yuv420p -color_range tv"
}
formats = {
"x264enc": "I420",
"nvh264enc": "NV12",
"nvv4l2h264enc": "I420"
}
profiles = ["baseline", "main"]
video_info = {
"video1":"-video_size 1920x1080 -framerate 23.98"
}
latency_filename = "latency-traces-autotest.log"
class Pipeline:
def __init__(self):
self.pipeline = "gst-launch-1.0 -e "
self.options = ""
def add_tracing(self):
self.pipeline = (
"GST_DEBUG_COLOR_MODE=off " +
"GST_TRACERS=\"latency(flags=pipeline+element)\" " +
"GST_DEBUG=GST_TRACER:7 GST_DEBUG_FILE=" + latency_filename + " " +
self.pipeline
)
return self
def add_source(self, source):
self.pipeline += source + " ! videoconvert ! "
return self
def __add_tee(self, encoder):
self.pipeline += "capsfilter caps=video/x-raw,format=" + formats[encoder] + " ! "
self.pipeline += "tee name=t t. ! queue ! filesink location=\"base-autotest.yuv\" "
def add_encoder(self, encoder, params):
self.__add_tee(encoder)
self.options += " ".join(params) + " "
self.pipeline += "t. ! queue ! "
self.pipeline += encoder + " "
self.pipeline += " ".join(params) + " "
return self
def add_profile(self, profile):
self.pipeline += "capsfilter caps=\"video/x-h264,profile=" + profile + "\" ! "
self.options += "profile=" + profile + " "
return self
def to_file(self, filename):
self.pipeline += "h264parse ! mp4mux ! filesink location=\"" + filename + "\""
return self
def makeVideoSrc(idx):
return videosrc[0] + videos[idx] + videosrc[1]
def generateEncoderStrings():
global options
result = dict()
for encoder, value in options.items():
result[encoder] = generate_combinations(value)
return result
def generate_combinations(config_dict):
"""
Generate all combinations of values from a configuration dictionary.
Args:
config_dict (dict): Dictionary with parameter names as keys and lists of values as values
Returns:
list: List of strings containing all parameter combinations
"""
combinations = []
keys = list(config_dict.keys())
value_lists = [config_dict[key] for key in keys]
for combo in product(*value_lists):
param_strings = []
for key, value in zip(keys, combo):
param_strings.append(f"{key}={value}")
combinations.append(" ".join(param_strings))
return combinations
# Step-by-step:
# 1. Generate all combinations for each encoder
# 2. For each combination, create a GStreamer pipeline string
# 3. Start each pipeline with latency tracing enabled
# 3.1 Monitor CPU, GPU and memory usage during each pipeline run (nah, later, maybe)
# 4. Start latency parsing script after each pipeline and store results in a pandas dataframe:
# - two key columns: encoder name, parameters string
# 5. Run PSNR check after each pipeline and add results in the dataframe
# 6. Save dataframe to CSV file
import pandas as pd
qualityDataframe = pd.DataFrame()
latencyDataframe = pd.DataFrame()
def run_pipeline(pipeline):
import subprocess
print("Running pipeline:")
print(pipeline)
with open("pipeline-log.txt", "w") as f:
proc = subprocess.run(pipeline, shell=True, stdout=f, stderr=subprocess.STDOUT, text=True)
print(f"Pipeline finished with return code: {proc.returncode}")
with open("pipeline-log.txt", "r") as f:
out = f.read()
print(out)
if proc.returncode != 0:
raise Exception("Pipeline failed, see log for details")
def run_autotest():
global qualityDataframe, latencyDataframe
encoders = generateEncoderStrings()
for encoder, combinations in encoders.items():
for params in combinations:
for profile in profiles:
for idx in range(len(videos)):
filename = "autotest-" + encoder + "-" + profile + "-test-" + str(idx) + ".mp4"
pipeline = Pipeline()
pipeline = (
pipeline.add_tracing()
.add_source(makeVideoSrc(idx))
.add_encoder(encoder, params.split(" "))
.add_profile(profile)
.to_file(filename)
)
print(pipeline.pipeline)
try:
run_pipeline(pipeline.pipeline)
except Exception as e:
print(f"Error occurred: {e}")
continue
psnr_metrics, ssim_metrics = qa.run_quality_check(
videos[idx],
filename,
video_info[videos[idx]] + " " + psnr_check[encoder]
)
dfPsnr = qa.parse_quality_report(psnr_metrics, ssim_metrics)
print("-----")
dfLatency = getLatencyTable(latency_filename)
columnsQ = pd.MultiIndex.from_tuples(
[(encoder, profile, params, col) for col in dfPsnr.columns]
)
columnsLatency = pd.MultiIndex.from_tuples(
[(encoder, profile, params, col) for col in dfLatency.columns]
)
dfPsnr.columns = columnsQ
dfLatency.columns = columnsLatency
qualityDataframe = pd.concat([qualityDataframe, dfPsnr], axis=1)
latencyDataframe = pd.concat([latencyDataframe, dfLatency], axis=1)
print("=====")
print("Current results:")
print(dfPsnr)
print(dfLatency)
def run_timetracer():
import time
start_time = time.time()
run_autotest()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Total execution time: {elapsed_time} seconds")
run_timetracer()