Server = "ldap://127.0.0.1:389" # 明文访问
MServer = "ldaps://127.0.0.1:636" # 密文访问
baseDN = "OU=xx,DC=xx,DC=xx" # 访问起始目录
username = "xxx" # ldap中用户名
password = "xxx" # ldap中密码
import ldap
class LdapModel():
def login_ldap(self,userId):
try:
print("开始执行")
searchScope = ldap.SCOPE_SUBTREE
# 设置过滤属性,这里只显示cn=test的信息
searchFilter = "sAMAccountName=" + userId
# 为用户名加上域名
#username = 'domainname\\' + userId
# None表示搜索所有属性,['cn']表示只搜索cn属性
retrieveAttributes = None
conn = ldap.initialize(Server)
# 非常重要
conn.set_option(ldap.OPT_REFERRALS, 0)
conn.protocol_version = ldap.VERSION3
# 这里用户名是域账号的全名例如domain/name
conn.simple_bind_s(username, password)
print('ldap connect successfully')
# 调用search方法返回结果id
ldap_result_id = conn.search(baseDN, searchScope, searchFilter, retrieveAttributes)
result_set = []
print(ldap_result_id)
print("****************")
while 1:
result_type, result_data = conn.result(ldap_result_id, 0)
if (result_data == []):
break
else:
if result_type == ldap.RES_SEARCH_ENTRY:
result_set.append(result_data)
# print result_set
Name, Attrs = result_set[0][0]
#print(Name)
#print(Attrs)
#if hasattr(Attrs, 'has_key') and 'name' in Attrs:
if 'name' in Attrs:
needKeyList=['sn','sAMAccountName','mobile','description']
resultMap={}
for key in Attrs.keys():
values=Attrs[key]
if isinstance(values,list):
#print("======",key)
valueList=[]
for item in values:
try:
#print(str(item,'utf-8'))
valueList.append(str(item,'utf-8'))
except Exception as e:
print(values)
print(e)
if key in needKeyList:
resultMap[key]=valueList
else:
pass
#print(key,Attrs[key])
return resultMap
else:
print("in error")
return None
except Exception as ex:
print(ex)
return None
def checkValue(self,name,userid,mobile):
rbool=False
rMsg=""
resultMap=None
try:
resultMap=self.login_ldap(userid)
if resultMap and len(resultMap) > 0:
# check data {'sn': ['刘政'], 'description': ['liuzheng2'], 'sAMAccountName': ['06308'], 'mobile': ['18128879095']}
if name in resultMap["sn"]:
if mobile in resultMap["mobile"]:
rbool=True
else:
rMsg="用户名不一致"
else:
rMsg="手机号码不一致"
else:
rMsg="query ldap no data!"
except Exception as ex:
print("ldap login_ldap")
print(ex)
return rbool,rMsg,resultMap
# 修改LDAP密码
# userId: LDAP uid
# newpwd: 新密码
def change_password(self,userId,newpwd):
try:
# 如果是self-signed cert, 加上这行
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
# 初始化LDAP连接
print("url",MServer)
conn = ldap.initialize(MServer)
#conn.protocol_version = ldap.VERSION3
# ---------
conn.set_option( ldap.OPT_X_TLS_DEMAND, True )
conn.set_option( ldap.OPT_DEBUG_LEVEL, 255 )
# 使用管理员
conn.simple_bind_s(username,password)
print("connecting successfully............")
searchScope = ldap.SCOPE_SUBTREE
# 设置过滤属性,这里只显示cn=test的信息
searchFilter = "sAMAccountName=" + userId
# None表示搜索所有属性,['cn']表示只搜索cn属性
retrieveAttributes = None
# 调用search方法返回结果id
ldap_result = conn.search_s(baseDN, searchScope, searchFilter, retrieveAttributes) # 返回该用户的所有信息,类型列表
print("query successfully ..............")
# 修改密码
if ldap_result:
user_dn = ldap_result[0][0] # 获取用户的cn,ou,dc
try:
unicode_pass = str('\"' + newpwd + '\"')
print (unicode_pass)
new_password = unicode_pass.encode('utf-16-le')
mod_attrs = [( ldap.MOD_REPLACE, 'unicodePwd', new_password)]
print(user_dn,mod_attrs)
conn.modify_s(user_dn, mod_attrs)
conn.unbind_s()
print("update password successfully............")
except Exception as e:
print(e)
else:
print("查询失败..........")
#conn.extend.microsoft.modify_password(dn, newpwd)
return True,"Password changed!"
except Exception as ex:
return False,"Oops! Something wrong: %s" % ex