import numpy as np
import torch
import torch.nn as nn
from DashAI.back.core.enums.metrics import LevelEnum, SplitEnum
from DashAI.back.core.schema_fields import (
BaseSchema,
enum_field,
int_field,
none_type,
optimizer_float_field,
optimizer_int_field,
schema_field,
)
from DashAI.back.core.utils import MultilingualString
from DashAI.back.dataloaders.classes.dashai_dataset import DashAIDataset
from DashAI.back.models.regression_model import RegressionModel
from DashAI.back.models.utils import DEVICE_ENUM, DEVICE_PLACEHOLDER, DEVICE_TO_IDX
class MLPRegressorSchema(BaseSchema):
"""PyTorch MLP Regressor Schema."""
hidden_size: schema_field(
optimizer_int_field(ge=1),
placeholder={
"optimize": False,
"fixed_value": 5,
"lower_bound": 1,
"upper_bound": 15,
},
description=MultilingualString(
en="Number of neurons in the hidden layer.",
es="Número de neuronas en la capa oculta.",
),
alias=MultilingualString(en="Hidden size", es="Tamaño oculto"),
) # type: ignore
activation: schema_field(
enum_field(enum=["relu", "tanh", "sigmoid", "identity"]),
placeholder="relu",
description=MultilingualString(
en="Activation function.",
es="Función de activación.",
),
alias=MultilingualString(en="Activation", es="Activación"),
) # type: ignore
learning_rate: schema_field(
optimizer_float_field(ge=1e-6, le=1.0),
placeholder={
"optimize": False,
"fixed_value": 0.001,
"lower_bound": 1e-6,
"upper_bound": 1.0,
},
description=MultilingualString(
en="Initial learning rate for the optimizer.",
es="Tasa de aprendizaje inicial para el optimizador.",
),
alias=MultilingualString(en="Learning rate", es="Tasa de aprendizaje"),
) # type: ignore
epochs: schema_field(
optimizer_int_field(ge=1),
placeholder={
"optimize": False,
"fixed_value": 5,
"lower_bound": 1,
"upper_bound": 15,
},
description=MultilingualString(
en="Total number of training passes over the dataset.",
es="Número total de pasadas de entrenamiento sobre el conjunto de datos.",
),
alias=MultilingualString(en="Epochs", es="Épocas"),
) # type: ignore
batch_size: schema_field(
none_type(int_field(ge=1)),
placeholder=32,
description=MultilingualString(
en=(
"Number of samples per gradient update during training. "
"If greater than dataset size or None, uses full dataset."
),
es=(
"Número de muestras por actualización de gradiente durante el "
"entrenamiento. Si es mayor que el tamaño del dataset o None, "
"usa el dataset completo."
),
),
alias=MultilingualString(en="Batch size", es="Tamaño de lote"),
) # type: ignore
device: schema_field(
enum_field(enum=DEVICE_ENUM),
placeholder=DEVICE_PLACEHOLDER,
description=MultilingualString(
en="Hardware device (CPU/GPU).",
es="Dispositivo de hardware (CPU/GPU).",
),
alias=MultilingualString(en="Device", es="Dispositivo"),
) # type: ignore
log_train_every_n_epochs: schema_field(
none_type(int_field(ge=1)),
placeholder=1,
description=MultilingualString(
en=(
"Log metrics for train split every n epochs during training. "
"If None, it won't log per epoch."
),
es=(
"Registrar métricas del split de entrenamiento cada n épocas. "
"Si es None, no registrará por época."
),
),
alias=MultilingualString(
en="Log train every N epochs", es="Registrar entrenamiento cada N épocas"
),
) # type: ignore
log_train_every_n_steps: schema_field(
none_type(int_field(ge=1)),
placeholder=None,
description=MultilingualString(
en=(
"Log metrics for train split every n steps during training. "
"If None, it won't log per step."
),
es=(
"Registrar métricas del split de entrenamiento cada n pasos. "
"Si es None, no registrará por paso."
),
),
alias=MultilingualString(
en="Log train every N steps", es="Registrar entrenamiento cada N pasos"
),
) # type: ignore
log_validation_every_n_epochs: schema_field(
none_type(int_field(ge=1)),
placeholder=1,
description=MultilingualString(
en=(
"Log metrics for validation split every n epochs during training. "
"If None, it won't log per epoch."
),
es=(
"Registrar métricas del split de validación cada n épocas. "
"Si es None, no registrará por época."
),
),
alias=MultilingualString(
en="Log validation every N epochs", es="Registrar validación cada N épocas"
),
) # type: ignore
log_validation_every_n_steps: schema_field(
none_type(int_field(ge=1)),
placeholder=None,
description=MultilingualString(
en=(
"Log metrics for validation split every n steps during training. "
"If None, it won't log per step."
),
es=(
"Registrar métricas del split de validación cada n pasos. "
"Si es None, no registrará por paso."
),
),
alias=MultilingualString(
en="Log validation every N steps", es="Registrar validación cada N pasos"
),
) # type: ignore
class MLP(nn.Module):
def __init__(self, input_dim, hidden_size, activation_name):
super().__init__()
activations = {
"relu": nn.ReLU(),
"tanh": nn.Tanh(),
"sigmoid": nn.Sigmoid(),
"identity": nn.Identity(),
}
self.model = nn.Sequential(
nn.Linear(input_dim, hidden_size),
activations.get(activation_name, nn.ReLU()),
nn.Linear(hidden_size, 1),
)
def forward(self, x):
return self.model(x)
[docs]
class MLPRegression(RegressionModel):
SCHEMA = MLPRegressorSchema
DISPLAY_NAME: str = MultilingualString(
en="Multi-layer Perceptron (MLP) Regression",
es="Perceptrón Multicapa (MLP) Regresión",
)
DESCRIPTION: str = MultilingualString(
en="Neural network with multiple hidden layers for regression.",
es="Red neuronal con múltiples capas ocultas para regresión.",
)
COLOR: str = "#FF7043"
ICON: str = "Psychology"
[docs]
def __init__(self, **kwargs) -> None:
self.params = kwargs
self.device = (
f"cuda:{DEVICE_TO_IDX.get(kwargs.get('device'))}"
if DEVICE_TO_IDX.get(kwargs.get("device"), -1) >= 0
else "cpu"
)
self.model = None
def train(
self,
x_train: DashAIDataset,
y_train: DashAIDataset,
x_validation: DashAIDataset = None,
y_validation: DashAIDataset = None,
) -> "MLPRegression":
# 1. Prepare Data
x_values = self.prepare_dataset(x_train, is_fit=True).to_pandas().values
y_values = self.prepare_output(y_train, is_fit=True).to_pandas().values
X_tensor = torch.tensor(x_values, dtype=torch.float32).to(self.device)
y_tensor = (
torch.tensor(y_values, dtype=torch.float32).view(-1, 1).to(self.device)
)
# 2. Init Model & Optimizer
self.model = MLP(
input_dim=X_tensor.shape[1],
hidden_size=self.params.get("hidden_size", 100),
activation_name=self.params.get("activation", "relu"),
).to(self.device)
optimizer = torch.optim.Adam(
self.model.parameters(), lr=self.params.get("learning_rate", 0.001)
)
criterion = nn.MSELoss()
# 3. Training Loop using Epochs
total_epochs = self.params.get("epochs", 3)
batch_size = self.params.get("batch_size")
if batch_size is None or batch_size > X_tensor.size(0):
batch_size = X_tensor.size(0)
global_step = 0
for epoch in range(total_epochs):
self.model.train()
indices = torch.randperm(X_tensor.size(0))
for i in range(0, X_tensor.size(0), batch_size):
# Set model to train mode
self.model.train()
batch_idx = indices[i : i + batch_size]
train_loss = criterion(
self.model(X_tensor[batch_idx]), y_tensor[batch_idx]
)
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
# Increment global step counter
global_step += 1
# Set model to eval for metric calculation
self.model.eval()
# Train metrics per step
if self.log_train_every_n_steps and (
global_step % self.log_train_every_n_steps == 0
):
self.calculate_metrics(
split=SplitEnum.TRAIN,
level=LevelEnum.STEP,
x_data=x_train,
y_data=y_train,
log_index=global_step,
)
# Validation metrics per step
if (
self.log_validation_every_n_steps
and global_step % self.log_validation_every_n_steps == 0
):
self.calculate_metrics(
split=SplitEnum.VALIDATION,
level=LevelEnum.STEP,
x_data=x_validation,
y_data=y_validation,
log_index=global_step,
)
# Set model to eval for metric calculation
self.model.eval()
# Train metrics per epoch
if (
self.log_train_every_n_epochs
and (epoch + 1) % self.log_train_every_n_epochs == 0
):
self.calculate_metrics(
split=SplitEnum.TRAIN,
level=LevelEnum.EPOCH,
x_data=x_train,
y_data=y_train,
log_index=epoch + 1,
)
# Validation metrics per epoch
if (
self.log_validation_every_n_epochs
and (epoch + 1) % self.log_validation_every_n_epochs == 0
):
self.calculate_metrics(
split=SplitEnum.VALIDATION,
level=LevelEnum.EPOCH,
x_data=x_validation,
y_data=y_validation,
log_index=epoch + 1,
)
return self
def predict(self, x: DashAIDataset) -> np.ndarray:
self.model.eval()
x_proc = self.prepare_dataset(x, is_fit=False).to_pandas().values
x_tensor = torch.tensor(x_proc, dtype=torch.float32).to(self.device)
with torch.no_grad():
return self.model(x_tensor).cpu().numpy().flatten()
def save(self, filename: str) -> None:
torch.save(
{
"state": self.model.state_dict(),
"params": self.params,
"input_dim": self.model.model[0].in_features,
},
filename,
)
@staticmethod
def load(filename: str) -> "MLPRegression":
data = torch.load(filename)
instance = MLPRegression(**data["params"])
# Rebuild the model architecture using saved input_dim
instance.model = MLP(
input_dim=data["input_dim"],
hidden_size=instance.params.get("hidden_size", 5),
activation_name=instance.params.get("activation", "relu"),
).to(instance.device)
# Load the trained weights
instance.model.load_state_dict(data["state"])
return instance