import os
from urllib import quote

from git import Repo

from elasticutils import (
    MappingType, Indexable, S as SBase,
    ObjectSearchResults, DictSearchResults, ListSearchResults)

from elasticgit.utils import introspect_properties
from import RemoteStorageManager

[docs]def index_name(prefix, name): """ Generate an Elasticsearch index name using given name and prefixing it with the given ``index_prefix``. The resulting generated index name is URL quoted. :param str prefix: The prefix to use for the index. :param str name: The name to use for the index. :returns: str """ return '-'.join(map(quote, [prefix, name]))
class ModelMappingTypeBase(MappingType): short_name = 'MappingType' @classmethod def get_mapping_type_name(cls): model_class = cls.model_class return '%s-%sType' % ( model_class.__module__.replace(".", "-"), model_class.__name__) @classmethod def get_model(self): return self.model_class def get_object(self): raise NotImplementedError def to_object(self): obj = self.model_class(self._results_dict, es_meta=self.es_meta) obj.set_read_only() # might not be in sync with Git return obj @classmethod def get_es(cls): raise NotImplementedError @classmethod def get_mapping(cls): return { 'properties': introspect_properties(cls.model_class) } @classmethod def subclass(cls, model_class, **attributes): attributes = attributes.copy() attributes['model_class'] = model_class return type( '%s%s' % (model_class.__name__, cls.short_name), (cls,), attributes) class ReadOnlyModelMappingType(ModelMappingTypeBase): short_name = 'ROMappingType' @classmethod def get_index(cls): return cls.s.get_repo_indexes() @classmethod def get_es(cls): return cls.s.get_es() @classmethod def subclass(cls, model_class, s): return super(ReadOnlyModelMappingType, cls).subclass( model_class, s=s) class ReadWriteModelMappingType(ModelMappingTypeBase, Indexable): short_name = 'RWMappingType' @classmethod def get_index(cls): return def get_object(self): return, self._id) @classmethod def get_es(cls): return @classmethod def extract_document(cls, obj_id, obj=None): if obj is None: obj =, obj_id) return dict(obj) @classmethod def get_indexable(cls): return @classmethod def subclass(cls, model_class, im, sm): return super(ReadWriteModelMappingType, cls).subclass( model_class, im=im, sm=sm) class SearchResultsMixin(object): def set_objects(self, results): """ Adds the index returned by Elasticsearch to :py:class:`Metadata` instances. See also: """ super(SearchResultsMixin, self).set_objects(results) for obj, result in zip(self.objects, self.results): obj.es_meta.index = result.get('_index') class CustomDictSearchResults(SearchResultsMixin, DictSearchResults): pass class CustomListSearchResults(SearchResultsMixin, ListSearchResults): pass class CustomObjectSearchResults(SearchResultsMixin, ObjectSearchResults): pass class S(SBase): def to_python(self, obj): """ Override `PythonMixin.to_python` to skip in-place datetime conversion. The original method's only function is to convert datetime-ish strings to datetime objects. This is done irrespective of mapping type and the timezone-aware ISO format is not recognized. """ return obj def get_results_class(self): """ Returns the custom results class to use """ results_class = super(S, self).get_results_class() return { DictSearchResults: CustomDictSearchResults, ListSearchResults: CustomListSearchResults, ObjectSearchResults: CustomObjectSearchResults, }.get(results_class) class RepoHelper(object): def __init__(self, repo_url): self.repo_url = repo_url if any([ repo_url.startswith('http://'), repo_url.startswith('https://')]): self.rsm = RemoteStorageManager(repo_url) self.repo = None else: self.rsm = None self.repo = Repo(repo_url) def active_branch_name(self): if self.repo: return return self.rsm.active_branch() def default_index_prefix(self): if self.repo: return os.path.basename(self.repo_url) return self.rsm.repo_name
[docs]class SM(S): """ A search interface similar to :py:class:`elasticutils.S` to retrieve :py:class:`` instances stored in Elasticsearch. These can be converted to :py:class:`elasticgit.model.Model` instances using :py:func:`ReadOnlyModelMappingType.to_object`. :param type model_class: A subclass of :py:class:`elasticgit.models.Model` for generating a mapping type. :param list in_: A list of :py:class:`git.Repo` instances, or a list of repo working dirs. :param list index_prefixes: An optional list of index prefixes corresponding to the repos in `in_`. """ def __init__(self, model_class, in_, index_prefixes=None): type_ = ReadOnlyModelMappingType.subclass( s=self, model_class=model_class) super(SM, self).__init__(type_=type_) self.repos = in_ self.index_prefixes = index_prefixes self.repos = map( lambda repo: repo if isinstance(repo, RepoHelper) else RepoHelper(repo), self.repos) if not self.index_prefixes: self.index_prefixes = map( lambda repo: repo.default_index_prefix(), self.repos)
[docs] def get_repo_indexes(self): """ Generate the indexes corresponding to the ``repos``. :returns: list """ if not self.repos: return [] return map( lambda (ip, r): index_name(ip, r.active_branch_name()), zip(self.index_prefixes, self.repos))
def _clone(self, next_step=None): # S._clone is re-implemented, because SM.__init__'s # signature differs from S.__init__. # Original method: # # noqa new = self.__class__( self.type.model_class, in_=self.repos, index_prefixes=self.index_prefixes) new.steps = list(self.steps) if next_step: new.steps.append(next_step) new.start = self.start new.stop = self.stop new.field_boosts = self.field_boosts.copy() return new
[docs]class ESManager(object): """ An interface to :py:class:`elasticgit.models.Model` instances stored in Git. :param elasticgit.workspace.Workspace workspace: The workspace to operate on. :param elasticsearch.Elasticsearch es: An Elasticsearch client instance. """ def __init__(self, storage_manager, es, index_prefix): = storage_manager = es self.index_prefix = index_prefix def get_mapping_type(self, model_class): return ReadWriteModelMappingType.subclass( im=self,, model_class=model_class)
[docs] def index_exists(self, name): """ Check if the index already exists in Elasticsearch :param str name: :returns: bool """ return
[docs] def create_index(self, name): """ Creates the index in Elasticsearch :param str name: """ return
[docs] def destroy_index(self, name): """ Destroys the index in Elasticsearch :param str name: """ return
[docs] def index_status(self, name): """ Get an index status :param str name: """ index_name = self.index_name(name) status = index_status = status['indices'][index_name] return index_status
[docs] def index_ready(self, name): """ Check if an index is ready for use. :param str name: :returns: bool """ status = self.index_status(name) # NOTE: ES returns a lot of nested info here, hence the complicated # generator in generator return any([ any([shard['state'] == 'STARTED' for shard in shard_slice]) for shard_slice in status['shards'].values() ])
[docs] def index(self, model, refresh_index=False): """ Index a :py:class:`elasticgit.models.Model` instance in Elasticsearch :param elasticgit.models.Model model: The model instance :param bool refresh_index: Whether or not to manually refresh the Elasticsearch index. Useful in testing. :returns: :py:class:`elasticgit.models.Model` """ model_class = model.__class__ MappingType = self.get_mapping_type(model_class) MappingType.index( MappingType.extract_document(model.uuid, model), id_=model.uuid) if refresh_index: MappingType.refresh_index() return model
[docs] def raw_unindex(self, model_class, uuid, refresh_index=False): """ Remove an entry from the Elasticsearch index. This differs from :py:func:`.unindex` because it does not require an instance of :py:class:`elasticgit.models.Model` because you're likely in a position where you don't have it if you're trying to unindex it. :param elasticgit.models.Model model_class: The model class :param str uuid: The model's UUID :param bool refresh_index: Whether or not to manually refresh the Elasticsearch index. Useful in testing. """ MappingType = self.get_mapping_type(model_class) MappingType.unindex(uuid) if refresh_index: MappingType.refresh_index()
[docs] def unindex(self, model, refresh_index=False): """ Remove a :py:class:`elasticgit.models.Model` instance from the Elasticsearch index. :param elasticgit.models.Model model: The model instance :param bool refresh_index: Whether or not to manually refresh the Elasticsearch index. Useful in testing. :returns: :py:class:`elasticgit.models.Model` """ model_class = model.__class__ self.raw_unindex(model_class, model.uuid, refresh_index=refresh_index) return model
[docs] def index_name(self, name): """ Generate an Elasticsearch index name using given name and prefixing it with the ``index_prefix``. The resulting generated index name is URL quoted. :param str name: The name to use for the index. """ return index_name(self.index_prefix, name)
[docs] def refresh_indices(self, name): """ Manually refresh the Elasticsearch index. In production this is not necessary but it is useful when running tests. :param str name: """ return
[docs] def setup_mapping(self, name, model_class): """ Specify a mapping for a model class in a specific index :param str name: :param elasticgit.models.Model model_class: :returns: dict """ MappingType = self.get_mapping_type(model_class) return self.setup_custom_mapping( name, model_class, MappingType.get_mapping())
[docs] def setup_custom_mapping(self, name, model_class, mapping): """ Specify a mapping for a model class in a specific index :param str name: :param elasticgit.models.Model model_class: :param dict mapping: The Elasticsearch mapping definition :returns: dict """ MappingType = self.get_mapping_type(model_class) return index=self.index_name(name), doc_type=MappingType.get_mapping_type_name(), body=mapping)
[docs] def get_mapping(self, name, model_class): """ Retrieve a mapping for a model class in a specific index :param str name: :param elasticgit.models.Model model_class: :returns: dict """ index_name = self.index_name(name) MappingType = self.get_mapping_type(model_class) data = index=index_name, doc_type=MappingType.get_mapping_type_name()) mappings = data[index_name]['mappings'] return mappings[MappingType.get_mapping_type_name()]