运维开发网

同步钉钉通讯录

运维开发网 https://www.qedev.com 2020-10-11 08:26 出处:51CTO 作者:qinshixu
#-*-coding:utf-8-*-#author:qinshixuimportrequestsimportloggingclassDingding(object):"""dingdingcontrollerincludeget_user_infosend_message"""ding_api="https://oapi.dingtalk.com/"def__init__(self,copy_i
# -*- coding: utf-8 -*-
# author: qinshixu
import requests
import logging

class Dingding(object):
    """
    dingding controller include get_user_info send_message
    """
    ding_api = "https://oapi.dingtalk.com/"

    def __init__(self, copy_id=None, copy_secret=None, agent_id=None):
        self.logger = logging.getLogger("dingding")
        self.copy_id = copy_id
        self.copy_secret = copy_secret
        self.agent_id = agent_id
        self._init_token()

    def _init_token(self):
        get_token_url = "%s%s?corpid=%s&corpsecret=%s" % (
            self.ding_api, "gettoken", self.copy_id, self.copy_secret)
        try:
            res = requests.get(get_token_url)
            if res.status_code != 200:
                self.access_token = None
                self.logger.error(
                    "Get dingding access_token error: msg:%s" % res.text)
            elif res.json()["errcode"] != 0:
                self.access_token = None
                self.logger.error(
                    "Get dingding access_token error: msg:%s" % res.json()["errmsg"])
            else:
                self.access_token = res.json()["access_token"]
        except Exception as e:
            self.logger.error(str(e))

    def department_list(self):
        get_department_url = "%sdepartment/list?access_token=%s" % (self.ding_api, self.access_token)
        try:
            res = requests.get(get_department_url)
            if res.status_code != 200:
                self.logger.error(
                    "Get dingding department error: msg:%s" % res.text)
            elif res.json()["errcode"] != 0:
                self.logger.error("Get dingding department error: msg:%s" % res.json()["errmsg"])
            for i in res.json()["department"]:
                parentid = i["parentid"] if not i["createDeptGroup"] else None
                department = {
                    "name": i["name"],
                    "id": i["id"],
                    "parentid": parentid
                }
                yield department
        except Exception as e:
            self.logger.error(str(e))
            return None

    def get_department_users(self, dep_id=None):
        get_users_url = "%s%s?access_token=%s&department_id=%s&offset=0&size=100" % (
            self.ding_api, "user/listbypage", self.access_token, dep_id)
        res = requests.get(get_users_url)
        if res.status_code != 200:
            self.logger.error("Get depart users error: msg:%s" % res.text)
            return None
        elif res.json()["errcode"] != 0:
            self.logger.error("Get depart users error: msg:%s" %
                              res.json()["errmsg"])
            return None
        else:
            dep_users = res.json()["userlist"]
            return dep_users

    def get_all_users(self):
        for i in self.department_list():
            for j in self.get_department_users(i["id"]):
                userinfo = {}
                userinfo["mobile"] = j.get("mobile")
                userinfo["position"] = j.get("position")
                userinfo["userid"] = j.get("userid")
                userinfo["name"] = j.get("name")
                userinfo["isLeader"] = j.get("isLeader")
                userinfo["openId"] = j.get("openId")
                userinfo["department"] = sorted(j.get("department"))[-1]
                userinfo["email"] = j.get("orgEmail")
                if j.get("orgEmail") is None:
                    userinfo["email"] = j.get("email")
                if not userinfo["email"]:
                    print("%s no config email" % j.get("name"))
                    continue
                yield userinfo

    def get_user_info(self, username):
        for userinfo in self.get_all_users():
            if userinfo.email.split("@")[0] == username:
                return userinfo
        return None

# example

# if __name__ == "__main__":
#     ding = Dingding("dingk*************", "****", "****")
#     for i in ding.get_all_users():
#         print(i)

扫码领视频副本.gif

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号