From 31671d7b30a9452e1374b614c5a8f2cec1bee30e Mon Sep 17 00:00:00 2001 From: Ian-Goodall-Halliwell <90027009+Ian-Goodall-Halliwell@users.noreply.github.com> Date: Mon, 13 Jan 2025 20:05:15 -0500 Subject: [PATCH] Update utils_analysis.py --- src/functions/utils_analysis.py | 129 +++++++++++++++++++++++++------- 1 file changed, 102 insertions(+), 27 deletions(-) diff --git a/src/functions/utils_analysis.py b/src/functions/utils_analysis.py index 60dc1c6..7db1e5c 100644 --- a/src/functions/utils_analysis.py +++ b/src/functions/utils_analysis.py @@ -885,6 +885,84 @@ def process_feature( return data_mahalanobis, log +def w_score(data_mahalanobis, cov_normative, px_demo): + # Initialize normative_data + normative_data = { + k: np.zeros([data_mahalanobis["data_px"][0].shape[0], 4]) + for k in data_mahalanobis["feat"] + } + + for e, feat in enumerate(data_mahalanobis["feat"]): + alldata = data_mahalanobis["data_cn"][e] + numvertex = alldata.shape[1] + + for i in range(numvertex): + data_vertex = alldata[:, i] + data_norms = data_mahalanobis["df_cn"][e][cov_normative].to_numpy() + for t in range(data_norms.shape[0]): + data_norms[t, :][data_norms[t, :] == "M"] = 1 + data_norms[t, :][data_norms[t, :] == "F"] = 0 + data_norms = np.asarray(data_norms, dtype=float) + # Create a boolean mask where True indicates rows without NaNs + mask = ~np.isnan(data_norms).any(axis=1) + + # Use the mask to filter out rows with NaNs + data_norms = data_norms[mask] + data_vertex = np.asarray(data_vertex[mask]) + + # Add a column of ones to data_norms for the intercept term + X = np.hstack([np.ones((data_norms.shape[0], 1)), data_norms]) + + # Compute the coefficients using the normal equation + coefficients = np.linalg.inv(X.T @ X) @ X.T @ data_vertex + + # Extract the intercept and coefficients + intercept = coefficients[0][0] + age_coefficient = coefficients[1][0] + sex_coefficient = coefficients[2][0] + + # Calculate the residuals and their standard deviation + residuals = data_vertex - X @ coefficients + std_residuals = np.std(residuals) + + # Append the results to normative_data + normative_data[feat][i] = np.array( + [intercept, age_coefficient, sex_coefficient, std_residuals] + ) + + normative_data[feat] = np.array(normative_data[feat]) + print("here") + + # Initialize w_scored_data + w_scored_data = { + k: np.zeros([data_mahalanobis["data_px"][0].shape[0]]) + for k in data_mahalanobis["feat"] + } + + for e, feat in enumerate(data_mahalanobis["feat"]): + data_ = data_mahalanobis["data_px"][e] + betas = normative_data[feat][:, :3] + std_residuals = normative_data[feat][:, 3] + px_demo_data = px_demo[cov_normative].to_numpy() + for t in range(px_demo_data.shape[0]): + px_demo_data[t, :][px_demo_data[t, :] == "M"] = 1 + px_demo_data[t, :][px_demo_data[t, :] == "F"] = 0 + px_demo_data = px_demo_data.squeeze() + # print("here") + dataoutput = np.zeros_like(data_) + for i in range(data_.shape[0]): + ns = data_[i] + ns_pred = ( + betas[i, 0] + + betas[i, 1] * px_demo_data[0] + + betas[i, 2] * px_demo_data[1] + ) + dataoutput[i] = (ns - ns_pred) / std_residuals[i] + w_scored_data[feat] = dataoutput + + return normative_data, w_scored_data + + def run_analysis( *, px_sid: str, @@ -1049,34 +1127,31 @@ def run_analysis( if len(data_mahalanobis["feat"]) < 2: continue if cov_normative: - for e, feat in enumerate(data_mahalanobis["feat"]): - alldata = data_mahalanobis["data_cn"][e] - numvertex = alldata.shape[1] - for i in range(numvertex): - data_vertex = alldata[:, i] - data_norms = data_mahalanobis["df_cn"][e][cov_normative].to_numpy() - for t in range(data_norms.shape[0]): - data_norms[t, :][data_norms[t, :] == "M"] = 1 - data_norms[t, :][data_norms[t, :] == "F"] = 0 - data_norms = np.asarray(data_norms, dtype=float) - # Create a boolean mask where True indicates rows without NaNs - mask = ~np.isnan(data_norms).any(axis=1) - - # Use the mask to filter out rows with NaNs - data_norms = data_norms[mask] - data_vertex = np.asarray(data_vertex[mask]) - - # Fit a linear regression model - model = LinearRegression() - model.fit(data_norms, data_vertex) - print("Intercept (β0):", model.intercept_) - print("Age coefficient (β1):", model.coef_[0][0]) - print("Sex coefficient (β2):", model.coef_[0][1]) - print( - "STD of residuals:", - np.std(model.predict(data_norms) - data_vertex), + data_mahalanobis_asymmety = copy.deepcopy(data_mahalanobis) + normative_data, w_scored_data = w_score( + data_mahalanobis_asymmety, cov_normative, px_demo + ) + + for en, x in enumerate(data_mahalanobis_asymmety["data_cn"]): + if x.shape[2] > 1: + previousshape = x.shape[1] + data_mahalanobis_asymmety["data_cn"][en] = ( + data_mahalanobis_asymmety["data_cn"][en].reshape(x.shape[0], -1) + ) + for en, x in enumerate(data_mahalanobis_asymmety["data_px"]): + if x.shape[1] > 1: + previousshape = x.shape[0] + + data_mahalanobis_asymmety["data_px"][en] = ( + data_mahalanobis_asymmety["data_px"][en].reshape(-1) ) - print("here") + + data_mahalanobis_asymmety["data_px"] = [ + compute_asymmetry(x.reshape(2, -1)[0, :], x.reshape(2, -1)[1, :]) + for x in data_mahalanobis_asymmety["data_px"] + ] + + print("here") # Analysis: mahalanobis distance res = _subject_mahalanobis(data=data_mahalanobis, analyses=analyses)