# -*- coding: utf-8 -*-
"""
This file is part of PyFrac.
Created by Haseeb Zia on 12.06.17.
Copyright (c) ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE, Switzerland, Geo-Energy Laboratory, 2016-2020.
All rights reserved. See the LICENSE.TXT file for more details.
"""
# local
import logging
import numpy as np
from scipy.interpolate import griddata
import dill
import os
import re
import sys
import json
from utility import ReadFracture
from HF_reference_solutions import HF_analytical_sol, get_fracture_dimensions_analytical
from labels import *
# import FractureInitialization
if 'win32' in sys.platform or 'win64' in sys.platform:
slash = '\\'
else:
slash = '/'
#-----------------------------------------------------------------------------------------------------------------------
[docs]def load_fractures(address=None, sim_name='simulation', time_period=0.0, time_srs=None, step_size=1):
"""
This function returns a list of the fractures. If address and simulation name are not provided, results from the
default address and having the default name will be loaded.
Args:
address (string): -- the folder address containing the saved files. If it is not provided,
simulation from the default folder (_simulation_data_PyFrac) will be loaded.
sim_name (string): -- the simulation name from which the fractures are to be loaded. If not
provided, simulation with the default name (Simulation) will be loaded.
time_period (float): -- time period between two successive fractures to be loaded. if not provided,
all fractures will be loaded.
time_srs (ndarray): -- if provided, the fracture stored at the closest time after the given times
will be loaded.
step_size (int): -- the number of time steps to skip before loading the next fracture. If not
provided, all of the fractures will be loaded.
Returns:
fracture_list(list): -- a list of fractures.
"""
log = logging.getLogger('PyFrac.load_fractures')
log.info('Returning fractures...')
if address is None:
address = '.' + slash + '_simulation_data_PyFrac'
if address[-1] != slash:
address = address + slash
if isinstance(time_srs, float) or isinstance(time_srs, int):
time_srs = np.array([time_srs])
elif isinstance(time_srs, list):
time_srs = np.array(time_srs)
if re.match('\d+-\d+-\d+__\d+_\d+_\d+', sim_name[-20:]):
sim_full_name = sim_name
else:
simulations = os.listdir(address)
time_stamps = []
for i in simulations:
if re.match(sim_name + '__\d+-\d+-\d+__\d+_\d+_\d+', i):
time_stamps.append(i[-20:])
if len(time_stamps) == 0:
raise ValueError('Simulation not found! The address might be incorrect.')
Tmst_sorted = sorted(time_stamps)
sim_full_name = sim_name + '__' + Tmst_sorted[-1]
sim_full_path = address + sim_full_name
properties_file = sim_full_path + slash + 'properties'
try:
with open(properties_file, 'rb') as inp:
properties = dill.load(inp)
except FileNotFoundError:
raise SystemExit('Data not found. The address might be incorrect')
fileNo = 0
next_t = 0.0
t_srs_indx = 0
fracture_list = []
t_srs_given = isinstance(time_srs, np.ndarray) #time series is given
if t_srs_given:
if len(time_srs) == 0:
return fracture_list
next_t = time_srs[t_srs_indx]
# time at wich the first fracture file was modified
while fileNo < 5000:
# trying to load next file. exit loop if not found
try:
ff = ReadFracture(sim_full_path + slash + sim_full_name + '_file_' + repr(fileNo))
except FileNotFoundError:
break
fileNo += step_size
if 1. - next_t / ff.time >= -1e-8:
# if the current fracture time has advanced the output time period
log.info('Returning fracture at ' + repr(ff.time) + ' s')
fracture_list.append(ff)
if t_srs_given:
if t_srs_indx < len(time_srs) - 1:
t_srs_indx += 1
next_t = time_srs[t_srs_indx]
if ff.time > max(time_srs):
break
else:
next_t = ff.time + time_period
if fileNo >= 5000:
raise SystemExit('too many files.')
if len(fracture_list) == 0:
raise ValueError("Fracture list is empty")
return fracture_list, properties
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_variable(fracture_list, variable, edge=4, return_time=False):
""" This function returns the required variable from a fracture list.
Args:
fracture_list (list): -- the fracture list from which the variable is to be extracted.
variable (string): -- the variable to be extracted. See :py:data:`labels.supported_variables` of the
:py:mod:`labels` module for a list of supported variables.
edge (int): -- the edge of the cell that will be plotted. This is for variables that
are evaluated on the cell edges instead of cell center. It can have a
value from 0 to 4 (0->left, 1->right, 2->bottom, 3->top, 4->average).
return_time (bool): -- if True, the times at which the fractures are stored will also be returned.
Returns:
- variable_list (list) -- a list containing the extracted variable from each of the fracture. The \
dimension and type of each member of the list depends upon the variable type.
- time_srs (list) -- a list of times at which the fractures are stored.
"""
variable_list = []
time_srs = []
if variable == 'time' or variable == 't':
for i in fracture_list:
variable_list.append(i.time)
time_srs.append(i.time)
elif variable == 'width' or variable == 'w' or variable == 'surface':
for i in fracture_list:
variable_list.append(i.w)
time_srs.append(i.time)
elif variable == 'fluid pressure' or variable == 'pf':
for i in fracture_list:
variable_list.append(i.pFluid)
time_srs.append(i.time)
elif variable == 'net pressure' or variable == 'pn':
for i in fracture_list:
variable_list.append(i.pNet)
time_srs.append(i.time)
elif variable == 'front velocity' or variable == 'v':
for i in fracture_list:
vel = np.full((i.mesh.NumberOfElts, ), np.nan)
vel[i.EltTip] = i.v
variable_list.append(vel)
time_srs.append(i.time)
elif variable == 'Reynolds number' or variable == 'Re':
if fracture_list[-1].ReynoldsNumber is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.ReynoldsNumber[edge])
time_srs.append(i.time)
elif i.ReynoldsNumber is not None:
variable_list.append(np.mean(i.ReynoldsNumber, axis=0))
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts, ), np.nan))
elif variable == 'fluid flux' or variable == 'ff':
if fracture_list[-1].fluidFlux is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.fluidFlux[edge])
time_srs.append(i.time)
elif i.fluidFlux is not None:
variable_list.append(np.mean(i.fluidFlux, axis=0))
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts,), np.nan))
elif variable == 'fluid velocity' or variable == 'fv':
if fracture_list[-1].fluidVelocity is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.fluidVelocity[edge])
time_srs.append(i.time)
elif i.fluidVelocity is not None:
variable_list.append(np.mean(i.fluidVelocity, axis=0))
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts, ), np.nan))
elif variable == 'pressure gradient x' or variable == 'dpdx':
for i in fracture_list:
dpdxLft = (i.pNet[i.EltCrack] - i.pNet[i.mesh.NeiElements[i.EltCrack, 0]]) \
* i.InCrack[i.mesh.NeiElements[i.EltCrack, 0]]
dpdxRgt = (i.pNet[i.mesh.NeiElements[i.EltCrack, 1]] - i.pNet[i.EltCrack]) \
* i.InCrack[i.mesh.NeiElements[i.EltCrack, 1]]
dpdx = np.full((i.mesh.NumberOfElts, ),0.0)
dpdx[i.EltCrack] = np.mean([dpdxLft, dpdxRgt], axis=0)
variable_list.append(dpdx)
time_srs.append(i.time)
elif variable == 'pressure gradient y' or variable == 'dpdy':
for i in fracture_list:
dpdyBtm = (i.pNet[i.EltCrack] - i.pNet[i.mesh.NeiElements[i.EltCrack, 2]]) \
* i.InCrack[i.mesh.NeiElements[i.EltCrack, 2]]
dpdxtop = (i.pNet[i.mesh.NeiElements[i.EltCrack, 3]] - i.pNet[i.EltCrack]) \
* i.InCrack[i.mesh.NeiElements[i.EltCrack, 3]]
dpdy = np.full((i.mesh.NumberOfElts, ),0.0)
dpdy[i.EltCrack] = np.mean([dpdyBtm, dpdxtop], axis=0)
variable_list.append(dpdy)
time_srs.append(i.time)
elif variable == 'fluid flux as vector field' or variable == 'ffvf':
if fracture_list[-1].fluidFlux_components is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.fluidFlux_components[edge])
time_srs.append(i.time)
elif i.fluidFlux_components is not None:
variable_list.append(i.fluidFlux_components)
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts,), np.nan))
elif variable == 'fluid velocity as vector field' or variable == 'fvvf':
if fracture_list[-1].fluidVelocity_components is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.fluidVelocity_components[edge])
time_srs.append(i.time)
elif i.fluidFlux_components is not None:
variable_list.append(i.fluidVelocity_components)
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts, ), np.nan))
elif variable == 'effective viscosity' or variable == 'ev':
if fracture_list[-1].effVisc is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.effVisc[edge])
time_srs.append(i.time)
elif i.effVisc is not None:
variable_list.append(np.mean(i.effVisc, axis=0))
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts, ), np.nan))
elif variable == 'yielded' or variable == 'y':
if fracture_list[-1].yieldRatio is None:
raise SystemExit(err_var_not_saved)
for i in fracture_list:
if edge < 0 or edge > 4:
raise ValueError('Edge can be an integer between and including 0 and 4.')
if edge < 4:
variable_list.append(i.yieldRatio[edge])
time_srs.append(i.time)
elif i.yieldRatio is not None:
variable_list.append(np.mean(i.yieldRatio, axis=0))
time_srs.append(i.time)
else:
variable_list.append(np.full((i.mesh.NumberOfElts, ), np.nan))
elif variable in ('front_dist_min', 'd_min', 'front_dist_max', 'd_max', 'front_dist_mean', 'd_mean'):
for i in fracture_list:
if len(i.source) != 0:
source_loc = i.mesh.CenterCoor[i.source[0]]
# coordinate of the zero vertex in the tip cells
front_intersect_dist = np.sqrt((i.Ffront[::, [0, 2]].flatten() - source_loc[0]) ** 2
+ (i.Ffront[::, [1, 3]].flatten() - source_loc[1]) ** 2)
if variable == 'front_dist_mean' or variable == 'd_mean':
variable_list.append(np.mean(front_intersect_dist))
elif variable == 'front_dist_max' or variable == 'd_max':
variable_list.append(np.max(front_intersect_dist))
elif variable == 'front_dist_min' or variable == 'd_min':
variable_list.append(np.min(front_intersect_dist))
time_srs.append(i.time)
elif variable == 'mesh':
for i in fracture_list:
variable_list.append(i.mesh)
time_srs.append(i.time)
elif variable == 'efficiency' or variable == 'ef':
for i in fracture_list:
variable_list.append(i.efficiency)
time_srs.append(i.time)
elif variable == 'volume' or variable == 'V':
for i in fracture_list:
variable_list.append(i.FractureVolume)
time_srs.append(i.time)
elif variable == 'leak off' or variable == 'lk':
for i in fracture_list:
variable_list.append(i.LkOff)
time_srs.append(i.time)
elif variable == 'leaked off volume' or variable == 'lkv':
for i in fracture_list:
variable_list.append(sum(i.LkOffTotal[i.EltCrack]))
time_srs.append(i.time)
elif variable == 'aspect ratio' or variable == 'ar':
for fr in fracture_list:
x_coords = np.hstack((fr.Ffront[:, 0], fr.Ffront[:, 2]))
x_len = np.max(x_coords) - np.min(x_coords)
y_coords = np.hstack((fr.Ffront[:, 1], fr.Ffront[:, 3]))
y_len = np.max(y_coords) - np.min(y_coords)
variable_list.append(x_len / y_len)
time_srs.append(fr.time)
elif variable == 'chi':
for i in fracture_list:
vel = np.full((i.mesh.NumberOfElts,), np.nan)
vel[i.EltTip] = i.v
variable_list.append(vel)
time_srs.append(i.time)
elif variable == 'regime':
legend_coord = []
if hasattr(fracture_list[0], 'regime_color'):
for i in fracture_list:
variable_list.append(i.regime_color)
time_srs.append(i.time)
else:
raise ValueError('The regime cannot be found. Saving of regime is most likely not enabled.\n'
' See the saveRegime falg of SimulationProperties class.')
elif variable == 'source elements' or variable == 'se':
for fr in fracture_list:
variable_list.append(fr.source)
time_srs.append(fr.time)
else:
raise ValueError('The variable type is not correct.')
if not return_time:
return variable_list
elif variable == 'regime':
return variable_list, legend_coord, time_srs
else:
return variable_list, time_srs
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_variable_at_point(fracture_list, variable, point, edge=4, return_time=True):
""" This function returns the required variable from a fracture list at the given point.
Args:
fracture_list (list): -- the fracture list from which the variable is to be extracted.
variable (string): -- the variable to be extracted. See :py:data:`supported_variables` of the
:py:mod:`Labels` module for a list of supported variables.
point (list or ndarray): -- the point at which the given variable is plotted against time [x, y].
edge (int): -- the edge of the cell that will be plotted. This is for variables that
are evaluated on the cell edges instead of cell center. It can have a
value from 0 to 4 (0->left, 1->right, 2->bottom, 3->top, 4->average).
return_time (bool): -- if True, the times at which the fractures are stored will also be returned.
Returns:
- variable_list (list) -- a list containing the extracted variable from each of the fracture. The \
dimension and type of each member of the list depends upon the variable type.
- time_srs (list) -- a list of times at which the fractures are stored.
"""
log = logging.getLogger('PyFrac.get_fracture_variable_at_point')
if variable not in supported_variables:
raise ValueError(err_msg_variable)
return_list = []
if variable in ['front intercepts', 'fi']:
return_list = get_front_intercepts(fracture_list, point)
if return_time:
return return_list, get_fracture_variable(fracture_list, 't')
else:
return return_list
else:
var_values, time_list = get_fracture_variable(fracture_list,
variable,
edge=edge,
return_time=True)
if variable in unidimensional_variables:
return_list = var_values
else:
for i in range(len(fracture_list)):
if variable in bidimensional_variables:
value_point = griddata(fracture_list[i].mesh.CenterCoor,
var_values[i],
point,
method='linear',
fill_value=np.nan)
if np.isnan(value_point):
log.warning('Point outside fracture.')
return_list.append(value_point[0])
if return_time:
return return_list, time_list
else:
return return_list
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_variable_slice_interpolated(var_value, mesh, point1=None, point2=None):
"""
This function returns the given fracture variable on a given slice of the domain. Two points are to be given that
will be joined to form the slice. The values on the slice are interpolated from the values available on the cell
centers.
Args:
var_value (ndarray): -- the value of the variable on each cell of the domain.
mesh (CartesianMesh): -- the CartesianMesh object describing the mesh.
point1 (list or ndarray): -- the left point from which the slice should pass [x, y].
point2 (list or ndarray): -- the right point from which the slice should pass [x, y].
Returns:
- value_samp_points (ndarray) -- the values of the variable at the sampling points given by sampling_line \
(see below).
- sampling_line (ndarray) -- the distance of the point where the value is provided from the center of\
the slice.
"""
if not isinstance(var_value, np.ndarray):
raise ValueError("Variable value should be provided in the form of numpy array with the size equal to the "
"number of elements in the mesh!")
elif var_value.size != mesh.NumberOfElts:
raise ValueError("Given array is not equal to the number of elements in mesh!")
if point1 is None:
point1 = np.array([-mesh.Lx, 0.])
if point2 is None:
point2 = np.array([mesh.Lx, 0.])
# the code below find the extreme points of the line joining the two given points with the current mesh
if point2[0] == point1[0]:
point1[1] = -mesh.Ly
point2[1] = mesh.Ly
elif point2[1] == point1[1]:
point1[0] = -mesh.Lx
point2[0] = mesh.Lx
else:
slope = (point2[1] - point1[1]) / (point2[0] - point1[0])
y_intrcpt_lft = slope * (-mesh.Lx - point1[0]) + point1[1]
y_intrcpt_rgt = slope * (mesh.Lx - point1[0]) + point1[1]
x_intrcpt_btm = (-mesh.Ly - point1[1]) / slope + point1[0]
x_intrcpt_top = (mesh.Ly - point1[1]) / slope + point1[0]
if abs(y_intrcpt_lft) < mesh.Ly:
point1[0] = -mesh.Lx
point1[1] = y_intrcpt_lft
if y_intrcpt_lft > mesh.Ly:
point1[0] = x_intrcpt_top
point1[1] = mesh.Ly
if y_intrcpt_lft < -mesh.Ly:
point1[0] = x_intrcpt_btm
point1[1] = -mesh.Ly
if abs(y_intrcpt_rgt) < mesh.Ly:
point2[0] = mesh.Lx
point2[1] = y_intrcpt_rgt
if y_intrcpt_rgt > mesh.Ly:
point2[0] = x_intrcpt_top
point2[1] = mesh.Ly
if y_intrcpt_rgt < -mesh.Ly:
point2[0] = x_intrcpt_btm
point2[1] = -mesh.Ly
sampling_points = np.hstack((np.linspace(point1[0], point2[0], 105).reshape((105, 1)),
np.linspace(point1[1], point2[1], 105).reshape((105, 1))))
value_samp_points = griddata(mesh.CenterCoor,
var_value,
sampling_points,
method='linear',
fill_value=np.nan)
sampling_line_lft = ((sampling_points[:52, 0] - sampling_points[52, 0]) ** 2 +
(sampling_points[:52, 1] - sampling_points[52, 1]) ** 2) ** 0.5
sampling_line_rgt = ((sampling_points[52:, 0] - sampling_points[52, 0]) ** 2 +
(sampling_points[52:, 1] - sampling_points[52, 1]) ** 2) ** 0.5
sampling_line = np.concatenate((-sampling_line_lft, sampling_line_rgt))
return value_samp_points, sampling_line
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_variable_slice_cell_center(var_value, mesh, point=None, orientation='horizontal'):
"""
This function returns the given fracture variable on a given slice of the domain. Two slice is constructed from the
given point and the orientation. The values on the slice are taken from the cell centers.
Args:
var_value (ndarray): -- the value of the variable on each cell of the domain.
mesh (CartesianMesh): -- the CartesianMesh object describing the mesh.
point (list or ndarray): -- the point from which the slice should pass [x, y]. If it does not lie on a cell
center, the closest cell center will be taken. By default, [0., 0.] will be
taken.
orientation (string): -- the orientation according to which the slice is made in the case the
plotted values are not interpolated and are taken at the cell centers.
Any of the four ('vertical', 'horizontal', 'ascending' and 'descending')
orientation can be used.
Returns:
- var_value (ndarray) -- the values of the variable at the sampling points given by sampling_line \
(see below).
- sampling_line (ndarray) -- the distance of the point where the value is provided from the center of\
the slice.
- sampling_cells (ndarray) -- the cells on the mesh along with the slice is made.
"""
if not isinstance(var_value, np.ndarray):
raise ValueError("Variable value should be provided in the form of numpy array with the size equal to the "
"number of elements in the mesh!")
elif var_value.size != mesh.NumberOfElts:
raise ValueError("Given array is not equal to the number of elements in mesh!")
if point is None:
point = np.array([0., 0.])
if orientation not in ('horizontal', 'vertical', 'increasing', 'decreasing'):
raise ValueError("Given orientation is not supported. Possible options:\n 'horizontal', 'vertical',"
" 'increasing', 'decreasing'")
zero_cell = mesh.locate_element(point[0], point[1])
if zero_cell == np.nan:
raise ValueError("The given point does not lie in the grid!")
if orientation == 'vertical':
sampling_cells = np.hstack((np.arange(zero_cell, 0, -mesh.nx)[::-1],
np.arange(zero_cell, mesh.NumberOfElts, mesh.nx)))
elif orientation == 'horizontal':
sampling_cells = np.arange(zero_cell // mesh.nx * mesh.nx, (zero_cell // mesh.nx + 1) * mesh.nx)
elif orientation == 'increasing':
bottom_half = np.arange(zero_cell, 0, -mesh.nx - 1)
bottom_half = np.delete(bottom_half, np.where(mesh.CenterCoor[bottom_half, 0] >
mesh.CenterCoor[zero_cell, 0])[0])
top_half = np.arange(zero_cell, mesh.NumberOfElts, mesh.nx + 1)
top_half = np.delete(top_half, np.where(mesh.CenterCoor[top_half, 0] <
mesh.CenterCoor[zero_cell, 0])[0])
sampling_cells = np.hstack((bottom_half[::-1], top_half))
elif orientation == 'decreasing':
bottom_half = np.arange(zero_cell, 0, -mesh.nx + 1)
bottom_half = np.delete(bottom_half, np.where(mesh.CenterCoor[bottom_half, 0] <
mesh.CenterCoor[zero_cell, 0])[0])
top_half = np.arange(zero_cell, mesh.NumberOfElts, mesh.nx - 1)
top_half = np.delete(top_half, np.where(mesh.CenterCoor[top_half, 0] >
mesh.CenterCoor[zero_cell, 0])[0])
sampling_cells = np.hstack((bottom_half[::-1], top_half))
sampling_len = ((mesh.CenterCoor[sampling_cells[0], 0] - mesh.CenterCoor[sampling_cells[-1], 0]) ** 2 + \
(mesh.CenterCoor[sampling_cells[0], 1] - mesh.CenterCoor[sampling_cells[-1], 1]) ** 2) ** 0.5
# making x-axis centered at zero for the 1D slice. Necessary to have same reference with different meshes and
# analytical solution plots.
sampling_line = np.linspace(0, sampling_len, len(sampling_cells)) - sampling_len / 2
return var_value[sampling_cells], sampling_line, sampling_cells
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_HF_analytical_solution(regime, variable, mat_prop, inj_prop, mesh=None, fluid_prop=None,
time_srs=None, length_srs=None, h=None, samp_cell=None, gamma=None):
if time_srs is None and length_srs is None:
raise ValueError('Either time series or lengths series is to be provided.')
if regime == 'E_K':
Kc_1 = mat_prop.Kc1
else:
Kc_1 = None
if regime == 'E_E':
Cij = mat_prop.Cij
else:
Cij = None
if regime == 'MDR':
density = fluid_prop.density
else:
density = None
if inj_prop.injectionRate.size > 2:
V0 = inj_prop.injectionRate[0, 1] * inj_prop.injectionRate[1, 0]
else:
V0 = None
if regime in ['M', 'MDR', 'Mt', 'PKN', 'Mp']:
if fluid_prop is None:
raise ValueError('Fluid properties required for ' + regime + ' type analytical solution')
muPrime = fluid_prop.muPrime
else:
muPrime = None
if samp_cell is None:
samp_cell = int(len(mat_prop.Kprime) / 2)
if time_srs is not None:
srs_length = len(time_srs)
else:
srs_length = len(length_srs)
mesh_list = []
return_list = []
for i in range(srs_length):
if length_srs is not None:
length = length_srs[i]
else:
length = None
if time_srs is not None:
time = time_srs[i]
else:
time = None
if variable in ['time', 't', 'width', 'w', 'net pressure', 'pn', 'front velocity', 'v']:
if mesh is None and variable in ['width', 'w', 'net pressure', 'pn']:
x_len, y_len = get_fracture_dimensions_analytical_with_properties(regime,
time_srs[i],
mat_prop,
inj_prop,
fluid_prop=fluid_prop,
h=h,
samp_cell=samp_cell,
gamma=gamma)
from mesh import CartesianMesh
mesh_i = CartesianMesh(x_len, y_len, 151, 151)
else:
mesh_i = mesh
t, r, p, w, v, actvElts = HF_analytical_sol(regime,
mesh_i,
mat_prop.Eprime,
inj_prop.injectionRate[1, 0],
inj_point=inj_prop.sourceCoordinates,
muPrime=muPrime,
Kprime=mat_prop.Kprime[samp_cell],
Cprime=mat_prop.Cprime[samp_cell],
length=length,
t=time,
Kc_1=Kc_1,
h=h,
density=density,
Cij=Cij,
gamma=gamma,
required=required_string[variable],
Vinj=V0)
mesh_list.append(mesh_i)
if variable == 'time' or variable == 't':
return_list.append(t)
elif variable == 'width' or variable == 'w':
return_list.append(w)
elif variable == 'net pressure' or variable == 'pn':
return_list.append(p)
elif variable == 'front velocity' or variable == 'v':
return_list.append(v)
elif variable in ['front_dist_min', 'd_min', 'front_dist_max', 'd_max', 'front_dist_mean', 'd_mean',
'radius', 'r']:
x_len, y_len = get_fracture_dimensions_analytical_with_properties(regime,
time,
mat_prop,
inj_prop,
fluid_prop=fluid_prop,
h=h,
samp_cell=samp_cell,
gamma=gamma)
if variable == 'radius' or variable == 'r':
return_list.append(x_len)
elif variable == 'front_dist_min' or variable == 'd_min':
return_list.append(y_len)
elif variable == 'front_dist_max' or variable == 'd_max':
return_list.append(x_len)
elif variable == 'front_dist_mean' or variable == 'd_mean':
if regime in ('E_K', 'E_E'):
raise ValueError('Mean distance not available.')
else:
return_list.append(x_len)
else:
raise ValueError('The variable type is not correct or the analytical solution not available. Select'
' one of the following:\n'
'-- \'r\' or \'radius\'\n'
'-- \'w\' or \'width\'\n'
'-- \'pn\' or \'net pressure\'\n'
'-- \'v\' or \'front velocity\'\n'
'-- \'d_min\' or \'front_dist_min\'\n'
'-- \'d_max\' or \'front_dist_max\'\n'
'-- \'d_mean\' or \'front_dist_mean\'\n' )
return return_list, mesh_list
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_HF_analytical_solution_at_point(regime, variable, point, mat_prop, inj_prop, fluid_prop=None, time_srs=None,
length_srs=None, h=None, samp_cell=None, gamma=None):
values_point = []
if time_srs is not None:
srs_length = len(time_srs)
else:
srs_length = len(length_srs)
from mesh import CartesianMesh
if point[0] == 0.:
mesh_Lx = 1.
else:
mesh_Lx = 2 * abs(point[0])
if point[1] == 0.:
mesh_Ly = 1.
else:
mesh_Ly = 2 * abs(point[1])
mesh = CartesianMesh(mesh_Lx, mesh_Ly, 5, 5)
for i in range(srs_length):
if time_srs is not None:
time = [time_srs[i]]
else:
time = None
if length_srs is not None:
length = [length_srs[i]]
else:
length = None
value_mesh, mesh_list = get_HF_analytical_solution(regime,
variable,
mat_prop,
inj_prop,
mesh=mesh,
fluid_prop=fluid_prop,
time_srs=time,
length_srs=length,
h=h,
samp_cell=samp_cell,
gamma=gamma)
if variable in ['front_dist_min', 'd_min', 'front_dist_max', 'd_max', 'front_dist_mean', 'd_mean',
'radius', 'r', 't', 'time']:
values_point.append(value_mesh[0])
elif point == [0., 0.]:
values_point.append(value_mesh[0][mesh_list[0].CenterElts])
else:
value_point = value_mesh[0][18]
values_point.append(value_point)
return values_point
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_dimensions_analytical_with_properties(regime, time_srs, mat_prop, inj_prop, fluid_prop=None,
h=None, samp_cell=None, gamma=None):
if regime == 'E_K':
Kc_1 = mat_prop.Kc1
else:
Kc_1 = None
if regime == 'MDR':
density = fluid_prop.density
else:
density = None
if regime in ('M', 'Mt', 'PKN', 'MDR', 'Mp', 'La'):
if fluid_prop is None:
raise ValueError('Fluid properties required to evaluate analytical solution')
muPrime = fluid_prop.muPrime
else:
muPrime = None
if samp_cell is None:
samp_cell = int(len(mat_prop.Kprime) / 2)
if inj_prop.injectionRate.size > 2:
V0 = inj_prop.injectionRate[0, 1] * inj_prop.injectionRate[1, 0]
else:
V0=None
Q0 = inj_prop.injectionRate[1, 0]
x_len, y_len = get_fracture_dimensions_analytical(regime,
np.max(time_srs),
mat_prop.Eprime,
Q0,
muPrime,
Kprime=mat_prop.Kprime[samp_cell],
Cprime=mat_prop.Cprime[samp_cell],
Kc_1=Kc_1,
h=h,
density=density,
gamma=gamma,
Vinj=V0)
return x_len, y_len
#-----------------------------------------------------------------------------------------------------------------------
[docs]def write_fracture_variable_csv_file(file_name, fracture_list, variable, point=None, edge=4):
""" This function writes fracture variable from each fracture in the list as a csv file. The variable from each of
the fracture in the list will saved in a row of the csv file. If a variable is bi-dimensional, a point can be
given at which the variable is to be saved.
Args:
file_name (string): -- the name of the file to be written.
fracture_list (list): -- the fracture list from which the variable is to be extracted.
variable (string): -- the variable to be saved. See :py:data:`supported_variables` of the
:py:mod:`Labels` module for a list of supported variables.
point (list or ndarray): -- the point in the mesh at which the given variable is saved [x, y]. If the
point is not given, the variable will be saved on the whole mesh.
edge (int): -- the edge of the cell that will be saved. This is for variables that
are evaluated on the cell edges instead of cell center. It can have a
value from 0 to 4 (0->left, 1->right, 2->bottom, 3->top, 4->average).
"""
log = logging.getLogger('PyFrac.write_fracture_variable_csv_file')
if variable not in supported_variables:
raise ValueError(err_msg_variable)
return_list = []
var_values, time_list = get_fracture_variable(fracture_list,
variable,
edge=edge,
return_time=True)
if point == None:
return_list = var_values
else:
for i in range(len(fracture_list)):
value_point = griddata(fracture_list[i].mesh.CenterCoor,
var_values[i],
point,
method='linear',
fill_value=np.nan)
if np.isnan(value_point):
log.warning('Point outside fracture.')
return_list.append(value_point[0])
if file_name[-4:] != '.csv':
file_name = file_name + '.csv'
return_list_np = np.asarray(return_list)
# np.savetxt(file_name, return_list_np, delimiter=',')
import csv
# write EltCrack file
file2 = open(file_name, 'w')
writer2 = csv.writer(file2)
writer2.writerows(return_list_np)
file2.close()
#-----------------------------------------------------------------------------------------------------------------------
[docs]def read_fracture_variable_csv_file(file_name):
""" This function returns the required variable from the csv file.
Args:
file_name (string): -- the name of the file to be written.
Returns:
- variable_list (list) -- a list containing the extracted variable from each of the fracture. The \
dimension and type of each member of the list depends upon the variable type.
"""
if file_name[-4:] != '.csv':
file_name = file_name + '.csv'
return np.genfromtxt(file_name, delimiter=',')
#-----------------------------------------------------------------------------------------------------------------------
[docs]def write_fracture_mesh_csv_file(file_name, mesh_list):
""" This function writes important data of a mesh as a csv file. The csv contains (in a row vector) the number of
elements, hx, hy, nx, ny, the flattened connectivity matrix and the flattened node coordinates. Each row of the csv
corresponds to an entry in the mesh_list
Args:
file_name (string): -- the name of the file to be written.
mesh_list (list): -- the fracture list from which the variable is to be extracted.
"""
return_list = []
for i in mesh_list:
export_mesh = np.array([i.NumberOfElts])
export_mesh = np.append(export_mesh, [i.hx, i.hy, i.nx, i.ny])
export_mesh = np.append(export_mesh, i.Connectivity.flatten())
export_mesh = np.append(export_mesh, i.VertexCoor.flatten())
return_list.append(export_mesh)
if file_name[-4:] != '.csv':
file_name = file_name + '.csv'
return_list_np = np.asarray(return_list)
# np.savetxt(file_name, return_list_np, delimiter=',')
import csv
# write EltCrack file
file2 = open(file_name, 'w')
writer2 = csv.writer(file2)
writer2.writerows(return_list_np)
file2.close()
#-----------------------------------------------------------------------------------------------------------------------
[docs]def append_to_json_file(file_name, content, action, key=None, delete_existing_filename=False):
""" This function appends data of a mesh as a json file.
Args:
file_name (string): -- the name of the file to be written.
key (string): -- a string that describes the information you are passing.
content (list): -- a list of some informations
action (string): -- action to take. Current options are:
'append2keyASnewlist'
This option means that you load the json file, you take the content of the key
and then you append the new content as a new list in a list of lists
if the existing content was not a list it will be put in a list
'append2keyAND2list'
This option means that you load the json file, you take the content of the key
and, supposing that it is a key, you append the new content to it
if the existing content was not a list it will be put in a list and the new content
will be appended to it
'dump_this_dictionary'
You will dump only the content of the dictionary
"""
log = logging.getLogger('PyFrac.append_to_json_file')
# 0)transform np.ndarray to list before output
if isinstance(content, np.ndarray):
content = np.ndarray.tolist(content)
# 1)check if the file_name is a Json file
if file_name[-5:] != '.json':
file_name = file_name + '.json'
# 3)check if the file already exist
if os.path.isfile(file_name) and delete_existing_filename:
os.remove(file_name)
log.warning("File " +file_name +" existed and it will be Removed!")
# 4)check if the file already exist
if os.path.isfile(file_name):
# The file exist
with open(file_name, "r+") as json_file:
data = json.load(json_file) # get the data
if action in ['append2keyASnewlist', 'append2keyAND2list'] and not key == None:
if key in data: # the key exist and we need just to add the value
if isinstance(data[key], list): # the data that is already there is a list and a key is provided
data[key].append(content)
elif action == 'append2keyAND2list':
data[key] = [data[key], content]
elif action == 'append2keyASnewlist':
data[key] = [[data[key]], [content]]
else:
if action == 'append2keyAND2list':
to_write = {key: content,}
elif action == 'append2keyASnewlist':
to_write = {key: [content],}
data.update(to_write)
json_file.seek(0)
return json.dump(data, json_file) # dump directly the content referenced by the key to the file
elif action == 'dump_this_dictionary':
return json.dump(content, json_file) # dump directly the dictionary to the file
elif action == 'extend_dictionary':
if isinstance(content, dict):
data.update(content)
json_file.seek(0) # return to the beginning of the file
return json.dump(data, json_file) # dump directly the dictionary to the file
else: raise SystemExit('DUMP TO JSON ERROR: You should provide a dictionary')
else: raise SystemExit('DUMP TO JSON ERROR: action not supported OR key not provided')
json_file.seek(0) # return to the beginning of the file
return json.dump(data, json_file) # dump the updated data
else:
# The file do not exist, create a new one
with open(file_name, "w") as json_file:
if action == 'append2keyAND2list' or action == 'append2keyASnewlist':
to_write = {key: content,}
return json.dump(to_write, json_file) # dump directly the content referenced by the key to the file
elif action == 'dump_this_dictionary':
return json.dump(content, json_file) # dump directly the dictionary to the file
else: raise SystemExit('DUMP TO JSON ERROR: action not supported')
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_extremities_cells(Fr_list):
"""
This function returns the extreme points for each of the fracture in the list.
Args:
Fr_list (list): -- the fracture list
Returns:
extremeties -- the [left, rigth, bottom, top] extremeties of the each of the fracture in the list.
"""
extremities = np.zeros((len(Fr_list), 4), dtype=int)
for indx, fracture in enumerate(Fr_list):
max_intrsct1_x = np.argmax(fracture.Ffront[:, 0])
max_intrsct2_x = np.argmax(fracture.Ffront[:, 2])
if fracture.Ffront[max_intrsct1_x, 0] > fracture.Ffront[max_intrsct2_x, 2]:
extremities[indx, 1] = fracture.EltTip[max_intrsct1_x]
else:
extremities[indx, 1] = fracture.EltTip[max_intrsct2_x]
min_intrsct1_x = np.argmin(fracture.Ffront[:, 0])
min_intrsct2_x = np.argmin(fracture.Ffront[:, 2])
if fracture.Ffront[min_intrsct1_x, 0] < fracture.Ffront[min_intrsct2_x, 2]:
extremities[indx, 0] = fracture.EltTip[min_intrsct1_x]
else:
extremities[indx, 0] = fracture.EltTip[min_intrsct2_x]
max_intrsct1_y = np.argmax(fracture.Ffront[:, 1])
max_intrsct2_y = np.argmax(fracture.Ffront[:, 3])
if fracture.Ffront[max_intrsct1_y, 1] > fracture.Ffront[max_intrsct2_y, 3]:
extremities[indx, 3] = fracture.EltTip[max_intrsct1_y]
else:
extremities[indx, 3] = fracture.EltTip[max_intrsct2_y]
min_intrsct1_y = np.argmin(fracture.Ffront[:, 1])
min_intrsct2_y = np.argmin(fracture.Ffront[:, 3])
if fracture.Ffront[min_intrsct1_x, 1] < fracture.Ffront[min_intrsct2_x, 3]:
extremities[indx, 2] = fracture.EltTip[min_intrsct1_y]
else:
extremities[indx, 2] = fracture.EltTip[min_intrsct2_y]
return extremities
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_front_intercepts(fr_list, point):
"""
This function returns the top, bottom, left and right intercepts on the front of the horizontal and vertical lines
drawn from the given point.
Arguments:
fr_list (list): -- the given fracture list.
point (list or ndarray) -- the point from the horizontal and vertical lines are drawn.
Returns:
intercepts (list): -- list of top, bottom, left and right intercepts for each fracture in the list
"""
log = logging.getLogger('PyFrac.get_front_intercepts')
intercepts = []
for fr in fr_list:
intrcp_top = intrcp_btm = intrcp_lft = intrcp_rgt = [np.nan] # set to nan if not available
pnt_cell = fr.mesh.locate_element(point[0], point[1]) # the cell in which the given point lie
if pnt_cell not in fr.EltChannel:
log.warning("Point is not inside fracture!")
else:
pnt_cell_y = fr.mesh.CenterCoor[pnt_cell, 1] # the y coordinate of the cell
cells_x_axis = np.where(fr.mesh.CenterCoor[:, 1] == pnt_cell_y)[0] # all the cells with the same y coord
tipCells_x_axis = np.intersect1d(fr.EltTip, cells_x_axis) # the tip cells with the same y coord
# the code bellow remove the tip cells which are directly at right and left of the cell containing the point
# but have the front line partially passing through them. For them, the horizontal line drawn from the given
# point will pass through the cell but not from the front line.
if len(tipCells_x_axis) > 2:
invalid_cell = np.full(len(tipCells_x_axis), True, dtype=bool)
for indx, cell in enumerate(tipCells_x_axis):
in_tip_cells = np.where(fr.EltTip == cell)[0]
if (point[1] > fr.Ffront[in_tip_cells, 1] and point[1] <= fr.Ffront[in_tip_cells, 3]) or (
point[1] < fr.Ffront[in_tip_cells, 1] and point[1] >= fr.Ffront[in_tip_cells, 3]):
invalid_cell[indx] = False
tipCells_x_axis = np.delete(tipCells_x_axis, np.where(invalid_cell)[0])
# find out the left and right cells
if len(tipCells_x_axis) == 2:
if fr.mesh.CenterCoor[tipCells_x_axis[0], 0] < point[0]:
lft_cell = tipCells_x_axis[0]
rgt_cell = tipCells_x_axis[1]
else:
lft_cell = tipCells_x_axis[1]
rgt_cell = tipCells_x_axis[0]
else:
lft_cell = np.nan
rgt_cell = np.nan
pnt_cell_x = fr.mesh.CenterCoor[pnt_cell, 0]
cells_y_axis = np.where(fr.mesh.CenterCoor[:, 0] == pnt_cell_x)[0]
tipCells_y_axis = np.intersect1d(fr.EltTip, cells_y_axis)
# the code bellow remove the tip cells which are directly at top and bottom of the cell containing the point
# but have the front line partially passing through them. For them, the vertical line drawn from the given
# point will pass through the cell but not from the front line.
if len(tipCells_y_axis) > 2:
invalid_cell = np.full(len(tipCells_y_axis), True, dtype=bool)
for indx, cell in enumerate(tipCells_y_axis):
in_tip_cells = np.where(fr.EltTip == cell)[0]
if (point[0] > fr.Ffront[in_tip_cells, 0] and point[0] <= fr.Ffront[in_tip_cells, 2]) or (
point[0] < fr.Ffront[in_tip_cells, 0] and point[0] >= fr.Ffront[in_tip_cells, 2]):
invalid_cell[indx] = False
tipCells_y_axis = np.delete(tipCells_y_axis, np.where(invalid_cell)[0])
if len(tipCells_y_axis) == 2:
if fr.mesh.CenterCoor[tipCells_y_axis[0], 1] < point[1]:
btm_cell = tipCells_y_axis[0]
top_cell = tipCells_y_axis[1]
else:
btm_cell = tipCells_y_axis[1]
top_cell = tipCells_y_axis[0]
else:
btm_cell = np.nan
top_cell = np.nan
top_in_tip = np.where(fr.EltTip == top_cell)[0]
btm_in_tip = np.where(fr.EltTip == btm_cell)[0]
lft_in_tip = np.where(fr.EltTip == lft_cell)[0]
rgt_in_tip = np.where(fr.EltTip == rgt_cell)[0]
# find the intersection using the equations of the front lines in the tip cells
if top_in_tip.size > 0:
intrcp_top = fr.Ffront[top_in_tip, 3] + \
(fr.Ffront[top_in_tip, 3] - fr.Ffront[top_in_tip, 1]) / (
fr.Ffront[top_in_tip, 2] - fr.Ffront[top_in_tip, 0]) * \
(point[0] - fr.Ffront[top_in_tip, 2])
if btm_in_tip.size > 0:
intrcp_btm = fr.Ffront[btm_in_tip, 3] + \
(fr.Ffront[btm_in_tip, 3] - fr.Ffront[btm_in_tip, 1]) / (
fr.Ffront[btm_in_tip, 2] - fr.Ffront[btm_in_tip, 0]) * \
(point[0] - fr.Ffront[btm_in_tip, 2])
if lft_in_tip.size > 0:
intrcp_lft = (point[1] - fr.Ffront[lft_in_tip, 3]) / \
(fr.Ffront[lft_in_tip, 3] - fr.Ffront[lft_in_tip, 1]) * (
fr.Ffront[lft_in_tip, 2] - fr.Ffront[lft_in_tip, 0]) + \
fr.Ffront[lft_in_tip, 2]
if rgt_in_tip.size > 0:
intrcp_rgt = (point[1] - fr.Ffront[rgt_in_tip, 3]) / \
(fr.Ffront[rgt_in_tip, 3] - fr.Ffront[rgt_in_tip, 1]) * (
fr.Ffront[rgt_in_tip, 2] - fr.Ffront[rgt_in_tip, 0]) + \
fr.Ffront[rgt_in_tip, 2]
intercepts.append([intrcp_top[0], intrcp_btm[0], intrcp_lft[0], intrcp_rgt[0]])
return intercepts
#-----------------------------------------------------------------------------------------------------------------------
[docs]def write_properties_csv_file(file_name, properties):
""" This function writes the properties of a simulatio as a csv file. The csv contains (in a row vector) Eprime, K1c
, Cl, mu, rho_f, Q and t_inj
Args:
file_name (string): -- the name of the file to be written.
properties (tuple): -- the properties of the fracture loaded
"""
if len(properties[2].injectionRate[0]) > 1:
output_list = [None] * 7
else:
output_list = [None] * 6
output_list[0] = properties[0].Eprime
output_list[1] = properties[0].K1c[0]
output_list[2] = properties[0].Cl
output_list[3] = properties[1].viscosity
output_list[4] = properties[1].density
if len(properties[2].injectionRate[0]) > 1:
output_list[5] = properties[2].injectionRate[1][0]
output_list[6] = properties[2].injectionRate[0][1]
else:
output_list[5] = properties[2].injectionRate[1][0]
if file_name[-4:] != '.csv':
file_name = file_name + '.csv'
output_list_np = np.asarray(output_list)
np.savetxt(file_name, output_list_np, delimiter=',')
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_geometric_parameters(fr_list, properties):
max_breadth = np.full((len(fr_list), 1), np.nan)
avg_breadth = np.full((len(fr_list), 1), np.nan)
height = np.full((len(fr_list), 1), np.nan)
iter = 0
for jk in fr_list:
left, right = get_Ffront_as_vector(jk, jk.mesh.CenterCoor[jk.source[0], ::])[1:]
if left.shape[0] == right.shape[0]:
breadth = np.vstack((np.abs(left - right)[::, 0], left[::, 1]))
else:
breadth = np.vstack((np.abs(left[:np.min((left.shape[0], right.shape[0])), :] -
right[:np.min((left.shape[0], right.shape[0])), :])[:, 0],
np.vstack((left[:np.min((left.shape[0], right.shape[0])), 1],
right[:np.min((left.shape[0], right.shape[0])), 1]))
[np.argmin((left.shape[0], right.shape[0])), :]))
max_breadth[iter] = np.max(breadth[0, ::])
avg_breadth[iter] = np.mean(breadth[0, ::])
height[iter] = np.abs(np.max(np.hstack((jk.Ffront[::, 1], jk.Ffront[::, 3]))) -
np.min(np.hstack((jk.Ffront[::, 1], jk.Ffront[::, 3]))))
iter = iter + 1
return height.flatten().flatten(), max_breadth.flatten().flatten(), avg_breadth.flatten().flatten()
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_Ffront_as_vector(frac, inj_p):
mask13 = np.all(np.asarray([frac.Ffront[::, 0] <= inj_p[0], frac.Ffront[::, 1] <= inj_p[1]]), axis=0)
mask24 = np.all(np.asarray([frac.Ffront[::, 2] <= inj_p[0], frac.Ffront[::, 3] <= inj_p[1]]), axis=0)
lowLef = np.concatenate((frac.Ffront[mask13, :2:], frac.Ffront[mask24, 2::]), axis=0)
lowLef = lowLef[np.flip(lowLef[:, 1].argsort()), ::]
mask13 = np.all(np.asarray([frac.Ffront[::, 0] >= inj_p[0], frac.Ffront[::, 1] <= inj_p[1]]), axis=0)
mask24 = np.all(np.asarray([frac.Ffront[::, 2] >= inj_p[0], frac.Ffront[::, 3] <= inj_p[1]]), axis=0)
lowRig = np.concatenate((frac.Ffront[mask13, :2:], frac.Ffront[mask24, 2::]), axis=0)
lowRig = lowRig[lowRig[:, 1].argsort(), ::]
mask13 = np.all(np.asarray([frac.Ffront[::, 0] <= inj_p[0], frac.Ffront[::, 1] >= inj_p[1]]), axis=0)
mask24 = np.all(np.asarray([frac.Ffront[::, 2] <= inj_p[0], frac.Ffront[::, 3] >= inj_p[1]]), axis=0)
upLef = np.concatenate((frac.Ffront[mask13, :2:], frac.Ffront[mask24, 2::]), axis=0)
upLef = upLef[np.flip(upLef[:, 1].argsort()), ::]
mask13 = np.all(np.asarray([frac.Ffront[::, 0] >= inj_p[0], frac.Ffront[::, 1] >= inj_p[1]]), axis=0)
mask24 = np.all(np.asarray([frac.Ffront[::, 2] >= inj_p[0], frac.Ffront[::, 3] >= inj_p[1]]), axis=0)
upRig = np.concatenate((frac.Ffront[mask13, :2:], frac.Ffront[mask24, 2::]), axis=0)
upRig = upRig[upRig[:, 1].argsort(), ::]
if len(lowLef) != 0:
Ffront = np.concatenate((lowLef, lowRig, upRig, upLef, np.asarray([lowLef[0, :]])), axis=0)
else:
Ffront = np.concatenate((upRig, upLef, np.asarray([upRig[0, :]])), axis=0)
left = np.concatenate((lowLef, upLef), axis=0)
left = np.unique(left, axis=0)[np.unique(left, axis=0)[::, 1].argsort(), ::]
right = np.concatenate((lowRig, upRig), axis=0)
right = np.unique(right, axis=0)[np.unique(right, axis=0)[::, 1].argsort(), ::]
return Ffront, left, right
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_fracture_fp(fr_list, properties):
fp_list = []
iter = 0
for jk in fr_list:
fp_list.append(get_Ffront_as_vector(jk, jk.mesh.CenterCoor[jk.source[0], ::])[0])
iter = iter + 1
return fp_list
#-----------------------------------------------------------------------------------------------------------------------
from elastohydrodynamic_solver import calculate_fluid_flow_characteristics_laminar
[docs]def get_velocity_as_vector(Solid, Fluid, Fr_list): #CP 2020
"""This function gets the velocity components of the fluid flux for a given list of fractures
:param Solid: Instance of the class MaterialProperties - see related documentation
:param Fluid: Instance of the class FluidProperties - see related documentation
:param Fr_list: List of Instances of the class Fracture - see related documentation
:return: List containing a matrix with the information about the fluid velocity for each of the edges of any mesh element,
List of time stations
"""
fluid_vel_list = []
time_srs = []
for i in Fr_list:
fluid_flux, \
fluid_vel, \
Rey_num, \
fluid_flux_components, \
fluid_vel_components = calculate_fluid_flow_characteristics_laminar(i.w,
i.pFluid,
Solid.SigmaO,
i.mesh,
i.EltCrack,
i.InCrack,
Fluid.muPrime,
Fluid.density)
# fluid_vel_components_for_one_elem = [fx left edge, fy left edge, fx right edge, fy right edge, fx bottom edge, fy bottom edge, fx top edge, fy top edge]
#
# 6 7
# (ux,uy)
# o---top edge---o
# 0 1 | | 2 3
# (ux,uy)left right(ux,uy)
# | |
# o-bottom edge--o
# (ux,uy)
# 4 5
#
fluid_vel_list.append(fluid_vel_components)
time_srs.append(i.time)
return fluid_vel_list, time_srs
#-----------------------------------------------------------------------------------------------------------------------
[docs]def get_velocity_slice(Solid, Fluid, Fr_list, initial_point, vel_direction = 'ux',orientation='horizontal'): #CP 2020
"""
This function returns, at each time station, the velocity component in x or y direction along a horizontal or vertical section passing
through a given point.
WARNING: ASSUMING NO MESH COARSENING OR REMESHING WITH DOMAIN COMPRESSION
:param Solid: Instance of the class MaterialProperties - see related documentation
:param Fluid: Instance of the class FluidProperties - see related documentation
:param Fr_list: List of Instances of the class Fracture - see related documentation
:param initial_point: coordinates of the point where to draw the slice
:param vel_direction: component of the velocity vector, it can be 'ux' or 'uy'
:param orientation: it can be 'horizontal' or 'vertical'
:return: set of velocities
set of times
set of points along the slice, where the velocity is given
"""
# initial_point - of the slice
fluid_vel_list, time_srs = get_velocity_as_vector(Solid, Fluid, Fr_list)
nOFtimes = len(time_srs)
# fluid_vel_list is a list and each entry contains a matrix with the information about the fluid velocity for each of the edges of any mesh element
# fluid_vel_components_for_one_elem = [fx left edge, fy left edge, fx right edge, fy right edge, fx bottom edge, fy bottom edge, fx top edge, fy top edge]
#
# 6 7
# (ux,uy)
# o---top edge---o
# 0 1 | | 2 3
# (ux,uy)left right(ux,uy)
# | |
# o-bottom edge--o
# (ux,uy)
# 4 5
#
list_of_sampling_lines = []
list_of_fluid_vel_lists = []
for i in range(nOFtimes): #each fr has its own mesh
# 1) get the coordinates of the points in the slices
vector_to_be_lost = np.zeros(Fr_list[i].mesh.NumberOfElts,dtype=np.int)
NotUsd_var_values, sampling_line_center, sampling_cells = get_fracture_variable_slice_cell_center(vector_to_be_lost,
Fr_list[i].mesh,
point = initial_point,
orientation = orientation)
hx = Fr_list[i].mesh.hx # element horizontal size
hy = Fr_list[i].mesh.hy # element vertical size
# get the coordinates along the slice where you are getting the values
if vel_direction == 'ux' and orientation == 'horizontal': # take ux on the vertical edges
indx1 = 0 #left
indx2 = 2 #right
sampling_line_center1=sampling_line_center-hx*.5
sampling_line_center2=sampling_line_center+hx*.5
elif vel_direction == 'ux' and orientation == 'vertical': # take ux on the horizontal edges
indx1 = 4 #bottom
indx2 = 6 #top
sampling_line_center1=sampling_line_center-hy*.5
sampling_line_center2=sampling_line_center+hy*.5
elif vel_direction == 'uy' and orientation == 'horizontal': # take uy on the vertical edges
indx1 = 1 #left
indx2 = 3 #rigt
sampling_line_center1=sampling_line_center-hx*.5
sampling_line_center2=sampling_line_center+hx*.5
elif vel_direction == 'uy' and orientation == 'vertical': # take uy on the horizontal edges
indx1 = 5 #bottom
indx2 = 7 #top
sampling_line_center1=sampling_line_center-hy*.5
sampling_line_center2=sampling_line_center+hy*.5
#combining the two list of locations where I get the velocity
sampling_line = [None] * (len(sampling_line_center1) + len(sampling_line_center2))
sampling_line[::2] = sampling_line_center1
sampling_line[1::2] = sampling_line_center2
list_of_sampling_lines.append(sampling_line)
# 2) get the velocity values
EltCrack_i = Fr_list[i].EltCrack
fluid_vel_list_i = fluid_vel_list[i]
vector_to_be_lost1 = np.zeros(Fr_list[i].mesh.NumberOfElts, dtype=np.float)
vector_to_be_lost1[EltCrack_i] = fluid_vel_list_i[indx1,:]
vector_to_be_lost2 = np.zeros(Fr_list[i].mesh.NumberOfElts, dtype=np.float)
vector_to_be_lost2[EltCrack_i] = fluid_vel_list_i[indx2,:]
fluid_vel_list_final_i = [None] * (len(vector_to_be_lost1[sampling_cells]) + len(vector_to_be_lost2[sampling_cells]))
fluid_vel_list_final_i[::2] = vector_to_be_lost1[sampling_cells]
fluid_vel_list_final_i[1::2] = vector_to_be_lost2[sampling_cells]
list_of_fluid_vel_lists.append(fluid_vel_list_final_i)
return list_of_fluid_vel_lists, time_srs, list_of_sampling_lines