Source code for opencmp.helpers.post_processing

########################################################################################################################
# Copyright 2021 the authors (see AUTHORS file for full list).                                                         #
#                                                                                                                      #
# This file is part of OpenCMP.                                                                                        #
#                                                                                                                      #
# OpenCMP is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public  #
# License as published by the Free Software Foundation, either version 2.1 of the License, or (at your option) any     #
# later version.                                                                                                       #
#                                                                                                                      #
# OpenCMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied        #
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more  #
# details.                                                                                                             #
#                                                                                                                      #
# You should have received a copy of the GNU Lesser General Public License along with OpenCMP. If not, see             #
# <https://www.gnu.org/licenses/>.                                                                                     #
########################################################################################################################

from ..config_functions import ConfigParser
from ngsolve import GridFunction, Mesh, Parameter, VTKOutput, H1
from ..models import get_model_class, Model
from pathlib import Path
from os import remove
from typing import Optional, Union
from multiprocessing.pool import Pool
from multiprocessing import cpu_count


[docs]class PhaseFieldModelMimic: """ Helper class that mimics the attributes of the Model class that are used during saving and post-processing in order to be able to post-process saved phase field .sol files to .vtu. """ def __init__(self, model_to_copy: Model) -> None: """ Initializer Args: model_to_copy: The model for the general simulation. Needed in order to know the config parameter values. """ self.interp_ord = model_to_copy.interp_ord self.save_names = ['phi'] self.mesh = model_to_copy.mesh self.fes = H1(self.mesh, order=self.interp_ord)
[docs] def construct_gfu(self) -> GridFunction: """ Function to construct a phase field gridfunction. """ return GridFunction(self.fes)
[docs]def sol_to_vtu(config: ConfigParser, output_dir_path: str, model: Optional[Union[Model, PhaseFieldModelMimic]] = None, delete_sol_file: bool = False, allow_all_threads: bool = False) -> None: """ Function to take the output .sol files and convert them into .vtu for visualization. Args: config: The loaded config parser used by the model output_dir_path: The path to the folder in which the .sol files are, and where the .vtu files will be saved. model: The model that generated the .sol files. delete_sol_file: Bool to indicate if to delete the original .sol files after converting to .vtu, Default is False. allow_all_threads: Bool to indicate if to use all available threads (if there are enough files), Default is False. """ # Being run outside of run.py, so have to create model if model is None: # Load model model_name = config.get_item(['OTHER', 'model'], str) model_class = get_model_class(model_name) model = model_class(config, [Parameter(0.0)]) # Number of subdivisions per element subdivision = config.get_item(['VISUALIZATION', 'subdivision'], int) # NOTE: -1 is the value used whenever an int default is needed. if subdivision == -1: subdivision = model.interp_ord # Generate a list of all .sol files sol_path_generator = Path(output_dir_path+'sol/').rglob('*.sol') sol_path_list = [str(sol_path) for sol_path in sol_path_generator] # Number of files to convert n_files = len(sol_path_list) # Figure out the maximum number of threads at our disposal if allow_all_threads: # Number of threads on the machine that we have access to num_threads = cpu_count() else: # Number of threads to use as specified in the config file num_threads = config.get_item(['OTHER', 'num_threads'], int) # Number of threads to use # NOTE: No point of starting more threads than files, and also lets us depend on modulo math later. n_threads = min(n_files, num_threads) # Create gridfunctions, one per thread gfus = [model.construct_gfu() for _ in range(n_threads)] # Create a list to contain the .pvd entries output_list = ['' for _ in range(n_files)] # NOTE: We HAVE to use Pool, and not ThreadPool. ThreadPool causes seg faults on the VTKOutput call. with Pool(processes=n_threads) as pool: # Create the pool and start it. It will automatically take and run the next entry when it needs it a = [ pool.apply_async(_sol_to_vtu, (gfus[i % n_threads], sol_path_list[i], output_dir_path, model.save_names, delete_sol_file, subdivision, model.mesh) ) for i in range(n_files) ] # Iterate through each thread and get it's result when it's done for i in range(len(a)): # Grab the result string and insert it in the correct place in the output list output_list[i] = a[i].get() # Add the header and footer output_list.insert(0, '<?xml version=\"1.0\"?>\n<VTKFile type=\"Collection\" version=\"0.1\"\n' + 'byte_order=\"LittleEndian\"\ncompressor=\"vtkZLibDataCompressor\">\n<Collection>\n') output_list.append('</Collection>\n</VTKFile>') # Write each line to the file with open(output_dir_path + 'transient.pvd', 'a+') as file: for line in output_list: file.write(line)
[docs]def _sol_to_vtu(gfu: GridFunction, sol_path_str: str, output_dir_path: str, save_names: str, delete_sol_file: bool, subdivision: int, mesh: Mesh) -> str: """ Function that gets parallelized and does the actual sol-to-vtu conversion. Args: gfu: The grid function into which to load the .sol file sol_path_str: The path to the solve file to load output_dir_path: The path to the directory to save the .vtu into save_names: The names of the variables to save delete_sol_file: Whether or not to delete the sol file after subdivision: Number of subdivisions on each mesh element mesh: The mesh on which the gfu was solved. Returns: A string containing the entry for the .pvd file for this .vtu file. """ # Get the timestep for this .sol file from its name sol_name = sol_path_str.split('/')[-1][:-4] time_str = sol_name.split('_')[-1] # Name for the .vtu filename = output_dir_path + 'vtu/' + sol_name # Load data into gfu gfu.Load(sol_path_str) # Convert gfu components into form needed for VTKOutput if len(gfu.components) > 0: coefs = [component for component in gfu.components] else: coefs = [gfu] # Write to .vtu VTKOutput(ma=mesh, coefs=coefs, names=save_names, filename=filename, subdivision=subdivision).Do() # Check that the file was created if not Path(filename + '.vtu').exists(): raise FileNotFoundError('Neither .vtk nor .vtu files are being generated. Something is wrong with _sol_to_vtu.') # Delete .sol if delete_sol_file: remove(sol_path_str) # Write timestep in .pvd return '<DataSet timestep=\"%e\" group=\"\" part=\"0\" file=\"%s\"/>\n'\ % (float(time_str), 'vtu/' + filename.split('/')[-1] + '.vtu')