>

日记记录

- 编辑:正版管家婆马报彩图 -

日记记录

Python开采【Django】:日志记录、API认证,pythondjango

日记记录:

  调用同三个目标,分别记录错误日志和平运动行日志

自定义日志类:

class Logger(object):
    __instance = None           

    def __init__(self):
        self.run_log_file = settings.RUN_LOG_FILE
        self.error_log_file = settings.ERROR_LOG_FILE
        self.run_logger = None
        self.error_logger = None

        self.initialize_run_log()
        self.initialize_error_log()

    def __new__(cls, *args, **kwargs):              # 单例模式
        if not cls.__instance:
            cls.__instance = object.__new__(cls, *args, **kwargs)
        return cls.__instance

    @staticmethod
    def check_path_exist(log_abs_file):
        log_path = os.path.split(log_abs_file)[0]
        if not os.path.exists(log_path):
            os.mkdir(log_path)

    def initialize_run_log(self):
        self.check_path_exist(self.run_log_file)
        file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8')
        fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s :  %(message)s")
        file_1_1.setFormatter(fmt)
        logger1 = logging.getLogger('run_log')          # 'run_log' 随意写
        logger1.setLevel(logging.INFO)                  # 效果与error里logging.Logger一样
        logger1.addHandler(file_1_1)
        self.run_logger = logger1

    def initialize_error_log(self):
        self.check_path_exist(self.error_log_file)
        file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8')
        fmt = logging.Formatter(fmt="%(asctime)s  - %(levelname)s :  %(message)s")
        file_1_1.setFormatter(fmt)
        logger1 = logging.Logger('run_log', level=logging.ERROR)
        logger1.addHandler(file_1_1)
        self.error_logger = logger1

    def log(self, message, mode=True):
        """
        写入日志
        :param message: 日志信息
        :param mode: True表示运行信息,False表示错误信息
        :return:
        """
        if mode:
            self.run_logger.info(message)
        else:
            self.error_logger.error(message)

调用格局: 

if ret['code'] == 1000:
    print(IP,'更新成功')
    Logger().log(ret['message'], True)
else:
    Logger().log(ret['message'], False)

  

日记记录:

  调用同二个目的,分别记录错误日志和平运动行日志

自定义日志类:

class Logger(object):
    __instance = None           

    def __init__(self):
        self.run_log_file = settings.RUN_LOG_FILE
        self.error_log_file = settings.ERROR_LOG_FILE
        self.run_logger = None
        self.error_logger = None

        self.initialize_run_log()
        self.initialize_error_log()

    def __new__(cls, *args, **kwargs):              # 单例模式
        if not cls.__instance:
            cls.__instance = object.__new__(cls, *args, **kwargs)
        return cls.__instance

    @staticmethod
    def check_path_exist(log_abs_file):
        log_path = os.path.split(log_abs_file)[0]
        if not os.path.exists(log_path):
            os.mkdir(log_path)

    def initialize_run_log(self):
        self.check_path_exist(self.run_log_file)
        file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8')
        fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s :  %(message)s")
        file_1_1.setFormatter(fmt)
        logger1 = logging.getLogger('run_log')          # 'run_log' 随意写
        logger1.setLevel(logging.INFO)                  # 效果与error里logging.Logger一样
        logger1.addHandler(file_1_1)
        self.run_logger = logger1

    def initialize_error_log(self):
        self.check_path_exist(self.error_log_file)
        file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8')
        fmt = logging.Formatter(fmt="%(asctime)s  - %(levelname)s :  %(message)s")
        file_1_1.setFormatter(fmt)
        logger1 = logging.Logger('run_log', level=logging.ERROR)
        logger1.addHandler(file_1_1)
        self.error_logger = logger1

    def log(self, message, mode=True):
        """
        写入日志
        :param message: 日志信息
        :param mode: True表示运行信息,False表示错误信息
        :return:
        """
        if mode:
            self.run_logger.info(message)
        else:
            self.error_logger.error(message)

调用格局: 

if ret['code'] == 1000:
    print(IP,'更新成功')
    Logger().log(ret['message'], True)
else:
    Logger().log(ret['message'], False)

  

API认证: 

  密钥串+时间戳发送进行求证,超过超时时间注明退步,认证过的密钥过时清空

客户端:

import hashlib
import time
import requests

class Client(object):

    def __init__(self):
        self.key = '299095cc-1330-11e5-b06a-a45e60bec08b'
        self.key_name = 'auth-key'
        self.asset_api = 'http://127.0.0.1:8000/api/'

    def auth_key(self):
        """
        接口认证
        """
        ha = hashlib.md5(self.key.encode('utf-8'))      # 用self.key 做加密盐
        time_span = time.time()
        ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8'))
        encryption = ha.hexdigest()
        result = "%s|%f" % (encryption, time_span)
        return {self.key_name: result}

    def get_asset(self):
        """
        post方式向街口提交资产信息
        """
        headers = {}
        headers.update(self.auth_key())
        response = requests.get(
            url=self.asset_api,
            headers=headers,
        )

        print(response.text)

Client().get_asset()

服务端:  

from django.views import View
import time
import hashlib

ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b'     #认证的KEY
ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY'        # 认证头
ASSET_AUTH_TIME = 2             # 超时时间
ENCRYPT_LIST  = []   #存放认证过的key

def api_auth(request):

    auth_key = request.META.get(ASSET_AUTH_HEADER_NAME)
    if not auth_key:        # 请求认证头不正确
        return False
    sp = auth_key.split('|')
    if len(sp) != 2:        # 格式不正确
        return False
    encrypt, timestamp = sp
    timestamp = float(timestamp)    # str换成float
    limit_timestamp = time.time() - ASSET_AUTH_TIME

    if limit_timestamp > timestamp:     # 当前程序时间与客户端时间戳对比 超时
        return False
    ha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8'))
    ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8'))
    result = ha.hexdigest()
    if encrypt != result:           # md5值校验
        return False

    exist = False
    del_keys = []
    for k, v in enumerate(ENCRYPT_LIST):   # 记录当前认证,如果在过期内已经认证过,则认证失败,另过期认证记录删除
        m = v['time']
        n = v['encrypt']
        if m < limit_timestamp:
            del_keys.append(k)
            continue
        if n == encrypt:
            exist = True
    for k in del_keys:
        del ENCRYPT_LIST[k]

    if exist:
        return False
    ENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp})
    return True


class Api(View):

    def get(self, request):
        result = api_auth(request)
        if result:
            return HttpResponse('认证成功')
        else:
            return HttpResponse('去你的吧')

 

  

 

API认证: 

  密钥串+时间戳发送实行求证,当先超时时间验证失利,认证过的密钥过时清空

客户端:

import hashlib
import time
import requests

class Client(object):

    def __init__(self):
        self.key = '299095cc-1330-11e5-b06a-a45e60bec08b'
        self.key_name = 'auth-key'
        self.asset_api = 'http://127.0.0.1:8000/api/'

    def auth_key(self):
        """
        接口认证
        """
        ha = hashlib.md5(self.key.encode('utf-8'))      # 用self.key 做加密盐
        time_span = time.time()
        ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8'))
        encryption = ha.hexdigest()
        result = "%s|%f" % (encryption, time_span)
        return {self.key_name: result}

    def get_asset(self):
        """
        post方式向街口提交资产信息
        """
        headers = {}
        headers.update(self.auth_key())
        response = requests.get(
            url=self.asset_api,
            headers=headers,
        )

        print(response.text)

Client().get_asset()

服务端:  

from django.views import View
import time
import hashlib

ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b'     #认证的KEY
ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY'        # 认证头
ASSET_AUTH_TIME = 2             # 超时时间
ENCRYPT_LIST  = []   #存放认证过的key

def api_auth(request):

    auth_key = request.META.get(ASSET_AUTH_HEADER_NAME)
    if not auth_key:        # 请求认证头不正确
        return False
    sp = auth_key.split('|')
    if len(sp) != 2:        # 格式不正确
        return False
    encrypt, timestamp = sp
    timestamp = float(timestamp)    # str换成float
    limit_timestamp = time.time() - ASSET_AUTH_TIME

    if limit_timestamp > timestamp:     # 当前程序时间与客户端时间戳对比 超时
        return False
    ha = hashlib.md5(ASSET_AUTH_KEY.encode('utf-8'))
    ha.update(bytes("%s|%f" % (ASSET_AUTH_KEY, timestamp), encoding='utf-8'))
    result = ha.hexdigest()
    if encrypt != result:           # md5值校验
        return False

    exist = False
    del_keys = []
    for k, v in enumerate(ENCRYPT_LIST):   # 记录当前认证,如果在过期内已经认证过,则认证失败,另过期认证记录删除
        m = v['time']
        n = v['encrypt']
        if m < limit_timestamp:
            del_keys.append(k)
            continue
        if n == encrypt:
            exist = True
    for k in del_keys:
        del ENCRYPT_LIST[k]

    if exist:
        return False
    ENCRYPT_LIST.append({'encrypt': encrypt, 'time': timestamp})
    return True


class Api(View):

    def get(self, request):
        result = api_auth(request)
        if result:
            return HttpResponse('认证成功')
        else:
            return HttpResponse('去你的吧')

 

  

 

日志记录: 调用同八个指标,分别记录错误日志和周转日志 自定义日志类: class Logge...

本文由编程应用发布,转载请注明来源:日记记录