前言
接上篇文章 [原创]django+ldap实现统一认证部分一(django-auth-ldap实践) 继续实现我们的统一认证
python-ldap
我在sso项目的backend/lib/common/下添加一个ldaphelper.py文件,其中定义一个类
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import ldap
import ldap.modlist as modlist
# 加载log配置
import logging
logger = logging.getLogger()
import sys
reload(sys)
sys.setdefaultencoding('utf8')
'''
实现LDAP用户登录验证,首先获取用户的dn,然后再验证用户名和密码
'''
from SSOadmin import settings
# 登陆 地址
LDAP_URI = settings.AUTH_LDAP_SERVER_URI
# 登陆 账户
LDAP_USER = settings.AUTH_LDAP_BIND_DN
# 登陆 密码
LDAP_PASS = settings.AUTH_LDAP_BIND_PASSWORD
# 默认 区域
BASE_DN = settings.base_dn
class LDAPTool(object):
def __init__(self,
ldap_uri=None,
base_dn=None,
user=None,
password=None):
"""
初始化
:param ldap_uri: ldap_uri
:param base_dn: 区域
:param user: 默认用户
:param password: 默认密码
:return:
"""
if not ldap_uri:
ldap_uri = LDAP_URI
if not base_dn:
self.base_dn = BASE_DN
if not user:
self.admin_user = LDAP_USER
if not password:
self.admin_password = LDAP_PASS
try:
self.ldapconn = ldap.initialize(ldap_uri) # 老版本使用open方法
self.ldapconn.simple_bind(self.admin_user, self.admin_password) # 绑定用户名、密码
except ldap.LDAPError, e:
logger.error('ldap conn失败,原因为: %s' % str(e))
def ldap_search_dn(self, value=None, value_type='uid'):
"""
# 根据表单提交的用户名,检索该用户的dn,一条dn就相当于数据库里的一条记录。
# 在ldap里类似cn=username,ou=users,dc=gccmx,dc=cn,验证用户密码,必须先检索出该DN
:param value: 用户 uid或 组cn
:param value_type: 用户 uid|cn
:return: search result
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
if value_type == 'cn':
searchFilter = "cn=" + value
else:
searchFilter = "uid=" + value
try:
ldap_result_id = obj.search(
base=self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
return result_data
else:
return None
except ldap.LDAPError, e:
logger.error('ldap search %s 失败,原因为: %s' % (value, str(e)))
def ldap_get_user(self, uid=None):
"""
通过查询用户uid,从ldap_search_dn进一步提取所需数据,search到的是全部信息
:param uid:
:return: {‘uid’:'zhangsan','mail':'zhangsan@xxx.com','cn':'张三'}
"""
result = None
try:
search = self.ldap_search_dn(value=uid, value_type=uid)
if search is None:
raise ldap.LDAPError('未查询到相应 id')
for user in search:
if user[1]['uid'][0] == uid:
result = {
'uid': uid,
'mail': user[1]['mail'][0],
'cn': user[1]['cn'][0],
}
except Exception, e:
logger.error('获取用户%s 失败,原因为: %s' % (uid, str(e)))
return result
def __ldap_getgid(self, cn="员工"):
"""
查询 组cn对应的gid
:param cn: 组cn
:return: 对应cn的gidNumber
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = None
searchFilter = "cn=" + cn
try:
ldap_result_id = obj.search(
base="ou=Group,%s" % self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_type, result_data = obj.result(ldap_result_id, 0)
if result_type == ldap.RES_SEARCH_ENTRY:
return result_data[0][1].get('gidNumber')[0]
else:
return None
except ldap.LDAPError, e:
logger.error('获取gid失败,原因为: %s' % str(e))
def __get_max_uidNumber(self):
"""
查询 当前最大的uid,这个是在添加用户时,用于自增uid
:param: None
:return: max uidNumber
"""
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
searchScope = ldap.SCOPE_SUBTREE
retrieveAttributes = ['uidNumber']
searchFilter = "uid=*"
try:
ldap_result = obj.search(
base="ou=People,%s" % self.base_dn,
scope=searchScope,
filterstr=searchFilter,
attrlist=retrieveAttributes
)
result_set = []
while True:
result_type, result_data = obj.result(ldap_result, 0)
if not result_data:
break
else:
if result_type == ldap.RES_SEARCH_ENTRY:
result_set.append(int(result_data[0][1].get('uidNumber')[0]))
return max(result_set) + 1
except ldap.LDAPError, e:
logger.error('获取最大uid失败,原因为: %s' % str(e))
def ldap_add_user(self, cn, mail, username, password):
"""
添加ldap用户
:param cn: 中文名, mail: 邮箱, username: 用户名, password: 密码
:return: True/None
"""
result = None
try:
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
addDN = "uid=%s,ou=People,%s" % (username, BASE_DN)
attrs = {}
attrs['objectclass'] = ['top', 'person', 'inetOrgPerson', 'posixAccount', 'organizationalPerson']
attrs['cn'] = str(cn)
attrs['homeDirectory'] = str('/home/%s' % username)
attrs['loginShell'] = '/bin/bash'
attrs['mail'] = str(mail)
attrs['sn'] = str(username)
attrs['uid'] = str(username)
attrs['userPassword'] = str(password)
attrs['uidNumber'] = str(self.__get_max_uidNumber())
attrs['gidNumber'] = self.__ldap_getgid(cn='员工')
ldif = ldap.modlist.addModlist(attrs)
obj.add_s(addDN, ldif)
obj.unbind_s()
result = True
except ldap.LDAPError, e:
logger.error("生成用户%s 失败,原因为: %s" % (username, str(e)))
return result
def check_user_belong_to_group(self, uid, group_cn='员工'):
"""
查询 用户 是否归属于某个组
:param uid: 用户uid , Ex: 'ssoadmin'
:param group_cn: 归属组cn , Ex: '黑名单'
:return: True|None
"""
result = None
try:
search = self.ldap_search_dn(value=group_cn, value_type='cn')
if search is None:
raise ldap.LDAPError('未查询到相应 id')
member_list = search[0][1].get('memberUid', [])
if uid in member_list:
result = True
except ldap.LDAPError, e:
logger.error('获取用户%s与组%s关系失败,原因为: %s' % (uid, group_cn, str(e)))
return result
def check_user_status(self, uid):
"""
验证用户状态
:param uid: 用户uid
:return: 200: 用户可用
404: 用户不存在
403: 用户被禁用
"""
result = 404
try:
target_cn = self.ldap_get_user(uid=uid)
if target_cn is None: # 如未查到用户,记录日志,但不算错误,后边有很多地方会验证用户是否存在
result = 404
logger.debug("%s uid未查询到" % uid)
else:
if self.check_user_belong_to_group(uid=uid, group_cn='黑名单'):
result = 403
else:
result = 200
except ldap.LDAPError, e:
logger.error("%s 检查用户状态失败,原因为: %s" % (uid, str(e)))
return result
def ldap_update_password(self, uid, new_password):
"""
更新密码
:param uid: 用户uid,新password
:return: True|None
"""
result = None
try:
obj = self.ldapconn
obj.protocal_version = ldap.VERSION3
modifyDN = "uid=%s,ou=People,%s" % (uid, BASE_DN)
# 因为是更新密码,如用passwd_s方法需要oldpassword,如果用下边方法,是增加一个新密码,而不是替换,而我们的需求是重置密码
# old_password = {'userPassword': ''}
# new_password = {'userPassword': new_password}
# ldif = modlist.modifyModlist(old_password, new_password)
# obj = modlist.modifyModlist(modifyDN, ldif)
# 以下方法实现密码替换的效果,第二个参数就是要替换的属性名,可以变更其他属性
obj.modify_s(modifyDN, [(ldap.MOD_REPLACE, 'userPassword', [str(new_password)])])
obj.unbind_s()
result = True
except ldap.LDAPError, e:
logger.error("%s 密码更新失败,原因为: %s" % (uid, str(e)))
return result
# for test
def main():
# print type(LDAPTool().get_max_uidNumber())
# print(LDAPTool().ldap_search_dn(value='qudong'))
# print(LDAPTool().check_user_belong_to_group('qudong', '黑名单'))
# print LDAPTool().ldap_get_user(uid='qudong')
# print(LDAPTool().check_user_and_email(uid='qudong', email='qudong@ssotest.com'))
# s=LDAPTool()
# print s.ldap_add_user('哈喽2','test222@ssotest.com','test222','test222')
# print(s._LDAPTool__ldap_getgid('黑名单'))
# print(LDAPTool().ldap_update_password('qudong','111111'))
pass
if __name__ == '__main__':
main()
view调用方面,就没什么太多说的了,就是把原先对本地数据库的user表的增删改查,移植到ldaphelper上即可,下边以我的修改密码的逻辑为例做以说明,其他的包括注册、登录都大同小异了。修改密码还涉及到一个验证码发送和验证的逻辑,可以用于参考
models.py,这里自定义了user表,参考Django admin定制化,User字段扩展[原创],自定义扩展user表,额外就多了一张验证码表
class MyUser(AbstractUser):
name = models.CharField(u'中文名', max_length=32, blank=False, null=False)
class Meta:
verbose_name = u'用户详情'
verbose_name_plural = u"用户详情"
class User_ex(models.Model):
"""User models ex"""
email = models.EmailField(unique=True, blank=False, null=False)
valid_code = models.CharField(max_length=24) #验证码
valid_time = models.DateTimeField(auto_now=True) #验证码有效时间
class Meta:
verbose_name = u'验证码'
verbose_name_plural = u"验证码"
def __unicode__(self):
return u'%s' % self.valid_code
获取验证码的view,生成验证码,入库,发送验证码邮件,发送邮件的方法send_mail在其他地方写好了
from web_sso.forms.account import LoginForm, RegisterForm, ForgetPwdForm
from web_sso import models
from backend.lib.common.sendemail import send_mail
from backend.lib.common.ldaphelper import LDAPTool
import logging
......
def get_email_code(request):
"""get email code"""
email = request.GET.get('email', '')
type = request.GET.get('type', '')
code = ''.join(random.sample(string.digits + string.letters, 4))
data = {}
data['success'] = False
data['message'] = ''
try:
# 检查邮箱
username = email.rsplit('@', 1)[0]
user_count = LDAPTool().check_user_status(uid=username)
# user_count = models.MyUser.objects.filter(email=email).count()
if type == "register":
if user_count != 404:
data['success'] = False
data['message'] = u'此用户%s已被注册过' % username
raise Exception(data['message'])
elif type == "forget_pwd":
if user_count == 404:
data['success'] = False
data['message'] = u'此邮箱未被注册过'
raise Exception(data['message'])
# 检查短时间内是否有生成过验证码
user_ex = models.User_ex.objects.filter(email=email)
if len(user_ex) > 0:
user_ex = user_ex[0]
# 两个datetime相减,得到datetime.timedelta类型
create_time = user_ex.valid_time
td = timezone.now() - create_time
if td.seconds < 60:
data['message'] = u'1分钟内发送过一次验证码'
raise Exception(data['message'])
# 发送邮件
subject = ""
if type == "register":
subject = u'[sa.ssotest.net]激活您的帐号'
elif type == "forget_pwd":
subject = u'[sa.ssotest.net]重置密码'
message = u"""
<h2>运维平台(<a href='http://sa.ssotest.net/' target=_blank>sa.ssotest.net</a>)<h2><br />
<table border="1px" cellpadding="3" cellspacing="0">
<thead></thead>
<tr bgcolor="#d3d3d3">
<th>邮箱</th>
<th>验证码</th>
</tr>
<tbody>
<tr>
<td>%s</td>
<td>%s</td>
</tr>
</tbody>
</table>
<br/><span style="color: red;font-size: medium">请保管好您的验证,有效期10分钟</span>
""" % (email, code)
mail_status = send_mail(subject, message, email, email)
if mail_status == 200:
# 邮件发送成功才入库验证码
models.User_ex.objects.filter(email=email).delete()
models.User_ex.objects.create(email=email, valid_code=code)
data['success'] = True
data['message'] = 'OK'
else:
data['success'] = False
data['message'] = "邮件发送失败,错误码:%s" % mail_status
raise Exception(data['message'])
except Exception:
pass
finally:
return HttpResponse(json.dumps(data), content_type="application/json")
验证码验证成功后,通过ldaphelper修改ldap中的对应用户密码,我这边注释的部分就是未使用ldap,使用数据库入库的方式,可作参考
def forget_pwd(request):
error = ''
forgetpwd_form = ForgetPwdForm(request.POST)
try:
if request.method == 'POST':
check = request.POST.get('checkcode', None).lower()
email = request.POST.get('email', None)
if not email:
error = '请输入邮箱.'
raise Exception('请输入邮箱.')
username = email.rsplit('@', 1)[0]
email_obj = models.User_ex.objects.get(email=email)
email_check = email_obj.valid_code.lower()
# 10分钟有效期
email_check_time = email_obj.valid_time
if (timezone.now() - email_check_time).seconds >= 600:
error = '验证码失效.'
elif check != email_check:
error = '验证码错误.'
else:
if not forgetpwd_form.is_valid():
error = '格式错误.'
else:
data = forgetpwd_form.clean()
ldap_obj = LDAPTool()
# if not models.MyUser.objects.filter(Q(username=username) | Q(email=email)):
if ldap_obj.check_user_status(uid=username) == 404:
error = '此邮箱未被注册过.'
else: # 重置密码成功
ldap_obj.ldap_update_password(uid=username, new_password=data.get('password', ''))
# user_obj = models.MyUser.objects.get(username=username,
# email=data.get('email', ''), )
# user_obj.set_password(data.get('password', ''))
# user_obj.save()
target = '%s?from=%s&username=%s' % ('/account/login/', 'forget_pwd', username)
return redirect(target)
except Exception, e:
logger.error(str(e))
return render(request, 'account/forget_pwd.html', {'model': forgetpwd_form, 'error': error})
综上,我们已经可以和ldap结合的比较好了,其中django-auth-ldap实现的功能,一般像一些成熟的软件,如csvn、jira、openvpn都可以通过配置来实现,python-ldap实现的部分是这个系统的核心所在,通过我们的平台对ldap进行操作,当然也可以使用ldap管理后台(我这边用的lam)来实现类似功能。
后边还有单点登录的实现部分,请看后续文章。
[原创]django+ldap实现单点登录(装饰器和缓存)
参考资料
http://www.vpsee.com/2012/11/use-python-ldap-to-create-read-delete-upgrade-ldap-entries/
http://www.grotan.com/ldap/python-ldap-samples.html#bind