import abc
from collections import UserDict
import pathlib
import re
import tempfile
from typing import Any, Callable, Sequence, TypeVar
from enum import Enum
from functools import lru_cache
from typing import Union, TextIO
from hikari.resources import cif_core_dict
from hikari.utility import make_abspath
T = TypeVar('T')
[docs]
class CifBlock(UserDict):
"""
CifBlock object handles all data inside an individual block of Cif file.
As a subclass of an `UserDict`, in python3.7+ it is ordered by design.
Individual Cif items can be accessed or assigned using a dict-like syntax.
"""
[docs]
def get_as_type(self, key: str, typ: Callable[[Any], T], default: Any = None) -> T:
"""
Get value of `self[key]` converted to `typ`. If value is a list,
convert its contents element-wise.
:param key: key associated with accessed element
:param typ: type/function applied to a value or its every element
:param default: if given, return it on KeyError
:return: value of `self[key]` or `default` converted to `typ`
"""
value = self.get(key)
if value is None:
value = default
else:
if isinstance(value, str):
value = typ(value)
elif isinstance(value, list):
value = list(map(typ, value))
else:
raise TypeError(f'Unknown value type of {value}: {type(value)}')
return value
[docs]
def read(self, path: str, block: str) -> None:
"""
Read the contents of .cif file specified by the `path` parameter, but
access and store only the `block` data block in self.
:param path: Absolute or relative path to the .cif file.
:param block: Name of the cif data block to be accessed
"""
reader = CifReader(cif_file_path=path)
self.update(reader.read()[block])
[docs]
def write(self, path: str) -> None:
"""
Write the contents of `CifBlock` to the .cif file specified
by the `path` parameter, using 'hikari' as block name.
:param path: Absolute or relative path to the .cif file.
"""
writer = CifWriter(cif_file_path=path)
writer.write(cif_frame=CifFrame({'hikari': self}))
[docs]
class CifFrame(UserDict):
"""
A master object which manages cif files. It utilises other `Cif*` classes
to manage multiple :class:`CifBlock`s with crystallographic information.
As a subclass of an `UserDict`, in python3.7+ it is ordered by design.
Individual Cif blocks and items within them can be accessed or assigned
using a single- or nested- dict-like syntax.
Similarly to other `Frame`s, `CifFrame` is designed to work in-place,
meaning it should be first created, and only then accessed using
methods such as :func:`read` or :func:`write`, but not chain assignments.
Unlike dict, CifBlock always initiates empty and does not accept
any parameters at creation.
"""
[docs]
def read(self, path: str) -> None:
"""
Read the contents of .cif file specified by the `path` parameter.
Store each found block as a {block_name: CifBlock} pair.
:param path: Absolute or relative path to the .cif file.
"""
reader = CifReader(cif_file_path=path)
self.update(reader.read())
[docs]
def write(self, path: str) -> None:
"""
Write the contents of `CifFrame` to the .cif file specified
by the `path` parameter.
:param path: Absolute or relative path to the .cif file.
"""
writer = CifWriter(cif_file_path=path)
writer.write(cif_frame=self)
[docs]
class CifValidator(UserDict):
"""
This object is used to validate individual cif keys when parsing cif files.
It knows the metadata about each key based on its entry
in the cif core dictionary v.2.4.5 packaged with the project.
Since the specification itself is written in cif format,
it is also read using the same `CifReader` (but without `CifValidator`).
Upon initialization, `CifValidator` becomes a dictionary whose
keys are all valid cif keys, according to the cif specification used.
Individual values are themselves dictionaries that store information
about key's contents, `_category`, `_type`, whether they are a `_list` etc.
contains all keys from core cif dictionary. In order
to access individual values, use `.get()` instead of bracket notation.
"""
def __init__(self) -> None:
super().__init__()
with tempfile.TemporaryDirectory() as temp_dir:
temp_dic_path = str(pathlib.Path(temp_dir) / 'cif_core.dic')
with open(temp_dic_path, 'w+') as f:
f.write(cif_core_dict)
reader = CifReader(cif_file_path=temp_dic_path, validate=False)
self.update(reader.read())
[docs]
def __contains__(self, item) -> bool:
try:
_ = self.get(item)
except KeyError:
return False
else:
return True
[docs]
def get(self, key: str, default: UserDict = None) -> UserDict:
"""Get the dictionary containing information about input cif `key`."""
# def get(self, key: str, default: Any = str) -> UserDict: FAILED
key, _key = (key[1:], key) if key.startswith('_') else (key, '_' + key)
value = UserDict()
try:
value = self[key]
except KeyError as e:
for self_key, self_value in self.items():
if key.startswith(self_key):
if _key in self_value.get('_name', []):
value = self_value
if not value:
value = default
return value
[docs]
def get__category(self, key: str, default: str = None) -> str:
"""Close equivalent to `self.get(key).get('_category', default)`"""
value = self.get(key)
if value is not None:
_category = value.get('_category', default)
else:
_category = default
return _category
[docs]
def get__list(self, key: str, default: bool = None) -> bool:
"""Close equivalent to `self.get(key).get('_list', default) == 'yes'`"""
value = self.get(key)
if value is not None:
got = value.get('_list')
_list = True if got == 'yes' else False if got == 'no' else None
else:
_list = default
return _list
[docs]
class CifIOBuffer(abc.ABC):
"""
An abstract base class for Cif reader and writer buffers.
Specifies that data can be added and flushed, names and values are stored
in a list, output is stored in target (dict if reading, file if writing).
"""
@abc.abstractmethod
def __init__(self, target: Any) -> None:
self.names = []
self.values = []
[docs]
@abc.abstractmethod
def add(self, data):
pass
[docs]
@abc.abstractmethod
def flush(self):
pass
[docs]
class CifIO(abc.ABC):
"""
A base class for `CifRead` and `CifWrite`. This class and its inheritors
base on the IUCr File Syntax version 1.1 Working specification available
[here](`https://www.iucr.org/resources/cif/spec/version1.1/cifsyntax`)
"""
COMMENT_REGEX = \
re.compile(r"(?<=\s)(#.*)(?=$)|(?<=^)(#.*)(?=$)", flags=re.M)
MATCHING_QUOTES_REGEX = re.compile(r"(\B[\"'])((?:\\\1|(?!\1\s).)*.)(\1\B)")
MATCHING_OUTER_DELIMITERS_REGEX = \
re.compile(r"(?<=^)([\"';])([\S\s]*)(\1)(?=$)")
MULTILINE_QUOTE_REGEX = re.compile(r"(^;)([\S\s]*?)(\n;)", flags=re.M)
WHITESPACE_SUBSTITUTES = {' ': '█', '\t': '▄', '\n': '▀'}
def __init__(self, cif_file_path, validate=True):
self.file_contents = ''
self.file_path = make_abspath(cif_file_path)
self.file_lines = []
self.validate = validate
[docs]
class CifReaderBuffer(CifIOBuffer):
"""Buffer for reading data from cif file into `CifReader`"""
def __init__(self, target: dict) -> None:
super().__init__(target=target)
self.target: dict = target
[docs]
def add(self, word: str) -> None:
"""Append the word to names or values based on its first char"""
if word.startswith('_'):
if self.values:
self.flush()
self.names.append(word)
else:
self.values.append(CifReader.revert_delimiters_and_whitespace(word))
[docs]
def flush(self) -> None:
"""Update the target dict with names and values stored hitherto"""
d = UserDict()
lv = len(self.values)
ln = len(self.names)
if lv == ln == 0:
pass
elif ln == 0:
raise IndexError(f'Orphan values found while '
f'flushing buffer: {self.values}')
elif lv % ln == 0:
d.update({n: self.values[i::ln] for i, n in
enumerate(self.names)})
else:
raise IndexError(
f'len(values) == {lv} % len(names) == {ln} mus'
f't be zero: {self.values} % {self.names}')
self.target.update(d)
self.__init__(target=self.target)
[docs]
class CifReader(CifIO):
"""
A helper class managing reading cif files into
:class:`~.CifFrame` or :class:`~.CifBlock`.
"""
@property
def blocks(self) -> dict[str, int]:
"""A dict of block names:line numbers where they start in cif file."""
return self._blocks(lines=tuple(self.file_lines))
[docs]
@lru_cache(maxsize=1)
def _blocks(self, lines: Sequence[str]) -> dict[str, int]:
return {l[5:]: i for i, l in enumerate(lines) if l.startswith('data_')}
[docs]
class State(Enum):
"""This class stores current cif reading state (e.g. inside loop etc.)"""
default = 0
loop_keys = 1
loop_values = 2
[docs]
def parse_lines(self, start: int, end: int) -> dict:
"""
Read the data from :attr:`~.CifIO.lines` numbered `start` to `end`,
interpret it, and return it as an instance of a dict.
:param start: number of the first line which data should be read from
:param end: number of the first line which should not be read anymore
:return: ordered dictionary with name: value pairs for all parsed lines
"""
parsed_data = dict()
buffer = CifReaderBuffer(target=parsed_data)
state = self.State.default
for line in self.file_lines[start:end]:
if line.lstrip().startswith('loop_'):
buffer.flush()
state = self.State.loop_keys
line = line.lstrip()[5:]
words = line.strip().split()
if not words:
if state is self.State.loop_values:
state = self.State.default
continue
if words[0].startswith('_') and state is not self.State.loop_keys:
buffer.flush()
if not words[0].startswith('_') and state is self.State.loop_keys:
state = self.State.loop_values
for word in words:
buffer.add(word)
buffer.flush()
formatted_data = self.format_dictionary(parsed_data)
return formatted_data
[docs]
def read(self) -> dict:
"""
Read the contents of cif currently pointed by :attr:`~.CifIO.file_path`
and block :attr:`~.CifIO.data_block_header` and return them to a dict.
:return: A dictionary containing information read from .cif file.
"""
with open(self.file_path, 'r') as cif_file:
self.file_contents = cif_file.read()
self.protect_multilines()
self.protect_quotes()
self.remove_comments()
self.file_lines = self.file_contents.splitlines()
block_names = self.blocks.keys()
block_starts = self.blocks.values()
block_ends = list(block_starts)[1:] + [None]
read_data = {}
for n, s, e in zip(block_names, block_starts, block_ends):
read_data[n] = CifBlock(self.parse_lines(s + 1, e))
return read_data
[docs]
def protect_multilines(self) -> None:
"""
Replace whitespace between every pair of "\\n;" sequences with
substitutes and remove the outer semicolons in `self.file_contents`.
"""
split_string = self.MULTILINE_QUOTE_REGEX.split(self.file_contents)
self.file_contents = self._protect_split(split_string)
[docs]
def protect_quotes(self) -> None:
"""
Replace whitespace between every pair of matching quotation marks
(single or double) with substitutes and remove the outer quotation
marks in `self.contents`. See stack overflow /q/46967465/ for details.
"""
split_string = self.MATCHING_QUOTES_REGEX.split(self.file_contents)
self.file_contents = self._protect_split(split_string)
[docs]
@classmethod
def _protect_split(cls, split_string: list[str]) -> str:
quoted = split_string[2::4]
for ws, sub in cls.WHITESPACE_SUBSTITUTES.items():
quoted = [w.replace(ws, sub) for w in quoted]
right_delimiters = [w.strip('\n') for w in split_string[3::4]]
split_string[2::4] = quoted
split_string[3::4] = right_delimiters
a = ''.join(split_string)
return ''.join(split_string)
[docs]
@classmethod
def revert_delimiters_and_whitespace(cls, string: str) -> str:
"""
If present, remove outer delimiters (matching quotes or semicolons) from
supplied string, remove `self.WHITESPACE_SUBSTITUTES` and return string.
:return: `string` without outer delimiters nor whitespace substitutes.
"""
s = ''.join(cls.MATCHING_OUTER_DELIMITERS_REGEX.split(string)[::2])
for whitespace, substitute in cls.WHITESPACE_SUBSTITUTES.items():
s = s.replace(substitute, whitespace)
return s
[docs]
class CifWriterBuffer(CifIOBuffer):
"""Buffer for writing data from `CifReader` into cif file """
MAX_NAME_LENGTH = 33
MAX_LINE_LENGTH = 80
MIN_STEP_LENGTH = 2
WHITESPACE = {' ', '\t', '\n'}
def __init__(self, target: TextIO) -> None:
super().__init__(target=target)
self.target: TextIO = target
self.current__category = ''
self.current__list = False
self.current_len = 0
[docs]
def add(self, data: tuple) -> None:
k_, v_ = data
k__category = cif_core_validator.get__category(k_)
k__list = cif_core_validator.get__list(k_) or isinstance(v_, list)
v_len = len(v_) if isinstance(v_, list) else 0
cat_match = self.current__category == k__category
lis_match = self.current__list == k__list
len_match = self.current_len == v_len
start_match = k_[:12] == ['', *self.names][-1][:12]
entry_missing = cif_core_validator.get(k_) is None
if len_match and ((cat_match and lis_match) or
(entry_missing and start_match)):
self.names.append(k_)
self.values.append(v_)
else:
self.flush()
self.names = [k_]
self.values = [v_]
self.current__category = k__category
self.current__list = k__list
self.current_len = v_len
[docs]
def flush(self) -> None:
s = '\n'
if self.current__list is True:
s += self.format_table()
else:
for n_, v_ in zip(self.names, self.values):
s += self.format_line(n_, v_) + '\n'
self.target.write(s)
self.names = []
self.values = []
[docs]
def enquote(self, text: str, force: bool = False) -> str:
if text == '':
quoted = "''"
elif any(whitespace in text for whitespace in self.WHITESPACE) or force:
if '\n' in text:
quoted = f';{text}\n;'
elif "'" not in text:
quoted = f"'{text}'"
elif '"' not in text:
quoted = f'"{text}"'
elif '\n;' not in text:
quoted = f';{text}\n;'
else:
raise ValueError(f'Unable to quote text: {text}')
else:
quoted = text
return quoted
[docs]
class CifWriter(CifIO):
"""
A helper class managing writing :class:`~.CifFrame` or :class:`~.CifBlock`
into cif files
"""
[docs]
def write(self, cif_frame: CifFrame) -> None:
with open(self.file_path, 'w') as cif_file:
buffer = CifWriterBuffer(target=cif_file)
first_block = True
for block_name, block in cif_frame.items():
if not first_block:
cif_file.write('\n\n')
cif_file.write(f'data_{block_name}')
for data in block.items():
buffer.add(data)
buffer.flush()
first_block = False
cif_core_validator = CifValidator()