########################################################################################################################
# Copyright 2021 the authors (see AUTHORS file for full list). #
# #
# This file is part of OpenCMP. #
# #
# OpenCMP is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public #
# License as published by the Free Software Foundation, either version 2.1 of the License, or (at your option) any #
# later version. #
# #
# OpenCMP is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied #
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more #
# details. #
# #
# You should have received a copy of the GNU Lesser General Public License along with OpenCMP. If not, see #
# <https://www.gnu.org/licenses/>. #
########################################################################################################################
import ngsolve
from ..helpers.ngsolve_ import get_special_functions
from ..helpers.dg import avg, jump, grad_avg
from . import Model
from typing import Dict, List, Optional, Union
from ngsolve.comp import ProxyFunction
from ngsolve import Grad, HDiv, IfPos, InnerProduct, Norm, OuterProduct, Parameter, GridFunction, FESpace, BilinearForm, \
LinearForm, \
Preconditioner, div, dx
from ..helpers.error import norm, mean
[docs]class INS(Model):
"""
A single phase incompressible Navier-Stokes model.
"""
def _pre_init(self) -> None:
# Nothing needs to be done
pass
def _post_init(self) -> None:
# Load the solver parameters.
self.linearize = self.config.get_item(['SOLVER', 'linearization_method'], str)
if self.linearize not in ['Oseen', 'IMEX']:
raise TypeError('Don\'t recognize the linearization method.')
if self.linearize == 'Oseen':
nonlinear_tolerance = self.config.get_dict(['SOLVER', 'nonlinear_tolerance'], self.run_dir, None)
self.abs_nonlinear_tolerance = nonlinear_tolerance['absolute']
self.rel_nonlinear_tolerance = nonlinear_tolerance['relative']
self.nonlinear_max_iters = self.config.get_item(['SOLVER', 'nonlinear_max_iterations'], int)
if self.nonlinear_max_iters < 1:
raise ValueError('Nonlinear solve must involve at least one iteration. Set nonlinear_max_iterations to '
'at least 1.')
self.W = self._construct_linearization_terms()
def _define_model_components(self) -> Dict[str, Optional[int]]:
return {'u': 0,
'p': 1}
def _define_model_local_error_components(self) -> Dict[str, bool]:
return {'u': True,
'p': False}
def _define_time_derivative_components(self) -> List[Dict[str, bool]]:
return [
{'u': True,
'p': False}
]
def _define_num_weak_forms(self) -> int:
return 1
def _define_bc_types(self) -> List[str]:
return ['dirichlet', 'stress', 'parallel', 'pinned']
[docs] @staticmethod
def allows_explicit_schemes() -> bool:
# INS cannot work with explicit schemes
return False
def _set_model_parameters(self) -> None:
self.kv = self.model_functions.model_parameters_dict['kinematic_viscosity']['all']
self.f = self.model_functions.model_functions_dict['source']
def _construct_fes(self) -> FESpace:
return FESpace(self._contruct_fes_helper(), dgjumps=self.DG)
[docs] def _contruct_fes_helper(self) -> List[FESpace]:
"""
Helper function for creating the FESpace.
This function exists in order to allow multicomponent_ins (and any further additions) to simplify their FES
creation by being able to use this helper function to create the finite element space for velocity and pressure.
Return:
A list containing individual finite element spaces
"""
if not self.DG:
if self.element['u'] == 'HDiv' or self.element['u'] == 'RT':
print('We recommended that you NOT use HDIV spaces without DG due to numerical issues.')
if self.element['p'] == 'L2':
print('We recommended that you NOT use L2 spaces without DG due to numerical issues.')
if self.element['u'] == 'RT':
# Raviart-Thomas elements are a type of HDiv finite element.
fes_u = HDiv(self.mesh, order=self.interp_ord, dirichlet=self.dirichlet_names.get('u', ''),
dgjumps=self.DG, RT=True)
else:
fes_u = getattr(ngsolve, self.element['u'])(self.mesh, order=self.interp_ord,
dirichlet=self.dirichlet_names.get('u', ''), dgjumps=self.DG)
if self.element['p'] == 'L2' and 'p' in self.dirichlet_names:
raise ValueError('Not able to pin pressure at a point on L2 spaces.')
else:
fes_p = getattr(ngsolve, self.element['p'])(self.mesh, order=self.interp_ord - 1,
dirichlet=self.dirichlet_names.get('p', ''), dgjumps=self.DG)
return [fes_u, fes_p]
def _construct_linearization_terms(self) -> Optional[List[GridFunction]]:
tmp = GridFunction(self.fes.components[0])
tmp.vec.data = self.IC.components[0].vec
return [tmp]
[docs] def _get_wind(self, gfu: Union[List[ProxyFunction], List[GridFunction]], time_step: int) -> Optional[ProxyFunction]:
"""
Function to obtain the wind term used to linearize the convection term.
Args:
gfu: List of
time_step: What time step values to use for ex: boundary conditions.
The value corresponds to the index of the time step in t_param and dt_param.
Returns:
The wind/linearization term used for linearizing the convection term.
"""
if self.linearize == 'Oseen':
# gfu = None: intermediary step from adaptive_three_step requires w from W when time_step > 0
if time_step > 0 and gfu is not None:
# Using known values from a previous time step, so no need to iteratively solve for a wind.
w = gfu[self.model_components['u']]
else:
w = self.W[self.model_components['u']]
elif self.linearize == 'IMEX':
w = None
else:
raise ValueError('Linearization scheme \"{}\" is not implemented.'.format(self.linearize))
return w
[docs] def update_linearization_terms(self, gfu: GridFunction) -> None:
if self.linearize == 'Oseen':
# Update the velocity linearization term.
try:
# First try just updating the value of the term.
self.W[self.model_components['u']].vec.data = gfu.components[self.model_components['u']].vec
except:
# In some cases the finite element space of the model will have changed, so the linearization term needs
# to be completely reconstructed.
# E.g. during convergence testing.
self.W = self._construct_linearization_terms()
else:
# Do nothing, no linearization term to update.
pass
[docs] def construct_bilinear_time_ODE(self, U: Union[List[ProxyFunction], List[GridFunction]], V: List[ProxyFunction],
dt: Parameter = Parameter(1.0), time_step: int = 0) -> List[BilinearForm]:
w = self._get_wind(U, time_step)
# Define the special DG functions
n, _, alpha, I_mat = get_special_functions(self.mesh, self.nu)
# Separate out the trial and test function for velocity
u = U[self.model_components['u']]
v = V[self.model_components['u']]
# Domain integrals. Newtonian Stress
a = dt * (self.kv[time_step] * InnerProduct(Grad(u), Grad(v))) * dx
if self.linearize == 'Oseen':
# Linearized convection term.
a += -dt * InnerProduct(OuterProduct(u, w), Grad(v)) * dx
if self.DG:
# Penalty for dirichlet BCs
if self.dirichlet_names.get('u', None) is not None:
a += dt * (
self.kv[time_step] * alpha * u * v # 1/2 of penalty term for u=g on 𝚪_D from ∇u^
- self.kv[time_step] * InnerProduct(Grad(u), OuterProduct(v, n)) # ∇u^ = ∇u
- self.kv[time_step] * InnerProduct(Grad(v), OuterProduct(u, n)) # 1/2 of penalty for u=g on 𝚪_D
) * self._ds(self.dirichlet_names['u'])
if self.linearize == 'Oseen':
# Additional 1/2 of uw^ (convection term)
a += dt * v * (0.5 * w * n * u + 0.5 * Norm(w * n) * u) * self._ds(self.dirichlet_names['u'])
if self.linearize == 'Oseen':
# Stress needs a no-backflow component in the bilinear form.
for marker in self.BC.get('stress', {}).get('stress', {}):
if self.DG:
a += dt * v * (IfPos(w * n, w * n, 0.0) * u) * self._ds(marker)
else:
a += dt * v.Trace() * (IfPos(w * n, w * n, 0.0) * u.Trace()) * self._ds(marker)
# Parallel Flow BC
for marker in self.BC.get('parallel', {}).get('parallel', {}):
if self.DG:
a += dt * v * (u - n * InnerProduct(u, n)) * self._ds(marker)
else:
a += dt * v.Trace() * (u.Trace() - n * InnerProduct(u.Trace(), n)) * self._ds(marker)
return [a]
[docs] def construct_bilinear_time_coefficient(self, U: List[ProxyFunction], V: List[ProxyFunction], dt: Parameter,
time_step: int) -> List[BilinearForm]:
w = self._get_wind(U, time_step)
# Define the special DG functions.
n, _, alpha, I_mat = get_special_functions(self.mesh, self.nu)
# Separate out the trial and test functions for velocity and pressure.
u, p = U[self.model_components['u']], U[self.model_components['p']]
v, q = V[self.model_components['u']], V[self.model_components['p']]
# Domain integrals.
a = dt * (
- div(u) * q # Conservation of mass
- div(v) * p # Pressure
- 1e-10 * p * q # Stabilization term
) * dx
if self.DG:
avg_u = avg(u)
jump_u = jump(u)
avg_grad_u = grad_avg(u)
jump_v = jump(v)
avg_grad_v = grad_avg(v)
# Penalty for discontinuities
a += dt * (
self.kv[time_step] * alpha * InnerProduct(jump_u, jump_v) # Penalty term for u+=u- on 𝚪_I from ∇u^
- self.kv[time_step] * InnerProduct(avg_grad_u, OuterProduct(jump_v, n)) # Stress
- self.kv[time_step] * InnerProduct(avg_grad_v, OuterProduct(jump_u, n)) # U
) * dx(skeleton=True)
if self.linearize == 'Oseen':
# Additional penalty for the convection term.
a += dt * jump_v * (w * n * avg_u + 0.5 * Norm(w * n) * jump_u) * dx(skeleton=True)
return [a]
[docs] def construct_linear(self, V: List[ProxyFunction], gfu_0: Optional[List[GridFunction]],
dt: Parameter, time_step: int) -> List[LinearForm]:
w = self._get_wind(gfu_0, time_step)
# Define the special DG functions.
n, h, alpha, I_mat = get_special_functions(self.mesh, self.nu)
# Separate out the test function for velocity.
v = V[self.model_components['u']]
# Domain integrals.
L = dt * v * self.f['u'][time_step] * dx
# Dirichlet BC for u
if self.DG:
for marker in self.BC.get('dirichlet', {}).get('u', {}):
g = self.BC['dirichlet']['u'][marker][time_step]
L += dt * (
self.kv[time_step] * alpha * g * v # 1/2 of penalty for u=g from ∇u^ on 𝚪_D
- self.kv[time_step] * InnerProduct(Grad(v), OuterProduct(g, n)) # 1/2 of penalty for u=g
) * self._ds(marker)
if self.linearize == 'Oseen':
# Additional 1/2 of uw^ (convection)
L += dt * v * (-0.5 * w * n * g + 0.5 * Norm(w * n) * g) * self._ds(marker)
# Stress BC
for marker in self.BC.get('stress', {}).get('stress', {}):
h = self.BC['stress']['stress'][marker][time_step]
if self.DG:
L += dt * v * h * self._ds(marker)
else:
L += dt * v.Trace() * h * self._ds(marker)
return [L]
[docs] def construct_imex_explicit(self, V: List[ProxyFunction], gfu_0: Optional[List[GridFunction]],
dt: Parameter, time_step: int) -> List[LinearForm]:
# Separate out the test function for velocity.
v = V[self.model_components['u']]
# Velocity linearization term
gfu_u = gfu_0[self.model_components['u']]
# Linearized convection term.
L = -dt * InnerProduct(Grad(gfu_u) * gfu_u, v) * dx
return [L]
[docs] def solve_single_step(self, a_lst: List[BilinearForm], L_lst: List[LinearForm],
precond_lst: List[Preconditioner], gfu: GridFunction, time_step: int = 0) -> None:
if self.linearize == 'Oseen':
# The component index representing velocity
component = self.fes.components[self.model_components['u']]
comp_index = self.model_components['u']
# Number of linear iterations for this timestep
num_iteration = 1
# Boolean used to keep the while loop going
done_iterating = False
while not done_iterating:
self.apply_dirichlet_bcs_to(gfu, time_step=time_step)
a_lst[0].Assemble()
L_lst[0].Assemble()
if precond_lst[0] is not None:
precond_lst[0].Update()
self.construct_and_run_solver(a_lst[0], L_lst[0], precond_lst[0], gfu)
err = norm("l2_norm", self.W[comp_index], gfu.components[comp_index], self.mesh, component, average=False)
gfu_norm = mean(gfu.components[comp_index], self.mesh)
num_iteration += 1
if self.verbose > 0:
print(num_iteration, err)
self.W[comp_index].vec.data = gfu.components[comp_index].vec
done_iterating = (err < self.abs_nonlinear_tolerance + self.rel_nonlinear_tolerance * gfu_norm) or (num_iteration > self.nonlinear_max_iters)
elif self.linearize == 'IMEX':
self.construct_and_run_solver(a_lst[0], L_lst[0], precond_lst[0], gfu)
else:
raise ValueError('Linearization scheme \"{}\" is not implemented.'.format(self.linearize))
[docs] def solve_stationary_iteration(self, a_lst: List[BilinearForm], L_lst: List[LinearForm],
precond_lst: List[Preconditioner], gfu: GridFunction) -> None:
if self.linearize == 'Oseen':
# The component index representing velocity
component = self.fes.components[self.model_components['u']]
comp_index = self.model_components['u']
self.apply_dirichlet_bcs_to(gfu, time_step=time_step)
a_lst[0].Assemble()
L_lst[0].Assemble()
if precond_lst[0] is not None:
precond_lst[0].Update()
# solver linearized model
self.construct_and_run_solver(a_lst[0], L_lst[0], precond_lst[0], gfu)
# calculate error and norm of solution
err = norm("l2_norm", self.W[comp_index], gfu.components[comp_index], self.mesh, component, average=False)
gfu_norm = mean(gfu.components[comp_index], self.mesh)
self.W[comp_index].vec.data = gfu.components[comp_index].vec
return(err, gfu_norm)
elif self.linearize == 'IMEX':
raise ValueError('Linearization scheme \"{}\" is appropriate for stationary solve.'.format(self.linearize))
else:
raise ValueError('Linearization scheme \"{}\" is not implemented.'.format(self.linearize))
[docs] def accept_stationary_solution(self, gfu: GridFunction):
# The component index representing velocity
component = self.fes.components[self.model_components['u']]
comp_index = self.model_components['u']
self.W[comp_index].vec.data = gfu.components[comp_index].vec