工程目录结构
jenkinsrole.py
import requests
import json
class JenkinsRole:
def __init__(self, host, username, port=8080, password=None, token=None, ssl=False):
"""
password和token使用其中一个即可
:param host: Jenkins主机
:param username: 管理员用户
:param port: Jenkins端口
:param password: 管理员密码
:param token: 管理员的Token
:param ssl: Jenkins地址是否是https协议
"""
self.host = host
self.username = username
self.port = port
self.password = password
self.token = token
self.ssl = ssl
@property
def pwd_or_token(self):
if self.password and self.token:
raise ConnectionError("password与token填写一个即可")
return self.password if self.password else self.token
@property
def proto(self):
return 'https' if self.ssl else 'http'
def get_crumb(self) -> dict:
res = requests.get(
f'{self.proto}://{self.username}:{self.pwd_or_token}@{self.host}:{self.port}/crumbIssuer/api/xml?'
f'xpath=concat(//crumbRequestField,":",//crumb)')
return {res.text.split(':')[0]: res.text.split(':')[1]}
def add_role(self, role_type, role_name, permissions: str, role_pattern=None, overwrite=True):
"""
添加角色
如果添加的权限不属于对应的角色类型,两种情况:
1、添加的权限都不属于对应的角色类型,则会添加一个空权限的角色
比如向projectRoles中添加视图权限hudson.model.View.Create命名为p1,
则在projectRoles列表中依然会添加p1角色,但是该角色没有任何权限
2、添加的权限部分不属于对应的角色类型,则会将属于该角色类型的权限添加上
:param role_type: 只能是globalRoles或projectRoles或slaveRoles
:param role_name: 角色名称
:param permissions: 角色ID,多个角色ID使用 , 号隔开,比如:'hudson.model.Hudson.Read,hudson.model.Computer.Build'
:param role_pattern: 角色模式,支持正则表达式,当添加的是项目角色时需要指定
:param overwrite: 如果新增的权限已经存在是否覆盖,如果选择不覆盖,即使权限已经存在,也不会返回任何报错
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
if role_type in ('projectRoles', 'slaveRoles') and not role_pattern:
raise AttributeError("如果增加项目权限或节点权限,必须指定role_pattern,否则将匹配 .* ")
role_data = {
"type": role_type,
"roleName": role_name,
"permissionIds": permissions,
"overwrite": overwrite,
"pattern": role_pattern
}
headers = self.get_crumb()
res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/addRole', data=role_data,
headers=headers, auth=(self.username, self.pwd_or_token))
return res.status_code
def get_role(self, role_type, role_name):
"""
获取指定角色的详细,返回结果示例:
{'permissionIds': {'hudson.model.Computer.Build': True}, 'sids': ['admin']}
:param role_type:
:param role_name:
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
params = {
"type": role_type,
"roleName": role_name
}
res = requests.get(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/getRole',
params=params, auth=(self.username, self.pwd_or_token))
return res.json()
def remove_roles(self, role_type, role_names: str):
"""
删除权限
:param role_type:
:param role_names: 多个角色用 , 号隔开
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
data = {
'type': role_type,
'roleNames': role_names
}
headers = self.get_crumb()
res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/removeRoles', data=data,
headers=headers, auth=(self.username, self.pwd_or_token))
return res.status_code
def assign_role(self, role_type, role_name, sid):
"""
将某个角色赋予某个用户
注意:如果赋予用户某个不存在的权限也不会报错
:param role_type:
:param role_name: (单个角色)
:param sid: 用户名称(单个用户)
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
data = {
'type': role_type,
'roleName': role_name,
'sid': sid
}
headers = self.get_crumb()
res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/assignRole', data=data,
headers=headers, auth=(self.username, self.pwd_or_token))
return res.status_code
def delete_roles_from_sid(self, role_type, sid):
"""
删除指定用户所有的相关权限
注意:如果指定了一个不存在的用户,也不会报错
:param role_type:
:param sid: 单个用户
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
data = {
'type': role_type,
'sid': sid
}
headers = self.get_crumb()
res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/deleteSid', data=data,
headers=headers, auth=(self.username, self.pwd_or_token))
return res.status_code
def unassign_role(self, role_type, role_name, sid):
"""
删除指定用户的某个权限
注意:即使指定一个不存在的用户或不存在的role,也不会返回错误
:param role_type:
:param role_name:
:param sid:
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
data = {
'type': role_type,
'roleName': role_name,
'sid': sid
}
headers = self.get_crumb()
res = requests.post(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/unassignRole', data=data,
headers=headers, auth=(self.username, self.pwd_or_token))
return res.status_code
def get_all_roles(self, role_type):
"""
获取指定类型角色下的所有角色以及角色下的用户
返回结果示例:{"p1":[],"p2":["zm"],"test":["zm"]}
:param role_type:
:return:
"""
if role_type not in ('globalRoles', 'projectRoles', 'slaveRoles'):
raise AttributeError("role_type必须是'globalRoles', 'projectRoles', 'slaveRoles' 其中一个")
params = {
"type": role_type
}
res = requests.get(f'{self.proto}://{self.host}:{self.port}/role-strategy/strategy/getAllRoles',
params=params, auth=(self.username, self.pwd_or_token))
return res.json()
ldapuser.py
from ldap3 import Server, Connection, SAFE_SYNC
import time
import re
def ldap_get_uid(ldap_server, ldap_user, ldap_pwd, ldap_search_base):
uidlist = []
server = Server(ldap_server)
conn = Connection(server, ldap_user, ldap_pwd, client_strategy=SAFE_SYNC, auto_bind=True)
status, result, response, _ = conn.search(ldap_search_base, '(objectclass=*)', attributes=['*'])
for i in response:
searchObj = re.search('uid=\d+', i['dn'])
if searchObj:
uidObj = re.search('\d+', searchObj.group())
if uidObj:
uidlist.append(uidObj.group())
return uidlist
jenkins_user_grant.py
import myjenkins
from myjenkins import jenkinsrole
import myldap
from myldap import ldapuser
import json
import re
#获取role信息
def get_role_info(role_type, role_name):
jekins_role_json = j.get_role(role_type, role_name)
js_fomat = json.dumps(jekins_role_json, sort_keys=True, indent=4)
print(js_fomat)
#将某个角色赋予某个用户
def role_to_user(role_type, role_name, username):
res = j.assign_role(role_type, role_name, username)
print('%s to %s %d' %(role_name, username, res))
#删除指定用户的某个权限
def role_cancel_user(role_type, role_name, username):
res = j.unassign_role(role_type, role_name, username)
print('%s del %s %d' %(username, role_name, res))
j = jenkinsrole.JenkinsRole('192.168.41.29', '5010905')
j.host = "192.168.41.29"
j.port = 8080
j.username = "5010905"
j.token = '1130c5e4c3968bfe4c64edb9f0a8ecc5c5'
ldap_server = '192.168.41.13'
ldap_user = 'cn=admin,cn=manager,dc=pre,dc=venusgroup,dc=com,dc=cn'
ldap_pwd = 'root@123'
ldap_search_base = 'ou=6334,ou=4474,ou=4260,ou=4259,ou=employee,dc=pre,dc=venusgroup,dc=com,dc=cn'
user_list = ldapuser.ldap_get_uid(ldap_server, ldap_user, ldap_pwd, ldap_search_base)
role_type = 'projectRoles'
role_name = 'item_other'
# for user in user_list:
# role_to_user(role_type, role_name, user)
# role_cancel_user(role_type, role_name, user)
role_list = j.get_all_roles(role_type)
# for role in role_list.keys():
# if not role == 'item_admin':
# role_to_user(role_type, role, '5010958')
# role_cancel_user(role_type, role, '5010958')
# for role in role_list.keys():
# if re.search('xxl-job', role):
# role_to_user(role_type, role, '5010958')
# role_cancel_user(role_type, role, '5010958')
for role in role_list.keys():
if re.findall(r'alm|hrm|common', role):
role_to_user(role_type, role, '5010958')
# role_cancel_user(role_type, role, '5010958')