gstAutotest/latencyParse.py
Artur Mukhamadiev 8b9190bb86 [init] just move from pipelines repo
:Release Notes:
-

:Detailed Notes:
-

:Testing Performed:
-

:QA Notes:
-

:Issues Addressed:
-
2025-10-11 18:41:28 +03:00

97 lines
3.2 KiB
Python

#!/usr/bin/python3
import pandas as pd
import numpy as np
# Idea is next:
# on set of experiments we are calculating all latency information -> each element avg, std, max numbers, total is not calculated, because it requires
# additional parsing for parallel branches (from tee)
# Ideally we would write data to table
idxCache = dict()
def findWord(words, wordToSearch):
global idxCache
if wordToSearch in idxCache:
for idx in idxCache[wordToSearch]:
if idx < len(words) and words[idx].startswith(wordToSearch):
return words[idx]
else:
if idx >= len(words):
print(f"ERROR: trying to access index={idx} while: {words}")
for word in words:
if word.startswith(wordToSearch):
idx = words.index(word)
if not wordToSearch in idxCache:
idxCache[wordToSearch] = []
idxCache[wordToSearch].append(idx)
return words[idx]
return ""
# taken with love from GStreamerLatencyPlotter implementation
def readAndParse(filename):
result = dict()
global idxCache
with open(filename, "r") as latencyFile:
lines = latencyFile.readlines()
for line in lines:
if line.find("new format string") != -1:
continue
words = line.split()
if not words[len(words) - 1].startswith("ts="):
continue
def findAndRemove(wordToSearch):
res = findWord(words, wordToSearch)
res = res[res.find(")") + 1:len(res) - 1]
return res
name = findWord(words, "element=(string)")
if name == "":
name = findWord(words, "src-element=(string)")
if name == "":
continue
src = findAndRemove("src=(string)")
name = name[name.find(")") + 1:len(name) - 1]
if name not in result:
result[name] = {"latency": [], "ts": []}
timeWord = findAndRemove("time=(guint64)")
tsWord = findAndRemove("ts=(guint64)")
result[name]["latency"].append(
int(timeWord)/1e6) # time=(guint64)=14
result[name]["ts"].append(int(tsWord)/1e9) # ts=(guint64)=12
# drop cache for future runs
idxCache = dict()
return result
def getLatencyTable(filename):
parsed = readAndParse(filename)
df = pd.DataFrame(parsed)
print(df)
latency_row = df.loc['latency']
ts_list = df.loc['ts']
avg_latency = latency_row.apply(np.mean)
median_latency = latency_row.apply(np.median)
max_latency = latency_row.apply(np.max)
std_latency = latency_row.apply(np.std)
dt_max_latency = dict()
min_timestamp = ts_list.apply(np.min)
for column in df.columns:
max_index = np.argmax(latency_row[column])
dt = ts_list[column][max_index] - min_timestamp.min()
dt_max_latency[column] = dt
df_dt_max = pd.Series(dt_max_latency)
resultDf = pd.concat(
[df_dt_max, max_latency, avg_latency, median_latency, std_latency], axis=1)
resultDf.columns = ['dTmax', 'max', 'avg', 'median', 'std']
print(resultDf)
return resultDf
# getLatencyTable("latency_traces-x264enc-kpop-test-10.log")