Created
August 9, 2016 18:35
-
-
Save leonsas/d1a1bef0a065b295450863209aa2b64c to your computer and use it in GitHub Desktop.
Django AuditTrail
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
From https://code.djangoproject.com/wiki/AuditTrail | |
Modified to allow `exclude_fields` options. | |
""" | |
from django.dispatch import dispatcher | |
from django.db import models | |
from django.core.exceptions import ImproperlyConfigured | |
from django.contrib import admin | |
import copy | |
import re | |
import types | |
try: | |
import settings_audit | |
except ImportError: | |
settings_audit = None | |
value_error_re = re.compile("^.+'(.+)'$") | |
class AuditTrail(object): | |
def __init__(self, show_in_admin=False, save_change_type=True, audit_deletes=True, | |
track_fields=None, exclude_fields=None): | |
self.opts = {} | |
self.opts['show_in_admin'] = show_in_admin | |
self.opts['save_change_type'] = save_change_type | |
self.opts['audit_deletes'] = audit_deletes | |
if track_fields: | |
self.opts['track_fields'] = track_fields | |
else: | |
self.opts['track_fields'] = [] | |
if exclude_fields: | |
self.opts['exclude_fields'] = exclude_fields | |
else: | |
self.opts['exclude_fields'] = [] | |
def contribute_to_class(self, cls, name): | |
# This should only get added once the class is otherwise complete | |
def _contribute(sender, **kwargs): | |
model = create_audit_model(sender, **self.opts) | |
if self.opts['show_in_admin']: | |
# Enable admin integration | |
# If ModelAdmin needs options or different base class, find | |
# some way to make the commented code work | |
# cls_admin_name = cls.__name__ + 'Admin' | |
# clsAdmin = type(cls_admin_name, (admin.ModelAdmin,),{}) | |
# admin.site.register(cls, clsAdmin) | |
# Otherwise, register class with default ModelAdmin | |
admin.site.register(model) | |
descriptor = AuditTrailDescriptor(model._default_manager, sender._meta.pk.attname) | |
setattr(sender, name, descriptor) | |
def _audit_track(instance, field_arr, **kwargs): | |
field_name = field_arr[0] | |
try: | |
return getattr(instance, field_name) | |
except: | |
if len(field_arr) > 2: | |
if callable(field_arr[2]): | |
fn = field_arr[2] | |
return fn(instance) | |
else: | |
return field_arr[2] | |
def _audit(sender, instance, created, **kwargs): | |
# Write model changes to the audit model. | |
# instance is the current (non-audit) model. | |
kwargs = {} | |
for field in sender._meta.fields: | |
#kwargs[field.attname] = getattr(instance, field.attname) | |
if field.name in self.opts['exclude_fields']: | |
continue | |
kwargs[field.name] = getattr(instance, field.name) | |
if self.opts['save_change_type']: | |
if created: | |
kwargs['_audit_change_type'] = 'I' | |
else: | |
kwargs['_audit_change_type'] = 'U' | |
for field_arr in model._audit_track: | |
kwargs[field_arr[0]] = _audit_track(instance, field_arr) | |
model._default_manager.create(**kwargs) | |
## Uncomment this line for pre r8223 Django builds | |
#dispatcher.connect(_audit, signal=models.signals.post_save, sender=cls, weak=False) | |
## Comment this line for pre r8223 Django builds | |
models.signals.post_save.connect(_audit, sender=cls, weak=False) | |
if self.opts['audit_deletes']: | |
def _audit_delete(sender, instance, **kwargs): | |
# Write model changes to the audit model | |
kwargs = {} | |
for field in sender._meta.fields: | |
if field.name in self.opts['exclude_fields']: | |
continue | |
kwargs[field.name] = getattr(instance, field.name) | |
if self.opts['save_change_type']: | |
kwargs['_audit_change_type'] = 'D' | |
for field_arr in model._audit_track: | |
kwargs[field_arr[0]] = _audit_track(instance, field_arr) | |
model._default_manager.create(**kwargs) | |
## Uncomment this line for pre r8223 Django builds | |
#dispatcher.connect(_audit_delete, signal=models.signals.pre_delete, sender=cls, weak=False) | |
## Comment this line for pre r8223 Django builds | |
models.signals.pre_delete.connect(_audit_delete, sender=cls, weak=False) | |
## Uncomment this line for pre r8223 Django builds | |
#dispatcher.connect(_contribute, signal=models.signals.class_prepared, sender=cls, weak=False) | |
## Comment this line for pre r8223 Django builds | |
models.signals.class_prepared.connect(_contribute, sender=cls, weak=False) | |
class AuditTrailDescriptor(object): | |
def __init__(self, manager, pk_attribute): | |
self.manager = manager | |
self.pk_attribute = pk_attribute | |
def __get__(self, instance=None, owner=None): | |
if instance == None: | |
#raise AttributeError, "Audit trail is only accessible via %s instances." % type.__name__ | |
return create_audit_manager_class(self.manager) | |
else: | |
return create_audit_manager_with_pk(self.manager, self.pk_attribute, instance._get_pk_val()) | |
def __set__(self, instance, value): | |
raise AttributeError, "Audit trail may not be edited in this manner." | |
def create_audit_manager_with_pk(manager, pk_attribute, pk): | |
"""Create an audit trail manager based on the current object""" | |
class AuditTrailWithPkManager(manager.__class__): | |
def __init__(self, *arg, **kw): | |
super(AuditTrailWithPkManager, self).__init__(*arg, **kw) | |
self.model = manager.model | |
def get_query_set(self): | |
qs = super(AuditTrailWithPkManager, self).get_query_set().filter(**{pk_attribute: pk}) | |
if self._db is not None: | |
qs = qs.using(self._db) | |
return qs | |
return AuditTrailWithPkManager() | |
def create_audit_manager_class(manager): | |
"""Create an audit trail manager based on the current object""" | |
class AuditTrailManager(manager.__class__): | |
def __init__(self, *arg, **kw): | |
super(AuditTrailManager, self).__init__(*arg, **kw) | |
self.model = manager.model | |
return AuditTrailManager() | |
def create_audit_model(cls, **kwargs): | |
"""Create an audit model for the specific class""" | |
name = cls.__name__ + 'Audit' | |
class Meta: | |
db_table = '%s_audit' % cls._meta.db_table | |
app_label = cls._meta.app_label | |
verbose_name_plural = '%s audit trail' % cls._meta.verbose_name | |
ordering = ['-_audit_timestamp'] | |
# Set up a dictionary to simulate declarations within a class | |
attrs = { | |
'__module__': cls.__module__, | |
'Meta': Meta, | |
'_audit_id': models.AutoField(primary_key=True), | |
'_audit_timestamp': models.DateTimeField(auto_now_add=True, db_index=True), | |
'_audit__str__': cls.__str__.im_func, | |
'__str__': lambda self: '%s as of %s' % (self._audit__str__(), self._audit_timestamp), | |
'_audit_track': _track_fields(track_fields=kwargs['track_fields'], unprocessed=True) | |
} | |
if 'save_change_type' in kwargs and kwargs['save_change_type']: | |
attrs['_audit_change_type'] = models.CharField(max_length=1) | |
# Copy the fields from the existing model to the audit model | |
for field in cls._meta.fields: | |
if field.name in kwargs['exclude_fields']: | |
continue | |
#if field.attname in attrs: | |
if field.name in attrs: | |
raise ImproperlyConfigured, "%s cannot use %s as it is needed by AuditTrail." % (cls.__name__, field.attname) | |
if isinstance(field, models.AutoField): | |
# Audit models have a separate AutoField | |
attrs[field.name] = models.IntegerField(db_index=True, editable=False) | |
else: | |
attrs[field.name] = copy.copy(field) | |
# If 'unique' is in there, we need to remove it, otherwise the index | |
# is created and multiple audit entries for one item fail. | |
attrs[field.name]._unique = False | |
# If a model has primary_key = True, a second primary key would be | |
# created in the audit model. Set primary_key to false. | |
attrs[field.name].primary_key = False | |
# Rebuild and replace the 'rel' object to avoid foreign key clashes. | |
# Borrowed from the Basie project | |
# Basie is MIT and GPL dual licensed. | |
if isinstance(field, models.ForeignKey): | |
rel = copy.copy(field.rel) | |
rel.related_name = '_audit_' + field.related_query_name() | |
attrs[field.name].rel = rel | |
for track_field in _track_fields(kwargs['track_fields']): | |
if track_field['name'] in attrs: | |
raise NameError('Field named "%s" already exists in audit version of %s' % (track_field['name'], cls.__name__)) | |
attrs[track_field['name']] = copy.copy(track_field['field']) | |
return type(name, (models.Model,), attrs) | |
def _build_track_field(track_item): | |
track = {} | |
track['name'] = track_item[0] | |
if isinstance(track_item[1], models.Field): | |
track['field'] = track_item[1] | |
elif issubclass(track_item[1], models.Model): | |
track['field'] = models.ForeignKey(track_item[1]) | |
else: | |
raise TypeError('Track fields only support items that are Fields or Models.') | |
return track | |
def _track_fields(track_fields=None, unprocessed=False): | |
# Add in the fields from the Audit class "track" attribute. | |
tracks_found = [] | |
if settings_audit: | |
global_track_fields = getattr(settings_audit, 'GLOBAL_TRACK_FIELDS', []) | |
for track_item in global_track_fields: | |
if unprocessed: | |
tracks_found.append(track_item) | |
else: | |
tracks_found.append(_build_track_field(track_item)) | |
if track_fields: | |
for track_item in track_fields: | |
if unprocessed: | |
tracks_found.append(track_item) | |
else: | |
tracks_found.append(_build_track_field(track_item)) | |
return tracks_found |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment