Initial commit
This commit is contained in:
0
auth/__init__.py
Normal file
0
auth/__init__.py
Normal file
90
auth/admin.py
Normal file
90
auth/admin.py
Normal file
@@ -0,0 +1,90 @@
|
||||
from flaskbase.extensions import admin, db
|
||||
from flaskbase.admin import ModelView
|
||||
from auth.models import User, Role, Connection
|
||||
from jinja2 import Markup
|
||||
|
||||
from flask import flash, redirect
|
||||
from flask_admin.babel import gettext
|
||||
from flask_admin.base import expose
|
||||
from flask_admin.helpers import (get_redirect_target, flash_errors)
|
||||
from flask_security import login_user
|
||||
|
||||
|
||||
class UserModelView(ModelView):
|
||||
column_searchable_list = (
|
||||
'email', 'current_login_ip', 'last_login_ip',
|
||||
)
|
||||
|
||||
column_list = (
|
||||
'email', 'active', 'is_superuser', 'last_login_at', 'last_login_ip',
|
||||
'display_name'
|
||||
)
|
||||
|
||||
column_filters = column_list
|
||||
column_sortable_list = column_list
|
||||
|
||||
column_labels = {
|
||||
'is_superuser': 'SU',
|
||||
}
|
||||
|
||||
column_descriptions = {
|
||||
'is_superuser': 'Superuser',
|
||||
}
|
||||
|
||||
inline_models = (Connection,)
|
||||
|
||||
list_template = 'admin/user_list.html'
|
||||
can_impersonate = True
|
||||
|
||||
@expose('/impersonate/', methods=('POST',))
|
||||
def impersonate_view(self):
|
||||
"""
|
||||
Impersonate user view. Only POST method is allowed.
|
||||
"""
|
||||
return_url = get_redirect_target() or self.get_url('.index_view')
|
||||
|
||||
if not self.can_impersonate:
|
||||
return redirect(return_url)
|
||||
|
||||
form = self.delete_form()
|
||||
|
||||
if self.validate_form(form):
|
||||
# id is Required()
|
||||
id = form.id.data
|
||||
|
||||
model = self.get_one(id)
|
||||
|
||||
if model is None:
|
||||
return redirect(return_url)
|
||||
|
||||
if login_user(model):
|
||||
flash(gettext('You are now %s' % (model,)))
|
||||
return redirect('/')
|
||||
else:
|
||||
flash_errors(form, message='Failed to impersonate. %(error)s')
|
||||
|
||||
return redirect(return_url)
|
||||
|
||||
|
||||
class ConnectionModelView(ModelView):
|
||||
column_list = (
|
||||
'user',
|
||||
'provider_id',
|
||||
'full_name',
|
||||
'email',
|
||||
)
|
||||
|
||||
column_formatters = {
|
||||
'full_name':
|
||||
lambda v, c, m, n: Markup(
|
||||
'<a href="%s" target="_blank"><img src="%s" /> %s</a>') % (
|
||||
m.profile_url, m.image_url, m.full_name)
|
||||
}
|
||||
|
||||
column_filters = column_list
|
||||
column_sortable_list = column_list
|
||||
|
||||
|
||||
admin.add_view(UserModelView(User, db.session))
|
||||
admin.add_view(ModelView(Role, db.session))
|
||||
admin.add_view(ConnectionModelView(Connection, db.session))
|
||||
21
auth/commands.py
Normal file
21
auth/commands.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from flask_script import Manager
|
||||
from auth.models import User
|
||||
from flaskbase.extensions import db
|
||||
import datetime
|
||||
|
||||
|
||||
manager = Manager()
|
||||
|
||||
|
||||
@manager.option('-e', '--email', help='User email', default='root')
|
||||
@manager.option('-p', '--password', help='Password', default='t00r')
|
||||
def create_superuser(email, password):
|
||||
try:
|
||||
u = User(email=email, password=password, active=True,
|
||||
is_superuser=True, confirmed_at=datetime.datetime.utcnow())
|
||||
db.session.add(u)
|
||||
db.session.commit()
|
||||
|
||||
print('User %s successfuly created.' % (email,))
|
||||
except Exception, e:
|
||||
print('User creation failed: %s' % (e,))
|
||||
12
auth/factories.py
Normal file
12
auth/factories.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from flaskbase.factories import BaseFactory
|
||||
from factory import Sequence
|
||||
from auth.models import User
|
||||
|
||||
|
||||
class UserFactory(BaseFactory):
|
||||
email = Sequence(lambda n: 'user{0}@example.com'.format(n))
|
||||
password = Sequence(lambda n: 'UserPassword{0}'.format(n))
|
||||
active = True
|
||||
|
||||
class Meta:
|
||||
model = User
|
||||
6
auth/forms.py
Normal file
6
auth/forms.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from flask_wtf import Form
|
||||
from flask_security.forms import UniqueEmailFormMixin
|
||||
|
||||
|
||||
class SocialLoginConfirmForm(Form, UniqueEmailFormMixin):
|
||||
pass
|
||||
120
auth/models.py
Normal file
120
auth/models.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from flask import g, session, url_for
|
||||
from flask_security import RoleMixin, UserMixin
|
||||
from flask_login import user_logged_in
|
||||
from flaskbase.extensions import db, social
|
||||
from importlib import import_module
|
||||
|
||||
# Define models
|
||||
roles_users = db.Table(
|
||||
'roles_users',
|
||||
db.Column('user_id', db.Integer(), db.ForeignKey('users.id')),
|
||||
db.Column('role_id', db.Integer(), db.ForeignKey('roles.id'))
|
||||
)
|
||||
|
||||
|
||||
class Role(db.Model, RoleMixin):
|
||||
__tablename__ = 'roles'
|
||||
|
||||
id = db.Column(db.Integer(), primary_key=True)
|
||||
name = db.Column(db.String(80), unique=True)
|
||||
description = db.Column(db.String(255))
|
||||
|
||||
def __unicode__(self):
|
||||
return '{0.description} ({0.name})'.format(self)
|
||||
|
||||
|
||||
class User(db.Model, UserMixin):
|
||||
__tablename__ = 'users'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
email = db.Column(db.String(255), unique=True)
|
||||
password = db.Column(db.String(255))
|
||||
|
||||
active = db.Column(db.Boolean())
|
||||
is_superuser = db.Column(db.Boolean(), default=False)
|
||||
|
||||
last_login_ip = db.Column(db.String(128))
|
||||
last_login_at = db.Column(db.DateTime())
|
||||
current_login_ip = db.Column(db.String(128))
|
||||
current_login_at = db.Column(db.DateTime())
|
||||
login_count = db.Column(db.Integer)
|
||||
|
||||
invited_by_id = db.Column(db.Integer, db.ForeignKey('users.id',
|
||||
ondelete='SET NULL'))
|
||||
invited_by = db.relationship('User', remote_side=[id],
|
||||
backref='invited_users')
|
||||
|
||||
confirmed_at = db.Column(db.DateTime())
|
||||
roles = db.relationship('Role', secondary=roles_users,
|
||||
backref=db.backref('users', lazy='dynamic'))
|
||||
|
||||
display_name = db.Column(db.String(128), info={
|
||||
'label': 'Display name',
|
||||
'description': 'Your email will be used instead if empty.',
|
||||
})
|
||||
|
||||
def __repr__(self):
|
||||
return '<User {0.email}>'.format(self)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.display_name or self.email
|
||||
|
||||
@property
|
||||
def image_url(self):
|
||||
if self.connections:
|
||||
return self.connections[0].image_url
|
||||
|
||||
return None
|
||||
|
||||
def first_login(self):
|
||||
"""Checks if user has just logged in (used in base template)"""
|
||||
g.first_login = session.pop('first_login', False) or \
|
||||
g.get('first_login', False)
|
||||
return g.first_login
|
||||
|
||||
@property
|
||||
def profile_url(self):
|
||||
return url_for('profile.public', id=self.id, _external=True)
|
||||
|
||||
@property
|
||||
def is_ghost(self):
|
||||
return not self.confirmed_at
|
||||
|
||||
|
||||
@user_logged_in.connect
|
||||
def track_first_login(app, user):
|
||||
"""Tracks first user login"""
|
||||
if not user.login_count:
|
||||
session['first_login'] = True
|
||||
|
||||
|
||||
class Connection(db.Model):
|
||||
__tablename__ = 'connections'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
|
||||
user = db.relationship('User', backref='connections')
|
||||
provider_id = db.Column(db.String(255))
|
||||
provider_user_id = db.Column(db.String(255))
|
||||
access_token = db.Column(db.String(255))
|
||||
secret = db.Column(db.String(255))
|
||||
email = db.Column(db.String(255))
|
||||
full_name = db.Column(db.String(255))
|
||||
display_name = db.Column(db.String(255))
|
||||
profile_url = db.Column(db.String(512))
|
||||
image_url = db.Column(db.String(512))
|
||||
rank = db.Column(db.Integer)
|
||||
|
||||
def __repr__(self):
|
||||
return '<Connection {0.provider_id}:{0.provider_user_id} {0.user}>' \
|
||||
.format(self)
|
||||
|
||||
def get_api(self):
|
||||
provider = social.providers.get(self.provider_id)
|
||||
if not provider:
|
||||
return None
|
||||
|
||||
module = import_module(provider.module)
|
||||
return module.get_api(connection=self,
|
||||
consumer_key=provider.consumer_key,
|
||||
consumer_secret=provider.consumer_secret)
|
||||
80
auth/views.py
Normal file
80
auth/views.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from flask import Blueprint, render_template, \
|
||||
redirect, url_for, session, abort
|
||||
from flask import current_app as app
|
||||
from flask_security import login_user
|
||||
from flask_security.confirmable import confirm_user
|
||||
from flask_social import connection_created, login_completed, login_failed
|
||||
from flask_social.utils import get_connection_values_from_oauth_response, \
|
||||
get_provider_or_404
|
||||
from flask_social.views import connect_handler
|
||||
|
||||
from werkzeug.local import LocalProxy
|
||||
|
||||
from flaskbase.extensions import db
|
||||
|
||||
from auth.forms import SocialLoginConfirmForm
|
||||
|
||||
blueprint = Blueprint('auth', __name__)
|
||||
|
||||
_security = LocalProxy(lambda: app.extensions['security'])
|
||||
|
||||
|
||||
@connection_created.connect
|
||||
@login_completed.connect
|
||||
def on_connection_created(app, connection=None, user=None, provider=None):
|
||||
if not connection:
|
||||
connection = provider.get_connection()
|
||||
|
||||
if not connection.user and user:
|
||||
connection.user = user
|
||||
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@login_failed.connect
|
||||
def on_login_failed(app, provider, oauth_response):
|
||||
session['failed_login_connection'] = \
|
||||
get_connection_values_from_oauth_response(provider, oauth_response)
|
||||
|
||||
abort(redirect(url_for('auth.confirm_social')))
|
||||
|
||||
|
||||
@blueprint.route('/confirm_social', methods=['GET', 'POST'])
|
||||
def confirm_social():
|
||||
connection_values = session.get('failed_login_connection', None)
|
||||
|
||||
if not connection_values:
|
||||
return redirect('/')
|
||||
|
||||
form = SocialLoginConfirmForm(
|
||||
email=connection_values.get('email', ''),
|
||||
)
|
||||
|
||||
if form.validate_on_submit():
|
||||
# Prevent Flask-Security form sending confirmation email
|
||||
_security.confirmable = False
|
||||
|
||||
kwargs = {'email': form.email.data}
|
||||
if connection_values.get('full_name'):
|
||||
kwargs['display_name'] = connection_values['full_name']
|
||||
|
||||
# Create and login user
|
||||
user = _security.datastore.create_user(**kwargs)
|
||||
confirm_user(user)
|
||||
_security.datastore.commit()
|
||||
|
||||
login_user(user)
|
||||
|
||||
# TODO: possibly move it to user_logged_in signal handler?
|
||||
|
||||
# Process pending social connection
|
||||
connection_values = session.pop('failed_login_connection', None)
|
||||
connection_values['user_id'] = user.id
|
||||
connect_handler(
|
||||
connection_values,
|
||||
get_provider_or_404(connection_values['provider_id']))
|
||||
|
||||
_security.datastore.commit()
|
||||
return redirect('/')
|
||||
|
||||
return render_template('security/confirm_social.html', form=form)
|
||||
Reference in New Issue
Block a user