久久亚洲精品国产精品_羞羞漫画在线版免费阅读网页漫画_国产精品久久久久久久久久久久_午夜dj免费观看在线视频_希崎杰西卡番号

stream.sys(stream.sys iomanager 藍(lán)屏)

前沿拓展:


推薦學(xué)習(xí)22天試水Python社招,歷經(jīng)“百度+字節(jié)+天融”等6家 金三即過(guò),這300道python高頻面試都沒(méi)刷,銀四怎么闖? stream.sys(stream.sys iomanager 藍(lán)屏)

前言

Logging日志記錄框架是Python內(nèi)置打印模塊,它對(duì)很多Python開發(fā)者來(lái)說(shuō)是既熟悉又陌生,確實(shí),它使用起來(lái)很簡(jiǎn)單,只需要我們簡(jiǎn)單一行代碼,就可以打印出日志

import logging

logging.debug("This is a debug log.")
logging.info("This is a info log.")
logging.warning("This is a warning log.")
logging.error("This is a error log.")
logging.critical("This is a critical log.")

# output
WARNING:root:This is a warning log.
ERROR:root:This is a error log.
CRITICAL:root:This is a critical log.

但是對(duì)于我們?cè)趯?shí)際項(xiàng)目中需要基于Logging來(lái)開發(fā)日志框架時(shí),常常會(huì)遇到各種各樣的問(wèn)題,例如性能方面的多進(jìn)程下滾動(dòng)日志記錄丟失、日志記錄效率低的問(wèn)題以及在二次開發(fā)時(shí)因?yàn)闆](méi)有理解Logging內(nèi)部流程導(dǎo)致沒(méi)有利用到Logging的核心機(jī)制等等。

接下來(lái),我們就從一行代碼來(lái)深挖Logging System的秘密

1. 深入Logging模塊源碼1.1 logging.info做了什么?

logging.info作為大多數(shù)人使用logging模塊的第一行代碼,隱藏了很多實(shí)現(xiàn)細(xì)節(jié),以至于我們?cè)谧畛醵贾魂P(guān)注功能,而忽略了它的內(nèi)部細(xì)節(jié)

import logging
logging.info("This is a info log.")

# logging/__init__.py
def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO' on the root logger. If the logger has
no handlers, call basicConfig() to add a console handler with a pre-defined
format.
"""
# 沒(méi)有綁定handlers就調(diào)用basicConfig方法
if len(root.handlers) == 0:
basicConfig()
root.info(msg, *args, **kwargs)

_lock = threading.RLock()

def _acquireLock():
"""
Acquire the module-level lock for serializing access to shared data.

This should be released with _releaseLock().
"""
if _lock:
_lock.acquire()

def _releaseLock():
"""
Release the module-level lock acquired by calling _acquireLock().
"""
if _lock:
_lock.release()

def basicConfig(**kwargs):
"""
很方便的一步到位的配置方法創(chuàng)建一個(gè)StreamHandler打印日志到控制臺(tái)
This function does nothing if the root logger already has handlers
configured. It is a convenience method intended for use by simple scripts
to do one-shot configuration of the logging package.

The default behaviour is to create a StreamHandler which writes to
sys.stderr, set a formatter using the BASIC_FORMAT format string, and
add the handler to the root logger.
"""
# Add thread safety in case someone mistakenly calls
# basicConfig() from multiple threads
# 為了確保多線程安全的寫日志**作,做了加鎖處理(如上,標(biāo)準(zhǔn)的多線程鎖**作)
_acquireLock()
try:
if len(root.handlers) == 0:
# 對(duì)于handler的處理,有filename就新建FileHandler,沒(méi)有就選擇StreamHandler
handlers = kwargs.pop("handlers", None)
if handlers is None:
if "stream" in kwargs and "filename" in kwargs:
raise ValueError("'stream' and 'filename' should not be "
"specified together")
else:
if "stream" in kwargs or "filename" in kwargs:
raise ValueError("'stream' or 'filename' should not be "
"specified together with 'handlers'")
if handlers is None:
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
h = FileHandler(filename, mode)
else:
stream = kwargs.pop("stream", None)
h = StreamHandler(stream)
handlers = [h]
dfs = kwargs.pop("datefmt", None)
style = kwargs.pop("style", '%')
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
fs = kwargs.pop("format", _STYLES[style][1])
fmt = Formatter(fs, dfs, style)
# 綁定handler到root
for h in handlers:
if h.formatter is None:
h.setFormatter(fmt)
root.addHandler(h)
level = kwargs.pop("level", None)
if level is not None:
root.setLevel(level)
if kwargs:
keys = ', '.join(kwargs.keys())
raise ValueError('Unrecognised argument(s): %s' % keys)
finally:
_releaseLock()

到目前為止,可以看到logging.info通過(guò)調(diào)用basicConfig()來(lái)完成初始化handler之后才開始正式打印,而basicConfig()的邏輯是通過(guò)多線程鎖狀態(tài)下的一個(gè)初始化handler->綁定root的**作,那這個(gè)root代表了什么呢?

# logging/__init__.py

root = RootLogger(WARNING)
Logger.root = root
Logger.manager = Manager(Logger.root)

class RootLogger(Logger):
"""
A root logger is not that different to any other logger, except that
it must have a logging level and there is only one instance of it in
the hierarchy.
"""
def __init__(self, level):
"""
Initialize the logger with the name "root".
"""
# 調(diào)用父類的初始化,傳入了root和WARNING兩個(gè)參數(shù),所以我們直接調(diào)用logging.info時(shí)是不能輸出任何信息的,因?yàn)閘ogger level初始化時(shí)就被指定成了WARNING
Logger.__init__(self, "root", level)

def __reduce__(self):
return getLogger, ()

class Logger(Filterer):
"""
Instances of the Logger class represent a single logging channel. A
"logging channel" indicates an area of an application. Exactly how an
"area" is defined is up to the application developer. Since an
application can have any number of areas, logging channels are identified
by a unique string. Application areas can be nested (e.g. an area
of "input processing" might include sub-areas "read CSV files", "read
XLS files" and "read Gnumeric files"). To cater for this natural nesting,
channel names are organized into a namespace hierarchy where levels are
separated by periods, much like the Java or Python package namespace. So
in the instance given above, channel names might be "input" for the upper
level, and "input.csv", "input.xls" and "input.gnu" for the sub-levels.
There is no arbitrary limit to the depth of nesting.
"""
def __init__(self, name, level=NOTSET):
"""
Initialize the logger with a name and an optional level.
"""
Filterer.__init__(self)
self.name = name # 指定name,也就是之前的root
self.level = _checkLevel(level) # 指定logger level
self.parent = None
self.propagate = True
self.handlers = []
self.disabled = False
self._cache = {}

def info(self, msg, *args, **kwargs):
"""
Log 'msg % args' with severity 'INFO'.

To pass exception information, use the keyword argument exc_info with
a true value, e.g.

logger.info("Houston, we have a %s", "interesting problem", exc_info=1)
"""
# 這步判斷是為了判斷是否該logger level可以被輸出
if self.isEnabledFor(INFO):
self._log(INFO, msg, args, **kwargs)

def getEffectiveLevel(self):
"""
Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy,
looking for a non-zero logging level. Return the first one found.
"""
# 尋找父級(jí)logger level
logger = self
while logger:
if logger.level:
return logger.level
logger = logger.parent
return NOTSET

def isEnabledFor(self, level):
"""
Is this logger enabled for level 'level'?
"""
try:
return self._cache[level]
except KeyError:
# 又出現(xiàn)的加鎖**作
_acquireLock()
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = level >= self.getEffectiveLevel()
_releaseLock()

return is_enabled

root是RootLogger的實(shí)例,RootLogger繼承于Logger,通過(guò)_log來(lái)統(tǒng)一記錄日志,而在_log之前會(huì)有個(gè)logger level的判斷

# 用來(lái)獲取堆棧中的第一個(gè)調(diào)用者
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame, by skipping frames whose filename is that of this
# module's source. It therefore should contain the filename of this module's
# source file.
#
# Ordinarily we would use __file__ for this, but frozen modules don't always
# have __file__ set, for some reason (see Issue #21736). Thus, we get the
# filename from a handy code object from a function defined in this module.
# (There's no particular reason for picking addLevelName.)
#

_srcfile = os.path.normcase(addLevelName.__code__.co_filename)

# _srcfile is only used in conjunction with sys._getframe().
# To provide compatibility with older versions of Python, set _srcfile
# to None if _getframe() is not available; this value will prevent
# findCaller() from being called. You can also do this if you want to avoid
# the overhead of fetching caller information, even when _getframe() is
# available.
#if not hasattr(sys, '_getframe'):
# _srcfile = None

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False):
"""
Low-level logging routine which creates a LogRecord and then calls
all the handlers of this logger to handle the record.
"""
sinfo = None
if _srcfile:
#IronPython doesn't track Python frames, so findCaller raises an
#exception on some versions of IronPython. We trap it here so that
#IronPython can use logging.
try:
fn, lno, func, sinfo = self.findCaller(stack_info)
except ValueError: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
# 通過(guò)makeRecord生成一條日志記錄
record = self.makeRecord(self.name, level, fn, lno, msg, args,
exc_info, func, extra, sinfo)
# 處理
self.handle(record)

def findCaller(self, stack_info=False):
"""
通過(guò)調(diào)用堆棧獲取文件名、行數(shù)和調(diào)用者
Find the stack frame of the caller so that we can note the source
file name, line number and function name.
"""
f = currentframe()
#On some versions of IronPython, currentframe() returns None if
#IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == _srcfile:
f = f.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == 'n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
break
return rv

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None):
"""
A factory method which can be overridden in subclasses to create
specialized LogRecords.
"""
# 生成_logRecordFactory的實(shí)例
rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
sinfo)
if extra is not None:
for key in extra:
if (key in ["message", "asctime"]) or (key in rv.__dict__):
raise KeyError("Attempt to overwrite %r in LogRecord" % key)
rv.__dict__[key] = extra[key]
return rv

def handle(self, record):
"""
Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as
well as those created locally. Logger-level filtering is applied.
"""
if (not self.disabled) and self.filter(record):
self.callHandlers(record)

def callHandlers(self, record):
"""
Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the
logger hierarchy. If no handler was found, output a one-off error
message to sys.stderr. Stop searching up the hierarchy whenever a
logger with the "propagate" attribute set to zero is found – that
will be the last logger whose handlers are called.
"""
c = self
found = 0
# 輪詢自身綁定的handlers,并調(diào)用handle方法來(lái)處理該record實(shí)例,這里有點(diǎn)類似于import的流程,調(diào)用sys.meta_path的importer來(lái)處理path,也是同樣的道理,這里我們回憶之前的rootLogger,我們調(diào)用baseConfig來(lái)初始化使FileHandler綁定到root,第二調(diào)用root.info,最終就來(lái)到了這個(gè)方法,調(diào)用到了FileHandler.handle方法來(lái)處理
while c:
for hdlr in c.handlers:
found = found + 1
if record.levelno >= hdlr.level:
hdlr.handle(record)
if not c.propagate:
c = None #break out
else:
c = c.parent
if (found == 0):
if lastResort:
if record.levelno >= lastResort.level:
lastResort.handle(record)
elif raiseExceptions and not self.manager.emittedNoHandlerWarning:
sys.stderr.write("No handlers could be found for logger"
" "%s"n" % self.name)
self.manager.emittedNoHandlerWarning = True

繼續(xù)來(lái)看FileHandler

class FileHandler(StreamHandler):
def _open(self):
"""
Open the current base file with the (original) mode and encoding.
Return the resulting stream.
"""
return open(self.baseFilename, self.mode, encoding=self.encoding)

def emit(self, record):
"""
Emit a record.

If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
# 打開文件句柄(文件流)
if self.stream is None:
self.stream = self._open()
# 調(diào)用StreamHandler的emit方法
StreamHandler.emit(self, record)

class StreamHandler(Handler):
def flush(self):
"""
Flushes the stream.
"""
# 同樣時(shí)加鎖刷入
self.acquire()
try:
if self.stream and hasattr(self.stream, "flush"):
self.stream.flush()
finally:
self.release()

def emit(self, record):
"""
Emit a record.

If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline. If
exception information is present, it is formatted using
traceback.print_exception and appended to the stream. If the stream
has an 'encoding' attribute, it is used to determine how to do the
output to the stream.
"""
try:
msg = self.format(record)
stream = self.stream
# issue 35046: merged two stream.writes into one.
# 寫入流的緩沖區(qū),執(zhí)行flush
stream.write(msg + self.terminator)
self.flush()
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)

class Handler(Filterer):
# FileHandler的handle方法來(lái)源于祖父輩的Handler
def filter(self, record):
"""
Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.

.. versionchanged:: 3.2

Allow filters to be just callables.
"""
rv = True
for f in self.filters:
if hasattr(f, 'filter'):
result = f.filter(record)
else:
result = f(record) # assume callable – will raise if not
if not result:
rv = False
break
return rv

def handle(self, record):
"""
Conditionally emit the specified logging record.

Emission depends on filters which may have been added to the handler.
Wrap the actual emission of the record with acquisition/release of
the I/O thread lock. Returns whether the filter passed the record for
emission.
"""
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

我們跟蹤完了整個(gè)logging.info背后的流程,下面我們結(jié)合上面的代碼以及官方的Logging Flow來(lái)梳理logging的流程框架

1.2 Logging 流程框架stream.sys(stream.sys iomanager 藍(lán)屏)

源碼在 python 自己的 lib/logging/ 下,主要內(nèi)容都在 __init__.py 里,根據(jù)流程圖以及我們之前分析的源碼來(lái)理解下面四個(gè)組件以及日志結(jié)構(gòu)體的定義

Logger,核心組件,可以掛載若干個(gè) Handler以及若干個(gè) Filter,定義要響應(yīng)的命名空間和日志級(jí)別Handler,可以掛載一個(gè) Formatter和若干個(gè) Filter,定義了要響應(yīng)日志級(jí)別和輸出方式Filter,雖然是過(guò)濾器,負(fù)責(zé)對(duì)輸入的 LogRecord 做判斷,返回 True/False 來(lái)決定掛載的 Logger 或 Handler 是否要處理當(dāng)前日志,但是也可以拿來(lái)當(dāng)做中間件來(lái)使用,可以自定義規(guī)則來(lái)改寫LogRecord,繼續(xù)傳遞給后續(xù)的Filter/Handler/LoggerFormatter,最終日志的格式化組件LogRecord,單條日志的結(jié)構(gòu)體

根據(jù)流程圖來(lái)看,主流程如下

日志打印請(qǐng)求到Logger后,第一判斷當(dāng)前Logger是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第一層級(jí)別控制)生成一條LogRecord,會(huì)把包括調(diào)用來(lái)源等信息都一起打包好,依次調(diào)用Logger掛載的Filter鏈來(lái)處理一旦有Filter類的檢測(cè)結(jié)果返回是False,則丟棄日志否則傳給Logger掛載的Handler鏈中依次處理(進(jìn)入Handler流程)如果開啟了propagate屬性,也就是“向上傳播”,會(huì)將當(dāng)前的LogRecord 傳遞給父類的Logger來(lái)進(jìn)行處理,直接從第4步開始執(zhí)行(不會(huì)觸發(fā)第一層級(jí)別控制)

Handler流程

判當(dāng)前Handler 是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第二層級(jí)別控制)將收到的LogRecord 依次調(diào)用Handler掛載的Filter鏈來(lái)處理同理,調(diào)用Formatter2. 多進(jìn)程場(chǎng)景下的Logging2.1 多進(jìn)程場(chǎng)景Logging問(wèn)題分析

Logging模塊是線程安全的,我們?cè)谥暗拇a中可以看到很多處使用到了threading.RLock(),特別是在調(diào)用info/error等方法打印日志時(shí),最終都會(huì)調(diào)用頂級(jí)父類的handle方法,也就是

# logging/__init__.py

def handle(self, record):
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

這使得在多線程環(huán)境下可以保證同一時(shí)間只有一個(gè)線程可以調(diào)用handle方法進(jìn)行寫入

然而,另一個(gè)可能被忽視的場(chǎng)景是在多進(jìn)程環(huán)境下引發(fā)的種種問(wèn)題,我們?cè)诓渴餚ython Web項(xiàng)目時(shí),通常會(huì)以多進(jìn)程的方式來(lái)啟動(dòng),這就可能導(dǎo)致以下的幾種問(wèn)題:

日志紊亂:比如兩個(gè)進(jìn)程分別輸出xxxx和yyyy兩條日志,那么在文件中可能會(huì)得到類似xxyxyxyy這樣的結(jié)果日志丟失:雖然讀寫日志是使用O_APPEND模式,保證了寫文件的一致性,但是由于buffer的存在(數(shù)據(jù)先寫入buffer,再觸發(fā)flush機(jī)制刷入磁盤),fwrite的**作并不是多進(jìn)程安全的日志丟失的另一種情況:使用RotatingFileHandler或者是TimerRotatingFileHandler的時(shí)候,在切換文件的時(shí)候會(huì)導(dǎo)致進(jìn)程拿到的文件句柄不同,導(dǎo)致新文件被重復(fù)創(chuàng)建、數(shù)據(jù)寫入舊文件2.2 多進(jìn)程場(chǎng)景Logging解決方案

為了應(yīng)對(duì)上述可能出現(xiàn)的情況,以下列舉幾種解決方案:

2.2.1 concurrent-log-handler模塊(文件鎖模式)# concurrent_log_handler/__init__.[y]

# 繼承自logging的BaseRotatingHandler
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
# 具體寫入邏輯
def emit(self, record):
try:
msg = self.format(record)
# 加鎖邏輯
try:
self._do_lock()
# 常規(guī)**作shouldRollover、doRollover做文件切分
try:
if self.shouldRollover(record):
self.doRollover()
except Exception as e:
self._console_log("Unable to do rollover: %s" % (e,), stack=True)
# Continue on anyway
self.do_write(msg)
finally:
self._do_unlock()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)

def _do_lock(self):
# 判斷是否已被鎖
if self.is_locked:
return # already locked… recursive?
# 觸發(fā)文件鎖
self._open_lockfile()
if self.stream_lock:
for i in range(10):
# noinspection PyBroadException
try:
# 調(diào)用portalocker lock的方法
lock(self.stream_lock, LOCK_EX)
self.is_locked = True
break
except Exception:
continue
else:
raise RuntimeError("Cannot acquire lock after 10 attempts")
else:
self._console_log("No self.stream_lock to lock", stack=True)

def _open_lockfile(self):
"""
改變文件權(quán)限
"""
if self.stream_lock and not self.stream_lock.closed:
self._console_log("Lockfile already open in this process")
return
lock_file = self.lockFilename
self._console_log(
"concurrent-log-handler %s opening %s" % (hash(self), lock_file), stack=False)

with self._alter_umask():
self.stream_lock = open(lock_file, "wb", buffering=0)

self._do_chown_and_chmod(lock_file)

def _do_unlock(self):
if self.stream_lock:
if self.is_locked:
try:
unlock(self.stream_lock)
finally:
self.is_locked = False
self.stream_lock.close()
self.stream_lock = None
else:
self._console_log("No self.stream_lock to unlock", stack=True)

# portalocker/portalocker.py
def lock(file_: typing.IO, flags: constants.LockFlags):
if flags & constants.LockFlags.SHARED:

else:
if flags & constants.LockFlags.NON_BLOCKING:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK

# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
"""
調(diào)用C運(yùn)行時(shí)的文件鎖機(jī)制
msvcrt.locking(fd, mode, nbytes)
Lock part of a file based on file descriptor fd from the C runtime. Raises OSError on failure. The locked region of the file extends from the current file position for nbytes bytes, and may continue beyond the end of the file. mode must be one of the LK_* constants listed below. Multiple regions in a file may be locked at the same time, but may not overlap. Adjacent regions are not merged; they must be unlocked individually.

Raises an auditing event msvcrt.locking with arguments fd, mode, nbytes.
"""

歸根到底,就是在emit方法觸發(fā)時(shí)調(diào)用了文件鎖機(jī)制,將多個(gè)進(jìn)程并發(fā)調(diào)用強(qiáng)制限制為單進(jìn)程順序調(diào)用,確保了日志寫入的準(zhǔn)確,但是在效率方面,頻繁的對(duì)文件修改權(quán)限、加鎖以及鎖搶占機(jī)制都會(huì)造成效率低下的問(wèn)題。

2.2.2 針對(duì)日志切分場(chǎng)景的復(fù)寫doRollover方法以及復(fù)寫FileHandler類

當(dāng)然,除了上述的文件加鎖方式,我們也可以自定義重寫TimeRotatingFileHandler類或者FileHandler類,加入簡(jiǎn)單的多進(jìn)程加鎖的邏輯,例如fcntl.flock

static PyObject *
fcntl_flock_impl(PyObject *module, int fd, int code)
/*[clinic end generated code: output=84059e2b37d2fc64 input=0bfc00f795953452]*/
{
int ret;
int async_err = 0;

if (PySys_Audit("fcntl.flock", "ii", fd, code) < 0) {
return NULL;
}
# 觸發(fā)linux flock命令加鎖
#ifdef HAVE_FLOCK
do {
Py_BEGIN_ALLOW_THREADS
ret = flock(fd, code);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
#else

#ifndef LOCK_SH
#define LOCK_SH 1 /* shared lock */
#define LOCK_EX 2 /* exclusive lock */
#define LOCK_NB 4 /* don't block when locking */
#define LOCK_UN 8 /* unlock */
#endif
{
struct flock l;
if (code == LOCK_UN)
l.l_type = F_UNLCK;
else if (code & LOCK_SH)
l.l_type = F_RDLCK;
else if (code & LOCK_EX)
l.l_type = F_WRLCK;
else {
PyErr_SetString(PyExc_ValueError,
"unrecognized flock argument");
return NULL;
}
l.l_whence = l.l_start = l.l_len = 0;
do {
Py_BEGIN_ALLOW_THREADS
ret = fcntl(fd, (code & LOCK_NB) ? F_SETLK : F_SETLKW, &l);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
}
#endif /* HAVE_FLOCK */
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
Py_RETURN_NONE;
}2.2.3 Master/Worker日志收集(Socket/Queue模式)

這種方式也是被官方主推的方式

Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file.(如果你需要將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件,有一個(gè)方案是讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler,第二用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件) (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.) This section documents this approach in more detail and includes a working socket receiver which can be used as a starting point for you to adapt in your own applications.

Alternatively, you can use a Queue and a QueueHandler to send all logging events to one of the processes in your multi-process application. (你也可以使用 Queue 和 QueueHandler 將所有的日志**發(fā)送至你的多進(jìn)程應(yīng)用的一個(gè)進(jìn)程中。)The following example script demonstrates how you can do this; in the example a separate listener process listens for events sent by other processes and logs them according to its own logging configuration. Although the example only demonstrates one way of doing it (for example, you may want to use a listener thread rather than a separate listener process – the implementation would be **ogous) it does allow for completely different logging configurations for the listener and the other processes in your application, and can be used as the basis for code meeting your own specific requirements:

看看QueueHandler的案例

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
"""
單獨(dú)的日志記錄線程
"""
while True:
record = q.get()
if record is None:
break
# 獲取record實(shí)例中的logger
logger = logging.getLogger(record.name)
# 調(diào)用logger的handle方法處理
logger.handle(record)

def worker_process(q):
# 日志寫入進(jìn)程
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# 綁定QueueHandler到logger
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
q = Queue()
# 省略一大推配置
workers = []
# 多進(jìn)程寫入日志到Queue
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
# 啟動(dòng)子線程負(fù)責(zé)日志收集寫入
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate…
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()

看看SocketHandler的案例

# 發(fā)送端

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
# 指定ip、端口
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root…
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

# 接收端

import pickle
import logging
import logging.handlers
import socketserver
import struct

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.

This basically logs the record using whatever logging policy is
configured locally.
"""

def handle(self):
"""
Handle multiple requests – each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
# 接收數(shù)據(jù)的流程
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen – len(chunk))
obj = self.unPickle(chunk)
# 生成LodRecord實(shí)例,調(diào)用handleLogRecord處理
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)

def unPickle(self, data):
return pickle.loads(data)

def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""

allow_reuse_address = True

def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None

def serve_until_stopped(self):
import select
abort = 0
while not abort:
# select方法接收端口數(shù)據(jù)
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
# 調(diào)用LogRecordStreamHandler的handle方法處理
self.handle_request()
abort = self.abort

def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server…')
tcpserver.serve_until_stopped()

if __name__ == '__main__':
main()

有關(guān)于 Master/Worker的方式就是上述的兩種模式,日志寫入的效率不受并發(fā)的影響,最終取決于寫入線程。此外,對(duì)于日志的寫入慢、阻塞問(wèn)題,同樣可以使用QueueHandlers以及其擴(kuò)展的QueueListener來(lái)處理。

作者:技術(shù)拆解官原文鏈接:https://juejin.cn/post/6945301448934719518

拓展知識(shí):

前沿拓展:


推薦學(xué)習(xí)22天試水Python社招,歷經(jīng)“百度+字節(jié)+天融”等6家 金三即過(guò),這300道python高頻面試都沒(méi)刷,銀四怎么闖? stream.sys(stream.sys iomanager 藍(lán)屏)

前言

Logging日志記錄框架是Python內(nèi)置打印模塊,它對(duì)很多Python開發(fā)者來(lái)說(shuō)是既熟悉又陌生,確實(shí),它使用起來(lái)很簡(jiǎn)單,只需要我們簡(jiǎn)單一行代碼,就可以打印出日志

import logging

logging.debug("This is a debug log.")
logging.info("This is a info log.")
logging.warning("This is a warning log.")
logging.error("This is a error log.")
logging.critical("This is a critical log.")

# output
WARNING:root:This is a warning log.
ERROR:root:This is a error log.
CRITICAL:root:This is a critical log.

但是對(duì)于我們?cè)趯?shí)際項(xiàng)目中需要基于Logging來(lái)開發(fā)日志框架時(shí),常常會(huì)遇到各種各樣的問(wèn)題,例如性能方面的多進(jìn)程下滾動(dòng)日志記錄丟失、日志記錄效率低的問(wèn)題以及在二次開發(fā)時(shí)因?yàn)闆](méi)有理解Logging內(nèi)部流程導(dǎo)致沒(méi)有利用到Logging的核心機(jī)制等等。

接下來(lái),我們就從一行代碼來(lái)深挖Logging System的秘密

1. 深入Logging模塊源碼1.1 logging.info做了什么?

logging.info作為大多數(shù)人使用logging模塊的第一行代碼,隱藏了很多實(shí)現(xiàn)細(xì)節(jié),以至于我們?cè)谧畛醵贾魂P(guān)注功能,而忽略了它的內(nèi)部細(xì)節(jié)

import logging
logging.info("This is a info log.")

# logging/__init__.py
def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO' on the root logger. If the logger has
no handlers, call basicConfig() to add a console handler with a pre-defined
format.
"""
# 沒(méi)有綁定handlers就調(diào)用basicConfig方法
if len(root.handlers) == 0:
basicConfig()
root.info(msg, *args, **kwargs)

_lock = threading.RLock()

def _acquireLock():
"""
Acquire the module-level lock for serializing access to shared data.

This should be released with _releaseLock().
"""
if _lock:
_lock.acquire()

def _releaseLock():
"""
Release the module-level lock acquired by calling _acquireLock().
"""
if _lock:
_lock.release()

def basicConfig(**kwargs):
"""
很方便的一步到位的配置方法創(chuàng)建一個(gè)StreamHandler打印日志到控制臺(tái)
This function does nothing if the root logger already has handlers
configured. It is a convenience method intended for use by simple scripts
to do one-shot configuration of the logging package.

The default behaviour is to create a StreamHandler which writes to
sys.stderr, set a formatter using the BASIC_FORMAT format string, and
add the handler to the root logger.
"""
# Add thread safety in case someone mistakenly calls
# basicConfig() from multiple threads
# 為了確保多線程安全的寫日志**作,做了加鎖處理(如上,標(biāo)準(zhǔn)的多線程鎖**作)
_acquireLock()
try:
if len(root.handlers) == 0:
# 對(duì)于handler的處理,有filename就新建FileHandler,沒(méi)有就選擇StreamHandler
handlers = kwargs.pop("handlers", None)
if handlers is None:
if "stream" in kwargs and "filename" in kwargs:
raise ValueError("'stream' and 'filename' should not be "
"specified together")
else:
if "stream" in kwargs or "filename" in kwargs:
raise ValueError("'stream' or 'filename' should not be "
"specified together with 'handlers'")
if handlers is None:
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
h = FileHandler(filename, mode)
else:
stream = kwargs.pop("stream", None)
h = StreamHandler(stream)
handlers = [h]
dfs = kwargs.pop("datefmt", None)
style = kwargs.pop("style", '%')
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
fs = kwargs.pop("format", _STYLES[style][1])
fmt = Formatter(fs, dfs, style)
# 綁定handler到root
for h in handlers:
if h.formatter is None:
h.setFormatter(fmt)
root.addHandler(h)
level = kwargs.pop("level", None)
if level is not None:
root.setLevel(level)
if kwargs:
keys = ', '.join(kwargs.keys())
raise ValueError('Unrecognised argument(s): %s' % keys)
finally:
_releaseLock()

到目前為止,可以看到logging.info通過(guò)調(diào)用basicConfig()來(lái)完成初始化handler之后才開始正式打印,而basicConfig()的邏輯是通過(guò)多線程鎖狀態(tài)下的一個(gè)初始化handler->綁定root的**作,那這個(gè)root代表了什么呢?

# logging/__init__.py

root = RootLogger(WARNING)
Logger.root = root
Logger.manager = Manager(Logger.root)

class RootLogger(Logger):
"""
A root logger is not that different to any other logger, except that
it must have a logging level and there is only one instance of it in
the hierarchy.
"""
def __init__(self, level):
"""
Initialize the logger with the name "root".
"""
# 調(diào)用父類的初始化,傳入了root和WARNING兩個(gè)參數(shù),所以我們直接調(diào)用logging.info時(shí)是不能輸出任何信息的,因?yàn)閘ogger level初始化時(shí)就被指定成了WARNING
Logger.__init__(self, "root", level)

def __reduce__(self):
return getLogger, ()

class Logger(Filterer):
"""
Instances of the Logger class represent a single logging channel. A
"logging channel" indicates an area of an application. Exactly how an
"area" is defined is up to the application developer. Since an
application can have any number of areas, logging channels are identified
by a unique string. Application areas can be nested (e.g. an area
of "input processing" might include sub-areas "read CSV files", "read
XLS files" and "read Gnumeric files"). To cater for this natural nesting,
channel names are organized into a namespace hierarchy where levels are
separated by periods, much like the Java or Python package namespace. So
in the instance given above, channel names might be "input" for the upper
level, and "input.csv", "input.xls" and "input.gnu" for the sub-levels.
There is no arbitrary limit to the depth of nesting.
"""
def __init__(self, name, level=NOTSET):
"""
Initialize the logger with a name and an optional level.
"""
Filterer.__init__(self)
self.name = name # 指定name,也就是之前的root
self.level = _checkLevel(level) # 指定logger level
self.parent = None
self.propagate = True
self.handlers = []
self.disabled = False
self._cache = {}

def info(self, msg, *args, **kwargs):
"""
Log 'msg % args' with severity 'INFO'.

To pass exception information, use the keyword argument exc_info with
a true value, e.g.

logger.info("Houston, we have a %s", "interesting problem", exc_info=1)
"""
# 這步判斷是為了判斷是否該logger level可以被輸出
if self.isEnabledFor(INFO):
self._log(INFO, msg, args, **kwargs)

def getEffectiveLevel(self):
"""
Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy,
looking for a non-zero logging level. Return the first one found.
"""
# 尋找父級(jí)logger level
logger = self
while logger:
if logger.level:
return logger.level
logger = logger.parent
return NOTSET

def isEnabledFor(self, level):
"""
Is this logger enabled for level 'level'?
"""
try:
return self._cache[level]
except KeyError:
# 又出現(xiàn)的加鎖**作
_acquireLock()
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = level >= self.getEffectiveLevel()
_releaseLock()

return is_enabled

root是RootLogger的實(shí)例,RootLogger繼承于Logger,通過(guò)_log來(lái)統(tǒng)一記錄日志,而在_log之前會(huì)有個(gè)logger level的判斷

# 用來(lái)獲取堆棧中的第一個(gè)調(diào)用者
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame, by skipping frames whose filename is that of this
# module's source. It therefore should contain the filename of this module's
# source file.
#
# Ordinarily we would use __file__ for this, but frozen modules don't always
# have __file__ set, for some reason (see Issue #21736). Thus, we get the
# filename from a handy code object from a function defined in this module.
# (There's no particular reason for picking addLevelName.)
#

_srcfile = os.path.normcase(addLevelName.__code__.co_filename)

# _srcfile is only used in conjunction with sys._getframe().
# To provide compatibility with older versions of Python, set _srcfile
# to None if _getframe() is not available; this value will prevent
# findCaller() from being called. You can also do this if you want to avoid
# the overhead of fetching caller information, even when _getframe() is
# available.
#if not hasattr(sys, '_getframe'):
# _srcfile = None

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False):
"""
Low-level logging routine which creates a LogRecord and then calls
all the handlers of this logger to handle the record.
"""
sinfo = None
if _srcfile:
#IronPython doesn't track Python frames, so findCaller raises an
#exception on some versions of IronPython. We trap it here so that
#IronPython can use logging.
try:
fn, lno, func, sinfo = self.findCaller(stack_info)
except ValueError: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
# 通過(guò)makeRecord生成一條日志記錄
record = self.makeRecord(self.name, level, fn, lno, msg, args,
exc_info, func, extra, sinfo)
# 處理
self.handle(record)

def findCaller(self, stack_info=False):
"""
通過(guò)調(diào)用堆棧獲取文件名、行數(shù)和調(diào)用者
Find the stack frame of the caller so that we can note the source
file name, line number and function name.
"""
f = currentframe()
#On some versions of IronPython, currentframe() returns None if
#IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == _srcfile:
f = f.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == 'n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
break
return rv

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None):
"""
A factory method which can be overridden in subclasses to create
specialized LogRecords.
"""
# 生成_logRecordFactory的實(shí)例
rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
sinfo)
if extra is not None:
for key in extra:
if (key in ["message", "asctime"]) or (key in rv.__dict__):
raise KeyError("Attempt to overwrite %r in LogRecord" % key)
rv.__dict__[key] = extra[key]
return rv

def handle(self, record):
"""
Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as
well as those created locally. Logger-level filtering is applied.
"""
if (not self.disabled) and self.filter(record):
self.callHandlers(record)

def callHandlers(self, record):
"""
Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the
logger hierarchy. If no handler was found, output a one-off error
message to sys.stderr. Stop searching up the hierarchy whenever a
logger with the "propagate" attribute set to zero is found – that
will be the last logger whose handlers are called.
"""
c = self
found = 0
# 輪詢自身綁定的handlers,并調(diào)用handle方法來(lái)處理該record實(shí)例,這里有點(diǎn)類似于import的流程,調(diào)用sys.meta_path的importer來(lái)處理path,也是同樣的道理,這里我們回憶之前的rootLogger,我們調(diào)用baseConfig來(lái)初始化使FileHandler綁定到root,第二調(diào)用root.info,最終就來(lái)到了這個(gè)方法,調(diào)用到了FileHandler.handle方法來(lái)處理
while c:
for hdlr in c.handlers:
found = found + 1
if record.levelno >= hdlr.level:
hdlr.handle(record)
if not c.propagate:
c = None #break out
else:
c = c.parent
if (found == 0):
if lastResort:
if record.levelno >= lastResort.level:
lastResort.handle(record)
elif raiseExceptions and not self.manager.emittedNoHandlerWarning:
sys.stderr.write("No handlers could be found for logger"
" "%s"n" % self.name)
self.manager.emittedNoHandlerWarning = True

繼續(xù)來(lái)看FileHandler

class FileHandler(StreamHandler):
def _open(self):
"""
Open the current base file with the (original) mode and encoding.
Return the resulting stream.
"""
return open(self.baseFilename, self.mode, encoding=self.encoding)

def emit(self, record):
"""
Emit a record.

If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
# 打開文件句柄(文件流)
if self.stream is None:
self.stream = self._open()
# 調(diào)用StreamHandler的emit方法
StreamHandler.emit(self, record)

class StreamHandler(Handler):
def flush(self):
"""
Flushes the stream.
"""
# 同樣時(shí)加鎖刷入
self.acquire()
try:
if self.stream and hasattr(self.stream, "flush"):
self.stream.flush()
finally:
self.release()

def emit(self, record):
"""
Emit a record.

If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline. If
exception information is present, it is formatted using
traceback.print_exception and appended to the stream. If the stream
has an 'encoding' attribute, it is used to determine how to do the
output to the stream.
"""
try:
msg = self.format(record)
stream = self.stream
# issue 35046: merged two stream.writes into one.
# 寫入流的緩沖區(qū),執(zhí)行flush
stream.write(msg + self.terminator)
self.flush()
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)

class Handler(Filterer):
# FileHandler的handle方法來(lái)源于祖父輩的Handler
def filter(self, record):
"""
Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.

.. versionchanged:: 3.2

Allow filters to be just callables.
"""
rv = True
for f in self.filters:
if hasattr(f, 'filter'):
result = f.filter(record)
else:
result = f(record) # assume callable – will raise if not
if not result:
rv = False
break
return rv

def handle(self, record):
"""
Conditionally emit the specified logging record.

Emission depends on filters which may have been added to the handler.
Wrap the actual emission of the record with acquisition/release of
the I/O thread lock. Returns whether the filter passed the record for
emission.
"""
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

我們跟蹤完了整個(gè)logging.info背后的流程,下面我們結(jié)合上面的代碼以及官方的Logging Flow來(lái)梳理logging的流程框架

1.2 Logging 流程框架stream.sys(stream.sys iomanager 藍(lán)屏)

源碼在 python 自己的 lib/logging/ 下,主要內(nèi)容都在 __init__.py 里,根據(jù)流程圖以及我們之前分析的源碼來(lái)理解下面四個(gè)組件以及日志結(jié)構(gòu)體的定義

Logger,核心組件,可以掛載若干個(gè) Handler以及若干個(gè) Filter,定義要響應(yīng)的命名空間和日志級(jí)別Handler,可以掛載一個(gè) Formatter和若干個(gè) Filter,定義了要響應(yīng)日志級(jí)別和輸出方式Filter,雖然是過(guò)濾器,負(fù)責(zé)對(duì)輸入的 LogRecord 做判斷,返回 True/False 來(lái)決定掛載的 Logger 或 Handler 是否要處理當(dāng)前日志,但是也可以拿來(lái)當(dāng)做中間件來(lái)使用,可以自定義規(guī)則來(lái)改寫LogRecord,繼續(xù)傳遞給后續(xù)的Filter/Handler/LoggerFormatter,最終日志的格式化組件LogRecord,單條日志的結(jié)構(gòu)體

根據(jù)流程圖來(lái)看,主流程如下

日志打印請(qǐng)求到Logger后,第一判斷當(dāng)前Logger是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第一層級(jí)別控制)生成一條LogRecord,會(huì)把包括調(diào)用來(lái)源等信息都一起打包好,依次調(diào)用Logger掛載的Filter鏈來(lái)處理一旦有Filter類的檢測(cè)結(jié)果返回是False,則丟棄日志否則傳給Logger掛載的Handler鏈中依次處理(進(jìn)入Handler流程)如果開啟了propagate屬性,也就是“向上傳播”,會(huì)將當(dāng)前的LogRecord 傳遞給父類的Logger來(lái)進(jìn)行處理,直接從第4步開始執(zhí)行(不會(huì)觸發(fā)第一層級(jí)別控制)

Handler流程

判當(dāng)前Handler 是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第二層級(jí)別控制)將收到的LogRecord 依次調(diào)用Handler掛載的Filter鏈來(lái)處理同理,調(diào)用Formatter2. 多進(jìn)程場(chǎng)景下的Logging2.1 多進(jìn)程場(chǎng)景Logging問(wèn)題分析

Logging模塊是線程安全的,我們?cè)谥暗拇a中可以看到很多處使用到了threading.RLock(),特別是在調(diào)用info/error等方法打印日志時(shí),最終都會(huì)調(diào)用頂級(jí)父類的handle方法,也就是

# logging/__init__.py

def handle(self, record):
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

這使得在多線程環(huán)境下可以保證同一時(shí)間只有一個(gè)線程可以調(diào)用handle方法進(jìn)行寫入

然而,另一個(gè)可能被忽視的場(chǎng)景是在多進(jìn)程環(huán)境下引發(fā)的種種問(wèn)題,我們?cè)诓渴餚ython Web項(xiàng)目時(shí),通常會(huì)以多進(jìn)程的方式來(lái)啟動(dòng),這就可能導(dǎo)致以下的幾種問(wèn)題:

日志紊亂:比如兩個(gè)進(jìn)程分別輸出xxxx和yyyy兩條日志,那么在文件中可能會(huì)得到類似xxyxyxyy這樣的結(jié)果日志丟失:雖然讀寫日志是使用O_APPEND模式,保證了寫文件的一致性,但是由于buffer的存在(數(shù)據(jù)先寫入buffer,再觸發(fā)flush機(jī)制刷入磁盤),fwrite的**作并不是多進(jìn)程安全的日志丟失的另一種情況:使用RotatingFileHandler或者是TimerRotatingFileHandler的時(shí)候,在切換文件的時(shí)候會(huì)導(dǎo)致進(jìn)程拿到的文件句柄不同,導(dǎo)致新文件被重復(fù)創(chuàng)建、數(shù)據(jù)寫入舊文件2.2 多進(jìn)程場(chǎng)景Logging解決方案

為了應(yīng)對(duì)上述可能出現(xiàn)的情況,以下列舉幾種解決方案:

2.2.1 concurrent-log-handler模塊(文件鎖模式)# concurrent_log_handler/__init__.[y]

# 繼承自logging的BaseRotatingHandler
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
# 具體寫入邏輯
def emit(self, record):
try:
msg = self.format(record)
# 加鎖邏輯
try:
self._do_lock()
# 常規(guī)**作shouldRollover、doRollover做文件切分
try:
if self.shouldRollover(record):
self.doRollover()
except Exception as e:
self._console_log("Unable to do rollover: %s" % (e,), stack=True)
# Continue on anyway
self.do_write(msg)
finally:
self._do_unlock()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)

def _do_lock(self):
# 判斷是否已被鎖
if self.is_locked:
return # already locked… recursive?
# 觸發(fā)文件鎖
self._open_lockfile()
if self.stream_lock:
for i in range(10):
# noinspection PyBroadException
try:
# 調(diào)用portalocker lock的方法
lock(self.stream_lock, LOCK_EX)
self.is_locked = True
break
except Exception:
continue
else:
raise RuntimeError("Cannot acquire lock after 10 attempts")
else:
self._console_log("No self.stream_lock to lock", stack=True)

def _open_lockfile(self):
"""
改變文件權(quán)限
"""
if self.stream_lock and not self.stream_lock.closed:
self._console_log("Lockfile already open in this process")
return
lock_file = self.lockFilename
self._console_log(
"concurrent-log-handler %s opening %s" % (hash(self), lock_file), stack=False)

with self._alter_umask():
self.stream_lock = open(lock_file, "wb", buffering=0)

self._do_chown_and_chmod(lock_file)

def _do_unlock(self):
if self.stream_lock:
if self.is_locked:
try:
unlock(self.stream_lock)
finally:
self.is_locked = False
self.stream_lock.close()
self.stream_lock = None
else:
self._console_log("No self.stream_lock to unlock", stack=True)

# portalocker/portalocker.py
def lock(file_: typing.IO, flags: constants.LockFlags):
if flags & constants.LockFlags.SHARED:

else:
if flags & constants.LockFlags.NON_BLOCKING:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK

# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
"""
調(diào)用C運(yùn)行時(shí)的文件鎖機(jī)制
msvcrt.locking(fd, mode, nbytes)
Lock part of a file based on file descriptor fd from the C runtime. Raises OSError on failure. The locked region of the file extends from the current file position for nbytes bytes, and may continue beyond the end of the file. mode must be one of the LK_* constants listed below. Multiple regions in a file may be locked at the same time, but may not overlap. Adjacent regions are not merged; they must be unlocked individually.

Raises an auditing event msvcrt.locking with arguments fd, mode, nbytes.
"""

歸根到底,就是在emit方法觸發(fā)時(shí)調(diào)用了文件鎖機(jī)制,將多個(gè)進(jìn)程并發(fā)調(diào)用強(qiáng)制限制為單進(jìn)程順序調(diào)用,確保了日志寫入的準(zhǔn)確,但是在效率方面,頻繁的對(duì)文件修改權(quán)限、加鎖以及鎖搶占機(jī)制都會(huì)造成效率低下的問(wèn)題。

2.2.2 針對(duì)日志切分場(chǎng)景的復(fù)寫doRollover方法以及復(fù)寫FileHandler類

當(dāng)然,除了上述的文件加鎖方式,我們也可以自定義重寫TimeRotatingFileHandler類或者FileHandler類,加入簡(jiǎn)單的多進(jìn)程加鎖的邏輯,例如fcntl.flock

static PyObject *
fcntl_flock_impl(PyObject *module, int fd, int code)
/*[clinic end generated code: output=84059e2b37d2fc64 input=0bfc00f795953452]*/
{
int ret;
int async_err = 0;

if (PySys_Audit("fcntl.flock", "ii", fd, code) < 0) {
return NULL;
}
# 觸發(fā)linux flock命令加鎖
#ifdef HAVE_FLOCK
do {
Py_BEGIN_ALLOW_THREADS
ret = flock(fd, code);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
#else

#ifndef LOCK_SH
#define LOCK_SH 1 /* shared lock */
#define LOCK_EX 2 /* exclusive lock */
#define LOCK_NB 4 /* don't block when locking */
#define LOCK_UN 8 /* unlock */
#endif
{
struct flock l;
if (code == LOCK_UN)
l.l_type = F_UNLCK;
else if (code & LOCK_SH)
l.l_type = F_RDLCK;
else if (code & LOCK_EX)
l.l_type = F_WRLCK;
else {
PyErr_SetString(PyExc_ValueError,
"unrecognized flock argument");
return NULL;
}
l.l_whence = l.l_start = l.l_len = 0;
do {
Py_BEGIN_ALLOW_THREADS
ret = fcntl(fd, (code & LOCK_NB) ? F_SETLK : F_SETLKW, &l);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
}
#endif /* HAVE_FLOCK */
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
Py_RETURN_NONE;
}2.2.3 Master/Worker日志收集(Socket/Queue模式)

這種方式也是被官方主推的方式

Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file.(如果你需要將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件,有一個(gè)方案是讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler,第二用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件) (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.) This section documents this approach in more detail and includes a working socket receiver which can be used as a starting point for you to adapt in your own applications.

Alternatively, you can use a Queue and a QueueHandler to send all logging events to one of the processes in your multi-process application. (你也可以使用 Queue 和 QueueHandler 將所有的日志**發(fā)送至你的多進(jìn)程應(yīng)用的一個(gè)進(jìn)程中。)The following example script demonstrates how you can do this; in the example a separate listener process listens for events sent by other processes and logs them according to its own logging configuration. Although the example only demonstrates one way of doing it (for example, you may want to use a listener thread rather than a separate listener process – the implementation would be **ogous) it does allow for completely different logging configurations for the listener and the other processes in your application, and can be used as the basis for code meeting your own specific requirements:

看看QueueHandler的案例

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
"""
單獨(dú)的日志記錄線程
"""
while True:
record = q.get()
if record is None:
break
# 獲取record實(shí)例中的logger
logger = logging.getLogger(record.name)
# 調(diào)用logger的handle方法處理
logger.handle(record)

def worker_process(q):
# 日志寫入進(jìn)程
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# 綁定QueueHandler到logger
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
q = Queue()
# 省略一大推配置
workers = []
# 多進(jìn)程寫入日志到Queue
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
# 啟動(dòng)子線程負(fù)責(zé)日志收集寫入
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate…
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()

看看SocketHandler的案例

# 發(fā)送端

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
# 指定ip、端口
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root…
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

# 接收端

import pickle
import logging
import logging.handlers
import socketserver
import struct

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.

This basically logs the record using whatever logging policy is
configured locally.
"""

def handle(self):
"""
Handle multiple requests – each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
# 接收數(shù)據(jù)的流程
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen – len(chunk))
obj = self.unPickle(chunk)
# 生成LodRecord實(shí)例,調(diào)用handleLogRecord處理
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)

def unPickle(self, data):
return pickle.loads(data)

def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""

allow_reuse_address = True

def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None

def serve_until_stopped(self):
import select
abort = 0
while not abort:
# select方法接收端口數(shù)據(jù)
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
# 調(diào)用LogRecordStreamHandler的handle方法處理
self.handle_request()
abort = self.abort

def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server…')
tcpserver.serve_until_stopped()

if __name__ == '__main__':
main()

有關(guān)于 Master/Worker的方式就是上述的兩種模式,日志寫入的效率不受并發(fā)的影響,最終取決于寫入線程。此外,對(duì)于日志的寫入慢、阻塞問(wèn)題,同樣可以使用QueueHandlers以及其擴(kuò)展的QueueListener來(lái)處理。

作者:技術(shù)拆解官原文鏈接:https://juejin.cn/post/6945301448934719518

拓展知識(shí):

前沿拓展:


推薦學(xué)習(xí)22天試水Python社招,歷經(jīng)“百度+字節(jié)+天融”等6家 金三即過(guò),這300道python高頻面試都沒(méi)刷,銀四怎么闖? stream.sys(stream.sys iomanager 藍(lán)屏)

前言

Logging日志記錄框架是Python內(nèi)置打印模塊,它對(duì)很多Python開發(fā)者來(lái)說(shuō)是既熟悉又陌生,確實(shí),它使用起來(lái)很簡(jiǎn)單,只需要我們簡(jiǎn)單一行代碼,就可以打印出日志

import logging

logging.debug("This is a debug log.")
logging.info("This is a info log.")
logging.warning("This is a warning log.")
logging.error("This is a error log.")
logging.critical("This is a critical log.")

# output
WARNING:root:This is a warning log.
ERROR:root:This is a error log.
CRITICAL:root:This is a critical log.

但是對(duì)于我們?cè)趯?shí)際項(xiàng)目中需要基于Logging來(lái)開發(fā)日志框架時(shí),常常會(huì)遇到各種各樣的問(wèn)題,例如性能方面的多進(jìn)程下滾動(dòng)日志記錄丟失、日志記錄效率低的問(wèn)題以及在二次開發(fā)時(shí)因?yàn)闆](méi)有理解Logging內(nèi)部流程導(dǎo)致沒(méi)有利用到Logging的核心機(jī)制等等。

接下來(lái),我們就從一行代碼來(lái)深挖Logging System的秘密

1. 深入Logging模塊源碼1.1 logging.info做了什么?

logging.info作為大多數(shù)人使用logging模塊的第一行代碼,隱藏了很多實(shí)現(xiàn)細(xì)節(jié),以至于我們?cè)谧畛醵贾魂P(guān)注功能,而忽略了它的內(nèi)部細(xì)節(jié)

import logging
logging.info("This is a info log.")

# logging/__init__.py
def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO' on the root logger. If the logger has
no handlers, call basicConfig() to add a console handler with a pre-defined
format.
"""
# 沒(méi)有綁定handlers就調(diào)用basicConfig方法
if len(root.handlers) == 0:
basicConfig()
root.info(msg, *args, **kwargs)

_lock = threading.RLock()

def _acquireLock():
"""
Acquire the module-level lock for serializing access to shared data.

This should be released with _releaseLock().
"""
if _lock:
_lock.acquire()

def _releaseLock():
"""
Release the module-level lock acquired by calling _acquireLock().
"""
if _lock:
_lock.release()

def basicConfig(**kwargs):
"""
很方便的一步到位的配置方法創(chuàng)建一個(gè)StreamHandler打印日志到控制臺(tái)
This function does nothing if the root logger already has handlers
configured. It is a convenience method intended for use by simple scripts
to do one-shot configuration of the logging package.

The default behaviour is to create a StreamHandler which writes to
sys.stderr, set a formatter using the BASIC_FORMAT format string, and
add the handler to the root logger.
"""
# Add thread safety in case someone mistakenly calls
# basicConfig() from multiple threads
# 為了確保多線程安全的寫日志**作,做了加鎖處理(如上,標(biāo)準(zhǔn)的多線程鎖**作)
_acquireLock()
try:
if len(root.handlers) == 0:
# 對(duì)于handler的處理,有filename就新建FileHandler,沒(méi)有就選擇StreamHandler
handlers = kwargs.pop("handlers", None)
if handlers is None:
if "stream" in kwargs and "filename" in kwargs:
raise ValueError("'stream' and 'filename' should not be "
"specified together")
else:
if "stream" in kwargs or "filename" in kwargs:
raise ValueError("'stream' or 'filename' should not be "
"specified together with 'handlers'")
if handlers is None:
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
h = FileHandler(filename, mode)
else:
stream = kwargs.pop("stream", None)
h = StreamHandler(stream)
handlers = [h]
dfs = kwargs.pop("datefmt", None)
style = kwargs.pop("style", '%')
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
fs = kwargs.pop("format", _STYLES[style][1])
fmt = Formatter(fs, dfs, style)
# 綁定handler到root
for h in handlers:
if h.formatter is None:
h.setFormatter(fmt)
root.addHandler(h)
level = kwargs.pop("level", None)
if level is not None:
root.setLevel(level)
if kwargs:
keys = ', '.join(kwargs.keys())
raise ValueError('Unrecognised argument(s): %s' % keys)
finally:
_releaseLock()

到目前為止,可以看到logging.info通過(guò)調(diào)用basicConfig()來(lái)完成初始化handler之后才開始正式打印,而basicConfig()的邏輯是通過(guò)多線程鎖狀態(tài)下的一個(gè)初始化handler->綁定root的**作,那這個(gè)root代表了什么呢?

# logging/__init__.py

root = RootLogger(WARNING)
Logger.root = root
Logger.manager = Manager(Logger.root)

class RootLogger(Logger):
"""
A root logger is not that different to any other logger, except that
it must have a logging level and there is only one instance of it in
the hierarchy.
"""
def __init__(self, level):
"""
Initialize the logger with the name "root".
"""
# 調(diào)用父類的初始化,傳入了root和WARNING兩個(gè)參數(shù),所以我們直接調(diào)用logging.info時(shí)是不能輸出任何信息的,因?yàn)閘ogger level初始化時(shí)就被指定成了WARNING
Logger.__init__(self, "root", level)

def __reduce__(self):
return getLogger, ()

class Logger(Filterer):
"""
Instances of the Logger class represent a single logging channel. A
"logging channel" indicates an area of an application. Exactly how an
"area" is defined is up to the application developer. Since an
application can have any number of areas, logging channels are identified
by a unique string. Application areas can be nested (e.g. an area
of "input processing" might include sub-areas "read CSV files", "read
XLS files" and "read Gnumeric files"). To cater for this natural nesting,
channel names are organized into a namespace hierarchy where levels are
separated by periods, much like the Java or Python package namespace. So
in the instance given above, channel names might be "input" for the upper
level, and "input.csv", "input.xls" and "input.gnu" for the sub-levels.
There is no arbitrary limit to the depth of nesting.
"""
def __init__(self, name, level=NOTSET):
"""
Initialize the logger with a name and an optional level.
"""
Filterer.__init__(self)
self.name = name # 指定name,也就是之前的root
self.level = _checkLevel(level) # 指定logger level
self.parent = None
self.propagate = True
self.handlers = []
self.disabled = False
self._cache = {}

def info(self, msg, *args, **kwargs):
"""
Log 'msg % args' with severity 'INFO'.

To pass exception information, use the keyword argument exc_info with
a true value, e.g.

logger.info("Houston, we have a %s", "interesting problem", exc_info=1)
"""
# 這步判斷是為了判斷是否該logger level可以被輸出
if self.isEnabledFor(INFO):
self._log(INFO, msg, args, **kwargs)

def getEffectiveLevel(self):
"""
Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy,
looking for a non-zero logging level. Return the first one found.
"""
# 尋找父級(jí)logger level
logger = self
while logger:
if logger.level:
return logger.level
logger = logger.parent
return NOTSET

def isEnabledFor(self, level):
"""
Is this logger enabled for level 'level'?
"""
try:
return self._cache[level]
except KeyError:
# 又出現(xiàn)的加鎖**作
_acquireLock()
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = level >= self.getEffectiveLevel()
_releaseLock()

return is_enabled

root是RootLogger的實(shí)例,RootLogger繼承于Logger,通過(guò)_log來(lái)統(tǒng)一記錄日志,而在_log之前會(huì)有個(gè)logger level的判斷

# 用來(lái)獲取堆棧中的第一個(gè)調(diào)用者
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame, by skipping frames whose filename is that of this
# module's source. It therefore should contain the filename of this module's
# source file.
#
# Ordinarily we would use __file__ for this, but frozen modules don't always
# have __file__ set, for some reason (see Issue #21736). Thus, we get the
# filename from a handy code object from a function defined in this module.
# (There's no particular reason for picking addLevelName.)
#

_srcfile = os.path.normcase(addLevelName.__code__.co_filename)

# _srcfile is only used in conjunction with sys._getframe().
# To provide compatibility with older versions of Python, set _srcfile
# to None if _getframe() is not available; this value will prevent
# findCaller() from being called. You can also do this if you want to avoid
# the overhead of fetching caller information, even when _getframe() is
# available.
#if not hasattr(sys, '_getframe'):
# _srcfile = None

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False):
"""
Low-level logging routine which creates a LogRecord and then calls
all the handlers of this logger to handle the record.
"""
sinfo = None
if _srcfile:
#IronPython doesn't track Python frames, so findCaller raises an
#exception on some versions of IronPython. We trap it here so that
#IronPython can use logging.
try:
fn, lno, func, sinfo = self.findCaller(stack_info)
except ValueError: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
# 通過(guò)makeRecord生成一條日志記錄
record = self.makeRecord(self.name, level, fn, lno, msg, args,
exc_info, func, extra, sinfo)
# 處理
self.handle(record)

def findCaller(self, stack_info=False):
"""
通過(guò)調(diào)用堆棧獲取文件名、行數(shù)和調(diào)用者
Find the stack frame of the caller so that we can note the source
file name, line number and function name.
"""
f = currentframe()
#On some versions of IronPython, currentframe() returns None if
#IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == _srcfile:
f = f.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == 'n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
break
return rv

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None):
"""
A factory method which can be overridden in subclasses to create
specialized LogRecords.
"""
# 生成_logRecordFactory的實(shí)例
rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
sinfo)
if extra is not None:
for key in extra:
if (key in ["message", "asctime"]) or (key in rv.__dict__):
raise KeyError("Attempt to overwrite %r in LogRecord" % key)
rv.__dict__[key] = extra[key]
return rv

def handle(self, record):
"""
Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as
well as those created locally. Logger-level filtering is applied.
"""
if (not self.disabled) and self.filter(record):
self.callHandlers(record)

def callHandlers(self, record):
"""
Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the
logger hierarchy. If no handler was found, output a one-off error
message to sys.stderr. Stop searching up the hierarchy whenever a
logger with the "propagate" attribute set to zero is found – that
will be the last logger whose handlers are called.
"""
c = self
found = 0
# 輪詢自身綁定的handlers,并調(diào)用handle方法來(lái)處理該record實(shí)例,這里有點(diǎn)類似于import的流程,調(diào)用sys.meta_path的importer來(lái)處理path,也是同樣的道理,這里我們回憶之前的rootLogger,我們調(diào)用baseConfig來(lái)初始化使FileHandler綁定到root,第二調(diào)用root.info,最終就來(lái)到了這個(gè)方法,調(diào)用到了FileHandler.handle方法來(lái)處理
while c:
for hdlr in c.handlers:
found = found + 1
if record.levelno >= hdlr.level:
hdlr.handle(record)
if not c.propagate:
c = None #break out
else:
c = c.parent
if (found == 0):
if lastResort:
if record.levelno >= lastResort.level:
lastResort.handle(record)
elif raiseExceptions and not self.manager.emittedNoHandlerWarning:
sys.stderr.write("No handlers could be found for logger"
" "%s"n" % self.name)
self.manager.emittedNoHandlerWarning = True

繼續(xù)來(lái)看FileHandler

class FileHandler(StreamHandler):
def _open(self):
"""
Open the current base file with the (original) mode and encoding.
Return the resulting stream.
"""
return open(self.baseFilename, self.mode, encoding=self.encoding)

def emit(self, record):
"""
Emit a record.

If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
# 打開文件句柄(文件流)
if self.stream is None:
self.stream = self._open()
# 調(diào)用StreamHandler的emit方法
StreamHandler.emit(self, record)

class StreamHandler(Handler):
def flush(self):
"""
Flushes the stream.
"""
# 同樣時(shí)加鎖刷入
self.acquire()
try:
if self.stream and hasattr(self.stream, "flush"):
self.stream.flush()
finally:
self.release()

def emit(self, record):
"""
Emit a record.

If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline. If
exception information is present, it is formatted using
traceback.print_exception and appended to the stream. If the stream
has an 'encoding' attribute, it is used to determine how to do the
output to the stream.
"""
try:
msg = self.format(record)
stream = self.stream
# issue 35046: merged two stream.writes into one.
# 寫入流的緩沖區(qū),執(zhí)行flush
stream.write(msg + self.terminator)
self.flush()
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)

class Handler(Filterer):
# FileHandler的handle方法來(lái)源于祖父輩的Handler
def filter(self, record):
"""
Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.

.. versionchanged:: 3.2

Allow filters to be just callables.
"""
rv = True
for f in self.filters:
if hasattr(f, 'filter'):
result = f.filter(record)
else:
result = f(record) # assume callable – will raise if not
if not result:
rv = False
break
return rv

def handle(self, record):
"""
Conditionally emit the specified logging record.

Emission depends on filters which may have been added to the handler.
Wrap the actual emission of the record with acquisition/release of
the I/O thread lock. Returns whether the filter passed the record for
emission.
"""
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

我們跟蹤完了整個(gè)logging.info背后的流程,下面我們結(jié)合上面的代碼以及官方的Logging Flow來(lái)梳理logging的流程框架

1.2 Logging 流程框架stream.sys(stream.sys iomanager 藍(lán)屏)

源碼在 python 自己的 lib/logging/ 下,主要內(nèi)容都在 __init__.py 里,根據(jù)流程圖以及我們之前分析的源碼來(lái)理解下面四個(gè)組件以及日志結(jié)構(gòu)體的定義

Logger,核心組件,可以掛載若干個(gè) Handler以及若干個(gè) Filter,定義要響應(yīng)的命名空間和日志級(jí)別Handler,可以掛載一個(gè) Formatter和若干個(gè) Filter,定義了要響應(yīng)日志級(jí)別和輸出方式Filter,雖然是過(guò)濾器,負(fù)責(zé)對(duì)輸入的 LogRecord 做判斷,返回 True/False 來(lái)決定掛載的 Logger 或 Handler 是否要處理當(dāng)前日志,但是也可以拿來(lái)當(dāng)做中間件來(lái)使用,可以自定義規(guī)則來(lái)改寫LogRecord,繼續(xù)傳遞給后續(xù)的Filter/Handler/LoggerFormatter,最終日志的格式化組件LogRecord,單條日志的結(jié)構(gòu)體

根據(jù)流程圖來(lái)看,主流程如下

日志打印請(qǐng)求到Logger后,第一判斷當(dāng)前Logger是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第一層級(jí)別控制)生成一條LogRecord,會(huì)把包括調(diào)用來(lái)源等信息都一起打包好,依次調(diào)用Logger掛載的Filter鏈來(lái)處理一旦有Filter類的檢測(cè)結(jié)果返回是False,則丟棄日志否則傳給Logger掛載的Handler鏈中依次處理(進(jìn)入Handler流程)如果開啟了propagate屬性,也就是“向上傳播”,會(huì)將當(dāng)前的LogRecord 傳遞給父類的Logger來(lái)進(jìn)行處理,直接從第4步開始執(zhí)行(不會(huì)觸發(fā)第一層級(jí)別控制)

Handler流程

判當(dāng)前Handler 是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第二層級(jí)別控制)將收到的LogRecord 依次調(diào)用Handler掛載的Filter鏈來(lái)處理同理,調(diào)用Formatter2. 多進(jìn)程場(chǎng)景下的Logging2.1 多進(jìn)程場(chǎng)景Logging問(wèn)題分析

Logging模塊是線程安全的,我們?cè)谥暗拇a中可以看到很多處使用到了threading.RLock(),特別是在調(diào)用info/error等方法打印日志時(shí),最終都會(huì)調(diào)用頂級(jí)父類的handle方法,也就是

# logging/__init__.py

def handle(self, record):
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

這使得在多線程環(huán)境下可以保證同一時(shí)間只有一個(gè)線程可以調(diào)用handle方法進(jìn)行寫入

然而,另一個(gè)可能被忽視的場(chǎng)景是在多進(jìn)程環(huán)境下引發(fā)的種種問(wèn)題,我們?cè)诓渴餚ython Web項(xiàng)目時(shí),通常會(huì)以多進(jìn)程的方式來(lái)啟動(dòng),這就可能導(dǎo)致以下的幾種問(wèn)題:

日志紊亂:比如兩個(gè)進(jìn)程分別輸出xxxx和yyyy兩條日志,那么在文件中可能會(huì)得到類似xxyxyxyy這樣的結(jié)果日志丟失:雖然讀寫日志是使用O_APPEND模式,保證了寫文件的一致性,但是由于buffer的存在(數(shù)據(jù)先寫入buffer,再觸發(fā)flush機(jī)制刷入磁盤),fwrite的**作并不是多進(jìn)程安全的日志丟失的另一種情況:使用RotatingFileHandler或者是TimerRotatingFileHandler的時(shí)候,在切換文件的時(shí)候會(huì)導(dǎo)致進(jìn)程拿到的文件句柄不同,導(dǎo)致新文件被重復(fù)創(chuàng)建、數(shù)據(jù)寫入舊文件2.2 多進(jìn)程場(chǎng)景Logging解決方案

為了應(yīng)對(duì)上述可能出現(xiàn)的情況,以下列舉幾種解決方案:

2.2.1 concurrent-log-handler模塊(文件鎖模式)# concurrent_log_handler/__init__.[y]

# 繼承自logging的BaseRotatingHandler
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
# 具體寫入邏輯
def emit(self, record):
try:
msg = self.format(record)
# 加鎖邏輯
try:
self._do_lock()
# 常規(guī)**作shouldRollover、doRollover做文件切分
try:
if self.shouldRollover(record):
self.doRollover()
except Exception as e:
self._console_log("Unable to do rollover: %s" % (e,), stack=True)
# Continue on anyway
self.do_write(msg)
finally:
self._do_unlock()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)

def _do_lock(self):
# 判斷是否已被鎖
if self.is_locked:
return # already locked… recursive?
# 觸發(fā)文件鎖
self._open_lockfile()
if self.stream_lock:
for i in range(10):
# noinspection PyBroadException
try:
# 調(diào)用portalocker lock的方法
lock(self.stream_lock, LOCK_EX)
self.is_locked = True
break
except Exception:
continue
else:
raise RuntimeError("Cannot acquire lock after 10 attempts")
else:
self._console_log("No self.stream_lock to lock", stack=True)

def _open_lockfile(self):
"""
改變文件權(quán)限
"""
if self.stream_lock and not self.stream_lock.closed:
self._console_log("Lockfile already open in this process")
return
lock_file = self.lockFilename
self._console_log(
"concurrent-log-handler %s opening %s" % (hash(self), lock_file), stack=False)

with self._alter_umask():
self.stream_lock = open(lock_file, "wb", buffering=0)

self._do_chown_and_chmod(lock_file)

def _do_unlock(self):
if self.stream_lock:
if self.is_locked:
try:
unlock(self.stream_lock)
finally:
self.is_locked = False
self.stream_lock.close()
self.stream_lock = None
else:
self._console_log("No self.stream_lock to unlock", stack=True)

# portalocker/portalocker.py
def lock(file_: typing.IO, flags: constants.LockFlags):
if flags & constants.LockFlags.SHARED:

else:
if flags & constants.LockFlags.NON_BLOCKING:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK

# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
"""
調(diào)用C運(yùn)行時(shí)的文件鎖機(jī)制
msvcrt.locking(fd, mode, nbytes)
Lock part of a file based on file descriptor fd from the C runtime. Raises OSError on failure. The locked region of the file extends from the current file position for nbytes bytes, and may continue beyond the end of the file. mode must be one of the LK_* constants listed below. Multiple regions in a file may be locked at the same time, but may not overlap. Adjacent regions are not merged; they must be unlocked individually.

Raises an auditing event msvcrt.locking with arguments fd, mode, nbytes.
"""

歸根到底,就是在emit方法觸發(fā)時(shí)調(diào)用了文件鎖機(jī)制,將多個(gè)進(jìn)程并發(fā)調(diào)用強(qiáng)制限制為單進(jìn)程順序調(diào)用,確保了日志寫入的準(zhǔn)確,但是在效率方面,頻繁的對(duì)文件修改權(quán)限、加鎖以及鎖搶占機(jī)制都會(huì)造成效率低下的問(wèn)題。

2.2.2 針對(duì)日志切分場(chǎng)景的復(fù)寫doRollover方法以及復(fù)寫FileHandler類

當(dāng)然,除了上述的文件加鎖方式,我們也可以自定義重寫TimeRotatingFileHandler類或者FileHandler類,加入簡(jiǎn)單的多進(jìn)程加鎖的邏輯,例如fcntl.flock

static PyObject *
fcntl_flock_impl(PyObject *module, int fd, int code)
/*[clinic end generated code: output=84059e2b37d2fc64 input=0bfc00f795953452]*/
{
int ret;
int async_err = 0;

if (PySys_Audit("fcntl.flock", "ii", fd, code) < 0) {
return NULL;
}
# 觸發(fā)linux flock命令加鎖
#ifdef HAVE_FLOCK
do {
Py_BEGIN_ALLOW_THREADS
ret = flock(fd, code);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
#else

#ifndef LOCK_SH
#define LOCK_SH 1 /* shared lock */
#define LOCK_EX 2 /* exclusive lock */
#define LOCK_NB 4 /* don't block when locking */
#define LOCK_UN 8 /* unlock */
#endif
{
struct flock l;
if (code == LOCK_UN)
l.l_type = F_UNLCK;
else if (code & LOCK_SH)
l.l_type = F_RDLCK;
else if (code & LOCK_EX)
l.l_type = F_WRLCK;
else {
PyErr_SetString(PyExc_ValueError,
"unrecognized flock argument");
return NULL;
}
l.l_whence = l.l_start = l.l_len = 0;
do {
Py_BEGIN_ALLOW_THREADS
ret = fcntl(fd, (code & LOCK_NB) ? F_SETLK : F_SETLKW, &l);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
}
#endif /* HAVE_FLOCK */
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
Py_RETURN_NONE;
}2.2.3 Master/Worker日志收集(Socket/Queue模式)

這種方式也是被官方主推的方式

Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file.(如果你需要將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件,有一個(gè)方案是讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler,第二用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件) (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.) This section documents this approach in more detail and includes a working socket receiver which can be used as a starting point for you to adapt in your own applications.

Alternatively, you can use a Queue and a QueueHandler to send all logging events to one of the processes in your multi-process application. (你也可以使用 Queue 和 QueueHandler 將所有的日志**發(fā)送至你的多進(jìn)程應(yīng)用的一個(gè)進(jìn)程中。)The following example script demonstrates how you can do this; in the example a separate listener process listens for events sent by other processes and logs them according to its own logging configuration. Although the example only demonstrates one way of doing it (for example, you may want to use a listener thread rather than a separate listener process – the implementation would be **ogous) it does allow for completely different logging configurations for the listener and the other processes in your application, and can be used as the basis for code meeting your own specific requirements:

看看QueueHandler的案例

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
"""
單獨(dú)的日志記錄線程
"""
while True:
record = q.get()
if record is None:
break
# 獲取record實(shí)例中的logger
logger = logging.getLogger(record.name)
# 調(diào)用logger的handle方法處理
logger.handle(record)

def worker_process(q):
# 日志寫入進(jìn)程
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# 綁定QueueHandler到logger
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
q = Queue()
# 省略一大推配置
workers = []
# 多進(jìn)程寫入日志到Queue
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
# 啟動(dòng)子線程負(fù)責(zé)日志收集寫入
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate…
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()

看看SocketHandler的案例

# 發(fā)送端

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
# 指定ip、端口
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root…
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

# 接收端

import pickle
import logging
import logging.handlers
import socketserver
import struct

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.

This basically logs the record using whatever logging policy is
configured locally.
"""

def handle(self):
"""
Handle multiple requests – each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
# 接收數(shù)據(jù)的流程
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen – len(chunk))
obj = self.unPickle(chunk)
# 生成LodRecord實(shí)例,調(diào)用handleLogRecord處理
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)

def unPickle(self, data):
return pickle.loads(data)

def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""

allow_reuse_address = True

def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None

def serve_until_stopped(self):
import select
abort = 0
while not abort:
# select方法接收端口數(shù)據(jù)
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
# 調(diào)用LogRecordStreamHandler的handle方法處理
self.handle_request()
abort = self.abort

def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server…')
tcpserver.serve_until_stopped()

if __name__ == '__main__':
main()

有關(guān)于 Master/Worker的方式就是上述的兩種模式,日志寫入的效率不受并發(fā)的影響,最終取決于寫入線程。此外,對(duì)于日志的寫入慢、阻塞問(wèn)題,同樣可以使用QueueHandlers以及其擴(kuò)展的QueueListener來(lái)處理。

作者:技術(shù)拆解官原文鏈接:https://juejin.cn/post/6945301448934719518

拓展知識(shí):

前沿拓展:


推薦學(xué)習(xí)22天試水Python社招,歷經(jīng)“百度+字節(jié)+天融”等6家 金三即過(guò),這300道python高頻面試都沒(méi)刷,銀四怎么闖? stream.sys(stream.sys iomanager 藍(lán)屏)

前言

Logging日志記錄框架是Python內(nèi)置打印模塊,它對(duì)很多Python開發(fā)者來(lái)說(shuō)是既熟悉又陌生,確實(shí),它使用起來(lái)很簡(jiǎn)單,只需要我們簡(jiǎn)單一行代碼,就可以打印出日志

import logging

logging.debug("This is a debug log.")
logging.info("This is a info log.")
logging.warning("This is a warning log.")
logging.error("This is a error log.")
logging.critical("This is a critical log.")

# output
WARNING:root:This is a warning log.
ERROR:root:This is a error log.
CRITICAL:root:This is a critical log.

但是對(duì)于我們?cè)趯?shí)際項(xiàng)目中需要基于Logging來(lái)開發(fā)日志框架時(shí),常常會(huì)遇到各種各樣的問(wèn)題,例如性能方面的多進(jìn)程下滾動(dòng)日志記錄丟失、日志記錄效率低的問(wèn)題以及在二次開發(fā)時(shí)因?yàn)闆](méi)有理解Logging內(nèi)部流程導(dǎo)致沒(méi)有利用到Logging的核心機(jī)制等等。

接下來(lái),我們就從一行代碼來(lái)深挖Logging System的秘密

1. 深入Logging模塊源碼1.1 logging.info做了什么?

logging.info作為大多數(shù)人使用logging模塊的第一行代碼,隱藏了很多實(shí)現(xiàn)細(xì)節(jié),以至于我們?cè)谧畛醵贾魂P(guān)注功能,而忽略了它的內(nèi)部細(xì)節(jié)

import logging
logging.info("This is a info log.")

# logging/__init__.py
def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO' on the root logger. If the logger has
no handlers, call basicConfig() to add a console handler with a pre-defined
format.
"""
# 沒(méi)有綁定handlers就調(diào)用basicConfig方法
if len(root.handlers) == 0:
basicConfig()
root.info(msg, *args, **kwargs)

_lock = threading.RLock()

def _acquireLock():
"""
Acquire the module-level lock for serializing access to shared data.

This should be released with _releaseLock().
"""
if _lock:
_lock.acquire()

def _releaseLock():
"""
Release the module-level lock acquired by calling _acquireLock().
"""
if _lock:
_lock.release()

def basicConfig(**kwargs):
"""
很方便的一步到位的配置方法創(chuàng)建一個(gè)StreamHandler打印日志到控制臺(tái)
This function does nothing if the root logger already has handlers
configured. It is a convenience method intended for use by simple scripts
to do one-shot configuration of the logging package.

The default behaviour is to create a StreamHandler which writes to
sys.stderr, set a formatter using the BASIC_FORMAT format string, and
add the handler to the root logger.
"""
# Add thread safety in case someone mistakenly calls
# basicConfig() from multiple threads
# 為了確保多線程安全的寫日志**作,做了加鎖處理(如上,標(biāo)準(zhǔn)的多線程鎖**作)
_acquireLock()
try:
if len(root.handlers) == 0:
# 對(duì)于handler的處理,有filename就新建FileHandler,沒(méi)有就選擇StreamHandler
handlers = kwargs.pop("handlers", None)
if handlers is None:
if "stream" in kwargs and "filename" in kwargs:
raise ValueError("'stream' and 'filename' should not be "
"specified together")
else:
if "stream" in kwargs or "filename" in kwargs:
raise ValueError("'stream' or 'filename' should not be "
"specified together with 'handlers'")
if handlers is None:
filename = kwargs.pop("filename", None)
mode = kwargs.pop("filemode", 'a')
if filename:
h = FileHandler(filename, mode)
else:
stream = kwargs.pop("stream", None)
h = StreamHandler(stream)
handlers = [h]
dfs = kwargs.pop("datefmt", None)
style = kwargs.pop("style", '%')
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
fs = kwargs.pop("format", _STYLES[style][1])
fmt = Formatter(fs, dfs, style)
# 綁定handler到root
for h in handlers:
if h.formatter is None:
h.setFormatter(fmt)
root.addHandler(h)
level = kwargs.pop("level", None)
if level is not None:
root.setLevel(level)
if kwargs:
keys = ', '.join(kwargs.keys())
raise ValueError('Unrecognised argument(s): %s' % keys)
finally:
_releaseLock()

到目前為止,可以看到logging.info通過(guò)調(diào)用basicConfig()來(lái)完成初始化handler之后才開始正式打印,而basicConfig()的邏輯是通過(guò)多線程鎖狀態(tài)下的一個(gè)初始化handler->綁定root的**作,那這個(gè)root代表了什么呢?

# logging/__init__.py

root = RootLogger(WARNING)
Logger.root = root
Logger.manager = Manager(Logger.root)

class RootLogger(Logger):
"""
A root logger is not that different to any other logger, except that
it must have a logging level and there is only one instance of it in
the hierarchy.
"""
def __init__(self, level):
"""
Initialize the logger with the name "root".
"""
# 調(diào)用父類的初始化,傳入了root和WARNING兩個(gè)參數(shù),所以我們直接調(diào)用logging.info時(shí)是不能輸出任何信息的,因?yàn)閘ogger level初始化時(shí)就被指定成了WARNING
Logger.__init__(self, "root", level)

def __reduce__(self):
return getLogger, ()

class Logger(Filterer):
"""
Instances of the Logger class represent a single logging channel. A
"logging channel" indicates an area of an application. Exactly how an
"area" is defined is up to the application developer. Since an
application can have any number of areas, logging channels are identified
by a unique string. Application areas can be nested (e.g. an area
of "input processing" might include sub-areas "read CSV files", "read
XLS files" and "read Gnumeric files"). To cater for this natural nesting,
channel names are organized into a namespace hierarchy where levels are
separated by periods, much like the Java or Python package namespace. So
in the instance given above, channel names might be "input" for the upper
level, and "input.csv", "input.xls" and "input.gnu" for the sub-levels.
There is no arbitrary limit to the depth of nesting.
"""
def __init__(self, name, level=NOTSET):
"""
Initialize the logger with a name and an optional level.
"""
Filterer.__init__(self)
self.name = name # 指定name,也就是之前的root
self.level = _checkLevel(level) # 指定logger level
self.parent = None
self.propagate = True
self.handlers = []
self.disabled = False
self._cache = {}

def info(self, msg, *args, **kwargs):
"""
Log 'msg % args' with severity 'INFO'.

To pass exception information, use the keyword argument exc_info with
a true value, e.g.

logger.info("Houston, we have a %s", "interesting problem", exc_info=1)
"""
# 這步判斷是為了判斷是否該logger level可以被輸出
if self.isEnabledFor(INFO):
self._log(INFO, msg, args, **kwargs)

def getEffectiveLevel(self):
"""
Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy,
looking for a non-zero logging level. Return the first one found.
"""
# 尋找父級(jí)logger level
logger = self
while logger:
if logger.level:
return logger.level
logger = logger.parent
return NOTSET

def isEnabledFor(self, level):
"""
Is this logger enabled for level 'level'?
"""
try:
return self._cache[level]
except KeyError:
# 又出現(xiàn)的加鎖**作
_acquireLock()
if self.manager.disable >= level:
is_enabled = self._cache[level] = False
else:
is_enabled = self._cache[level] = level >= self.getEffectiveLevel()
_releaseLock()

return is_enabled

root是RootLogger的實(shí)例,RootLogger繼承于Logger,通過(guò)_log來(lái)統(tǒng)一記錄日志,而在_log之前會(huì)有個(gè)logger level的判斷

# 用來(lái)獲取堆棧中的第一個(gè)調(diào)用者
# _srcfile is used when walking the stack to check when we've got the first
# caller stack frame, by skipping frames whose filename is that of this
# module's source. It therefore should contain the filename of this module's
# source file.
#
# Ordinarily we would use __file__ for this, but frozen modules don't always
# have __file__ set, for some reason (see Issue #21736). Thus, we get the
# filename from a handy code object from a function defined in this module.
# (There's no particular reason for picking addLevelName.)
#

_srcfile = os.path.normcase(addLevelName.__code__.co_filename)

# _srcfile is only used in conjunction with sys._getframe().
# To provide compatibility with older versions of Python, set _srcfile
# to None if _getframe() is not available; this value will prevent
# findCaller() from being called. You can also do this if you want to avoid
# the overhead of fetching caller information, even when _getframe() is
# available.
#if not hasattr(sys, '_getframe'):
# _srcfile = None

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False):
"""
Low-level logging routine which creates a LogRecord and then calls
all the handlers of this logger to handle the record.
"""
sinfo = None
if _srcfile:
#IronPython doesn't track Python frames, so findCaller raises an
#exception on some versions of IronPython. We trap it here so that
#IronPython can use logging.
try:
fn, lno, func, sinfo = self.findCaller(stack_info)
except ValueError: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
else: # pragma: no cover
fn, lno, func = "(unknown file)", 0, "(unknown function)"
if exc_info:
if isinstance(exc_info, BaseException):
exc_info = (type(exc_info), exc_info, exc_info.__traceback__)
elif not isinstance(exc_info, tuple):
exc_info = sys.exc_info()
# 通過(guò)makeRecord生成一條日志記錄
record = self.makeRecord(self.name, level, fn, lno, msg, args,
exc_info, func, extra, sinfo)
# 處理
self.handle(record)

def findCaller(self, stack_info=False):
"""
通過(guò)調(diào)用堆棧獲取文件名、行數(shù)和調(diào)用者
Find the stack frame of the caller so that we can note the source
file name, line number and function name.
"""
f = currentframe()
#On some versions of IronPython, currentframe() returns None if
#IronPython isn't run with -X:Frames.
if f is not None:
f = f.f_back
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(f, "f_code"):
co = f.f_code
filename = os.path.normcase(co.co_filename)
if filename == _srcfile:
f = f.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write('Stack (most recent call last):n')
traceback.print_stack(f, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == 'n':
sinfo = sinfo[:-1]
sio.close()
rv = (co.co_filename, f.f_lineno, co.co_name, sinfo)
break
return rv

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
func=None, extra=None, sinfo=None):
"""
A factory method which can be overridden in subclasses to create
specialized LogRecords.
"""
# 生成_logRecordFactory的實(shí)例
rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
sinfo)
if extra is not None:
for key in extra:
if (key in ["message", "asctime"]) or (key in rv.__dict__):
raise KeyError("Attempt to overwrite %r in LogRecord" % key)
rv.__dict__[key] = extra[key]
return rv

def handle(self, record):
"""
Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as
well as those created locally. Logger-level filtering is applied.
"""
if (not self.disabled) and self.filter(record):
self.callHandlers(record)

def callHandlers(self, record):
"""
Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the
logger hierarchy. If no handler was found, output a one-off error
message to sys.stderr. Stop searching up the hierarchy whenever a
logger with the "propagate" attribute set to zero is found – that
will be the last logger whose handlers are called.
"""
c = self
found = 0
# 輪詢自身綁定的handlers,并調(diào)用handle方法來(lái)處理該record實(shí)例,這里有點(diǎn)類似于import的流程,調(diào)用sys.meta_path的importer來(lái)處理path,也是同樣的道理,這里我們回憶之前的rootLogger,我們調(diào)用baseConfig來(lái)初始化使FileHandler綁定到root,第二調(diào)用root.info,最終就來(lái)到了這個(gè)方法,調(diào)用到了FileHandler.handle方法來(lái)處理
while c:
for hdlr in c.handlers:
found = found + 1
if record.levelno >= hdlr.level:
hdlr.handle(record)
if not c.propagate:
c = None #break out
else:
c = c.parent
if (found == 0):
if lastResort:
if record.levelno >= lastResort.level:
lastResort.handle(record)
elif raiseExceptions and not self.manager.emittedNoHandlerWarning:
sys.stderr.write("No handlers could be found for logger"
" "%s"n" % self.name)
self.manager.emittedNoHandlerWarning = True

繼續(xù)來(lái)看FileHandler

class FileHandler(StreamHandler):
def _open(self):
"""
Open the current base file with the (original) mode and encoding.
Return the resulting stream.
"""
return open(self.baseFilename, self.mode, encoding=self.encoding)

def emit(self, record):
"""
Emit a record.

If the stream was not opened because 'delay' was specified in the
constructor, open it before calling the superclass's emit.
"""
# 打開文件句柄(文件流)
if self.stream is None:
self.stream = self._open()
# 調(diào)用StreamHandler的emit方法
StreamHandler.emit(self, record)

class StreamHandler(Handler):
def flush(self):
"""
Flushes the stream.
"""
# 同樣時(shí)加鎖刷入
self.acquire()
try:
if self.stream and hasattr(self.stream, "flush"):
self.stream.flush()
finally:
self.release()

def emit(self, record):
"""
Emit a record.

If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline. If
exception information is present, it is formatted using
traceback.print_exception and appended to the stream. If the stream
has an 'encoding' attribute, it is used to determine how to do the
output to the stream.
"""
try:
msg = self.format(record)
stream = self.stream
# issue 35046: merged two stream.writes into one.
# 寫入流的緩沖區(qū),執(zhí)行flush
stream.write(msg + self.terminator)
self.flush()
except RecursionError: # See issue 36272
raise
except Exception:
self.handleError(record)

class Handler(Filterer):
# FileHandler的handle方法來(lái)源于祖父輩的Handler
def filter(self, record):
"""
Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto
this and the record is then dropped. Returns a zero value if a record
is to be dropped, else non-zero.

.. versionchanged:: 3.2

Allow filters to be just callables.
"""
rv = True
for f in self.filters:
if hasattr(f, 'filter'):
result = f.filter(record)
else:
result = f(record) # assume callable – will raise if not
if not result:
rv = False
break
return rv

def handle(self, record):
"""
Conditionally emit the specified logging record.

Emission depends on filters which may have been added to the handler.
Wrap the actual emission of the record with acquisition/release of
the I/O thread lock. Returns whether the filter passed the record for
emission.
"""
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

我們跟蹤完了整個(gè)logging.info背后的流程,下面我們結(jié)合上面的代碼以及官方的Logging Flow來(lái)梳理logging的流程框架

1.2 Logging 流程框架stream.sys(stream.sys iomanager 藍(lán)屏)

源碼在 python 自己的 lib/logging/ 下,主要內(nèi)容都在 __init__.py 里,根據(jù)流程圖以及我們之前分析的源碼來(lái)理解下面四個(gè)組件以及日志結(jié)構(gòu)體的定義

Logger,核心組件,可以掛載若干個(gè) Handler以及若干個(gè) Filter,定義要響應(yīng)的命名空間和日志級(jí)別Handler,可以掛載一個(gè) Formatter和若干個(gè) Filter,定義了要響應(yīng)日志級(jí)別和輸出方式Filter,雖然是過(guò)濾器,負(fù)責(zé)對(duì)輸入的 LogRecord 做判斷,返回 True/False 來(lái)決定掛載的 Logger 或 Handler 是否要處理當(dāng)前日志,但是也可以拿來(lái)當(dāng)做中間件來(lái)使用,可以自定義規(guī)則來(lái)改寫LogRecord,繼續(xù)傳遞給后續(xù)的Filter/Handler/LoggerFormatter,最終日志的格式化組件LogRecord,單條日志的結(jié)構(gòu)體

根據(jù)流程圖來(lái)看,主流程如下

日志打印請(qǐng)求到Logger后,第一判斷當(dāng)前Logger是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第一層級(jí)別控制)生成一條LogRecord,會(huì)把包括調(diào)用來(lái)源等信息都一起打包好,依次調(diào)用Logger掛載的Filter鏈來(lái)處理一旦有Filter類的檢測(cè)結(jié)果返回是False,則丟棄日志否則傳給Logger掛載的Handler鏈中依次處理(進(jìn)入Handler流程)如果開啟了propagate屬性,也就是“向上傳播”,會(huì)將當(dāng)前的LogRecord 傳遞給父類的Logger來(lái)進(jìn)行處理,直接從第4步開始執(zhí)行(不會(huì)觸發(fā)第一層級(jí)別控制)

Handler流程

判當(dāng)前Handler 是否要處理這個(gè)級(jí)別,不處理的直接舍棄(第二層級(jí)別控制)將收到的LogRecord 依次調(diào)用Handler掛載的Filter鏈來(lái)處理同理,調(diào)用Formatter2. 多進(jìn)程場(chǎng)景下的Logging2.1 多進(jìn)程場(chǎng)景Logging問(wèn)題分析

Logging模塊是線程安全的,我們?cè)谥暗拇a中可以看到很多處使用到了threading.RLock(),特別是在調(diào)用info/error等方法打印日志時(shí),最終都會(huì)調(diào)用頂級(jí)父類的handle方法,也就是

# logging/__init__.py

def handle(self, record):
# 先對(duì)日志進(jìn)行過(guò)濾處理再調(diào)用子類重寫的emit方法執(zhí)行真正的寫入邏輯
rv = self.filter(record)
if rv:
self.acquire()
try:
self.emit(record)
finally:
self.release()
return rv

這使得在多線程環(huán)境下可以保證同一時(shí)間只有一個(gè)線程可以調(diào)用handle方法進(jìn)行寫入

然而,另一個(gè)可能被忽視的場(chǎng)景是在多進(jìn)程環(huán)境下引發(fā)的種種問(wèn)題,我們?cè)诓渴餚ython Web項(xiàng)目時(shí),通常會(huì)以多進(jìn)程的方式來(lái)啟動(dòng),這就可能導(dǎo)致以下的幾種問(wèn)題:

日志紊亂:比如兩個(gè)進(jìn)程分別輸出xxxx和yyyy兩條日志,那么在文件中可能會(huì)得到類似xxyxyxyy這樣的結(jié)果日志丟失:雖然讀寫日志是使用O_APPEND模式,保證了寫文件的一致性,但是由于buffer的存在(數(shù)據(jù)先寫入buffer,再觸發(fā)flush機(jī)制刷入磁盤),fwrite的**作并不是多進(jìn)程安全的日志丟失的另一種情況:使用RotatingFileHandler或者是TimerRotatingFileHandler的時(shí)候,在切換文件的時(shí)候會(huì)導(dǎo)致進(jìn)程拿到的文件句柄不同,導(dǎo)致新文件被重復(fù)創(chuàng)建、數(shù)據(jù)寫入舊文件2.2 多進(jìn)程場(chǎng)景Logging解決方案

為了應(yīng)對(duì)上述可能出現(xiàn)的情況,以下列舉幾種解決方案:

2.2.1 concurrent-log-handler模塊(文件鎖模式)# concurrent_log_handler/__init__.[y]

# 繼承自logging的BaseRotatingHandler
class ConcurrentRotatingFileHandler(BaseRotatingHandler):
# 具體寫入邏輯
def emit(self, record):
try:
msg = self.format(record)
# 加鎖邏輯
try:
self._do_lock()
# 常規(guī)**作shouldRollover、doRollover做文件切分
try:
if self.shouldRollover(record):
self.doRollover()
except Exception as e:
self._console_log("Unable to do rollover: %s" % (e,), stack=True)
# Continue on anyway
self.do_write(msg)
finally:
self._do_unlock()
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
self.handleError(record)

def _do_lock(self):
# 判斷是否已被鎖
if self.is_locked:
return # already locked… recursive?
# 觸發(fā)文件鎖
self._open_lockfile()
if self.stream_lock:
for i in range(10):
# noinspection PyBroadException
try:
# 調(diào)用portalocker lock的方法
lock(self.stream_lock, LOCK_EX)
self.is_locked = True
break
except Exception:
continue
else:
raise RuntimeError("Cannot acquire lock after 10 attempts")
else:
self._console_log("No self.stream_lock to lock", stack=True)

def _open_lockfile(self):
"""
改變文件權(quán)限
"""
if self.stream_lock and not self.stream_lock.closed:
self._console_log("Lockfile already open in this process")
return
lock_file = self.lockFilename
self._console_log(
"concurrent-log-handler %s opening %s" % (hash(self), lock_file), stack=False)

with self._alter_umask():
self.stream_lock = open(lock_file, "wb", buffering=0)

self._do_chown_and_chmod(lock_file)

def _do_unlock(self):
if self.stream_lock:
if self.is_locked:
try:
unlock(self.stream_lock)
finally:
self.is_locked = False
self.stream_lock.close()
self.stream_lock = None
else:
self._console_log("No self.stream_lock to unlock", stack=True)

# portalocker/portalocker.py
def lock(file_: typing.IO, flags: constants.LockFlags):
if flags & constants.LockFlags.SHARED:

else:
if flags & constants.LockFlags.NON_BLOCKING:
mode = msvcrt.LK_NBLCK
else:
mode = msvcrt.LK_LOCK

# windows locks byte ranges, so make sure to lock from file start
try:
savepos = file_.tell()
if savepos:
# [ ] test exclusive lock fails on seek here
# [ ] test if shared lock passes this point
file_.seek(0)
# [x] check if 0 param locks entire file (not documented in
# Python)
# [x] fails with "IOError: [Errno 13] Permission denied",
# but -1 seems to do the trick
try:
msvcrt.locking(file_.fileno(), mode, lock_length)
except IOError as exc_value:
# [ ] be more specific here
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_)
finally:
if savepos:
file_.seek(savepos)
except IOError as exc_value:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED, exc_value.strerror,
fh=file_)
"""
調(diào)用C運(yùn)行時(shí)的文件鎖機(jī)制
msvcrt.locking(fd, mode, nbytes)
Lock part of a file based on file descriptor fd from the C runtime. Raises OSError on failure. The locked region of the file extends from the current file position for nbytes bytes, and may continue beyond the end of the file. mode must be one of the LK_* constants listed below. Multiple regions in a file may be locked at the same time, but may not overlap. Adjacent regions are not merged; they must be unlocked individually.

Raises an auditing event msvcrt.locking with arguments fd, mode, nbytes.
"""

歸根到底,就是在emit方法觸發(fā)時(shí)調(diào)用了文件鎖機(jī)制,將多個(gè)進(jìn)程并發(fā)調(diào)用強(qiáng)制限制為單進(jìn)程順序調(diào)用,確保了日志寫入的準(zhǔn)確,但是在效率方面,頻繁的對(duì)文件修改權(quán)限、加鎖以及鎖搶占機(jī)制都會(huì)造成效率低下的問(wèn)題。

2.2.2 針對(duì)日志切分場(chǎng)景的復(fù)寫doRollover方法以及復(fù)寫FileHandler類

當(dāng)然,除了上述的文件加鎖方式,我們也可以自定義重寫TimeRotatingFileHandler類或者FileHandler類,加入簡(jiǎn)單的多進(jìn)程加鎖的邏輯,例如fcntl.flock

static PyObject *
fcntl_flock_impl(PyObject *module, int fd, int code)
/*[clinic end generated code: output=84059e2b37d2fc64 input=0bfc00f795953452]*/
{
int ret;
int async_err = 0;

if (PySys_Audit("fcntl.flock", "ii", fd, code) < 0) {
return NULL;
}
# 觸發(fā)linux flock命令加鎖
#ifdef HAVE_FLOCK
do {
Py_BEGIN_ALLOW_THREADS
ret = flock(fd, code);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
#else

#ifndef LOCK_SH
#define LOCK_SH 1 /* shared lock */
#define LOCK_EX 2 /* exclusive lock */
#define LOCK_NB 4 /* don't block when locking */
#define LOCK_UN 8 /* unlock */
#endif
{
struct flock l;
if (code == LOCK_UN)
l.l_type = F_UNLCK;
else if (code & LOCK_SH)
l.l_type = F_RDLCK;
else if (code & LOCK_EX)
l.l_type = F_WRLCK;
else {
PyErr_SetString(PyExc_ValueError,
"unrecognized flock argument");
return NULL;
}
l.l_whence = l.l_start = l.l_len = 0;
do {
Py_BEGIN_ALLOW_THREADS
ret = fcntl(fd, (code & LOCK_NB) ? F_SETLK : F_SETLKW, &l);
Py_END_ALLOW_THREADS
} while (ret == -1 && errno == EINTR && !(async_err = PyErr_CheckSignals()));
}
#endif /* HAVE_FLOCK */
if (ret < 0) {
return !async_err ? PyErr_SetFromErrno(PyExc_OSError) : NULL;
}
Py_RETURN_NONE;
}2.2.3 Master/Worker日志收集(Socket/Queue模式)

這種方式也是被官方主推的方式

Although logging is thread-safe, and logging to a single file from multiple threads in a single process is supported, logging to a single file from multiple processes is not supported, because there is no standard way to serialize access to a single file across multiple processes in Python. If you need to log to a single file from multiple processes, one way of doing this is to have all the processes log to a SocketHandler, and have a separate process which implements a socket server which reads from the socket and logs to file.(如果你需要將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件,有一個(gè)方案是讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler,第二用一個(gè)實(shí)現(xiàn)了套接字服務(wù)器的單獨(dú)進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件) (If you prefer, you can dedicate one thread in one of the existing processes to perform this function.) This section documents this approach in more detail and includes a working socket receiver which can be used as a starting point for you to adapt in your own applications.

Alternatively, you can use a Queue and a QueueHandler to send all logging events to one of the processes in your multi-process application. (你也可以使用 Queue 和 QueueHandler 將所有的日志**發(fā)送至你的多進(jìn)程應(yīng)用的一個(gè)進(jìn)程中。)The following example script demonstrates how you can do this; in the example a separate listener process listens for events sent by other processes and logs them according to its own logging configuration. Although the example only demonstrates one way of doing it (for example, you may want to use a listener thread rather than a separate listener process – the implementation would be **ogous) it does allow for completely different logging configurations for the listener and the other processes in your application, and can be used as the basis for code meeting your own specific requirements:

看看QueueHandler的案例

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
"""
單獨(dú)的日志記錄線程
"""
while True:
record = q.get()
if record is None:
break
# 獲取record實(shí)例中的logger
logger = logging.getLogger(record.name)
# 調(diào)用logger的handle方法處理
logger.handle(record)

def worker_process(q):
# 日志寫入進(jìn)程
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# 綁定QueueHandler到logger
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
q = Queue()
# 省略一大推配置
workers = []
# 多進(jìn)程寫入日志到Queue
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
# 啟動(dòng)子線程負(fù)責(zé)日志收集寫入
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate…
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()

看看SocketHandler的案例

# 發(fā)送端

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
# 指定ip、端口
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root…
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

# 接收端

import pickle
import logging
import logging.handlers
import socketserver
import struct

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.

This basically logs the record using whatever logging policy is
configured locally.
"""

def handle(self):
"""
Handle multiple requests – each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
# 接收數(shù)據(jù)的流程
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen – len(chunk))
obj = self.unPickle(chunk)
# 生成LodRecord實(shí)例,調(diào)用handleLogRecord處理
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)

def unPickle(self, data):
return pickle.loads(data)

def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""

allow_reuse_address = True

def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None

def serve_until_stopped(self):
import select
abort = 0
while not abort:
# select方法接收端口數(shù)據(jù)
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
# 調(diào)用LogRecordStreamHandler的handle方法處理
self.handle_request()
abort = self.abort

def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server…')
tcpserver.serve_until_stopped()

if __name__ == '__main__':
main()

有關(guān)于 Master/Worker的方式就是上述的兩種模式,日志寫入的效率不受并發(fā)的影響,最終取決于寫入線程。此外,對(duì)于日志的寫入慢、阻塞問(wèn)題,同樣可以使用QueueHandlers以及其擴(kuò)展的QueueListener來(lái)處理。

作者:技術(shù)拆解官原文鏈接:https://juejin.cn/post/6945301448934719518

拓展知識(shí):

原創(chuàng)文章,作者:九賢生活小編,如若轉(zhuǎn)載,請(qǐng)注明出處:http://www.cddhlm.com/11764.html