Source code for kalfeat.tools.coreutils

# -*- coding: utf-8 -*-
#   author: KLaurent <etanoyau@gmail.com>
#   Licence:  GPL-3.0 

"""
`kalfeat`_ core functionalities 
===============================
Encompasses the main functionalities for class and methods  

.. _kalfeat: https://github.com/WEgeophysics/kalfeat/

"""
from __future__ import  annotations 
import os 
import warnings 
import copy 

import numpy as np 
import pandas as pd 
import matplotlib.pyplot as plt
 
from ..documentation import __doc__ 
from ..decorators import  (
    refAppender, 
    docSanitizer
    ) 
from ..property import P 
from ..typing import (
    Any, 
    List ,  
    Union, 
    Tuple,
    Dict,
    Optional,
    NDArray,
    DataFrame, 
    Series,
    Array, 
    DType, 
    Sub, 
    SP
)
from ..exceptions import ( 
    StationError, 
    HeaderError, 
    ResistivityError,
    ERPError,
    VESError
)
from .funcutils import (
    smart_format as smft,
    _isin , 
    _assert_all_types,
    accept_types,
    read_from_excelsheets
    ) 
from .gistools import (
    project_point_ll2utm, 
    project_point_utm2ll 
    )
 
def _is_readable (
        f:str, 
        readableformats : Tuple[str] = ('.csv', '.xlsx'),
        **kws
 ) -> DataFrame: 
    """ Specific files that can be read file throughout the packages 
    :param f: Path-like object -Should be a readable files. 
    :param readableformats: tuple -Specific readable files 
    
    :return: dataframe - A dataframe with head contents... 
    
    """
    if not os.path.isfile: 
        raise TypeError (
            f'Expected a Path-like object, got : {type(f).__name__!r}')

    if os.path.splitext(f)[1].lower() not in readableformats:
        raise TypeError(f'Can only parse the {smft(readableformats)} files'
                        )
    
    if f.endswith ('.csv'): 
        f = pd.read_csv (f,**kws) 
    elif f.endswith ('.xlsx'): 
        f = pd.read_excel(f, **kws )
        
    return f 
    
[docs]@refAppender(__doc__) def vesSelector( data:str | DataFrame[DType[float|int]] = None, *, rhoa: Array |Series | List [float] = None, AB :Array |Series = None, MN: Array|Series | List[float] =None, index_rhoa: Optional[int] = None, **kws ) -> DataFrame : """ Assert the validity of |VES| data and return a sanitize dataframe. :param rhoa: array-like - Apparent resistivities collected during the sounding. :param AB: array-like - Investigation distance between the current electrodes. Note that the `AB` is by convention equals to `AB/2`. It's taken as half-space of the investigation depth. :param MN: array-like - Potential electrodes distances at each investigation depth. Note by convention the values are half-space and equals to `MN/2`. :param f: Path-like object or sounding dataframe. If given, the others parameters could keep the ``None` values. :param index_rhoa: int - The index to retrieve the resistivity data of a specific sounding point. Sometimes the sounding data are composed of the different sounding values collected in the same survey area into different |ERP| line. For instance: +------+------+----+----+----+----+----+ | AB/2 | MN/2 |SE1 | SE2| SE3| ...|SEn | +------+------+----+----+----+----+----+ Where `SE` are the electrical sounding data values and `n` is the number of the sounding points selected. `SE1`, `SE2` and `SE3` are three points selected for |VES| i.e. 3 sounding points carried out either in the same |ERP| or somewhere else. These sounding data are the resistivity data with a specific numbers. Commonly the number are randomly chosen. It does not refer to the expected best fracture zone selected after the prior-interpretation. After transformation via the function `ves_selector`, the header of the data should hold the `resistivity`. For instance, refering to the table above, the data should be: +----+----+-------------+-------------+-------------+-----+ | AB | MN |resistivity | resistivity | resistivity | ... | +----+----+-------------+-------------+-------------+-----+ Therefore, the `index_rhoa` is used to select the specific resistivity values i.e. select the corresponding sounding number of the |VES| expecting to locate the drilling operations or for computation. For esample, ``index_rhoa=1`` should figure out: +------+------+----+--------+-----+----+------------+ | AB/2 | MN/2 |SE2 | --> | AB | MN |resistivity | +------+------+----+--------+-----+----+------------+ If `index_rhoa` is ``None`` and the number of sounding curves are more than one, by default the first sounding curve is selected ie `index_rhoa` equals to ``0``. :param kws: dict - Pandas dataframe reading additionals keywords arguments. :return: -dataframe -Sanitize |VES| dataframe with ` AB`, `MN` and `resistivity` as the column headers. :Example: >>> from kalfeat.tools.coreutils import vesSelector >>> df = vesSelector (data='data/ves/ves_gbalo.csv') >>> df.head(3) ... AB MN resistivity 0 1 0.4 943 1 2 0.4 1179 2 3 0.4 1103 >>> df = vesSelector ('data/ves/ves_gbalo.csv', index_rhoa=3 ) >>> df.head(3) ... AB MN resistivity 0 1 0.4 457 1 2 0.4 582 2 3 0.4 558 """ for arr in (AB , MN, rhoa): if arr is not None: _assert_all_types(arr, list, tuple, np.ndarray, pd.Series) try: index_rhoa = index_rhoa if index_rhoa is None else int(index_rhoa) except: raise TypeError ( f'Index is an integer, not {type(index_rhoa).__name__!r}') if data is not None: if isinstance(data, str): try : data = _is_readable(data, **kws) except TypeError as typError: raise VESError (str(typError)) data = _assert_all_types(data, pd.DataFrame ) # sanitize the dataframe pObj =P() ; ncols = pObj(hl = list(data.columns), kind ='ves') if ncols is None: raise HeaderError (f"Columns {smft(pObj.icpr)} are missing in " "the given dataset.") data.columns = ncols try : rhoa= data.resistivity except : raise ResistivityError( "Data validation aborted! Missing resistivity values.") else : # In the case, we got a multiple resistivity values # corresponding to the different sounding values if rhoa.ndim > 1 : if index_rhoa is None: index_rhoa = 0 elif index_rhoa >= len(rhoa.columns): warnings.warn(f'The index `{index_rhoa}` is out of the range' f' `{len(rhoa.columns)-1}` for selecting the' ' specific resistivity data. By default, we ' 'only keep the data at the index 0.' ) index_rhoa= 0 rhoa = rhoa.iloc[:, index_rhoa] if rhoa.ndim > 1 else rhoa if 'MN' in data.columns: MN = data.MN try: AB= data.AB except: raise VESError("Data validation aborted! Current electrodes values" " are missing. Specify the deep measurement!") if rhoa is None: raise ResistivityError( "Data validation aborted! Missing resistivity values.") if AB is None: raise VESError("Data validation aborted! Current electrodes values" " are missing. Specify the deep measurement!") AB = np.array(AB) ; MN = np.array(MN) ; rhoa = np.array(rhoa) if len(AB) !=len(rhoa): raise VESError(" Deep measurement from the current electrodes `AB` and" " the resistiviy values `rhoa` must have the same length" f'. But `{len(AB)}` and `{len(rhoa)}` were given.') sdata =pd.DataFrame( {'AB': AB, 'MN': MN, 'resistivity':rhoa},index =range(len(AB))) return sdata
[docs]@docSanitizer() def fill_coordinates( data: DataFrame =None, lon: Array = None, lat: Array = None, east: Array = None, north: Array = None, epsg: Optional[int] = None , utm_zone: Optional [str] = None, datum: str = 'WGS84' ) -> Tuple [DataFrame, str] : """ Recompute coordinates values Compute the couples (easting, northing) or (longitude, latitude ) and set the new calculated values into a dataframe. Parameters ----------- data : dataframe, Dataframe contains the `lat`, `lon` or `east` and `north`. All data dont need to be provided. If ('lat', 'lon') and (`east`, `north`) are given, ('`easting`, `northing`') should be overwritten. lat: array-like float or string (DD:MM:SS.ms) Values composing the `longitude` of point lon: array-like float or string (DD:MM:SS.ms) Values composing the `longitude` of point east : array-le float Values composing the northing coordinate in meters north : array-like float Values composing the northing coordinate in meters datum: string well known datum ex. WGS84, NAD27, etc. projection: string projected point in lat and lon in Datum `latlon`, as decimal degrees or 'UTM'. epsg: int epsg number defining projection (see http://spatialreference.org/ref/ for moreinfo) Overrides utm_zone if both are provided datum: string well known datum ex. WGS84, NAD27, etc. utm_zone : string zone number and 'S' or 'N' e.g. '55S'. Defaults to the centre point of the provided points Returns ------- - `data`: Dataframe with new coodinates values computed - `utm_zone`: zone number and 'S' or 'N' """ def _get_coordcomps (str_, df): """ Retrieve coordinate values and assert whether values are given. If ``True``, retunrs `array` of `given item` and valid type of the data. Note that if data equals to ``0``, we assume values are not provided. :param str_: str - item in the `df` columns :param df: DataFrame - dataframe expected containing the `str_` item. """ if str_ in df.columns: return df[str_] , np.all(df[str_])!=0 return None, None def _set_coordinate_values (x, y, *, func ): """ Iterate `x` and `y` and output new coordinates values computed from `func` . param x: iterable values :param y: iterabel values :param func: function F can be: - ``project_point_utm2ll`` for `UTM` to `latlon`` or - `` project_point_ll2utm`` for `latlon`` to `UTM` :retuns: - xx new calculated - yy new calculated - utm zone """ xx = np.zeros_like(x); yy = np.zeros_like(xx) for ii, (la, lo) in enumerate (zip(x, y)): e , n, uz = func ( la, lo, utm_zone = utm_zone, datum = datum, epsg =epsg ) xx [ii] = e ; yy[ii] = n return xx, yy , uz if data is None: data = pd.DataFrame ( dict ( longitude = lon , latitude = lat , easting = east, northing=north ), #pass index If using all scalar values index = range(4) ) if data is not None : data = _assert_all_types(data, pd.DataFrame) lon , lon_isvalid = _get_coordcomps( 'longitude', data ) lat , lat_isvalid = _get_coordcomps( 'latitude', data ) east , e_isvalid = _get_coordcomps( 'easting', data ) north, n_isvalid = _get_coordcomps( 'northing', data ) if lon_isvalid and lat_isvalid: try : east , north , uz = _set_coordinate_values( lat.values, lon.values, project_point_ll2utm, ) except :# pass if an error occurs pass else : data['easting'] = east ; data['northing'] = north elif e_isvalid and n_isvalid: if utm_zone is None: warnings.warn( 'Should provide the `UTM` for `latitute` and `longitude`' ' calculus. `NoneType` can not be used as UTM zone number.' ' Refer to the documentation.') try : lat , lon, utm_zone = _set_coordinate_values( east.values, north.values, func = project_point_utm2ll, ) except : pass else : data['longitude'] = lon ; data['latitude'] = lat return data, utm_zone
def _assert_data (data :DataFrame ): """ Assert the data and return the property dataframe """ data = _assert_all_types( data, list, tuple, np.ndarray, pd.Series, pd.DataFrame) if isinstance(data, pd.DataFrame): cold , ixc =list(), list() for i , ckey in enumerate(data.columns): for kp in P().isrll : if ckey.lower() .find(kp) >=0 : cold.append (kp); ixc.append(i) break if len (cold) ==0: raise ValueError (f'Expected {smft(P().isrll)} ' ' columns, but not found in the given dataframe.' ) dup = cold.copy() # filter and remove one by one duplicate columns. list(filter (lambda x: dup.remove(x), set(cold))) dup = set(dup) if len(dup) !=0 : raise HeaderError( f'Duplicate column{"s" if len(dup)>1 else ""}' f' {smft(dup)} found. It seems to be {smft(dup)}' f'column{"s" if len(dup)>1 else ""}. Please provide' ' the right column name in the dataset.' ) data_ = data [cold] col = list(data_.columns) for i, vc in enumerate (col): for k in P().isrll : if vc.lower().find(k) >=0 : col[i] = k ; break return data_
[docs]def is_erp_series ( data : Series , dipolelength : Optional [float] = None ) -> DataFrame : """ Validate the series. The `data` should be the resistivity values with the one of the following property index names ``resistivity`` or ``rho``. Will raises error if not detected. If a`dipolelength` is given, a data should include each station positions values. Parameters ----------- data : pandas Series object Object of resistivity values dipolelength: float Distance of dipole during the whole survey line. If it is is not given , the station location should be computed and filled using the default value of the dipole. The *default* value is set to ``10 meters``. Returns -------- A dataframe of the property indexes such as ``['station', 'easting','northing', 'resistivity']``. Raises ------ ResistivityError If name does not match the `resistivity` column name. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from kalfeat.tools.coreutils imprt is_erp_series >>> data = pd.Series (np.abs (np.random.rand (42)), name ='res') >>> data = is_erp_series (data) >>> data.columns ... Index(['station', 'easting', 'northing', 'resistivity'], dtype='object') >>> data = pd.Series (np.abs (np.random.rand (42)), name ='NAN') >>> data = _is_erp_series (data) ... ResistivityError: Unable to detect the resistivity column: 'NAN'. """ data = _assert_all_types(data, pd.Series) is_valid = False for p in P().iresistivity : if data.name.lower().find(p) >=0 : data.name = p ; is_valid = True ; break if not is_valid : raise ResistivityError( f"Unable to detect the resistivity column: {data.name!r}." ) if is_valid: df = is_erp_dataframe (pd.DataFrame ( { data.name : data , 'NAN' : np.zeros_like(data ) } ), dipolelength = dipolelength, ) return df
[docs]def is_erp_dataframe ( data :DataFrame , dipolelength : Optional[float] = None ) -> DataFrame: """ Ckeck whether the dataframe contains the electrical resistivity profiling (ERP) index properties. DataFrame should be reordered to fit the order of index properties. Anyway it should he dataframe filled by ``0.`` where the property is missing. However, if `station` property is not given. station` property should be set by using the dipolelength default value equals to ``10.``. Parameters ---------- data : Dataframe object Dataframe object. The columns dataframe should match the property ERP property object such as ``['station','resistivity', 'longitude','latitude']`` or ``['station','resistivity', 'easting','northing']``. dipolelength: float Distance of dipole during the whole survey line. If the station is not given as `data` columns, the station location should be computed and filled the station columns using the default value of the dipole. The *default* value is set to ``10 meters``. Returns -------- A new data with index properties. Raises ------ - None of the column matches the property indexes. - Find duplicated values in the given data header. Examples -------- >>> import numpy as np >>> from kalfeat.tools.coreutils import is_erp_dataframe >>> df = pd.read_csv ('data/erp/testunsafedata.csv') >>> df.columns ... Index(['x', 'stations', 'resapprho', 'NORTH'], dtype='object') >>> df = _is_erp_dataframe (df) >>> df.columns ... Index(['station', 'easting', 'northing', 'resistivity'], dtype='object') """ data = _assert_all_types(data, pd.DataFrame) datac= data.copy() def _is_in_properties (h ): """ check whether the item header `h` is in the property values. Return `h` and it correspondence `key` in the property values. """ for key, values in P().idicttags.items() : for v in values : if h.lower().find (v)>=0 : return h, key return None, None def _check_correspondence (pl, dl): """ collect the duplicated name in the data columns """ return [ l for l in pl for d in dl if d.lower().find(l)>=0 ] cold , c = list(), list() # create property object pObj = P(data.columns) for i , ckey in enumerate(list(datac.columns)): h , k = _is_in_properties(ckey) cold.append (h) if h is not None else h c.append(k) if k is not None else k if len (cold) ==0: raise HeaderError ( f'Wrong column headers {list(data.columns)}.' f' Unable to find the expected {smft(pObj.isrll)}' ' column properties.' ) dup = cold.copy() # filter and remove one by one duplicate columns. list(filter (lambda x: dup.remove(x), set(cold))) dup = set(dup) ; ress = _check_correspondence( pObj() or pObj.idicttags.keys(), dup) if len(dup) !=0 : raise HeaderError( f'Duplicate column{"s" if len(dup)>1 else ""}' f' {smft(dup)} {"are" if len(dup)>1 else "is"} ' f'found. It seems correspond to {smft(ress)}. ' 'Please ckeck your data column names. ' ) # fetch the property column names and # replace by 0. the non existence column # reorder the column to match # ['station','resistivity', 'easting','northing', ] data_ = data[cold] data_.columns = c data_= data_.reindex (columns =pObj.idicttags.keys(), fill_value =0.) dipolelength = _assert_all_types( dipolelength , float, int) if dipolelength is not None else None if (np.all (data_.station) ==0. and dipolelength is None ): dipolelength = 10. data_.station = np.arange ( 0 , data_.shape[0] * dipolelength , dipolelength ) return data_
[docs]def erpSelector ( f: str | NDArray | Series | DataFrame , columns: str | List[str] = ..., **kws:Any ) -> DataFrame : """ Read and sanitize the data collected from the survey. `data` should be an array, a dataframe, series, or arranged in ``.csv`` or ``.xlsx`` formats. Be sure to provide the header of each columns in' the worksheet. In a file is given, header columns should be aranged as ``['station','resistivity' ,'longitude', 'latitude']``. Note that coordinates columns (`longitude` and `latitude`) are not compulsory. Parameters ---------- f: Path-like object, ndarray, Series or Dataframe, If a path-like object is given, can only parse `.csv` and `.xlsx` file formats. However, if ndarray is given and shape along axis 1 is greater than 4, the ndarray should be shrunked. columns: list list of the valuable columns. It can be used to fix along the axis 1 of the array the specific values. It should contain the prefix or the whole name of each item in ``['station','resistivity' ,'longitude', 'latitude']``. kws: dict Additional pandas `pd.read_csv` and `pd.read_excel` methods keyword arguments. Be sure to provide the right argument. when reading `f`. For instance, provide ``sep= ','`` argument when the file to read is ``xlsx`` format will raise an error. Indeed, `sep` parameter is acceptable for parsing the `.csv` file format only. Returns ------- DataFrame with valuable column(s). Notes ------ The length of acceptable columns is ``4``. If the size of the columns is higher than `4`, the data should be shrunked to match the expected columns. Futhermore, if the header is not specified in `f` , the defaut column arrangement should be used. Therefore, the second column should be considered as the ``resistivity`` column. Examples --------- >>> import numpy as np >>> from kalfeat.tools.coreutils import erpSelector >>> df = erpSelector ('data/erp/testsafedata.csv') >>> df.shape ... (45, 4) >>> list(df.columns) ... ['station','resistivity', 'longitude', 'latitude'] >>> df = erp_selector('data/erp/testunsafedata.xlsx') >>> list(df.columns) ... ['easting', 'station', 'resistivity', 'northing'] >>> df = erpSelector(np.random.randn(7, 7)) >>> df.shape ... (7, 4) >>> list(df.columns) ... ['station', 'resistivity', 'longitude', 'latitude'] """ if columns is ...: columns=None if columns is not None: if isinstance(columns, str): columns =columns.replace(':', ',').replace(';', ',') if ',' in columns: columns =columns.split(',') if isinstance(f, str): if os.path.isfile(f): try : f = _is_readable(f, **kws) except TypeError as typError: raise ERPError (str(typError)) if isinstance( f, np.ndarray): name = copy.deepcopy(columns) columns = P().isrll if columns is None else columns colnum = 1 if f.ndim ==1 else f.shape[1] if colnum==1: if isinstance (name, list) : if len(name) ==1: name = name[0] f = is_erp_series ( pd.Series (f, name = name or columns[1] ) ) elif colnum==2 : f= pd.DataFrame (f, columns = columns if columns is None else columns[:2] ) elif colnum==3: warnings.warn("One missing column `longitude|latitude` value." "If the `longitude` and `latitude` data are" f" not available. Use {smft(P().isrll[:2])} " "columns instead.", UserWarning) columns = name or columns [:colnum] f= pd.DataFrame (f[:, :len(columns)], columns =columns ) elif f.shape[1]==4: f =pd.DataFrame (f, columns =columns ) elif colnum > 4: # add 'none' columns for the remaining columns. f =pd.DataFrame ( f, columns = columns + [ 'none' for i in range(colnum-4)] ) if isinstance(f, pd.DataFrame): f = is_erp_dataframe( f) elif isinstance(f , pd.Series ): f = is_erp_series(f) else : amsg = smft(accept_types ( pd.Series, pd.DataFrame, np.ndarray) + ['*.xls', '*.csv']) raise ValueError (f" Unacceptable data. Accept only {amsg}." ) if np.all(f.resistivity)==0: raise ResistivityError('Resistivity values need to be supply.') return f
def _fetch_prefix_index ( arr:NDArray [DType[float]] = None, col: List[str] = None, df : DataFrame = None, prefixs: List [str ] =None ) -> Tuple [int | int]: """ Retrieve index at specific column. Use the given station positions collected on the field to compute the dipole length during the whole survey. :param arr: array. Ndarray of data where one colum must the positions values. :param col: list. The list should be considered as the head of array. Each position in the list sould fit the column data in the array. It raises an error if the number of item in the list is different to the size of array in axis=1. :param df: dataframe. When supply, the `arr` and `col` is not compulsory. :param prefixs: list. Contains specific column prefixs to fetch the corresponding data. For instance:: - Station prefix : ['pk','sta','pos'] - Easting prefix : ['east', 'x', 'long'] - Northing prefix: ['north', 'y', 'lat'] :returns: - index of the position columns in the data - station position array-like. :Example: >>> from numpy as np >>> from kalfeat.tools.coreutils import _assert_positions >>> array1 = np.c_[np.arange(0, 70, 10), np.random.randn (7,3)] >>> col = ['pk', 'x', 'y', 'rho'] >>> index, = _fetch_prefix_index (array1 , col = ['pk', 'x', 'y', 'rho'], ... prefixs = EASTPREFIX) ... 1 >>> index, _fetch_prefix_index (array1 , col = ['pk', 'x', 'y', 'rho'], ... prefixs = NOTHPREFIX ) ... 2 """ if prefixs is None: raise ValueError('Please specify the list of items to compose the ' 'prefix to fetch the columns data. For instance' f' `station prefix` can be `{P().istation}`.') if arr is None and df is None : raise TypeError ( 'Expected and array or a dataframe not' ' a Nonetype object.' ) elif df is None and col is None: raise StationError( 'Column list is missing.' ' Could not detect the position index.') if isinstance( df, pd.DataFrame): # collect the resistivity from the index # if a dataFrame is given arr, col = df.values, df.columns if arr.ndim ==1 : # Here return 0 as colIndex return 0, arr if isinstance(col, str): col =[col] if len(col) != arr.shape[1]: raise ValueError ( f'Column should match the array shape in axis =1 <{arr.shape[1]}>.' f' But {"was" if len(col)==1 else "were"} given') # convert item in column in lowercase comsg = col.copy() col = list(map(lambda x: x.lower(), col)) colIndex = [col.index (item) for item in col for pp in prefixs if item.find(pp) >=0] if len(colIndex) is None or len(colIndex) ==0: raise ValueError (f'Unable to detect the position in `{smft(comsg)}`' ' columns. Columns must contain at least' f' `{smft(prefixs)}`.') return colIndex[0], arr def _assert_station_positions( arr: SP = None, prefixs: List [str] =..., **kws ) -> Tuple [int, float]: """ Assert positions and compute dipole length. Use the given station positions collected on the field to detect the dipole length during the whole survey. :param arr: array. Ndarray of data where one column must the positions values. :param col: list. The list should be considered as the head of array. Each position in the list sould fit the column data in the array. It raises an error if the number of item in the list is different to the size of array in axis=1. :param df: dataframe. When supply, the `arr` and `col` are not needed. :param prefixs: list. Contains all the station column names prefixs to fetch the corresponding data. :returns: - positions: new positions numbering from station `S00` to ... - dipolelength: recomputed dipole value :Example: >>> from numpy as np >>> from kalfeat.tools.coreutils import _assert_station_positions >>> array1 = np.c_[np.arange(0, 70, 10), np.random.randn (7,3)] >>> col = ['pk', 'x', 'y', 'rho'] >>> _assert_positions(array1, col) ... (array([ 0, 10, 20, 30, 40, 50, 60]), 10) >>> array1 = np.c_[np.arange(30, 240, 30), np.random.randn (7,3)] ... (array([ 0, 30, 60, 90, 120, 150, 180]), 30) """ if prefixs is (None or ...): prefixs = P().istation colIndex, arr =_fetch_prefix_index( arr=arr, prefixs = prefixs, **kws ) positions = arr[:, colIndex] # assert the position is aranged from lower to higher # if there is not wrong numbering. fsta = np.argmin(positions) lsta = np.argmax (positions) if int(fsta) !=0 or int(lsta) != len(positions)-1: raise StationError( 'Wrong numbering! Please number the position from first station ' 'to the last station. Check your array positionning numbers.') dipoleLength = int(np.abs (positions.min() - positions.max () ) / (len(positions)-1)) # renamed positions positions = np.arange(0 , len(positions) *dipoleLength , dipoleLength ) return positions, dipoleLength
[docs]@refAppender(__doc__) def plotAnomaly( erp: Array | List[float], cz: Optional [Sub[Array], List[float]] = None, s: Optional [str] = None, figsize: Tuple [int, int] = (10, 4), fig_dpi: int = 300 , savefig: str | None = None, show_fig_title: bool = True, style: str = 'seaborn', fig_title_kws: Dict[str, str|Any] = ..., czkws: Dict [str , str|Any] = ... , legkws: Dict [Any , str|Any] = ... , **kws, ) -> None: """ Plot the whole |ERP| line and selected conductive zone. Conductive zone can be supplied nannualy as a subset of the `erp` or by specifyting the station expected for drilling location. For instance ``S07`` for the seventh station. Futhermore, for automatic detection, one should set the station argument `s` to ``auto``. However, it 's recommended to provide the `cz` or the `s` to have full control. The conductive zone is juxtaposed to the whole |ERP| survey. One can customize the `cz` plot by filling with `Matplotlib pyplot`_ additional keywords araguments thought the kewords argument `czkws`. :param sample: array_like - the |ERP| survey line. The line is an array of resistivity values. :param cz: array_like - the selected conductive zone. If ``None``, only the `erp` should be displayed. Note that `cz` is an subset of `erp` array. :param s: str - The station location given as string (e.g. ``s= "S10"``) or as a station number (indexing; e.g ``s =10``). If value is set to ``"auto"``, `s` should be find automatically and fetching `cz` as well. :param figsize: tuple- Tuple value of figure size. Refer to the web resources `Matplotlib figure`_. :param fig_dpi: int - figure resolution "dot per inch". Refer to `Matplotlib figure`_. :param savefig: str - save figure. Refer to `Matplotlib figure`_. :param show_fig_tile: bool - display the title of the figure. :param fig_title_kws: dict - Keywords arguments of figure suptile. Refer to `Matplotlib figsuptitle`_. :param style: str - the style for customizing visualization. For instance to get the first seven available styles in pyplot, one can run the script below:: plt.style.available[:7] Futher details can be foud in Webresources below or click on `GeekforGeeks`_. :param czkws: dict - keywords `Matplotlib pyplot`_ additional arguments to customize the `cz` plot. :param legkws: dict - keywords Matplotlib legend additional keywords arguments. :param kws: dict - additional keywords argument for `Matplotlib pyplot`_ to customize the `erp` plot. :Example: >>> import numpy as np >>> from kalfeat.tools.coreutils import ( ... plot_anomaly, _define_conductive_zone) >>> test_array = np.random.randn (10) >>> selected_cz ,*_ = _define_conductive_zone(test_array, 7) >>> plot_anomaly(test_array, selected_cz ) >>> plot_anomaly(tes_array, selected_cz , s= 5) >>> plot_anomaly(tes_array, s= 's02') >>> plot_anomaly(tes_array) .. note:: If `cz` is given, one does not need to worry about the station `s`. `s` can stay with it default value``None``. References ----------- See Matplotlib Axes: https://matplotlib.org/stable/api/_as_gen/matplotlib.axes.Axes.tick_params.html GeekforGeeks: https://www.geeksforgeeks.org/style-plots-using-matplotlib/#:~:text=Matplotlib%20is%20the%20most%20popular,without%20using%20any%20other%20GUIs. """ def format_thicks (value, tick_number): """ Format thick parameter with 'FuncFormatter(func)' rather than using:: axi.xaxis.set_major_locator (plt.MaxNLocator(3)) ax.xaxis.set_major_formatter (plt.FuncFormatter(format_thicks)) """ if value % 7 ==0: return 'S{:02}'.format(int(value)+ 1) else: None erp = _assert_all_types( erp, tuple, list , np.ndarray , pd.Series) if cz is not None: cz = _assert_all_types( cz, tuple, list , np.ndarray , pd.Series) cz = np.array (cz) erp =np.array (erp) plt.style.use (style) kws =dict ( color=P().frcolortags.get('fr1') if kws.get( 'color') is None else kws.get('color'), linestyle='-' if kws.get('ls') is None else kws.get('ls'), linewidth=2. if kws.get('lw') is None else kws.get('lw'), label = 'Electrical resistivity profiling' if kws.get( 'label') is None else kws.get('label') ) if czkws is ( None or ...) : czkws =dict (color=P().frcolortags.get('fr3'), linestyle='-', linewidth=3, label = 'Conductive zone' ) if czkws.get('color') is None: czkws['color']= P().frcolortags.get(czkws['color']) if (xlabel := kws.get('xlabel')) is not None : del kws['xlabel'] if (ylabel := kws.get('ylabel')) is not None : del kws['ylabel'] if (rotate:= kws.get ('rotate')) is not None: del kws ['rotate'] fig, ax = plt.subplots(1,1, figsize =figsize) leg =[] zl, = ax.plot(np.arange(len(erp)), erp, **kws ) leg.append(zl) if s =='' : s= None # for consistency if s is not None: auto =False ; keepindex =True if isinstance (s , str): auto = True if s.lower()=='auto' else s if 's' or 'pk' in s.upper(): # if provide the station. keepindex =False cz , _ , _, ix = defineConductiveZone( erp, s = s , auto = auto, keepindex=keepindex ) s = "S{:02}".format(ix +1) if s is not None else s if cz is not None: # construct a mask array with np.isin to check whether if not _isin (erp, cz ): raise ValueError ('Expected a conductive zone to be a subset of ' ' the resistivity profiling line.') # `cz` is subset array z = np.ma.masked_values (erp, np.isin(erp, cz )) # a masked value is constructed so we need # to get the attribute fill_value as a mask # However, we need to use np.invert or the tilde operator # to specify that other value except the `CZ` values mus be # masked. Note that the dtype must be changed to boolean sample_masked = np.ma.array( erp, mask = ~z.fill_value.astype('bool') ) czl, = ax.plot( np.arange(len(erp)), sample_masked, 'o', **czkws) leg.append(czl) ax.tick_params (labelrotation = 0. if rotate is None else rotate) ax.set_xticks(range(len(erp)), ) if len(erp ) >= 14 : ax.xaxis.set_major_formatter (plt.FuncFormatter(format_thicks)) else : ax.set_xticklabels( ['S{:02}'.format(int(i)+1) for i in range(len(erp))], rotation =0. if rotate is None else rotate ) if legkws is( None or ...): legkws =dict() ax.set_xlabel ('Stations') if xlabel is None else ax.set_xlabel (xlabel) ax.set_ylabel ('Resistivity (Ω.m)' ) if ylabel is None else ax.set_ylabel (ylabel) ax.legend( handles = leg, **legkws ) if show_fig_title: title = 'Plot ERP line with SVES = {0}'.format(s if s is not None else '') if fig_title_kws is ( None or ...): fig_title_kws = dict ( t = title if s is not None else title.replace ( 'with SVES =', ''), style ='italic', bbox =dict(boxstyle='round',facecolor ='lightgrey')) plt.tight_layout() fig.suptitle(**fig_title_kws, ) if savefig is not None : plt.savefig(savefig, dpi=fig_dpi, ) plt.show()
[docs]def defineConductiveZone( erp:Array| pd.Series | List[float] , s: Optional [str , int] = None, p: SP = None, auto: bool = False, **kws, ) -> Tuple [Array, int] : """ Define conductive zone as subset of the erp line. Indeed the conductive zone is a specific zone expected to hold the drilling location `s`. If drilling location is not provided, it would be by default the very low resistivity values found in the `erp` line. :param erp: array_like, the array contains the apparent resistivity values :param s: str or int, is the station position. :param auto: bool. If ``True``, the station position should be the position of the lower resistivity value in |ERP|. :returns: - conductive zone of resistivity values - conductive zone positionning - station position index in the conductive zone - station position index in the whole |ERP| line :Example: >>> import numpy as np >>> from kalfeat.tools.coreutils import _define_conductive_zone >>> test_array = np.random.randn (10) >>> selected_cz ,*_ = _define_conductive_zone(test_array, 's20') >>> shortPlot(test_array, selected_cz ) """ if isinstance(erp, pd.Series): erp = erp.values # conductive zone positioning pcz : Optional [Array] = None if s is None and auto is False: raise TypeError ('Expected the station position. NoneType is given.') elif s is None and auto: s, = np.where (erp == erp.min()) s=int(s) s, pos = _assert_stations(s, **kws ) # takes the last position if the position is outside # the number of stations. pos = len(erp) -1 if pos >= len(erp) else pos # frame the `sves` (drilling position) and define the conductive zone ir = erp[:pos][-3:] ; il = erp[pos:pos +3 +1 ] cz = np.concatenate((ir, il)) if p is not None: if len(p) != len(erp): raise StationError ( 'Array of position and conductive zone must have the same ' f'length: `{len(p)}` and `{len(cz)}` were given.') sr = p[:pos][-3:] ; sl = p[pos:pos +3 +1 ] pcz = np.concatenate((sr, sl)) # Get the new position in the selected conductive zone # from the of the whole erp pix, = np.where (cz == erp[pos]) return cz , pcz, int(pix), pos
def _assert_stations( s:Any , dipole:Any = None, keepindex:bool = False ) -> Tuple[str, int]: """ Sanitize stations and returns station name and index. ``pk`` and ``S`` can be used as prefix to define the station `s`. For instance ``S01`` and ``PK01`` means the first station. :param s: Station name :type s: str, int :param dipole: dipole_length in meters. :type dipole: float :param keepindex: bool - Stands for keeping the Python indexing. If set to ``True`` so the station should start by `S00` and so on. :returns: - station name - index of the station. .. note:: The defaut station numbering is from 1. SO if ``S00` is given, and the argument `keepindex` is still on its default value i.e ``False``, the station name should be set to ``S01``. Moreover, if `dipole` value is given, the station should named according to the value of the dipole. For instance for `dipole` equals to ``10m``, the first station should be ``S00``, the second ``S10`` , the third ``S30`` and so on. However, it is recommend to name the station using counting numbers rather than using the dipole position. :Example: >>> from kalfeat.tools.coreutils import _assert_stations >>> _assert_stations('pk01') ... ('S01', 0) >>> _assert_stations('S1') ... ('S01', 0) >>> _assert_stations('S1', keepindex =True) ... ('S01', 1) # station here starts from 0 i.e `S00` >>> _assert_stations('S00') ... ('S00', 0) >>> _assert_stations('S1000',dipole ='1km') ... ('S02', 1) # by default it does not keep the Python indexing >>> _assert_stations('S10', dipole ='10m') ... ('S02', 1) >>> _assert_stations(1000,dipole =1000) ... ('S02', 1) """ # in the case s is string: eg. "00", "pk01", "S001" ix = 0 s = _assert_all_types(s, str, int, float) if isinstance(s, str): s =s.lower().replace('pk', '').replace('s', '').replace('ta', '') try : s = int(s ) except : raise TypeError ('Unable to convert str to float.') else : # set index to 0 , is station `S00` is found for instance. if s ==0 : keepindex =True st = copy.deepcopy(s) if isinstance(s, int): msg = 'Station numbering must start'\ ' from {0!r} or set `keepindex` argument to {1!r}.' msg = msg.format('0', 'False') if keepindex else msg.format( '1', 'True') if not keepindex: # station starts from 1 if s <=0: raise ValueError (msg ) s , ix = "S{:02}".format(s), s - 1 elif keepindex: if s < 0: raise ValueError (msg) # for consistency s, ix = "S{:02}".format(s ), s # Recompute the station position if the dipole value are given if dipole is not None: if isinstance(dipole, str): #'10m' if dipole.find('km')>=0: dipole = dipole.lower().replace('km', '000') dipole = dipole.lower().replace('m', '') try : dipole = float(dipole) except : raise StationError( 'Invalid literal value for' f' dipole : {dipole!r}') # since the renamed from dipole starts at 0 # e.g. 0(S1)---10(S2)---20(S3) ---30(S4)etc .. ix = int(st//dipole) ; s= "S{:02}".format(ix +1) return s, ix def _parse_args ( args:Union[List | str ] )-> Tuple [ pd.DataFrame, List[str|Any]]: """ `Parse_args` function returns array of rho and coordinates values (X, Y). Arguments can be a list of data, a dataframe or a Path like object. If a Path-like object is set, it should be the priority of reading. :param args: arguments :return: ndarray or array-like arranged with apparent resistivity at the first index .. note:: If a list of arrays is given or numpy.ndarray is given, we assume that the columns at the first index fits the apparent resistivity values. :Example: >>> import numpy as np >>> from kalfeat.tools.coreutils import _parse_args >>> a, b = np.arange (1, 10 , 0.5), np.random.randn(9).reshape(3, 3) >>> _parse_args ([a, 'data/erp/l2_gbalo.xlsx', b]) ... array([[1.1010000e+03, 0.0000000e+00, 7.9075200e+05, 1.0927500e+06], [1.1470000e+03, 1.0000000e+01, 7.9074700e+05, 1.0927580e+06], [1.3450000e+03, 2.0000000e+01, 7.9074300e+05, 1.0927630e+06], [1.3690000e+03, 3.0000000e+01, 7.9073800e+05, 1.0927700e+06], [1.4060000e+03, 4.0000000e+01, 7.9073300e+05, 1.0927765e+06], [1.5430000e+03, 5.0000000e+01, 7.9072900e+05, 1.0927830e+06], [1.4800000e+03, 6.0000000e+01, 7.9072400e+05, 1.0927895e+06], [1.5170000e+03, 7.0000000e+01, 7.9072000e+05, 1.0927960e+06], [1.7540000e+03, 8.0000000e+01, 7.9071500e+05, 1.0928025e+06], [1.5910000e+03, 9.0000000e+01, 7.9071100e+05, 1.0928090e+06]]) """ keys= ['res', 'rho', 'app.res', 'appres', 'rhoa'] col=None if isinstance(args, list): args, isfile = _assert_file(args) # file to datafame if not isfile: # list of values # _assert _list of array_length args = np.array(args, dtype =np.float64).T if isinstance(args, pd.DataFrame): # firt drop all untitled items # if data is from xlsx sheets args.drop([ c for c in args.columns if c.find('untitle')>=0 ], axis =1, inplace =True) # get the index of items `resistivity` ixs = [ii for ii, name in enumerate(args.columns ) for item in keys if name.lower().find(item)>=0] if len(set(ixs))==0: raise ValueError( f"Column name `resistivity` not found in {list(args.columns)}" " Please provide the resistivity column.") elif len(set(ixs))>1: raise ValueError ( f"Expected 1 but got {len(ixs)} resistivity columns " f"{tuple([list(args.columns)[i] for i in ixs])}.") rc= args.pop(args.columns[ixs[0]]) args.insert(0, 'app.res', rc) col =list(args.columns ) args = args.values if isinstance(args, pd.Series): col =args.name args = args.values return args, col def _assert_file ( args: List[str, Any] )-> Tuple [List [str , pd.DataFrame] | Any , bool]: """ Check whether the data is gathering into a Excel sheet workbook file. If the workbook is detected, will read the data and grab all into a dataframe. :param args: argument into a list :returns: - dataframe - assert whether workbook was successful read. :Example: >>> import numpy as np >>> from kalfeat.tools.coreutils import _assert_file >>> a, b = np.arange (1, 10 , 0.5), np.random.randn(9).reshape(3, 3) >>> data = [a, 'data/erp/l2_gbalo', b] # collection of 03 objects >>> # but read only the Path-Like object >>> _assert_file([a, 'data/erp/l2_gbalo.xlsx', b]) ... ['l2_gbalo', pk x y rho 0 0 790752 1092750.0 1101 1 10 790747 1092758.0 1147 2 20 790743 1092763.0 1345 3 30 790738 1092770.0 1369 4 40 790733 1092776.5 1406 5 50 790729 1092783.0 1543 6 60 790724 1092789.5 1480 7 70 790720 1092796.0 1517 8 80 790715 1092802.5 1754 9 90 790711 1092809.0 1591] """ isfile =False file = [ item for item in args if isinstance(item, str) if os.path.isfile (item)] if len(file) > 1: raise ValueError ( f"Expected a single file but got {len(file)}. " "Please select the right file expected to contain the data.") if len(file) ==1 : _, args = read_from_excelsheets(file[0]) isfile =True return args , isfile