Source code for kalfeat.tools.exmath

# -*- coding: utf-8 -*-
#   author: KLaurent <etanoyau@gmail.com>
#   Licence:  GPL-3.0 

from __future__ import annotations 

import copy 
import inspect 
import warnings 

from scipy.signal import argrelextrema 
import scipy.integrate as integrate
from scipy.optimize import curve_fit
import numpy as np
import pandas as pd 
import  matplotlib.pyplot as plt
 
from .._kalfeatlog import kalfeatlog
from ..documentation import __doc__
from ..decorators import ( 
    deprecated, 
    refAppender, 
    docSanitizer
)
from .. import exceptions as Wex 
from ..property import P
from ..typing import (
    T, 
    F,
    List, 
    Tuple,
    Union,
    Array,
    DType,
    Optional,
    Sub, 
    SP, 
    Series, 
    DataFrame,
)

from .funcutils import (
    _assert_all_types, 
    smart_format,
                         
)
_logger =kalfeatlog.get_kalfeat_logger(__name__)

_msg= ''.join([
    'Note: need scipy version 0.14.0 or higher or interpolation,',
    ' might not work.']
)
_msg0 = ''.join([
    'Could not find scipy.interpolate, cannot use method interpolate'
     'check installation you can get scipy from scipy.org.']
)

try:
    import scipy
    scipy_version = [int(ss) for ss in scipy.__version__.split('.')]
    if scipy_version[0] == 0:
        if scipy_version[1] < 14:
            warnings.warn(_msg, ImportWarning)
            _logger.warning(_msg)
            
    import scipy.interpolate as spi

    interp_import = True
 # pragma: no cover
except ImportError: 
    
    warnings.warn(_msg0)
    _logger.warning(_msg0)
    
    interp_import = False
    

[docs]def dummy_basement_curve( func: F , ks: float , slope: float | int = 45, )-> Tuple[F, float]: """ Compute the pseudodepth from the search zone. :param f: callable - Polyfit1D function :param mz: array-zone - Expected Zone for groundwater search :param ks: float - The depth from which the expected fracture zone must starting looking for groundwater. :param slope: float - Degree angle for slope in linear function of the dummy curve :returns: - lambda function of basement curve `func45` - beta is intercept value compute for keysearch `ks` """ # Use kesearch (ks) to compute the beta value from the function f beta = func(ks) # note that 45 degree is used as the slope of the # imaginary basement curve # fdummy (x) = slope (45degree) * x + intercept (beta) slope = np.sin(np.deg2rad(slope)) func45 = lambda x: slope * x + beta return func45, beta
[docs]def find_limit_for_integration( ix_arr: Array[DType[int]], b0: List[T] =[] )-> List[T]: r""" Use the roots between f curve and basement curves to detect the limit of integration. :param ix_arr: array-like - Indexes array from masked array where the value are true i.e. where :math:` b-f > 0 \Rightarrow b> f` . :param b0: list - Empy list to hold the limit during entire loop .. note:: :math:`b > f \Longrightarrow` Curve b (basement) is above the fitting curve :math:`f` . :math:`b < f` otherwise. The pseudoarea is the area where :math:` b > f` . :return: list - integration bounds """ s = ix_arr.min() - 1 # 0 -1 =-1 oc = ix_arr.min() for jj, v in enumerate(ix_arr): s = v - s if s !=1: b0.append(oc); b0.append(ix_arr[jj-1]) oc= v s= v if v ==ix_arr[-1]: b0.append(oc); b0.append(v) return b0
[docs]def find_bound_for_integration( ix_arr: Array[DType[int]], b0: List[T] =[] )-> List[T]: r""" Recursive function. Use the roots between f curve and basement curves to detect the integration bounds. The function use entirely numpy for seaching integration bound. Since it is much faster than `find_limit_for_integration` although both did the same tasks. :param ix_arr: array-like - Indexes array from masked array where the value are true i.e. where :math:`b-f > 0 \Rightarrow b > f` . :param b0: list - Empy list to hold the limit during entire loop :return: list - integration bounds .. note:: :math:`b > f \Longrightarrow` Curve b (basement) is above the fitting curve :math:`f` . :math:`b < f` otherwise. The pseudoarea is the area where :math:`b > f` . """ # get the first index and arange this thin the end psdiff = np.arange(ix_arr.min(), len(ix_arr) + ix_arr.min(), 1) # make the difference to find the zeros values diff = ix_arr - psdiff index, = np.where(diff ==0) ; # take the min index and max index b0.append(min(ix_arr[index])) b0.append(max(ix_arr[index])) #now take the max index and add +1 and start by this part # retreived the values array_init = ix_arr[int(max(index)) +1:] return b0 if len( array_init)==0 else find_bound_for_integration(array_init, b0)
[docs]def fitfunc( x: Array[T], y: Array[T], deg: float | int =None, sample: int =1000 )-> Tuple[F, Array[T]]: """ Create polyfit function from a specifc sample data points. :param x: array-like of x-axis. :param y: array-like of y-axis. :param deg: polynomial degree. If ``None`` should compute using the length of extrema (local + global). :param sample: int - Number of data points should use for fitting function. Default is ``1000``. :returns: - Polynomial function `f` - new axis `x_new` generated from the samples. - projected sample values got from `f`. """ # generate a sample of values to cover the fit function # thus compute ynew (yn) from the poly function f minl, = argrelextrema(y, np.less) maxl, = argrelextrema(y,np.greater) # get the number of degrees degree = len(minl) + len(maxl) coeff = np.polyfit(x, y, deg if deg is not None else degree + 1 ) f = np.poly1d(coeff) xn = np.linspace(min(x), max(x), sample) yp = f(xn) return f, xn, yp
[docs]def vesDataOperator( AB : Array = None, rhoa: Array= None , data: DataFrame =None, typeofop: str = None, outdf: bool = False, )-> Tuple[Array] | DataFrame : """ Check the data in the given deep measurement and set the suitable operations for duplicated spacing distance of current electrodes `AB`. Sometimes at the potential electrodes (`MN`), the measurement of `AB` are collected twice after modifying the distance of `MN` a bit. At this point, two or many resistivity values are targetted to the same distance `AB` (`AB` still remains unchangeable while while `MN` is changed). So the operation consists whether to average (``mean``) the resistiviy values or to take the ``median`` values or to ``leaveOneOut`` (i.e. keep one value of resistivity among the different values collected at the same point`AB`) at the same spacing `AB`. Note that for the `LeaveOneOut``, the selected resistivity value is randomly chosen. :param AB: array-like - Spacing of the current electrodes when exploring in deeper. Units are in meters. :param rhoa: array-like - Apparent resistivity values collected in imaging in depth. Units are in :math:`\Omega {.m}` not :math:`log10(\Omega {.m})` :param data: DataFrame - It is composed of spacing values `AB` and the apparent resistivity values `rhoa`. If `data` is given, params `AB` and `rhoa` should be kept to ``None``. :param typeofop: str - Type of operation to apply to the resistivity values `rhoa` of the duplicated spacing points `AB`. The default operation is ``mean``. :param outdf: bool - Outpout a new dataframe composed of `AB` and `rhoa` data renewed. :returns: - Tuple of (AB, rhoa): New values computed from `typeofop` - DataFrame: New dataframe outputed only if ``outdf`` is ``True``. :note: By convention `AB` and `MN` are half-space dipole length which correspond to `AB/2` and `MN/2` respectively. :Example: >>> from kalfeat.tools.exmath import vesDataOperator >>> from kalfeat.tools.coreutils import vesSelector >>> data = vesSelector (f= 'data/ves/ves_gbalo.xlsx') >>> len(data) ... (32, 3) # include the potentiel electrode values `MN` >>> df= vesDataOperator(data.AB, data.resistivity, typeofop='leaveOneOut', outdf =True) >>> df.shape ... (26, 2) # exclude `MN` values and reduce(-6) the duplicated values. """ op = copy.deepcopy(typeofop) typeofop= str(typeofop).lower() if typeofop not in ('none', 'mean', 'median', 'leaveoneout'): raise ValueError( f'Unacceptable argument {op!r}. Use one of the following ' f'argument {smart_format([None,"mean", "median", "leaveOneOut"])}' ' instead.') typeofop ='mean' if typeofop =='none' else typeofop if data is not None: data = _assert_all_types(data, pd.DataFrame) rhoa = np.array(data.resistivity ) AB= np.array(data.AB) AB= np.array( _assert_all_types( AB, np.ndarray, list, tuple, pd.Series)) rhoa = np.array( _assert_all_types( rhoa, np.ndarray, list, tuple, pd.Series)) if len(AB)!= len(rhoa): raise Wex.VESError( 'Deep measurement `AB` must have the same size with ' ' the collected apparent resistivity `rhoa`.' f' {len(AB)} and {len(rhoa)} were given.') #----> When exploring in deeper, after changing the distance # of MN , measure are repeated at the same points. So, we will # selected these points and take the mean values of tyhe resistivity # make copies AB_ = AB.copy() ; rhoa_= rhoa.copy() # find the duplicated values # with np.errstate(all='ignore'): mask = np.zeros_like (AB_, dtype =bool) mask[np.unique(AB_, return_index =True)[1]]=True dup_values = AB_[~mask] indexes, = np.where(AB_==dup_values) #make a copy of unique values and filled the duplicated # values by their corresponding mean resistivity values X, rindex = np.unique (AB_, return_index=True); Y = rhoa_[rindex] d0= np.zeros_like(dup_values) for ii, d in enumerate(dup_values): index, = np.where (AB_==d) if typeofop =='mean': d0[ii] = rhoa_[index].mean() elif typeofop =='median': d0[ii] = np.median(rhoa_[index]) elif typeofop =='leaveoneout': d0[ii] = np.random.permutation(rhoa_[index])[0] maskr = np.isin(X, dup_values, assume_unique=True) Y[maskr] = d0 return (X, Y) if not outdf else pd.DataFrame ( {'AB': X,'resistivity':Y}, index =range(len(X)))
# XXXTODO
[docs]def invertVES (data: DataFrame[DType[float|int]] = None, rho0: float = None , h0 : float = None, typeof : str = 'HMCMC', **kwd)->Tuple [Array]: """ Invert the |VES| data collected in the exporation area. :param data: Dataframe pandas - contains the depth measurement AB from current electrodes, the potentials electrodes MN and the collected apparents resistivities. :param rho0: float - Value of the starting resistivity model. If ``None``, `rho0` should be the half minumm value of the apparent resistivity collected. Units is in Ω.m not log10(Ω.m) :param h0: float - Thickness in meter of the first layers in meters. If ``None``, it should be the minimum thickess as possible ``1.`` m. :param typeof: str - Type of inversion scheme. The defaut is Hybrid Monte Carlo (HMC) known as ``HMCMC`` . Another scheme is Bayesian neural network approach (``BNN``). :param kws: dict - Additionnal keywords arguments from |VES| data operations. See :func:`kalfeat.utils.exmath.vesDataOperator` for futher details. """ X, Y = vesDataOperator(data =data, **kwd) pass
[docs]def ohmicArea( data: DataFrame[DType[float|int]] = None, ohmSkey: float = 45., sum : bool = False, objective: str = 'ohmS', **kws ) -> float: r""" Compute the ohmic-area from the |VES| data collected in exploration area. Parameters ----------- * data: Dataframe pandas - contains the depth measurement AB from current electrodes, the potentials electrodes MN and the collected apparents resistivities. * ohmSkey: float - The depth in meters from which one expects to find a fracture zone outside of pollutions. Indeed, the `ohmSkey` parameter is used to speculate about the expected groundwater in the fractured rocks under the average level of water inrush in a specific area. For instance in `Bagoue region`_ , the average depth of water inrush is around ``45m``. So the `ohmSkey` can be specified via the water inrush average value. * objective: str - Type operation to outputs. By default, the function outputs the value of pseudo-area in :math:`$ \Omega .m^2 $`. However, for plotting purpose by setting the argument to ``view``, its gives an alternatively outputs of X and Y, recomputed and projected as weel as the X and Y values of the expected fractured zone. Where X is the AB dipole spacing when imaging to the depth and Y is the apparent resistivity computed kws: dict - Additionnal keywords arguments from |VES| data operations. See :func:`kalfeat.tools.exmath.vesDataOperator` for futher details. Returns -------- List of twice tuples: - Tuple(ohmS, error, roots): - `ohmS`is the pseudo-area computed expected to be a fractured zone - `error` is the integration error - `roots` is the integration boundaries of the expected fractured zone where the basement rocks is located above the resistivity transform function. At these points both curves values equal to null. - Tuple `(XY, fit XY,XYohmSarea)`: - `XY` is the ndarray(nvalues, 2) of the operated of `AB` dipole spacing and resistivity `rhoa` values. - `fit XY` is the fitting ndarray(nvalues, 2) uses to redraw the dummy resistivity transform function. - `XYohmSarea` is `ndarray(nvalues, 2)` of the dipole spacing and resistiviy values of the expected fracture zone. Raises ------- VESError If the `ohmSkey` is greater or equal to the maximum investigation depth in meters. Examples --------- >>> from kalfeat.tools.exmath import ohmicArea >>> from kalfeat.tools.coreutils import vesSelector >>> data = vesSelector (f= 'data/ves/ves_gbalo.xlsx') >>> (ohmS, err, roots), *_ = ohmicArea(data = data, ohmSkey =45, sum =True ) ... (13.46012197818152, array([5.8131967e-12]), array([45. , 98.07307307])) # pseudo-area is computed between the spacing point AB =[45, 98] depth. >>> _, (XY.shape, XYfit.shape, XYohms_area.shape) = ohmicArea( AB= data.AB, rhoa =data.resistivity, ohmSkey =45, objective ='plot') ... ((26, 2), (1000, 2), (8, 2)) See also --------- The `ohmS` value calculated from `pseudo-area` is a fully data-driven parameter and is used to evaluate a pseudo-area of the fracture zone from the depth where the basement rock is supposed to start. Usually, when exploring deeper using the |VES|, we are looking for groundwater in thefractured rock that is outside the anthropic pollution (Biemi, 1992). Since the VES is an indirect method, we cannot ascertain whether the presumed fractured rock contains water inside. However, we assume that the fracture zone could exist and should contain groundwater. Mathematically, based on the VES1D model proposed by `Koefoed, O. (1976)`_ , we consider a function :math:`$ \rho_T(l)$`, a set of reducing resistivity transform function to lower the boundary plane at half the current electrode spacing :math:`$(l)$`. From the sounding curve :math:`$\rho_T(l)$`, curve an imaginary basement rock :math:`$b_r (l)$` of slope equal to ``45°`` with the horizontal :math:`$h(l)$` was created. A pseudo-area :math:`$S(l)$` should be defined by extending from :math:`$h(l)$` the :math:`$b_r (l)$` curve when the sounding curve :math:`$\rho_T(l)$` is below :math:`$b_r(l)$`, otherwise :math:`$S(l)$` is equal to null. The computed area is called the ohmic-area :math:`$ohmS$` expressed in :math:`$\Omega .m^2$` and constitutes the expected *fractured zone*. Thus :math:`$ohmS$` ≠ :math:`0` confirms the existence of the fracture zone while of :math:`$Ohms=0$` raises doubts. The equation to determine the parameter is given as: .. math:: ohmS & = &\int_{ l_i}^{l_{i+1}} S(l)dl \quad {s.t.} S(l) & = & b_r (l) - \rho_T (l) \quad \text{if} \quad b_r (l) > \rho_T (l) \\ & = & 0. \quad \text{if} \quad b_r (l) \leq \rho_T (l) b_r(l) & = & l + h(l) \quad ; \quad h(l) = \beta \rho_T(l) & = & l^2 \int_{0}^{\infty} T_i( \lambda ) h_1( \lambda l) \lambda d\lambda where :math:`l_i \quad \text{and} \quad l_{i+1}` solve the equation :math:`S(l=0)`; :math:`$l$` is half the current electrode spacing :math:`$AB/2$`, and :math:`$h_1$` denotes the first-order of the Bessel function of the first kind, :math:`$ \beta $` is the coordinate value on y-axis direction of the intercept term of the :math:`$b_r(l)$` and :math:`$h(l)$`, :math:`$T_i(\lambda )$` resistivity transform function, :math:`$lamda$` denotes the integral variable, where n denotes the number of layers, :math:`$rho_i$` and :math:`$h_i$` are the resistivity and thickness of the :math:`$i-th$` layer, respectively. Get more explanations and cleareance of formula in the paper of `Kouadio et al 2022`_. References ---------- *Kouadio, K.L., Nicolas, K.L., Binbin, M., Déguine, G.S.P. & Serge*, *K.K. (2021, October)* Bagoue dataset-Cote d’Ivoire: Electrical profiling, electrical sounding and boreholes data, Zenodo. doi:10.5281/zenodo.5560937 *Koefoed, O. (1970)*. A fast method for determining the layer distribution from the raised kernel function in geoelectrical sounding. Geophysical Prospecting, 18(4), 564–570. https://doi.org/10.1111/j.1365-2478.1970.tb02129.x *Koefoed, O. (1976)*. Progress in the Direct Interpretation of Resistivity Soundings: an Algorithm. Geophysical Prospecting, 24(2), 233–240. https://doi.org/10.1111/j.1365-2478.1976.tb00921.x *Biemi, J. (1992)*. Contribution à l’étude géologique, hydrogéologique et par télédétection de bassins versants subsaheliens du socle précambrien d’Afrique de l’Ouest: hydrostructurale hydrodynamique, hydrochimie et isotopie des aquifères discontinus de sillons et aires gran. In Thèse de Doctorat (IOS journa, p. 493). Abidjan, Cote d'Ivoire .. _Kouadio et al 2022: https://doi.org/10.1029/2021WR031623 or refer to the paper `FlowRatePredictionWithSVMs <https://agupubs.onlinelibrary.wiley.com/doi/epdf/10.1029/2021WR031623>`_ .. _Koefoed, O. (1970): https://doi.org/10.1111/j.1365-2478.1970.tb02129.x .. _Koefoed, O. (1976): https://doi.org/10.1111/j.1365-2478.1976.tb00921.x .. _Bagoue region: https://en.wikipedia.org/wiki/Bagou%C3%A9 .. |VES| replace: Vertical Electrical Sounding """ objkeys = ( 'ohms','none','eval', 'area', 'ohmic','true', 'plot', 'mpl', 'false', 'graph','visual', 'view') objr = copy.deepcopy(objective) objective = str(objective).lower() compout, viewout = np.split(np.array(objkeys), 2) for oo, pp in zip(compout, viewout): if objective.find(oo)>=0 : objective ='ohms'; break elif objective.find(pp)>=0: objective ='graph'; break if objective not in list(objkeys)+ ['full', 'coverall']: raise ValueError(f"Unacceptable argument {str(objr)!r}. Objective" " argument can only be 'ohmS' for pseudo-area" " evaluation or 'graph' for visualization outputs." ) bound0=[] X, Y = vesDataOperator(data =data, **kws) try : ohmSkey = str(ohmSkey).lower().replace('m', '') if ohmSkey.find('none')>=0 : ohmSkey = X.max()/2 ohmSkey = float(ohmSkey) except: raise ValueError (f'Could not convert value {ohmSkey!r} to float') if ohmSkey >= X.max(): raise Wex.VESError(f"The startpoint 'ohmSkey={ohmSkey}m'is expected " f"to be less than the 'maxdepth={X.max()}m'.") #-------> construct the fitting curves for 1000 points # create the polyfit function fitting raw(f) from coefficents # (coefs) of the initial function f_rhotl, x_new, y_projected = fitfunc (X, Y) # Finding the intercepts between the fitting curve and the dummy # basement curves #--> e. g. start from 20m (oix) --> ... searching and find the index oIx = np.argmin (np.abs(X - ohmSkey)) # from this index (oIx) , read the remain depth. oB = X[int(oIx):] # from O-> end [OB] #--< construct the basement curve from the index of ohmSkey f_brl, beta = dummy_basement_curve( f_rhotl, ohmSkey) # 1000 points from OB (xx) xx = np.linspace(oB.min(), oB.max(), 1000) b45_projected= f_brl(xx) # create a fit function for b45 and find the limits # find the intersection between the b45_projected values and # fpartial projected values are the solution of equations f45 -fpartials diff_arr = b45_projected - f_rhotl(xx) #ypartial_projected # # if f-y < 0 => f< y so fitting curve is under the basement curve # # we keep the limit indexes for integral computation # # we want to keep the array_masked = np.ma.masked_where (diff_arr < 0 , diff_arr , copy =True) # get indexes of valid values indexes, = array_masked.nonzero() try : ib_indexes = find_bound_for_integration(indexes, b0=bound0) except : bound0=[] #initialize the bounds lists ib_indexes =find_limit_for_integration(indexes, b0= bound0) roots = xx[ib_indexes] f45, *_ = fitfunc(oB, Y[oIx:]) ff = f45 - f_rhotl pairwise_r = np.split(roots, len(roots)//2 ) if len( roots) > 2 else [np.array(roots)] ohmS = np.zeros((len(pairwise_r,))) err_ohmS = np.zeros((len(pairwise_r,))) for ii, (inf, sup) in enumerate(pairwise_r): values, err = integrate.quad(ff, a = inf, b = sup) ohmS[ii] = np.zeros((1,)) if values < 0 else values err_ohmS[ii] = err if sum: ohmS = ohmS.sum() rv =[ (ohmS, err_ohmS, roots), ( np.hstack((X[:, np.newaxis], Y[:, np.newaxis]) ), np.hstack((x_new[:, np.newaxis], y_projected[:, np.newaxis])), np.hstack((oB[:, np.newaxis], f_brl(oB)[:, np.newaxis]) ) ) ] for ii, ( obj , ix) in enumerate( zip(('ohms', 'graph'), [1, -1])): if objective ==obj : rv[ii + ix ]= (None, None, None) break return rv
def _type_mechanism ( cz: Array |List[float], dipolelength : float =10. ) -> Tuple[str, float]: """ Using the type mechanism helps to not repeat several time the same process during the `type` definition. :param cz: array-like - conductive zone; is a subset of the whole |ERP| survey line. .. note:: Here, the position absolutely refer to the global minimum resistivity value. :Example: >>> import numpy as np >>> from kalfeat.tools.exmath import _type_mechanism >>> rang = random.RandomState(42) >>> test_array2 = rang.randn (7) >>> _type_mechanism(np.abs(test_array2)) ... ('yes', 60.0) """ s_index = np.argmin(cz) lc , rc = cz[:s_index +1] , cz[s_index :] lm , rm = lc.max() , rc.max() # get the index of different values ixl, = np.where (lc ==lm) ; ixr, = np.where (rc ==rm) # take the far away value if the index is more than one ixl = ixl[0] if len(ixl) > 1 else ixl ixr =ixr [-1] + s_index if len(ixr) > 1 else ixr + s_index wcz = dipolelength * abs (int(ixl) - int(ixr)) status = 'yes' if wcz > 4 * dipolelength else 'no' return status, wcz
[docs]def type_ (erp: Array[DType[float]] ) -> str: """ Compute the type of anomaly. The type parameter is defined by the African Hydraulic Study Committee report (CIEH, 2001). Later it was implemented by authors such as (Adam et al., 2020; Michel et al., 2013; Nikiema, 2012). `Type` comes to help the differenciation of two or several anomalies with the same `shape`. For instance, two anomalies with the same shape ``W`` will differ from the order of priority of their types. The `type` depends on the lateral resistivity distribution of underground (resulting from the pace of the apparent resistivity curve) along with the whole |ERP| survey line. Indeed, four types of anomalies were emphasized: **"EC"**, **"CB2P"**, **"NC"** and **"CP"**. For more details refers to references. :param erp: array-like - Array of |ERP| line composed of apparent resistivity values. :return: str -The `type` of anomaly. :Example: >>> import numpy as np >>> from kalfeat.tools.exmath import type_ >>> rang = random.RandomState(42) >>> test_array2 = rang.randn (7) >>> type_(np.abs(test_array2)) ... 'EC' >>> long_array = np.abs (rang.randn(71)) >>> type(long_array) ... 'PC' References ----------- *Adam, B. M., Abubakar, A. H., Dalibi, J. H., Khalil Mustapha,M., & Abubakar,* *A. H. (2020)*. Assessment of Gaseous Emissions and Socio-Economic Impacts From Diesel Generators used in GSM BTS in Kano Metropolis. African Journal of Earth and Environmental Sciences, 2(1),517–523. https://doi.org/10.11113/ajees.v3.n1.104 *CIEH. (2001)*. L’utilisation des méthodes géophysiques pour la recherche d’eaux dans les aquifères discontinus. Série Hydrogéologie, 169. *Michel, K. A., Drissa, C., Blaise, K. Y., & Jean, B. (2013)*. Application de méthodes géophysiques à l ’ étude de la productivité des forages d ’eau en milieu cristallin : cas de la région de Toumodi ( Centre de la Côte d ’Ivoire). International Journal of Innovation and Applied Studies, 2(3), 324–334. *Nikiema, D. G. C. (2012)*. Essai d‘optimisation de l’implantation géophysique des forages en zone de socle : Cas de la province de Séno, Nord Est du Burkina Faso (IRD). (I. / I. Ile-de-France, Ed.). IST / IRD Ile-de-France, Ouagadougou, Burkina Faso, West-africa. Retrieved from http://documentation.2ie-edu.org/cdi2ie/opac_css/doc_num.php?explnum_id=148 .. |ERP| replace:: Electrical Resistivity Profiling """ # split array type_ ='PC' # initialize type erp = _assert_all_types(erp, tuple, list, np.ndarray, pd.Series) erp = np.array (erp) try : ssets = np.split(erp, len(erp)//7) except ValueError: # get_indices if len(erp) < 7: ssets =[erp ] else : remains = len(erp) % 7 indices = np.arange(7 , len(erp) - remains , 7) ssets = np.split(erp , indices ) status =list() for czx in ssets : sta , _ = _type_mechanism(czx) status.append(sta) if len(set (status)) ==1: if status [0] =='yes': type_= 'EC' elif status [0] =='no': type_ ='NC' elif len(set(status)) ==2: yes_ix , = np.where (np.array(status) =='yes') # take the remain index no_ix = np.array (status)[len(yes_ix):] # check whether all indexes are sorted sort_ix_yes = all(yes_ix[i] < yes_ix[i+1] for i in range(len(yes_ix) - 1)) sort_ix_no = all(no_ix[i] < no_ix[i+1] for i in range(len(no_ix) - 1)) # check whether their difference is 1 even sorted if sort_ix_no == sort_ix_yes == True: yes = set ([abs(yes_ix[i] -yes_ix[i+1]) for i in range(len(yes_ix)-1)]) no = set ([abs(no_ix[i] -no_ix[i+1]) for i in range(len(no_ix)-1)]) if yes == no == {1}: type_= 'CB2P' return type_
[docs]def shape ( cz : Array | List [float], s : Optional [str, int] = ..., p: SP = ..., ) -> str: """ Compute the shape of anomaly. The `shape` parameter is mostly used in the basement medium to depict the better conductive zone for the drilling location. According to Sombo et al. (2011; 2012), various shapes of anomalies can be described such as: **"V"**, **"U"**, **"W"**, **"M"**, **"K"**, **"C"**, and **"H"** The `shape` consists to feed the algorithm with the |ERP| resistivity values by specifying the station :math:`$(S_{VES})$`. Indeed, mostly, :math:`$S_{VES}$` is the station with a very low resistivity value expected to be the drilling location. :param cz: array-like - Conductive zone resistivity values :param s: int, str - Station position index or name. :param p: Array-like - Should be the position of the conductive zone. .. note:: If `s` is given, `p` should be provided. If `p` is missing an error will raises. :return: str - the shape of anomaly. :Example: >>> import numpy as np >>> rang = random.RandomState(42) >>> from kalfeat.tools.exmath import shape_ >>> test_array1 = np.arange(10) >>> shape_ (test_array1) ... 'C' >>> test_array2 = rang.randn (7) >>> _shape(test_array2) ... 'K' >>> test_array3 = np.power(10, test_array2 , dtype =np.float32) >>> _shape (test_array3) ... 'K' # does not change whatever the resistivity values. References ---------- *Sombo, P. A., Williams, F., Loukou, K. N., & Kouassi, E. G. (2011)*. Contribution de la Prospection Électrique à L’identification et à la Caractérisation des Aquifères de Socle du Département de Sikensi (Sud de la Côte d’Ivoire). European Journal of Scientific Research, 64(2), 206–219. *Sombo, P. A. (2012)*. Application des methodes de resistivites electriques dans la determination et la caracterisation des aquiferes de socle en Cote d’Ivoire. Cas des departements de Sikensi et de Tiassale (Sud de la Cote d’Ivoire). Universite Felix Houphouet Boigny. """ shape = 'V' # initialize the shape with the most common cz = _assert_all_types( cz , tuple, list, np.ndarray, pd.Series) cz = np.array(cz) # detect the staion position index if s is (None or ... ): s_index = np.argmin(cz) elif s is not None: if isinstance(s, str): try: s= int(s.lower().replace('s', '')) except: if p is ( None or ...): raise Wex.StationError( "Need the positions `p` of the conductive zone " "to be supplied.'NoneType' is given.") s_index,*_ = detect_station_position(s,p) else : s_index = s else : s_index= _assert_all_types(s, int) if s_index >= len(cz): raise Wex.StationError( f"Position should be less than '7': got '{s_index}'") lbound , rbound = cz[:s_index +1] , cz[s_index :] ls , rs = lbound[0] , rbound [-1] # left side and right side (s) lminls, = argrelextrema(lbound, np.less) lminrs, = argrelextrema(rbound, np.less) lmaxls, = argrelextrema(lbound, np.greater) lmaxrs, = argrelextrema(rbound, np.greater) # median helps to keep the same shape whatever # the resistivity values med = np.median(cz) if (ls >= med and rs < med ) or (ls < med and rs >= med ): if len(lminls) == 0 and len(lminrs) ==0 : shape = 'C' elif (len(lminls) ==0 and len(lminrs) !=0) or ( len(lminls) !=0 and len(lminrs)==0) : shape = 'K' elif (ls and rs) > med : if len(lminls) ==0 and len(lminrs) ==0 : shape = 'U' elif (len(lminls) ==0 and len(lminrs) ==1 ) or ( len(lminrs) ==0 and len(lminls) ==1): shape = 'H' elif len(lminls) >=1 and len(lminrs) >= 1 : return 'W' elif (ls < med ) and rs < med : if (len(lmaxls) >=1 and len(lmaxrs) >= 0 ) or ( len(lmaxls) <=0 and len(lmaxrs) >=1): shape = 'M' return shape
[docs]@refAppender(__doc__) @docSanitizer() def scalePosition( ydata: Array | SP | Series | DataFrame , xdata: Array| Series = None, func : Optional [F] = None , c_order: Optional[int|str] = 0, show: bool =False, **kws): """ Correct data location or position and return new corrected location Parameters ---------- ydata: array_like, series or dataframe The dependent data, a length M array - nominally ``f(xdata, ...)``. xdata: array_like or object The independent variable where the data is measured. Should usually be an M-length sequence or an (k,M)-shaped array for functions with k predictors, but can actually be any object. If ``None``, `xdata` is generated by default using the length of the given `ydata`. func: callable The model function, ``f(x, ...)``. It must take the independent variable as the first argument and the parameters to fit as separate remaining arguments. The default `func` is ``linear`` function i.e for ``f(x)= ax +b``. where `a` is slope and `b` is the intercept value. Setting your own function for better fitting is recommended. c_order: int or str The index or the column name if ``ydata`` is given as a dataframe to select the right column for scaling. show: bool Quick visualization of data distribution. kws: dict Additional keyword argument from `scipy.optimize_curvefit` parameters. Refer to `scipy.optimize.curve_fit`_. Returns -------- - ydata - array -like - Data scaled - popt - array-like Optimal values for the parameters so that the sum of the squared residuals of ``f(xdata, *popt) - ydata`` is minimized. - pcov - array like The estimated covariance of popt. The diagonals provide the variance of the parameter estimate. To compute one standard deviation errors on the parameters use ``perr = np.sqrt(np.diag(pcov))``. How the sigma parameter affects the estimated covariance depends on absolute_sigma argument, as described above. If the Jacobian matrix at the solution doesn’t have a full rank, then ‘lm’ method returns a matrix filled with np.inf, on the other hand 'trf' and 'dogbox' methods use Moore-Penrose pseudoinverse to compute the covariance matrix. Examples -------- >>> from kalfeat.tools import erpSelector, scalePosition >>> df = erpSelector('data/erp/l10_gbalo.xlsx') >>> df.columns ... Index(['station', 'resistivity', 'longitude', 'latitude', 'easting', 'northing'], dtype='object') >>> # correcting northing coordinates from easting data >>> northing_corrected, popt, pcov = scalePosition(ydata =df.northing , xdata = df.easting, show=True) >>> len(df.northing.values) , len(northing_corrected) ... (20, 20) >>> popt # by default popt =(slope:a ,intercept: b) ... array([1.01151734e+00, 2.93731377e+05]) >>> # corrected easting coordinates using the default x. >>> easting_corrected, *_= scalePosition(ydata =df.easting , show=True) >>> df.easting.values ... array([790284, 790281, 790277, 790270, 790265, 790260, 790254, 790248, ... 790243, 790237, 790231, 790224, 790218, 790211, 790206, 790200, ... 790194, 790187, 790181, 790175], dtype=int64) >>> easting_corrected ... array([790288.18571705, 790282.30300999, 790276.42030293, 790270.53759587, ... 790264.6548888 , 790258.77218174, 790252.88947468, 790247.00676762, ... 790241.12406056, 790235.2413535 , 790229.35864644, 790223.47593938, ... 790217.59323232, 790211.71052526, 790205.8278182 , 790199.94511114, ... 790194.06240407, 790188.17969701, 790182.29698995, 790176.41428289]) """ def linfunc (x, a, b): """ Set the simple linear function""" return a * x + b if str(func).lower() in ('none' , 'linear'): func = linfunc elif not hasattr(func, '__call__') or not inspect.isfunction (func): raise TypeError( f'`func` argument is a callable not {type(func).__name__!r}') ydata = _assert_all_types(ydata, list, tuple, np.ndarray, pd.Series, pd.DataFrame ) c_order = _assert_all_types(c_order, int, float, str) try : c_order = int(c_order) except: pass if isinstance(ydata, pd.DataFrame): if c_order ==0: warnings.warn("The first column of the data should be considered" " as the `y` target.") if c_order is None: raise TypeError('Dataframe is given. The `c_order` argument should ' 'be defined for column selection. Use column name' ' instead') if isinstance(c_order, str): # check whether the value is on the column name if c_order.lower() not in list(map( lambda x :x.lower(), ydata.columns)): raise ValueError ( f'c_order {c_order!r} not found in {list(ydata.columns)}' ' Use the index instead.') # if c_order exists find the index and get the # right column name ix_c = list(map( lambda x :x.lower(), ydata.columns) ).index(c_order.lower()) ydata = ydata.iloc [:, ix_c] # series elif isinstance (c_order, (int, float)): c_order =int(c_order) if c_order >= len(ydata.columns): raise ValueError( f"`c_order`'{c_order}' should be less than the number of " f"given columns '{len(ydata.columns)}'. Use column name instead.") ydata= ydata.iloc[:, c_order] ydata = np.array(ydata) if xdata is None: xdata = np.linspace(0, 4, len(ydata)) if len(xdata) != len(ydata): raise ValueError(" `x` and `y` arrays must have the same length." "'{len(xdata)}' and '{len(ydata)}' are given.") popt, pcov = curve_fit(func, xdata, ydata, **kws) ydata_new = func(xdata, *popt) if show: plt.plot(xdata, ydata, 'b-', label='data') plt.plot(xdata, func(xdata, *popt), 'r-', label='fit: a=%5.3f, b=%5.3f' % tuple(popt)) plt.xlabel('x') plt.ylabel('y') plt.legend() plt.show() return ydata_new, popt, pcov
def __sves__ ( s_index: int , cz: Array | List[float], ) -> Tuple[Array, Array]: """ Divide the conductive zone in leftzone and rightzone from the drilling location index . :param s_index - station location index expected for dilling location. It refers to the position of |VES|. :param cz: array-like - Conductive zone . :returns: - <--Sves: Left side of conductive zone from |VES| location. - --> Sves: Right side of conductive zone from |VES| location. .. note:: Both sides included the |VES| `Sves` position. .. |VES| replace:: Vertical Electrical Sounding """ try: s_index = int(s_index) except: raise TypeError( f'Expected integer value not {type(s_index).__name__}') s_index = _assert_all_types( s_index , int) cz = _assert_all_types(cz, np.ndarray, pd.Series, list, tuple ) rmax_ls , rmax_rs = max(cz[:s_index + 1]), max(cz[s_index :]) # detect the value of rho max (rmax_...) # from lower side bound of the anomaly. rho_ls= rmax_ls if rmax_ls < rmax_rs else rmax_rs side =... # find with positions for _, sid in zip((rmax_ls , rmax_rs ) , ('leftside', 'rightside')) : side = sid ; break return (rho_ls, side), (rmax_ls , rmax_rs )
[docs]def detect_station_position ( s : Union[str, int] , p: SP, ) -> Tuple [int, float]: """ Detect station position and return the index in positions :param s: str, int - Station location in the position array. It should be the positionning of the drilling location. If the value given is type string. It should be match the exact position to locate the drilling. Otherwise, if the value given is in float or integer type, it should be match the index of the position array. :param p: Array-like - Should be the conductive zone as array of station location values. :returns: - `s_index`- the position index location in the conductive zone. - `s`- the station position in distance. :Example: >>> import numpy as np >>> from kalfeat.utils.exmath import detect_station_position >>> pos = np.arange(0 , 50 , 10 ) >>> detect_station_position (s ='S30', p = pos) ... (3, 30.0) >>> detect_station_position (s ='40', p = pos) ... (4, 40.0) >>> detect_station_position (s =2, p = pos) ... (2, 20) >>> detect_station_position (s ='sta200', p = pos) ... kalfeatError_station: Station sta200 \ is out of the range; max position = 40 """ s = _assert_all_types( s, float, int, str) p = _assert_all_types( p, tuple, list, np.ndarray, pd.Series) S=copy.deepcopy(s) if isinstance(s, str): s =s.lower().replace('s', '').replace('pk', '').replace('ta', '') try : s=int(s) except : raise ValueError (f'could not convert string to float: {S}') p = np.array(p, dtype = np.int32) dl = (p.max() - p.min() ) / (len(p) -1) if isinstance(s, (int, float)): if s > len(p): # consider this as the dipole length position: # now let check whether the given value is module of the station if s % dl !=0 : raise Wex.kalfeatError_station ( f'Unable to detect the station position {S}') elif s % dl == 0 and s <= p.max(): # take the index s_index = s//dl return int(s_index), s_index * dl else : raise Wex.StationError ( f'Station {S} is out of the range; max position = {max(p)}' ) else : if s >= len(p): raise Wex.kalfeatError_station ( 'Location index must be less than the number of' f' stations = {len(p)}. {s} is gotten.') # consider it as integer index # erase the last variable # s_index = s # s = S * dl # find return s , p[s ] # check whether the s value is in the p if True in np.isin (p, s): s_index , = np.where (p ==s ) s = p [s_index] return int(s_index) , s
[docs]def sfi ( cz: Sub[Array[T, DType[T]]] | List[float] , p: Sub[SP[Array, DType [int]]] | List [int] = None, s: Optional [str] =None, dipolelength: Optional [float] = None, plot: bool = False, raw : bool = False, **plotkws ) -> float: r""" Compute the pseudo-fracturing index known as *sfi*. The sfi parameter does not indicate the rock fracturing degree in the underground but it is used to speculate about the apparent resistivity dispersion ratio around the cumulated sum of the resistivity values of the selected anomaly. It uses a similar approach of IF parameter proposed by `Dieng et al`_ (2004). Furthermore, its threshold is set to :math:`$sqrt{2}$` for symmetrical anomaly characterized by a perfect distribution of resistivity in a homogenous medium. The formula is given by: .. math:: sfi=\sqrt{(P_a^{*}/P_a )^2+(M_a^{*}/M_a )^2} where :math:`$P_a$` and :math:`$M_a$` are the anomaly power and the magnitude respectively. :math:`$P_a^{*}$` is and :math:`$M_a^{*}$` are the projected power and magnitude of the lower point of the selected anomaly. :param cz: array-like. Selected conductive zone :param p: array-like. Station positions of the conductive zone. :param dipolelength: float. If `p` is not given, it will be set automatically using the default value to match the ``cz`` size. The **default** value is ``10.``. :param plot: bool. Visualize the fitting curve. *Default* is ``False``. :param raw: bool. Overlaining the fitting curve with the raw curve from `cz`. :param plotkws: dict. `Matplotlib plot`_ keyword arguments. :Example: >>> from numpy as np >>> from kalfeat.properties import P >>> from kalfeat.tools.exmath import sfi >>> rang = np.random.RandomState (42) >>> condzone = np.abs(rang.randn (7)) >>> # no visualization and default value `s` with gloabl minimal rho >>> pfi = sfi (condzone) ... 3.35110143 >>> # visualize fitting curve >>> plotkws = dict (rlabel = 'Conductive zone (cz)', label = 'fitting model', color=f'{P().frcolortags.get("fr3")}', ) >>> sfi (condzone, plot= True , s= 5, figsize =(7, 7), **plotkws ) ... Out[598]: (array([ 0., 10., 20., 30.]), 1) References ---------- - See `Numpy Polyfit <https://numpy.org/doc/stable/reference/generated/numpy.polyfit.html>`_ - See `Stackoverflow <https://stackoverflow.com/questions/10457240/solving-polynomial-equations-in-python>`_ the answer of AkaRem edited by Tobu and Migilson. - See `Numpy Errorstate <https://numpy.org/devdocs/reference/generated/numpy.errstate.html>`_ and how to implement the context manager. """ # Determine the number of curve inflection # to find the number of degree to compose # cz fonction if p is None : dipolelength = 10. if dipolelength is None else dipolelength p = np.arange (0, len(cz) * dipolelength, dipolelength) if len(p) != len(cz): raise Wex.StationError ( 'Array of position and conductive zone must have the same length:' f' `{len(p)}` and `{len(cz)}` were given.') minl, = argrelextrema(cz, np.less) maxl, = argrelextrema(cz,np.greater) ixf = len(minl) + len(maxl) # create the polyfit function f from coefficents (coefs) coefs = np.polyfit(x=p, y=cz, deg =ixf + 1 ) f = np.poly1d(coefs ) # generate a sample of values to cover the fit function # for degree 2: eq => f(x) =ax2 +bx + c or c + bx + ax2 as # the coefs are aranged. # coefs are ranged for index0 =c, index1 =b and index 2=a # for instance for degree =2 # model (f)= [coefs[2] + coefs[1] * x + coefs [0]* x**2 for x in xmod] # where x_new(xn ) = 1000 points generated # thus compute ynew (yn) from the poly function f xn = np.linspace (min(p), max(p), 1000) yn = f(xn) # solve the system to find the different root # from the min resistivity value bound. # -> Get from each anomaly bounds (leftside and right side ) # the maximum resistivity and selected the minumum # value to project to the other side in order to get # its positions on the station location p. if s is not None : # explicity giving s s_ix , spos = detect_station_position(s , p ) (rho_side, side ), (rho_ls_max , rho_rs_max) = __sves__(s_ix , cz ) elif s is None: # take the index of min value of cz s_ix = np.argmin(cz) ; spos = p[s_ix] (rho_side, side ), (rho_ls_max , rho_rs_max) = __sves__(s_ix , cz ) # find the roots from rhoa_side: # f(x) =y => f (x) = rho_side fn = f - rho_side roots = np.abs(fn.r ) # detect the rho_side positions ppow = roots [np.where (roots > spos )] if side =='leftside' else roots[ np.where (roots < spos)] ppow = ppow [0] if len (ppow) > 1 else ppow # compute sfi pw = power(p) ma= magnitude(cz) pw_star = np.abs (p.min() - ppow) ma_star = np.abs(cz.min() - rho_side) with np.errstate(all='ignore'): # $\sqrt2# is the threshold sfi = np.sqrt ( (pw_star/pw)**2 + (ma_star / ma )**2 ) % np.sqrt(2) if sfi == np.inf : sfi = np.sqrt ( (pw/pw_star)**2 + (ma / ma_star )**2 ) % np.sqrt(2) if plot: plot_(p,cz,'-ok', xn, yn, raw = raw , **plotkws) return sfi
[docs]@refAppender(__doc__) def plot_ ( *args : List [Union [str, Array, ...]], figsize: Tuple[int] = None, raw : bool = False, style : str = 'seaborn', dtype: str ='erp', kind: Optional[str] = None , **kws ) -> None : """ Quick visualization for fitting model, |ERP| and |VES| curves. :param x: array-like - array of data for x-axis representation :param y: array-like - array of data for plot y-axis representation :param figsize: tuple - Mtplotlib (MPL) figure size; should be a tuple value of integers e.g. `figsize =(10, 5)`. :param raw: bool- Originally the `plot_` function is intended for the fitting |ERP| model i.e. the correct value of |ERP| data. However, when the `raw` is set to ``True``, it plots the both curves: The fitting model as well as the uncorrected model. So both curves are overlaining or supperposed. :param style: str - Pyplot style. Default is ``seaborn`` :param dtype: str - Kind of data provided. Can be |ERP| data or |VES| data. When the |ERP| data are provided, the common plot is sufficient to visualize all the data insignt i.e. the default value of `kind` is kept to ``None``. However, when the data collected is |VES| data, the convenient plot for visualization is the ``loglog`` for parameter `kind`` while the `dtype` can be set to `VES` to specify the labels into the x-axis. The default value of `dtype` is ``erp`` for common visualization. :param kind: str - Use to specify the kind of data provided. See the explanation of `dtype` parameters. By default `kind` is set to ``None`` i.e. its keep the normal plots. It can be ``loglog``, ``semilogx`` and ``semilogy``. :param kws: dict - Additional `Matplotlib plot`_ keyword arguments. To cus- tomize the plot, one can provide a dictionnary of MPL keyword additional arguments like the example below. :Example: >>> import numpy as np >>> from kalfeat.tools.exmath import plot_ >>> x, y = np.arange(0 , 60, 10) ,np.abs( np.random.randn (6)) >>> KWS = dict (xlabel ='Stations positions', ylabel= 'resistivity(ohm.m)', rlabel = 'raw cuve', rotate = 45 ) >>> plot_(x, y, '-ok', raw = True , style = 'seaborn-whitegrid', figsize = (7, 7) ,**KWS ) """ plt.style.use(style) # retrieve all the aggregated data from keywords arguments if (rlabel := kws.get('rlabel')) is not None : del kws['rlabel'] if (xlabel := kws.get('xlabel')) is not None : del kws['xlabel'] if (ylabel := kws.get('ylabel')) is not None : del kws['ylabel'] if (rotate:= kws.get ('rotate')) is not None: del kws ['rotate'] if (title:= kws.get ('title')) is not None: del kws ['title'] x , y, *args = args fig = plt.figure(1, figsize =figsize) plt.plot (x, y,*args, **kws) if raw: kind = kind.lower( ) if isinstance(kind, str) else kind if kind =='semilogx': plt.semilogx (x, y, color = '{}'.format(P().frcolortags.get("fr1")), label =rlabel, ) elif kind =='semilogy': plt.semilogy (x, y, color = '{}'.format(P().frcolortags.get("fr1")), label =rlabel, ) elif kind =='loglog': print('yes') plt.loglog (x, y, color = '{}'.format(P().frcolortags.get("fr1")), label =rlabel, ) else: plt.plot (x, y, color = '{}'.format(P().frcolortags.get("fr1")), label =rlabel, ) dtype = dtype.lower() if isinstance(dtype, str) else dtype if dtype is None: dtype ='erp' if dtype not in ('erp', 'ves'): kind ='erp' if dtype =='erp': plt.xticks (x, labels = ['S{:02}'.format(int(i)) for i in x ], rotation = 0. if rotate is None else rotate ) elif dtype =='ves': plt.xticks (x, rotation = 0. if rotate is None else rotate ) plt.xlabel ('Stations') if xlabel is None else plt.xlabel (xlabel) plt.ylabel ('Resistivity (Ω.m)' ) if ylabel is None else plt.ylabel (ylabel) fig_title_kws = dict ( t = 'Plot fit model' if dtype =='erp' else title, style ='italic', bbox =dict(boxstyle='round',facecolor ='lightgrey')) plt.tight_layout() fig.suptitle(**fig_title_kws) plt.legend () plt.show ()
[docs]def quickplot (arr: Array | List[float], dl:float =10)-> None: """Quick plot to see the anomaly""" plt.plot(np.arange(0, len(arr) * dl, dl), arr , ls ='-', c='k') plt.show()
[docs]def magnitude (cz:Sub[Array[float, DType[float]]] ) -> float: r""" Compute the magnitude of selected conductive zone. The magnitude parameter is the absolute resistivity value between the minimum :math:`\min \rho_a` and maximum :math:`\max \rho_a` value of selected anomaly: .. math:: magnitude=|\min\rho_a -\max\rho_a| :param cz: array-like. Array of apparent resistivity values composing the conductive zone. :return: Absolute value of anomaly magnitude in ohm.meters. """ return np.abs (cz.max()- cz.min())
[docs]def power (p:Sub[SP[Array, DType [int]]] | List[int] ) -> float : """ Compute the power of the selected conductive zone. Anomaly `power` is closely referred to the width of the conductive zone. The power parameter implicitly defines the width of the conductive zone and is evaluated from the difference between the abscissa :math:`$X_{LB}$` and the end :math:`$X_{UB}$` points of the selected anomaly: .. math:: power=|X_{LB} - X_{UB} | :param p: array-like. Station position of conductive zone. :return: Absolute value of the width of conductive zone in meters. """ return np.abs(p.min()- p.max())
def _find_cz_bound_indexes ( erp: Union[Array[float, DType[float]], List[float], pd.Series], cz: Union [Sub[Array], List[float]] )-> Tuple[int, int]: """ Fetch the limits 'LB' and 'UB' of the selected conductive zone. Indeed the 'LB' and 'UB' fit the lower and upper boundaries of the conductive zone respectively. :param erp: array-like. Apparent resistivities collected during the survey. :param cz: array-like. Array of apparent resistivies composing the conductive zone. :return: The index of boundaries 'LB' and 'UB'. .. note:: `cz` must be self-containing of `erp`. If ``False`` should raise and error. """ # assert whether cz is a subset of erp. if isinstance( erp, pd.Series): erp = erp.values if not np.isin(True, (np.isin (erp, cz))): raise ValueError ('Expected the conductive zone array being a ' 'subset of the resistivity array.') # find the indexes using np.argwhere cz_indexes = np.argwhere(np.isin(erp, cz)).ravel() return cz_indexes [0] , cz_indexes [-1]
[docs]def convert_distance_to_m( value:T , converter:float =1e3, unit:str ='km' )-> float: """ Convert distance from `km` to `m` or vice versa even a string value is given. :param value: value to convert. :paramm converter: Equivalent if given in ``km`` rather than ``m``. :param unit: unit to convert to. """ if isinstance(value, str): try: value = float(value.replace(unit, '') )*converter if value.find( 'km')>=0 else float(value.replace('m', '')) except: raise TypeError(f"Expected float not {type(value)!r}." ) return value
[docs]def get_station_number ( dipole:float, distance:float , from0:bool = False, **kws )-> float: """ Get the station number from dipole length and the distance to the station. :param distance: Is the distance from the first station to `s` in meter (m). If value is given, please specify the dipole length in the same unit as `distance`. :param dipole: Is the distance of the dipole measurement. By default the dipole length is in meter. :param kws: :func:`convert_distance_to_m` additional arguments """ dipole=convert_distance_to_m(dipole, **kws) distance =convert_distance_to_m(distance, **kws) return distance/dipole if from0 else distance/dipole + 1
[docs]@deprecated('Function is going to be removed for the next release ...') def define_conductive_zone ( erp: Array | List[float], stn: Optional [int] = None, sres:Optional [float] = None, *, distance:float | None = None , dipole_length:float | None = None, extent:int =7): """ Detect the conductive zone from `s`ves point. :param erp: Resistivity values of electrical resistivity profiling(ERP). :param stn: Station number expected for VES and/or drilling location. :param sres: Resistivity value at station number `stn`. If `sres` is given, the auto-search will be triggered to find the station number that fits the resistivity value. :param distance: Distance from the first station to `stn`. If given, be sure to provide the `dipole_length` :param dipole_length: Length of the dipole. Comonly the distante between two close stations. Since we use config AB/2 :param extent: Is the width to depict the anomaly. If provide, need to be consistent along all ERP line. Should keep unchanged for other parameters definitions. Default is ``7``. :returns: - CZ:Conductive zone including the station position - sres: Resistivity value of the station number - ix_stn: Station position in the CZ .. note:: If many stations got the same `sres` value, the first station is flagged. This may not correspond to the station number that is searching. Use `sres` only if you are sure that the resistivity value is unique on the whole ERP. Otherwise it's not recommended. :Example: >>> import numpy as np >>> from kalfeat.tools.exmath import define_conductive_zone >>> sample = np.random.randn(9) >>> cz, stn_res = define_conductive_zone(sample, 4, extent = 7) ... (array([ 0.32208638, 1.48349508, 0.6871188 , -0.96007639, -1.08735204,0.79811492, -0.31216716]), -0.9600763919368086, 3) """ try : iter(erp) except : raise Wex.ArgumentError ( f'`erp` must be a sequence of values not {type(erp)!r}') finally: erp = np.array(erp) # check the distance if stn is None: if (dipole_length and distance) is not None: stn = get_station_number(dipole_length, distance) elif sres is not None: snix, = np.where(erp==sres) if len(snix)==0: raise Wex.ParameterNumberError( "Could not find the resistivity value of the VES " "station. Please provide the right value instead.") elif len(snix)==2: stn = int(snix[0]) + 1 else : raise Wex.ArgumentError ( '`stn` is needed or at least provide the survey ' 'dipole length and the distance from the first ' 'station to the VES station. ') if erp.size < stn : raise Wex.StationError( f"Wrong station number =`{stn}`. Is larger than the " f" number of ERP stations = `{erp.size}` ") # now defined the anomaly boundaries from sn stn = 1 if stn == 0 else stn stn -=1 # start counting from 0. if extent %2 ==0: if len(erp[:stn]) > len(erp[stn:])-1: ub = erp[stn:][:extent//2 +1] lb = erp[:stn][len(ub)-int(extent):] elif len(erp[:stn]) < len(erp[stn:])-1: lb = erp[:stn][stn-extent//2 +1:stn] ub= erp[stn:][:int(extent)- len(lb)] else : lb = erp[:stn][-extent//2:] ub = erp[stn:][:int(extent//2)+ 1] # read this part if extent anomaly is not reached if len(ub) +len(lb) < extent: if len(erp[:stn]) > len(erp[stn:])-1: add = abs(len(ub)-len(lb)) # remain value to add lb = erp[:stn][-add -len(lb) - 1:] elif len(erp[:stn]) < len(erp[stn:])-1: add = abs(len(ub)-len(lb)) # remain value to add ub = erp[stn:][:len(ub)+ add -1] conductive_zone = np.concatenate((lb, ub)) # get the index of station number from the conductive zone. ix_stn, = np.where (conductive_zone == conductive_zone[stn]) ix_stn = int(ix_stn[0]) if len(ix_stn)> 1 else int(ix_stn) return conductive_zone, conductive_zone[stn], ix_stn
[docs]def shortPlot (sample, cz=None): """ Quick plot to visualize the `sample` line as well as the selected conductive zone if given. :param sample: array_like, the electrical profiling array :param cz: array_like, the selected conductive zone. If ``None``, `cz` should be plotted. :Example: >>> import numpy as np >>> from kalfeat.tools.exmath import shortPlot, define_conductive_zone >>> test_array = np.random.randn (10) >>> selected_cz ,*_ = define_conductive_zone(test_array, 7) >>> shortPlot(test_array, selected_cz ) """ import matplotlib.pyplot as plt fig, ax = plt.subplots(1,1, figsize =(10, 4)) leg =[] ax.scatter (np.arange(len(sample)), sample, marker ='.', c='b') zl, = ax.plot(np.arange(len(sample)), sample, c='r', label ='Electrical resistivity profiling') leg.append(zl) if cz is not None: # construct a mask array with np.isin to check whether # `cz` is subset array z = np.ma.masked_values (sample, np.isin(sample, cz )) # a masked value is constructed so we need # to get the attribute fill_value as a mask # However, we need to use np.invert or tilde operator # to specify that other value except the `CZ` values mus be # masked. Note that the dtype must be changed to boolean sample_masked = np.ma.array( sample, mask = ~z.fill_value.astype('bool') ) czl, = ax.plot( np.arange(len(sample)), sample_masked, ls='-', c='#0A4CEE', lw =2, label ='Conductive zone') leg.append(czl) ax.set_xticks(range(len(sample))) ax.set_xticklabels( ['S{0:02}'.format(i+1) for i in range(len(sample))]) ax.set_xlabel('Stations') ax.set_ylabel('app.resistivity (ohm.m)') ax.legend( handles = leg, loc ='best') plt.show()
[docs]def compute_anr ( sfi: float , rhoa_array: Array | List[float], pos_bound_indexes: Array[DType[int]] | List[int] )-> float: r""" Compute the select anomaly ratio (ANR) along with the whole profile from SFI. The standardized resistivity values`rhoa` of is averaged from :math:`X_{begin}` to :math:`X_{end}`. The ANR is a positive value and the equation is given as: .. math:: ANR= sfi * \frac{1}{N} * | \sum_{i=1}^{N} \frac{ \rho_{a_i} - \bar \rho_a}{\sigma_{\rho_a}} | where :math:`$\sigma_{rho_a}$` and :math:`\bar \rho_a` are the standard deviation and the mean of the resistivity values composing the selected anomaly. :param sfi: Is standard fracturation index. please refer to :func:`compute_sfi`. :param rhoa_array: Resistivity values of Electrical Resistivity Profiling line :type rhoa_array: array_like :param pos_bound_indexes: Select anomaly station location boundaries indexes. Refer to :func:`compute_power` of ``pos_bounds``. :return: Anomaly ratio :rtype: float :Example: >>> from kalfeat.tools.exmath import compute_anr >>> import pandas as pd >>> anr = compute_anr(sfi=sfi, ... rhoa_array=data = pd.read_excel( ... 'data/l10_gbalo.xlsx').to_numpy()[:, -1], ... pk_bound_indexes = [9, 13]) >>> anr """ stand = (rhoa_array - rhoa_array.mean())/np.std(rhoa_array) try: stand_rhoa =stand[int(min(pos_bound_indexes)): int(max(pos_bound_indexes))+1] except: stand_rhoa = np.array([0]) return sfi * np.abs(stand_rhoa.mean())
[docs]@deprecated('Deprecated function to `:func:`kalfeat.methods.erp.get_type`' ' more efficient using median and index computation. It will ' 'probably deprecate soon for neural network pattern recognition.') def get_type ( erp_array: Array | List [float], posMinMax:Tuple[int] | List[int], pk: float | int, pos_array: SP[DType[float]], dl: float )-> str: """ Find anomaly type from app. resistivity values and positions locations :param erp_array: App.resistivty values of all `erp` lines :type erp_array: array_like :param posMinMax: Selected anomaly positions from startpoint and endpoint :type posMinMax: list or tuple or nd.array(1,2) :param pk: Position of selected anomaly in meters :type pk: float or int :param pos_array: Stations locations or measurements positions :type pos_array: array_like :param dl: Distance between two receiver electrodes measurement. The same as dipole length in meters. :returns: - ``EC`` for Extensive conductive. - ``NC`` for narrow conductive. - ``CP`` for conductive plane - ``CB2P`` for contact between two planes. :Example: >>> from kalfeat.utils.exmath import get_type >>> x = [60, 61, 62, 63, 68, 65, 80, 90, 100, 80, 100, 80] >>> pos= np.arange(0, len(x)*10, 10) >>> ano_type= get_type(erp_array= np.array(x), ... posMinMax=(10,90), pk=50, pos_array=pos, dl=10) >>> ano_type ...CB2P """ # Get position index anom_type ='CP' index_pos = int(np.where(pos_array ==pk)[0]) # if erp_array [:index_pos +1].mean() < np.median(erp_array) or\ # erp_array[index_pos:].mean() < np.median(erp_array) : # anom_type ='CB2P' if erp_array [:index_pos+1].mean() < np.median(erp_array) and \ erp_array[index_pos:].mean() < np.median(erp_array) : anom_type ='CB2P' elif erp_array [:index_pos +1].mean() >= np.median(erp_array) and \ erp_array[index_pos:].mean() >= np.median(erp_array) : if dl <= (max(posMinMax)- min(posMinMax)) <= 5* dl: anom_type = 'NC' elif (max(posMinMax)- min(posMinMax))> 5 *dl: anom_type = 'EC' return anom_type
[docs]@deprecated('`Deprecated function. Replaced by :meth:~core.erp.get_shape` ' 'more convenient to recognize anomaly shape using ``median line``' 'rather than ``mean line`` below.') def get_shape ( rhoa_range: Array | List [float] )-> str : """ Find anomaly `shape` from apparent resistivity values framed to the best points using the mean line. :param rhoa_range: The apparent resistivity from selected anomaly bounds :attr:`~core.erp.ERP.anom_boundaries` :type rhoa_range: array_like or list :returns: - V - W - K - C - M - U :Example: >>> from kalfeat.tools.exmath import get_shape >>> x = [60, 70, 65, 40, 30, 31, 34, 40, 38, 50, 61, 90] >>> shape = get_shape (rhoa_range= np.array(x)) ... U """ minlocals = argrelextrema(rhoa_range, np.less) shape ='V' average_curve = rhoa_range.mean() if len (minlocals[0]) >1 : shape ='W' average_curve = rhoa_range.mean() minlocals_slices = rhoa_range[ int(minlocals[0][0]):int(minlocals[0][-1])+1] average_minlocals_slices = minlocals_slices .mean() if average_curve >= 1.2 * average_minlocals_slices: shape = 'U' if rhoa_range [-1] < average_curve and\ rhoa_range [-1]> minlocals_slices[ int(argrelextrema(minlocals_slices, np.greater)[0][0])]: shape ='K' elif rhoa_range [0] < average_curve and \ rhoa_range [-1] < average_curve : shape ='M' elif len (minlocals[0]) ==1 : if rhoa_range [0] < average_curve and \ rhoa_range [-1] < average_curve : shape ='M' elif rhoa_range [-1] <= average_curve : shape = 'C' return shape
[docs]def compute_power ( posMinMax:float =None, pk_min: Optional[float]=None , pk_max: Optional[float]=None, )-> float: """ Compute the power Pa of anomaly. :param pk_min: Min boundary value of anomaly. `pk_min` is min value (lower) of measurement point. It's the position of the site in meter. :type pk_min: float :param pk_max: Max boundary of the select anomaly. `pk_max` is the maximum value the measurement point in meter. It's the upper boundary position of the anomaly in the site in m. :type pk_max: float :return: The absolute value between the `pk_min` and `pk_max`. :rtype: float :Example: >>> from kalfeat.tools.exmath import compute_power >>> power= compute_power(80, 130) """ if posMinMax is not None: pk_min = np.array(posMinMax).min() pk_max= np.array(posMinMax).max() if posMinMax is None and (pk_min is None or pk_max is None) : raise Wex.kalfeatError_parameter_number( 'Could not compute the anomaly power. Provide at least' 'the anomaly position boundaries or the left(`pk_min`) ' 'and the right(`pk_max`) boundaries.') return np.abs(pk_max - pk_min)
[docs]def compute_magnitude( rhoa_max: float =None , rhoa_min: Optional[float]=None, rhoaMinMax:Optional [float] =None )-> float: """ Compute the magnitude `Ma` of selected anomaly expressed in Ω.m. ano :param rhoa_min: resistivity value of selected anomaly :type rhoa_min: float :param rhoa_max: Max boundary of the resistivity value of select anomaly. :type rhoa_max: float :return: The absolute value between the `rhoa_min` and `rhoa_max`. :rtype: float :Example: >>> from kalfeat.utils.exmath import compute_power >>> power= compute_power(80, 130) """ if rhoaMinMax is not None : rhoa_min = np.array(rhoaMinMax).min() rhoa_max= np.array(rhoaMinMax).max() if rhoaMinMax is None and (rhoa_min is None or rhoa_min is None) : raise Wex.ParameterNumberError( 'Could not compute the anomaly magnitude. Provide at least' 'the anomaly resistivy value boundaries or the buttom(`rhoa_min`)' 'and the top(`rhoa_max`) boundaries.') return np.abs(rhoa_max -rhoa_min)