93 lines
3.0 KiB
Python
93 lines
3.0 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from interpolationConfiguration import InterpolConfig
|
|
from scipy.stats import norm
|
|
from scipy.optimize import fsolve
|
|
import numpy as np
|
|
import logging
|
|
import matplotlib.pyplot as plt
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
def mix_csv_column_pandas(filename, index_col=0, value_col=1):
|
|
"""
|
|
Read a CSV file, mix up values in the specified column, and overwrite the file using pandas.
|
|
|
|
Args:
|
|
filename (str): Path to the CSV file
|
|
index_col (int): Column index for identifiers (default: 0)
|
|
value_col (int): Column index for values to mix (default: 1)
|
|
"""
|
|
# Read CSV
|
|
df = pd.read_csv(filename, header=None)
|
|
|
|
# Mix values in the specified column
|
|
mixed_values = df[value_col].sample(frac=1).reset_index(drop=True)
|
|
df[value_col] = mixed_values
|
|
|
|
# Save back to same file
|
|
df.to_csv(filename, index=False, header=False)
|
|
|
|
class Interpol:
|
|
"""
|
|
Huhu, interpol
|
|
Interpolation of the uncompleted distribution
|
|
"""
|
|
def __init__(self, config='shit.yml'):
|
|
self.config = InterpolConfig(config)
|
|
self.available_data = pd.read_csv(self.config.available_data).iloc[:,1]
|
|
pass
|
|
|
|
|
|
def save(self, res: np.array):
|
|
array = [[1] * len(res),
|
|
res]
|
|
df = pd.DataFrame(array).transpose()
|
|
df.to_csv(self.config.out_file, header=False, index=False)
|
|
|
|
# to interpolate data we first must find "true mean"
|
|
# for this we would take min value from csv, max value from image
|
|
# and take as granted that we have len(data)/total_points already there
|
|
def interpolate_part_of_data():
|
|
pass
|
|
|
|
def normal_dist(self):
|
|
mu = self.maximum_likelihood()
|
|
res = np.random.normal(loc=mu, scale=8, size=150)
|
|
_ = plt.figure(figsize=(10, 6), dpi=300)
|
|
plt.hist(res)
|
|
plt.title('Interpolated results')
|
|
plt.show()
|
|
|
|
res = res[res >= self.config.available_limit]
|
|
res = np.append(res, self.available_data, axis=0)
|
|
plt.hist(res)
|
|
plt.title('Combined results')
|
|
plt.show()
|
|
self.save(res)
|
|
|
|
|
|
|
|
|
|
def maximum_likelihood(self):
|
|
def equation(mu, A, B, dAC, n_known, total):
|
|
prob_AB = norm.cdf((B-mu)/dAC) - norm.cdf((A-mu)/dAC)
|
|
return prob_AB - n_known/total
|
|
|
|
# Your values
|
|
A = self.available_data.min() # left boundary
|
|
B = float(self.config.available_limit) # right boundary of known data
|
|
C = np.max(self.config.image_data) # right boundary of distribution
|
|
dAC = self.config.std_source.iloc[:,1].std() # standard deviation
|
|
logging.info(f"dAC={dAC}")
|
|
n_known = len(self.available_data)
|
|
total = n_known * 3
|
|
logging.info(f"A={A}; B={B}; C={C}")
|
|
# Solve for mu
|
|
mu_estimate = fsolve(equation, x0=B, args=(A, B, dAC, n_known, total))[0]
|
|
logging.info(f"Estimated mean: {mu_estimate:.3f}")
|
|
return mu_estimate
|
|
|
|
a = Interpol()
|
|
a.normal_dist() |