"I am working on developing a Generative Adversarial Network (GAN) with the aim of generating new microstructures based on their characteristics. The objective is to create a microstructure using a given characteristic, provided to the GAN in vector form. This process is implemented using a database containing 40,000 pairs of microstructures and their corresponding characteristics. I have already coded a Wasserstein GAN (WGAN) that successfully generates coherent microstructures from the database, although it currently lacks a connection to the specified characteristics. Additionally, I have coded a conditional GAN (cGAN) that operates on the MNIST dataset. However, I require your assistance in merging these two code structures. Thank you very much for any help you can provide!
##cGANimport numpy as npimport matplotlib.pyplot as pltimport torchfrom torch import nnfrom torch.nn import functional as Ffrom torch.utils.data import DataLoaderfrom torchvision import datasets, transformsfrom torchvision.utils import make_gridfrom tqdm import tqdm# Common configbatch_size = 64# Generator configsample_size = 100 # Random sample sizeg_alpha = 0.01 # LeakyReLU alphag_lr = 1.0e-4 # Learning rate# Discriminator configd_alpha = 0.01 # LeakyReLU alphad_lr = 1.0e-4 # Learning rate# Data Loader for MNISTtransform = transforms.ToTensor()dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)dataloader = DataLoader(dataset, batch_size=batch_size, drop_last=True)# Coverts conditions into feature vectorsclass Condition(nn.Module): def __init__(self, alpha: float): super().__init__() # From one-hot encoding to features: 10 => 784 self.fc = nn.Sequential( nn.Linear(10, 784), nn.BatchNorm1d(784), nn.LeakyReLU(alpha)) def forward(self, labels: torch.Tensor): # One-hot encode labels x = F.one_hot(labels, num_classes=10) # From Long to Float x = x.float() # To feature vectors return self.fc(x)# Reshape helperclass Reshape(nn.Module): def __init__(self, *shape): super().__init__() self.shape = shape def forward(self, x): return x.reshape(-1, *self.shape)# Generator networkclass Generator(nn.Module): def __init__(self, sample_size: int, alpha: float): super().__init__() # sample_size => 784 self.fc = nn.Sequential( nn.Linear(sample_size, 784), nn.BatchNorm1d(784), nn.LeakyReLU(alpha)) # 784 => 16 x 7 x 7 self.reshape = Reshape(16, 7, 7) # 16 x 7 x 7 => 32 x 14 x 14 self.conv1 = nn.Sequential( nn.ConvTranspose2d(16, 32, kernel_size=5, stride=2, padding=2, output_padding=1, bias=False), nn.BatchNorm2d(32), nn.LeakyReLU(alpha)) # 32 x 14 x 14 => 1 x 28 x 28 self.conv2 = nn.Sequential( nn.ConvTranspose2d(32, 1, kernel_size=5, stride=2, padding=2, output_padding=1, bias=False), nn.Sigmoid()) # Random value sample size self.sample_size = sample_size # To convert labels into feature vectors self.cond = Condition(alpha) def forward(self, labels: torch.Tensor): # Labels as feature vectors c = self.cond(labels) # Batch size is the number of labels batch_size = len(labels) # Generate random inputs z = torch.randn(batch_size, self.sample_size) # Inputs are the sum of random inputs and label features x = self.fc(z) # => 784 x = self.reshape(x + c) # => 16 x 7 x 7 x = self.conv1(x) # => 32 x 14 x 14 x = self.conv2(x) # => 1 x 28 x 28 return x# Discriminator networkclass Discriminator(nn.Module): def __init__(self, alpha: float): super().__init__() # 1 x 28 x 28 => 32 x 14 x 14 self.conv1 = nn.Sequential( nn.Conv2d(1, 32, kernel_size=5, stride=2, padding=2, bias=False), nn.LeakyReLU(alpha)) # 32 x 14 x 14 => 16 x 7 x 7 self.conv2 = nn.Sequential( nn.Conv2d(32, 16, kernel_size=5, stride=2, padding=2, bias=False), nn.BatchNorm2d(16), nn.LeakyReLU(alpha)) # 16 x 7 x 7 => 784 self.fc = nn.Sequential( nn.Flatten(), nn.Linear(784, 784), nn.BatchNorm1d(784), nn.LeakyReLU(alpha), nn.Linear(784, 1)) # Reshape label features: 784 => 16 x 7 x 7 self.cond = nn.Sequential( Condition(alpha), Reshape(16, 7, 7)) def forward(self, images: torch.Tensor, labels: torch.Tensor, targets: torch.Tensor): # Label features c = self.cond(labels) # Image features + Label features => real or fake? x = self.conv1(images) # => 32 x 14 x 14 x = self.conv2(x) # => 16 x 7 x 7 prediction = self.fc(x + c) # => 1 loss = F.binary_cross_entropy_with_logits(prediction, targets) return loss# To save grid imagesdef save_image_grid(epoch: int, images: torch.Tensor, ncol: int): image_grid = make_grid(images, ncol) # Into a grid image_grid = image_grid.permute(1, 2, 0) # Channel to last image_grid = image_grid.cpu().numpy() # Into Numpy plt.imshow(image_grid) plt.xticks([]) plt.yticks([]) plt.savefig(f'generated_{epoch:03d}.jpg') plt.close()# Real / Fake targetsreal_targets = torch.ones(batch_size, 1)fake_targets = torch.zeros(batch_size, 1)# Generator and discriminatorgenerator = Generator(sample_size, g_alpha)discriminator = Discriminator(d_alpha)# Optimizersd_optimizer = torch.optim.Adam(discriminator.parameters(), lr=d_lr)g_optimizer = torch.optim.Adam(generator.parameters(), lr=g_lr)# Train loopfor epoch in range(100): d_losses = [] g_losses = [] for images, labels in tqdm(dataloader): # =============================== # Disciminator Network Training # =============================== # Images from MNIST are considered as real d_loss = discriminator(images, labels, real_targets) # Images from Generator are considered as fake d_loss += discriminator(generator(labels), labels, fake_targets) # Discriminator paramter update d_optimizer.zero_grad() d_loss.backward() d_optimizer.step() # =============================== # Generator Network Training # =============================== # Images from Generator should be as real as ones from MNIST g_loss = discriminator(generator(labels), labels, real_targets) # Generator parameter update g_optimizer.zero_grad() g_loss.backward() g_optimizer.step() # Keep losses for logging d_losses.append(d_loss.item()) g_losses.append(g_loss.item()) # Print loss print(epoch, np.mean(d_losses), np.mean(g_losses)) # Save generated images labels = torch.LongTensor(list(range(10))).repeat(8).flatten() save_image_grid(epoch, generator(labels), ncol=10)##WGANimport tensorflow as tffrom tensorflow.keras import layers, modelsfrom tensorflow.keras.optimizers import RMSpropfrom tensorflow.keras.layers import Dropout# from tensorflow.keras.constraints import ClipConstraintimport numpy as npimport pandas as pdfrom utils import load_parquet_files, wasserstein_loss, generator_loss, ClipConstraint, generate_batches, Conv2DCircularPaddingimport matplotlib.pyplot as pltimport osfrom vtk import vtkStructuredPoints, vtkXMLImageDataWriterimport vtkfrom vtkmodules.util import numpy_support# from sklearn.model_selection import train_test_splitclass Generator(models.Model): def __init__(self, latent_dim): super(Generator, self).__init__() self.latent_dim = latent_dim self.model = self.build_model() # def build_model(self): # model = models.Sequential() # model.add(layers.Dense(16 * 16 * 256, input_dim=self.latent_dim, activation='relu')) # model.add(layers.Reshape((16, 16, 256))) # La taille avant la convolution transposee # model.add(layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding='same', activation='relu')) # model.add(layers.BatchNormalization()) # Batch normalization # model.add(layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding='same', activation='relu')) # model.add(layers.BatchNormalization()) # Batch normalization # model.add(layers.Conv2DTranspose(32, kernel_size=4, strides=2, padding='same', activation='relu')) # model.add(layers.BatchNormalization()) # Batch normalization # # model.add(layers.Conv2DTranspose(16, kernel_size=4, strides=2, padding='same', activation='relu')) # # model.add(layers.Conv2DTranspose(1, kernel_size=4, strides=2, padding='same', activation='hard_sigmoid')) # model.add(layers.Conv2DTranspose(1, kernel_size=4, strides=2, padding='same', activation='sigmoid')) # return model def build_model(self): model = models.Sequential() model.add(layers.Dense(8 * 8 * 512, input_dim=self.latent_dim, activation='relu')) model.add(layers.Reshape((8, 8, 512))) model.add(layers.Conv2DTranspose(256, kernel_size=4, strides=2, padding='same', activation='relu')) model.add(layers.BatchNormalization()) model.add(Dropout(0.25)) model.add(layers.Conv2DTranspose(128, kernel_size=4, strides=2, padding='same', activation='relu')) model.add(layers.BatchNormalization()) model.add(layers.Conv2DTranspose(64, kernel_size=4, strides=2, padding='same', activation='relu')) model.add(layers.BatchNormalization()) model.add(Dropout(0.25)) model.add(layers.UpSampling2D()) model.add(layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')) model.add(layers.BatchNormalization()) model.add(layers.UpSampling2D()) # model.add(layers.Conv2D(1, kernel_size=3, padding='same', activation='hard_sigmoid')) model.add(layers.Conv2D(1, kernel_size=3, padding='same', activation='sigmoid')) return model def call(self, inputs): return self.model(inputs)class Discriminator(models.Model): def __init__(self, circ_pad): super(Discriminator, self).__init__() self.circ_pad = circ_pad self.model = self.build_model() # def build_model(self): # model = models.Sequential() # model.add(layers.Conv2D(64, kernel_size=4, strides=2, padding='same', kernel_constraint=ClipConstraint(0.5), input_shape=(256, 256, 1))) # model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.25)) # Ajout de Dropout # model.add(layers.Conv2D(128, kernel_size=4, strides=2, padding='same', kernel_constraint=ClipConstraint(0.5))) # model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.25)) # Ajout de Dropout # model.add(layers.Flatten()) # model.add(layers.Dense(1, activation='linear')) # return model # PADDING NORMAL def build_model(self): if not self.circ_pad : model = models.Sequential() model.add(layers.Conv2D(32, kernel_size=3, strides=1, padding='same', kernel_constraint=ClipConstraint(0.2), input_shape=(256, 256, 1))) model.add(layers.LeakyReLU(alpha=0.2)) model.add(layers.Conv2D(64, kernel_size=5, strides=2, padding='same', kernel_constraint=ClipConstraint(0.2))) model.add(layers.LeakyReLU(alpha=0.2)) model.add(layers.Conv2D(128, kernel_size=7, strides=2, padding='same', kernel_constraint=ClipConstraint(0.2))) model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.10)) model.add(layers.Conv2D(256, kernel_size=7, strides=2, padding='same', kernel_constraint=ClipConstraint(0.2))) model.add(layers.LeakyReLU(alpha=0.2)) model.add(layers.Conv2D(512, kernel_size=5, strides=2, padding='same', kernel_constraint=ClipConstraint(0.2))) model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.10)) model.add(layers.Flatten()) model.add(layers.Dense(1, activation='linear')) return model # PADDING CIRCULAIRE if self.circ_pad : model = models.Sequential() model.add(Conv2DCircularPadding(32, kernel_size=3, strides=1, input_shape=(256, 256, 1))) model.add(layers.LeakyReLU(alpha=0.2)) model.add(Conv2DCircularPadding(64, kernel_size=5, strides=2)) model.add(layers.LeakyReLU(alpha=0.2)) model.add(Conv2DCircularPadding(128, kernel_size=7, strides=2)) model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.10)) model.add(Conv2DCircularPadding(256, kernel_size=9, strides=2)) model.add(layers.LeakyReLU(alpha=0.2)) model.add(Conv2DCircularPadding(512, kernel_size=5, strides=2)) model.add(layers.LeakyReLU(alpha=0.2)) # model.add(Dropout(0.10)) model.add(layers.Flatten()) model.add(layers.Dense(1, activation='linear')) return model def call(self, inputs): return self.model(inputs)class GAN(models.Model): def __init__(self, generator, discriminator, data_path): super(GAN, self).__init__() self.generator = generator self.discriminator = discriminator self.compile_discriminator() self.compile_gan() self.data_path = data_path self.data = load_parquet_files(self.data_path, test = False) def compile_discriminator(self): self.discriminator.compile(loss=wasserstein_loss, optimizer=RMSprop(lr=0.000005), metrics=['accuracy']) self.discriminator.trainable = False def compile_gan(self): z = layers.Input(shape=(self.generator.latent_dim,)) fake_image = self.generator(z) validity = self.discriminator(fake_image) self.model = models.Model(z, validity) # self.model.compile(loss=wasserstein_loss, optimizer=RMSprop(lr=0.0001)) self.model.compile(loss=generator_loss, optimizer=RMSprop(lr=0.0005)) def generate_latent_points(self, latent_dim, n_samples): x_input = np.random.randn(latent_dim * n_samples) x_input = x_input.reshape(n_samples, latent_dim) return x_input def generate_latent_points_uniform(self, latent_dim, n_samples): x_input = np.random.uniform(-1, 1, size = (n_samples, latent_dim)) # x_input = x_input.reshape(n_samples, latent_dim) return x_input def generate_real_samples(self, n_samples): dfs = self.data dfs_array = [df.to_numpy() for df in dfs] # np.random.shuffle(dfs_array) sampled_indices = np.random.choice(len(dfs_array), size=n_samples, replace=False) # Sélectionner les arrays échantillonnés real_samples = [dfs_array[i] for i in sampled_indices] real_samples = np.stack(real_samples, axis=0) real_samples = np.expand_dims(real_samples, axis=-1) labels = -(np.ones((n_samples, 1))) return real_samples, labels def generate_and_save_samples(self, epoch, latent_dim, n_samples, output_dir): # Générer des exemples avec le générateur z = self.generate_latent_points(latent_dim, n_samples) generated_samples = self.generator.predict(z) # binary_generated_samples = (generated_samples > 0.5).astype(np.float32) for i in range(3): # np.save(os.path.join(output_dir, f'generated_example_epoch{epoch}_sample{i}.npy'), binary_generated_samples[i]) np.save(os.path.join(output_dir, f'generated_example_epoch{epoch}_sample{i}.npy'), generated_samples[i])# def train_gan(generator, discriminator, gan, latent_dim, n_epochs, n_batch, output_dir):# d_losses, g_losses = [], []# current_epoch = 0 # Ajoutez cette ligne## for epoch in range(n_epochs):# current_epoch += 1 # Ajoutez cette ligne# for _ in range(n_batch):# z = gan.generate_latent_points(latent_dim, n_batch)# X_fake = generator.predict(z)# # X_fake = tf.cast(X_fake > 0.5, tf.float32)# X_real, y_real = gan.generate_real_samples(n_samples = n_batch)## # Entraînement du discriminateur# d_loss_real = discriminator.train_on_batch(X_real, y_real)# d_loss_fake = discriminator.train_on_batch(X_fake, -np.ones((n_batch, 1)))# d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)## # Entraînement du générateur# z = gan.generate_latent_points(latent_dim, n_batch)# y_gan = np.ones((n_batch, 1))# g_loss = gan.model.train_on_batch(z, y_gan)## # Enregistrement des pertes pour la visualisation# d_losses.append(d_loss[0])# g_losses.append(g_loss)## # Affichage des résultats et sauvegarde des exemples générés# print(f"Epoch {current_epoch}, [D loss: {d_loss[0]}, acc.: {100 * d_loss[1]}], [G loss: {g_loss}]")# gan.generate_and_save_samples(current_epoch, latent_dim, n_batch, output_dir)## # Affichage des courbes d'entraînement# plot_training_history(d_losses, g_losses)def train_wgan(generator, discriminator, gan, latent_dim, n_epochs, n_critic, batch_size, output_dir, circ_pad): d_losses, g_losses = [], [] current_epoch = 0 # num_batches = int(np.ceil(len(gan.data) / batch_size)) for epoch in range(n_epochs): # génération des batchs batches = generate_batches(gan.data, batch_size) # num_batches = len(batches) current_epoch += 1 # for _ in range(batch_size): for batch in batches: # Update the critic (discriminator) multiple times for _ in range(n_critic): z = gan.generate_latent_points(latent_dim, batch_size) X_fake = generator.predict(z) # X_real, y_real = gan.generate_real_samples(n_samples=batch_size) # Expand dims and stacking # if current_epoch == 1 : # print(batch[0].shape) # real_sample_batch = np.array([np.stack(sample, axis=0) for sample in batch]) real_sample_batch = np.array([np.expand_dims(sample, axis = -1) for sample in batch]) # if current_epoch == 1 : # print(real_sample_batch[0].shape) X_real, y_real = real_sample_batch, -(np.ones((batch_size, 1))) d_loss_real = discriminator.train_on_batch(X_real, y_real) d_loss_fake = discriminator.train_on_batch(X_fake, np.ones((batch_size, 1))) # Use +1 as the target for fake samples # d_loss = 0.5 * np.add(d_loss_real, d_loss_fake) d_loss = np.mean(np.array(d_loss_fake) - np.array(d_loss_real)) # Wasserstein loss # Clip the weights of the discriminator if circ_pad : for layer in discriminator.model.layers: weights = layer.get_weights() weights = [np.clip(w, -0.2, 0.2) for w in weights] layer.set_weights(weights) # Update the generator z = gan.generate_latent_points(latent_dim, batch_size) y_gan = np.ones((batch_size, 1)) g_loss = gan.model.train_on_batch(z, y_gan) # # Record losses for visualization # d_losses.append(d_loss[0]) # g_losses.append(g_loss) # Record losses for visualization d_losses.append(-d_loss) # Negative of Wasserstein loss for the critic g_losses.append(g_loss) # Display results and save generated samples # print(f"Epoch {current_epoch}, [D loss: {d_loss[0]}, acc.: {100 * d_loss[1]}], [G loss: {g_loss}]") print(f"Epoch {current_epoch}, [D loss: {d_losses[-1]}], [G loss: {g_loss}]") gan.generate_and_save_samples(current_epoch, latent_dim, batch_size, output_dir) # Display training curves plot_training_history(d_losses, g_losses)def plot_training_history(d_losses, g_losses): plt.figure(figsize=(10, 5)) plt.plot(d_losses, label='Discriminator Loss', linestyle='--') plt.plot(g_losses, label='Generator Loss') plt.xlabel('Epoch') plt.ylabel('Loss') plt.legend() plt.title('WGAN Training History') plt.show()# Choix du padding :circ_pad = True# Définir le nombre d'époques et la taille du lotlatent_dim = 100n_epochs = 2n_batch = 32# data_path = '/Users/gregoirecolin/Documents/4A/Projet 4A/2023-06-26_projet_etudiants_mines_ML/data/preprocess_data_reduit' # Définir le chemin d'accès aux donnéesdata_path = '/Users/mathys/Documents/Projet 3A/preprocessed_data' # Définir le chemin d'accès aux donnéesoutput_directory = '/Users/mathys/Documents/Projet 3A/result_WGAN' # Remplacez par le chemin de votre choixif not os.path.exists(output_directory): os.makedirs(output_directory)# Créer les instances des classesgenerator = Generator(latent_dim)# generator.summary()discriminator = Discriminator(circ_pad)# discriminator.summary()gan = GAN(generator, discriminator, data_path)generator.summary()discriminator.summary()# Entraîner le GANtrain_wgan(generator, discriminator, gan, latent_dim, n_epochs, n_critic = 1, batch_size = n_batch, output_dir = output_directory, circ_pad = circ_pad)# Générer des exemples avec le générateur après l'entraînementz = gan.generate_latent_points(latent_dim=latent_dim, n_samples=n_batch)generated_samples = generator(z)# binary_generated_samples = tf.cast(generated_samples > 0.5, tf.float32)generator_weights = [layer.get_weights()[0].flatten() for layer in generator.layers if len(layer.get_weights()) > 0]discriminator_weights = [layer.get_weights()[0].flatten() for layer in discriminator.layers if len(layer.get_weights()) > 0]plt.figure(figsize=(10,5))for weights in generator_weights : plt.hist(weights, bins = 50, alpha = 0.5)plt.title('Histogramme des poids du générateur')plt.show()plt.figure(figsize=(10,5))for weights in discriminator_weights : plt.hist(weights, bins = 50, alpha = 0.5)plt.title('Histogramme des poids du discriminateur')plt.show()##utilsimport osimport pandas as pdimport tensorflow as tfimport numpy as npfrom tensorflow.keras import backendfrom tensorflow.keras.constraints import Constraintfrom tensorflow.keras import layersdef load_parquet_files(root_folder, test): dfs = [] # Si on veut juste un échantillon de données if test : k = 0 # Parcourir tous les sous-dossiers dans le chemin spécifié # Parcourir les dossiers dans data_path for folder in os.listdir(root_folder): folder_path = os.path.join(root_folder, folder) if os.path.isdir(folder_path): # Charger les fichiers parquet dans le dossier # Parcourir tous les fichiers dans le dossier for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) # Vérifier si le fichier est un fichier Parquet if filename.endswith(".parquet"): # Charger le fichier Parquet dans un DataFrame df = pd.read_parquet(file_path) # Ajouter le DataFrame à la liste dfs.append(df) if test : k+=1 if k >1000 : break return dfsdef generate_batches(data, batch_size): data_np = [df.to_numpy() for df in data] np.random.shuffle(data_np) batches = [data_np[i:i+batch_size] for i in range(0, len(data_np), batch_size)] if len(batches[-1]) != batch_size : batches.pop() return batchesdef wasserstein_loss(y_true, y_pred): # return tf.reduce_mean(y_true * y_pred) return backend.mean(y_true * y_pred)def generator_loss(y_true, y_pred): return -tf.reduce_mean(y_pred)class ClipConstraint(Constraint): def __init__(self, clip_value): self.clip_value = clip_value def __call__(self, weights): return tf.clip_by_value(weights, -self.clip_value, self.clip_value) def get_config(self): return{'clip_value': self.clip_value}class Conv2DCircularPadding(layers.Layer): def __init__(self, filters, kernel_size, strides=(1, 1), activation=None, **kwargs): super(Conv2DCircularPadding, self).__init__(**kwargs) self.conv = layers.Conv2D(filters, kernel_size, strides=strides, padding='valid', activation=activation) def call(self, input_tensor): # Taille du padding basée sur la taille du kernel pad_size = self.conv.kernel_size[0] - 1 half_pad = pad_size // 2 # Padding circulaire padded_input = tf.concat([input_tensor[:, -half_pad:, :], input_tensor, input_tensor[:, :half_pad, :]], axis=1) padded_input = tf.concat([padded_input[:, :, -half_pad:], padded_input, padded_input[:, :, :half_pad]], axis=2) # Application de la convolution return self.conv(padded_input) def get_config(self): config = super(Conv2DCircularPadding, self).get_config() config.update({"conv": self.conv}) return config
Top comments(0)
For further actions, you may consider blocking this person and/orreporting abuse