[autotest] added sudo access check

This commit is contained in:
Artur Mukhamadiev 2025-10-12 17:34:51 +03:00
parent 76f852f856
commit 92c785a29a
4 changed files with 131 additions and 51 deletions

View File

@ -1,5 +1,6 @@
import yaml
class AutotestConfig:
def __init__(self, path='autotest-conf.yaml'):
with open(path, 'r') as file:
@ -60,3 +61,7 @@ class AutotestConfig:
@property
def docker_run_string(self):
return self.data["docker_run_string"]
@property
def results_dir(self):
return self.data.get("results_dir", "results/")

View File

@ -1,5 +1,6 @@
from functools import wraps
def log_args_decorator(func):
"""
A decorator that logs the arguments passed to a function.
@ -15,3 +16,40 @@ def log_args_decorator(func):
print(f"Function '{func.__name__}' returned: {result}")
return result
return wrapper
def sudo_check(file):
import pipelineExec as pe
import subprocess
import logging
import emoji
import os
uauth_pass = os.getenv("UAUTH")
if uauth_pass is None:
logging.fatal(emoji.emojize(f"""
:warning: Please, create .env file with UAUTH variable. Before running {file}.
UAUTH variable should match your password for sudo access to the docker.
example:
{os.getcwd()}$ cat .env
UAUTH=123
""")
)
raise Exception(emoji.emojize(
":cross_mark: Password isn't set properly"))
else:
logging.debug(emoji.emojize(f":warning: pass:{uauth_pass} :warning:"))
proc = subprocess.Popen('sudo -S ls', shell=True,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, text=True)
pe.passwordAuth(proc)
code = proc.wait()
if proc.returncode != 0:
logging.fatal(emoji.emojize(f"""
:cross_mark: Password isn't correct in UAUTH
error={proc.returncode}
"""))
raise Exception(emoji.emojize(":cross_mark: Password isn't correct"))
logging.info(emoji.emojize(":check_mark_button: Sudo access verified"))

93
gstreamerAutotest.py Normal file → Executable file
View File

@ -2,16 +2,23 @@
from itertools import product
import qa
from latencyParse import getLatencyTable
import os, stat, subprocess
import os
import stat
import subprocess
import pandas as pd
from extra import log_args_decorator
from extra import log_args_decorator, sudo_check
from autotestConfig import AutotestConfig
from dotenv import load_dotenv
import emoji
import logging
# Configure logging to show informational messages
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s')
config = None
def get_config():
global config
if config is None:
@ -27,6 +34,8 @@ def get_config():
# - two key columns: encoder name, parameters string
# 5. Run PSNR check after each pipeline and add results in the dataframe
# 6. Save dataframe to CSV file
class Pipeline:
def __init__(self):
self.pipeline = "gst-launch-1.0 -e "
@ -47,14 +56,15 @@ class Pipeline:
def __add_tee(self, encoder):
pass
#self.pipeline += "tee name=t t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! filesink location=\"base-autotest.yuv\" "
# self.pipeline += "tee name=t t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! filesink location=\"base-autotest.yuv\" "
def add_encoder(self, encoder, params):
self.pipeline += get_config().videoconvert[encoder] + " ! "
self.pipeline += "capsfilter caps=video/x-raw,format=" + get_config().formats[encoder] + " ! "
#self.__add_tee(encoder)
self.pipeline += "capsfilter caps=video/x-raw,format=" + \
get_config().formats[encoder] + " ! "
# self.__add_tee(encoder)
self.options += " ".join(params) + " "
#self.pipeline += "t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! "
# self.pipeline += "t. ! queue max-size-time=5000000000 max-size-bytes=100485760 max-size-buffers=1000 ! "
self.pipeline += encoder + " "
self.pipeline += " ".join(params) + " "
return self
@ -75,7 +85,7 @@ def makeVideoSrc(videoName):
get_config().videos[videoName] +
get_config().videosrc["raw"][1] +
get_config().gst_video_info[videoName]
)
)
def generateEncoderStrings():
@ -110,19 +120,19 @@ def generate_combinations(config_dict):
return combinations
qualityDataframe = pd.DataFrame()
latencyDataframe = pd.DataFrame()
def execPermissions(scriptFile = "to_exec.sh"):
def execPermissions(scriptFile="to_exec.sh"):
current_permissions = os.stat(scriptFile).st_mode
new_permissions = current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
os.chmod(scriptFile, new_permissions)
def writeToExecFile(contents, file):
with open(file, "w") as f:
f.write(str(contents))
execPermissions(file)
def is_docker(func):
def wrapper(pipeline):
script_name = "to_exec.sh"
@ -134,35 +144,41 @@ def is_docker(func):
func(pipeline)
return wrapper
def is_sudo(pipeline):
if pipeline.startswith("sudo"):
return True
return True
return False
def passwordAuth(proc):
password = os.getenv("UAUTH")
if password is not None:
proc.communicate(password)
def printLog(file):
with open(file, "r") as f:
out = f.read()
print(out)
@is_docker
@log_args_decorator
def run_pipeline(pipeline):
logfile = "pipeline-log.txt"
with open(logfile, "w") as f:
proc = subprocess.Popen(pipeline, shell=True,
stdin=subprocess.PIPE, stdout=f,
stderr=subprocess.STDOUT, text=True)
stdin=subprocess.PIPE, stdout=f,
stderr=subprocess.STDOUT, text=True)
if is_sudo(pipeline):
passwordAuth(proc)
code = proc.wait()
printLog(logfile)
if proc.returncode != 0:
raise Exception("Pipeline failed, see log for details")
raise Exception(emoji.emojize(
":cross_mark: Pipeline failed, see log for details"))
def time_trace(func):
def wrapper():
@ -171,11 +187,16 @@ def time_trace(func):
func()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Total execution time: {elapsed_time} seconds")
print(emoji.emojize(
f":alarm_clock: Total execution time: {elapsed_time} seconds"))
return wrapper
@time_trace
def run_autotest():
qualityDataframe = pd.DataFrame()
latencyDataframe = pd.DataFrame()
encoders = generateEncoderStrings()
for encoder, combinations in encoders.items():
qualityDataframe = pd.DataFrame()
@ -184,7 +205,8 @@ def run_autotest():
for profile in get_config().profiles:
for videoName, videoPath in get_config().videos.items():
for _ in range(get_config().repeats):
filename = "autotest-" + encoder + "-" + profile + "-test-" + videoName + ".mp4"
filename = "autotest-" + encoder + "-" + \
profile + "-test-" + videoName + ".mp4"
pipeline = Pipeline()
pipeline = (
pipeline.add_tracing()
@ -193,36 +215,49 @@ def run_autotest():
.add_profile(profile)
.to_file(filename)
)
print(pipeline.pipeline)
logging.debug(pipeline.pipeline)
try:
run_pipeline(pipeline.pipeline)
except Exception as e:
print(f"Error occurred: {e}")
logging.error(emoji.emojize(
f":cross_mark: Error occurred: {e}"))
continue
psnr_metrics, ssim_metrics = qa.run_quality_check(
videoPath,
filename,
get_config().video_info[videoName] + " " + get_config().psnr_check[encoder]
get_config().video_info[videoName] +
" " + get_config().psnr_check[encoder]
)
dfPsnr = qa.parse_quality_report(psnr_metrics, ssim_metrics)
dfPsnr = qa.parse_quality_report(
psnr_metrics, ssim_metrics)
print("-----")
dfLatency = getLatencyTable(get_config().latency_filename)
dfLatency = getLatencyTable(
get_config().latency_filename)
columnsQ = pd.MultiIndex.from_tuples(
[(encoder, profile, videoName, params, col) for col in dfPsnr.columns]
[(encoder, profile, videoName, params, col)
for col in dfPsnr.columns]
)
columnsLatency = pd.MultiIndex.from_tuples(
[(encoder, profile, videoName, params, col) for col in dfLatency.columns]
[(encoder, profile, videoName, params, col)
for col in dfLatency.columns]
)
dfPsnr.columns = columnsQ
dfLatency.columns = columnsLatency
qualityDataframe = pd.concat([qualityDataframe, dfPsnr], axis=1)
latencyDataframe = pd.concat([latencyDataframe, dfLatency], axis=1)
print("=====")
qualityDataframe = pd.concat(
[qualityDataframe, dfPsnr], axis=1)
latencyDataframe = pd.concat(
[latencyDataframe, dfLatency], axis=1)
print("="*50)
print("Current results:")
print(dfPsnr)
print(dfLatency)
qualityDataframe.to_csv(f"qualityResults{encoder}.csv")
latencyDataframe.to_csv(f"latencyDataframe{encoder}.csv")
qualityDataframe.to_csv(
get_config().results_dir + f"qualityResults{encoder}.csv")
latencyDataframe.to_csv(
get_config().results_dir + f"latencyDataframe{encoder}.csv")
if __name__ == "__main__":
load_dotenv()
sudo_check(__file__)
run_autotest()

View File

@ -1,5 +1,6 @@
contourpy==1.3.3
cycler==0.12.1
emoji==2.15.0
fonttools==4.60.1
kiwisolver==1.4.9
matplotlib==3.10.7
@ -9,6 +10,7 @@ pandas==2.3.3
pillow==11.3.0
pyparsing==3.2.5
python-dateutil==2.9.0.post0
python-dotenv==1.1.1
pytz==2025.2
PyYAML==6.0.3
six==1.17.0