Source code for elasticgit.commands.avro

from __future__ import absolute_import

from jinja2 import Environment, PackageLoader
from functools import partial
import argparse
import imp
import json
import pprint

from datetime import datetime

import avro.schema

from elasticgit.models import (
    Model, IntegerField, TextField, FloatField,
    BooleanField, ListField, DictField, UUIDField,

from elasticgit.commands.base import (
    ToolCommand, ToolCommandError, CommandArgument)
from elasticgit.utils import load_class

[docs]def deserialize(data, field_mapping={}, module_name=None): """ Deserialize an Avro schema and define it within a module (if specified) :param dict data: The Avro schema :param dict field_mapping: Optional mapping to override the default mapping. :param str module_name: The name of the module to put this in. This module is dynamically generated with :py:func:`imp.new_module` and only available during code generation for setting the class' ``__module__``. :returns: :py:class:`elasticgit.models.Model` >>> from elasticgit.commands.avro import deserialize >>> schema = { ... 'name': 'Foo', ... 'type': 'record', ... 'fields': [{ ... 'name': 'some_field', ... 'type': 'int', ... }] ... } >>> deserialize(schema) <class 'Foo'> >>> """ schema_loader = SchemaLoader() schema = avro.schema.make_avsc_object(data, avro.schema.Names()).to_json() model_code = schema_loader.generate_model(schema) model_name = schema['name'] if module_name is not None: mod = imp.new_module(module_name) scope = mod.__dict__ else: scope = {} exec model_code in scope return scope.pop(model_name)
[docs]def serialize(model_class): """ Serialize a :py:class:`elasticgit.models.Model` to an Avro JSON schema :param elasticgit.models.Model model_class: :returns: str >>> from elasticgit.commands.avro import serialize >>> from elasticgit.tests.base import TestPerson >>> json_data = serialize(TestPerson) >>> import json >>> schema = json.loads(json_data) >>> sorted(schema.keys()) [u'fields', u'name', u'namespace', u'type'] >>> """ schema_dumper = SchemaDumper() return schema_dumper.dump_schema(model_class)
[docs]class FieldMapType(object): """ A custom type for providing mappings on the command line for the :py:class:`.SchemaLoader` tool. :param str mapping: A mapping of a key to a field type >>> from elasticgit.commands.avro import FieldMapType >>> mt = FieldMapType('uuid=elasticgit.models.UUIDField') >>> mt.key 'uuid' >>> mt.field_class <class 'elasticgit.models.UUIDField'> >>> """ def __init__(self, mapping): key, _, class_name = mapping.partition('=') self.key = key self.field_class = load_class(class_name)
[docs]class RenameType(object): """ A custom type for renaming things. :param str mapping: A mapping of an old name to a new name >>> from elasticgit.commands.avro import RenameType >>> rt = RenameType('OldName=NewName') >>> rt.old 'OldName' >>> 'NewName' >>> """ def __init__(self, mapping): self.old, _, = mapping.partition('=')
[docs]class SchemaLoader(ToolCommand): """ Load an Avro_ JSON schema and generate Elasticgit Model python code. :: python -m load-schema avro.json .. _Avro: """ command_name = 'load-schema' command_help_text = 'Dump an Avro schema as an Elasticgit model.' command_arguments = ( CommandArgument( 'schema_files', metavar='schema_file', help='path to Avro schema file.', nargs='+', type=argparse.FileType('r')), CommandArgument( '-m', '--map-field', help=( 'Manually map specific field names to Field classes. ' 'Formatted as ``field=IntegerField``' ), metavar='key=FieldType', dest='field_mappings', action='append', type=FieldMapType), CommandArgument( '-r', '--rename-model', help=( 'Manually rename a model.' 'Formatted as ``OldModelName=NewShiny``'), metavar='OldModelName=NewShiny', dest='model_renames', action='append', type=RenameType), ) core_mapping = { 'int': IntegerField, 'string': TextField, 'float': FloatField, 'boolean': BooleanField, }
[docs] def run(self, schema_files, field_mappings=None, model_renames=None): """ Inspect an Avro schema file and write the generated Python code to ``self.stdout`` :param list schema_files: The list of file pointers to load. :param list field_mappings: A list of :py:class:`.FieldMapType` types that allow overriding of field mappings. :param list model_renames: A list of :py:class:`.RenameType` types that allow renaming of model names """ field_mapping = dict((m.key, m.field_class) for m in field_mappings or []) model_renames = dict((m.old, for m in model_renames or []) schemas = [json.load(schema_fp) for schema_fp in schema_files] self.stdout.write(self.generate_models( schemas, field_mapping=field_mapping, model_renames=model_renames))
def model_class_for(self, model_name, model_renames): return model_renames.get(model_name, model_name) def field_class_for(self, field, field_mapping): field_type = field['type'] field_name = field['name'] if field_name in field_mapping: return field_mapping[field_name].__name__ if isinstance(field_type, dict): return self.field_class_for_complex_type(field) if isinstance(field_type, list): return self.field_class_for_core_type(field_type) return self.core_mapping[field_type].__name__ def field_class_for_core_type(self, core_types): [not_null_type] = [core_type for core_type in core_types if core_type != "null"] return self.core_mapping[not_null_type].__name__ def field_class_for_complex_type(self, field): field_type = field['type']['type'] if isinstance(field_type, list): [field_type] = [ft for ft in field_type if ft != "null"] handler = getattr( self, 'field_class_for_complex_%s_type' % (field_type,)) return handler(field) def field_class_for_complex_record_type(self, field): return DictField.__name__ def field_class_for_complex_array_type(self, field): return ListField.__name__ def default_value(self, field): return pprint.pformat(field['default'], indent=8)
[docs] def generate_models(self, schemas, field_mapping={}, model_renames={}): """ Generate Python code for the given Avro schemas :param list schemas: A list of Avro schema's :param dict field_mapping: An optional mapping of keys to field types that can be used to override the default mapping. :returns: str """ first, remainder = schemas[0], schemas[1:] first_chunk = self.generate_model(first, field_mapping, model_renames) remainder_chunk = u''.join([ self.generate_model(subsequent, field_mapping, model_renames, include_header=False) for subsequent in remainder]) return u'\n'.join([ first_chunk, remainder_chunk, ])
[docs] def generate_model(self, schema, field_mapping={}, model_renames={}, include_header=True): """ Generate Python code for the given Avro schema :param dict schema: The Avro schema :param dict field_mapping: An optional mapping of keys to field types that can be used to override the default mapping. :param dict model_renames: An optional mapping of model names that can be used to rename a model. :parak bool include_header: Whether or not to generate the header in the source code, this is useful of you're generating a list of model schema but don't want the header and import statements printed every time. :returns: str """ env = Environment(loader=PackageLoader('elasticgit', 'templates')) env.globals['model_class_for'] = partial( self.model_class_for, model_renames=model_renames) env.globals['field_class_for'] = partial( self.field_class_for, field_mapping=field_mapping) env.globals['default_value'] = self.default_value env.globals['is_complex'] = ( lambda field: isinstance(field['type'], dict)) env.globals['field_class_for_core_type'] = ( self.field_class_for_core_type) # env.globals['core_mapping'] = self.core_mapping template = env.get_template('') return template.render( datetime=datetime.utcnow(), schema=schema, include_header=include_header, version_info=version_info)
[docs]class SchemaDumper(ToolCommand): """ Dump an Avro_ JSON schema for an Elasticgit Model. :: python -m dump-schema elasticgit.tests.base.TestPerson .. _Avro: """ command_name = 'dump-schema' command_help_text = 'Dump model information as an Avro schema.' command_arguments = ( CommandArgument('class_path', help='python path to Class.'), ) # How model fields map to types core_field_mappings = { IntegerField: 'int', TextField: 'string', FloatField: 'float', BooleanField: 'boolean', UUIDField: 'string', }
[docs] def run(self, class_path): """ Introspect the given class path and print the schema to `self.stdout` :param str class_path: The path to the model file to introspect """ module_path, name = class_path.rsplit('.', 1) parent_module = __import__(module_path, fromlist=[name]) model_class = getattr(parent_module, name) if not issubclass(model_class, Model): raise ToolCommandError( '%r is not a subclass of %r' % (model_class, Model)) return self.stdout.write(self.dump_schema(model_class))
[docs] def dump_schema(self, model_class): """ Return the JSON schema for an :py:class:`elasticgit.models.Model`. :param elasticgit.models.Model model_class: :returns: str """ return json.dumps({ 'type': 'record', 'namespace': model_class.__module__, 'name': model_class.__name__, 'fields': [self.get_field_info(name, field) for name, field in model_class._fields.items()], }, indent=2)
def map_field_to_type(self, field): if field.__class__ in self.core_field_mappings: return ["null", self.core_field_mappings[field.__class__]] handler = getattr(self, 'map_%s_type' % (field.__class__.__name__,)) return handler(field) def map_ListField_type(self, field): return { 'type': 'array', 'name':, 'namespace': field.__class__.__module__, 'items': list(set(reduce( lambda a, b: a + b, [self.map_field_to_type(fld) for fld in field.fields], []))), } def map_DictField_type(self, field): return { 'type': 'record', 'name':, 'namespace': field.__class__.__module__, 'fields': [{ 'name':, 'type': self.map_field_to_type(fld), } for fld in field.fields], }
[docs] def get_field_info(self, name, field): """ Return the Avro field object for an :py:class:`elasticgit.models.Model` field. :param str name: The name of the field :param confmodel.fields.ConfigField field: The field :returns: dict """ return { 'name': name, 'type': self.map_field_to_type(field), 'doc': field.doc, 'default': field.default, 'aliases': [fallback.field_name for fallback in field.fallbacks] }