#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Logging Handlers that send messages in Graylog Extended Log Format (GELF)"""
import abc
import datetime
import json
import logging
import math
import random
import socket
import ssl
import struct
import sys
import traceback
import zlib
from logging.handlers import DatagramHandler, SocketHandler
WAN_CHUNK = 1420
LAN_CHUNK = 8154
if sys.version_info[0] == 3: # check if python3+
data, text = bytes, str
else:
data, text = str, unicode # pylint: disable=undefined-variable
# fixes for using ABC
if sys.version_info >= (3, 4): # check if python3.4+
ABC = abc.ABC
else:
ABC = abc.ABCMeta(str('ABC'), (), {})
try:
import httplib
except ImportError:
import http.client as httplib
SYSLOG_LEVELS = {
logging.CRITICAL: 2,
logging.ERROR: 3,
logging.WARNING: 4,
logging.INFO: 6,
logging.DEBUG: 7,
}
[docs]class BaseGELFHandler(logging.Handler, ABC):
"""Abstract class defining the basic functionality of converting a
:obj:`logging.LogRecord` into a GELF log. Provides the boilerplate for
all GELF handlers defined within graypy."""
[docs] def __init__(self, chunk_size=WAN_CHUNK, debugging_fields=True,
extra_fields=True, fqdn=False, localname=None, facility=None,
level_names=False, compress=True):
"""Initialize the BaseGELFHandler.
:param chunk_size: Message chunk size. Messages larger than this
size will be sent to Graylog in multiple chunks.
:type chunk_size: int
:param debugging_fields: If :obj:`True` add debug fields from the
log record into the GELF logs to be sent to Graylog.
:type debugging_fields: bool
:param extra_fields: If :obj:`True` add extra fields from the log
record into the GELF logs to be sent to Graylog.
:type extra_fields: bool
:param fqdn: If :obj:`True` use the fully qualified domain name of
localhost to populate the ``host`` GELF field.
:type fqdn: bool
:param localname: If specified and ``fqdn`` is :obj:`False`, use the
specified hostname to populate the ``host`` GELF field.
:type localname: str or None
:param facility: If specified, replace the ``facility`` GELF field
with the specified value. Also add a additional ``_logger``
GELF field containing the ``LogRecord.name``.
:type facility: str
:param level_names: If :obj:`True` use python logging error level name
strings instead of syslog numerical values.
:type level_names: bool
:param compress: If :obj:`True` compress the GELF message before
sending it to the Graylog server.
:type compress: bool
"""
logging.Handler.__init__(self)
self.debugging_fields = debugging_fields
self.extra_fields = extra_fields
self.chunk_size = chunk_size
if fqdn and localname:
raise ValueError(
"cannot specify 'fqdn' and 'localname' arguments together")
self.fqdn = fqdn
self.localname = localname
self.facility = facility
self.level_names = level_names
self.compress = compress
[docs] def makePickle(self, record):
"""Convert a :class:`logging.LogRecord` into bytes representing
a GELF log
:param record: :class:`logging.LogRecord` to convert into a GELF log.
:type record: logging.LogRecord
:return: bytes representing a GELF log.
:rtype: bytes
"""
gelf_dict = self._make_gelf_dict(record)
packed = self._pack_gelf_dict(gelf_dict)
pickle = zlib.compress(packed) if self.compress else packed
return pickle
def _make_gelf_dict(self, record):
"""Create a dictionary representing a GELF log from a
python :class:`logging.LogRecord`
:param record: :class:`logging.LogRecord` to create a GELF log from.
:type record: logging.LogRecord
:return: dictionary representing a GELF log.
:rtype: dict
"""
# construct the base GELF format
gelf_dict = {
'version': "1.0",
'host': BaseGELFHandler._resolve_host(self.fqdn, self.localname),
'short_message': self.formatter.format(record) if self.formatter else record.getMessage(),
'timestamp': record.created,
'level': SYSLOG_LEVELS.get(record.levelno, record.levelno),
'facility': self.facility or record.name,
}
# add in specified optional extras
self._add_full_message(gelf_dict, record)
if self.level_names:
self._add_level_names(gelf_dict, record)
if self.facility is not None:
self._set_custom_facility(gelf_dict, self.facility, record)
if self.debugging_fields:
self._add_debugging_fields(gelf_dict, record)
if self.extra_fields:
self._add_extra_fields(gelf_dict, record)
return gelf_dict
@staticmethod
def _add_level_names(gelf_dict, record):
"""Add the ``level_name`` field to the ``gelf_dict`` which notes
the logging level via the string error level names instead of
numerical values
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:param record: :class:`logging.LogRecord` to extract a logging
level from to insert into the given ``gelf_dict``.
:type record: logging.LogRecord
"""
gelf_dict['level_name'] = logging.getLevelName(record.levelno)
@staticmethod
def _set_custom_facility(gelf_dict, facility_value, record):
"""Set the ``gelf_dict``'s ``facility`` field to the specified value
Also add a additional ``_logger`` field containing the
``LogRecord.name``.
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:param facility_value: Value to set as the ``gelf_dict``'s
``facility`` field.
:type facility_value: str
:param record: :class:`logging.LogRecord` to extract it's record
name to insert into the given ``gelf_dict`` as the ``_logger``
field.
:type record: logging.LogRecord
"""
gelf_dict.update({"facility": facility_value, '_logger': record.name})
@staticmethod
def _add_full_message(gelf_dict, record):
"""Add the ``full_message`` field to the ``gelf_dict`` if any
traceback information exists within the logging record
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:param record: :class:`logging.LogRecord` to extract a full
logging message from to insert into the given ``gelf_dict``.
:type record: logging.LogRecord
"""
# if a traceback exists add it to the log as the full_message field
full_message = None
# format exception information if present
if record.exc_info:
full_message = '\n'.join(
traceback.format_exception(*record.exc_info))
# use pre-formatted exception information in cases where the primary
# exception information was removed, e.g. for LogRecord serialization
if record.exc_text:
full_message = record.exc_text
if full_message:
gelf_dict["full_message"] = full_message
@staticmethod
def _resolve_host(fqdn, localname):
"""Resolve the ``host`` GELF field
:param fqdn: Boolean indicating whether to use :meth:`socket.getfqdn`
to obtain the ``host`` GELF field.
:type fqdn: bool
:param localname: Use specified hostname as the ``host`` GELF field.
:type localname: str or None
:return: String representing the ``host`` GELF field.
:rtype: str
"""
if fqdn:
return socket.getfqdn()
elif localname is not None:
return localname
return socket.gethostname()
@staticmethod
def _add_debugging_fields(gelf_dict, record):
"""Add debugging fields to the given ``gelf_dict``
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:param record: :class:`logging.LogRecord` to extract debugging
fields from to insert into the given ``gelf_dict``.
:type record: logging.LogRecord
"""
gelf_dict.update({
'file': record.pathname,
'line': record.lineno,
'_function': record.funcName,
'_pid': record.process,
'_thread_name': record.threadName,
})
# record.processName was added in Python 2.6.2
pn = getattr(record, 'processName', None)
if pn is not None:
gelf_dict['_process_name'] = pn
@staticmethod
def _add_extra_fields(gelf_dict, record):
"""Add extra fields to the given ``gelf_dict``
However, this does not add additional fields in to ``message_dict``
that are either duplicated from standard :class:`logging.LogRecord`
attributes, duplicated from the python logging module source
(e.g. ``exc_text``), or violate GELF format (i.e. ``id``).
.. seealso::
The list of standard :class:`logging.LogRecord` attributes can be
found at:
http://docs.python.org/library/logging.html#logrecord-attributes
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:param record: :class:`logging.LogRecord` to extract extra fields
from to insert into the given ``gelf_dict``.
:type record: logging.LogRecord
"""
# skip_list is used to filter additional fields in a log message.
skip_list = (
'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
'msecs', 'message', 'msg', 'name', 'pathname', 'process',
'processName', 'relativeCreated', 'thread', 'threadName')
for key, value in record.__dict__.items():
if key not in skip_list and not key.startswith('_'):
gelf_dict['_%s' % key] = value
@staticmethod
def _pack_gelf_dict(gelf_dict):
"""Convert a given ``gelf_dict`` into JSON-encoded UTF-8 bytes, thus,
creating an uncompressed GELF log ready for consumption by Graylog.
Since we cannot be 100% sure of what is contained in the ``gelf_dict``
we have to do some sanitation.
:param gelf_dict: dictionary representing a GELF log.
:type gelf_dict: dict
:return: bytes representing a uncompressed GELF log.
:rtype: bytes
"""
gelf_dict = BaseGELFHandler._sanitize_to_unicode(gelf_dict)
packed = json.dumps(
gelf_dict,
separators=',:',
default=BaseGELFHandler._object_to_json
)
return packed.encode('utf-8')
@staticmethod
def _sanitize_to_unicode(obj):
"""Convert all strings records of the object to unicode
:param obj: object to sanitize to unicode.
:type obj: object
:return: Unicode string representing the given object.
:rtype: str
"""
if isinstance(obj, dict):
return dict((BaseGELFHandler._sanitize_to_unicode(k), BaseGELFHandler._sanitize_to_unicode(v)) for k, v in obj.items())
if isinstance(obj, (list, tuple)):
return obj.__class__([BaseGELFHandler._sanitize_to_unicode(i) for i in obj])
if isinstance(obj, data):
obj = obj.decode('utf-8', errors='replace')
return obj
@staticmethod
def _object_to_json(obj):
"""Convert objects that cannot be natively serialized into JSON
into their string representation (for later JSON serialization).
:class:`datetime.datetime` based objects will be converted into a
ISO formatted timestamp string.
:param obj: object to convert into a string representation.
:type obj: object
:return: String representing the given object.
:rtype: str
"""
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return repr(obj)
[docs]class GELFUDPHandler(BaseGELFHandler, DatagramHandler):
"""GELF UDP handler"""
[docs] def __init__(self, host, port=12202, **kwargs):
"""Initialize the GELFUDPHandler
:param host: GELF UDP input host.
:type host: str
:param port: GELF UDP input port.
:type port: int
"""
BaseGELFHandler.__init__(self, **kwargs)
DatagramHandler.__init__(self, host, port)
[docs] def send(self, s):
if len(s) < self.chunk_size:
DatagramHandler.send(self, s)
else:
for chunk in ChunkedGELF(s, self.chunk_size):
DatagramHandler.send(self, chunk)
[docs]class GELFTCPHandler(BaseGELFHandler, SocketHandler):
"""GELF TCP handler"""
[docs] def __init__(self, host, port=12201, **kwargs):
"""Initialize the GELFTCPHandler
:param host: GELF TCP input host.
:type host: str
:param port: GELF TCP input port.
:type port: int
.. attention::
GELF TCP does not support compression due to the use of the null
byte (``\\0``) as frame delimiter.
Thus, :class:`.handler.GELFTCPHandler` does not support setting
``compress`` to :obj:`True` and is locked to :obj:`False`.
"""
BaseGELFHandler.__init__(self, compress=False, **kwargs)
SocketHandler.__init__(self, host, port)
[docs] def makePickle(self, record):
"""Add a null terminator to generated pickles as TCP frame objects
need to be null terminated
:param record: :class:`logging.LogRecord` to create a null
terminated GELF log.
:type record: logging.LogRecord
:return: Null terminated bytes representing a GELF log.
:rtype: bytes
"""
return BaseGELFHandler.makePickle(self, record) + b'\x00'
[docs]class GELFTLSHandler(GELFTCPHandler):
"""GELF TCP handler with TLS support"""
[docs] def __init__(self, host, port=12204, validate=False, ca_certs=None,
certfile=None, keyfile=None, **kwargs):
"""Initialize the GELFTLSHandler
:param host: GELF TLS input host.
:type host: str
:param port: GELF TLS input port.
:type port: int
:param validate: If :obj:`True`, validate the Graylog server's
certificate. In this case specifying ``ca_certs`` is also
required.
:type validate: bool
:param ca_certs: Path to CA bundle file.
:type ca_certs: str
:param certfile: Path to the client certificate file.
:type certfile: str
:param keyfile: Path to the client private key. If the private key is
stored with the certificate, this parameter can be ignored.
:type keyfile: str
"""
if validate and ca_certs is None:
raise ValueError('CA bundle file path must be specified')
if keyfile is not None and certfile is None:
raise ValueError('certfile must be specified')
GELFTCPHandler.__init__(self, host=host, port=port, **kwargs)
self.ca_certs = ca_certs
self.reqs = ssl.CERT_REQUIRED if validate else ssl.CERT_NONE
self.certfile = certfile
self.keyfile = keyfile if keyfile else certfile
[docs] def makeSocket(self, timeout=1):
"""Create a TLS wrapped socket"""
plain_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if hasattr(plain_socket, 'settimeout'):
plain_socket.settimeout(timeout)
wrapped_socket = ssl.wrap_socket(
plain_socket,
ca_certs=self.ca_certs,
cert_reqs=self.reqs,
keyfile=self.keyfile,
certfile=self.certfile
)
wrapped_socket.connect((self.host, self.port))
return wrapped_socket
# TODO: add https?
[docs]class GELFHTTPHandler(BaseGELFHandler):
"""GELF HTTP handler"""
[docs] def __init__(self, host, port=12203, compress=True, path='/gelf',
timeout=5, **kwargs):
"""Initialize the GELFHTTPHandler
:param host: GELF HTTP input host.
:type host: str
:param port: GELF HTTP input port.
:type port: int
:param compress: If :obj:`True` compress the GELF message before
sending it to the Graylog server.
:type compress: bool
:param path: Path of the HTTP input.
(see http://docs.graylog.org/en/latest/pages/sending_data.html#gelf-via-http)
:type path: str
:param timeout: Number of seconds the HTTP client should wait before
it discards the request if the Graylog server doesn't respond.
:type timeout: int
"""
BaseGELFHandler.__init__(self, compress=compress, **kwargs)
self.host = host
self.port = port
self.path = path
self.timeout = timeout
self.headers = {}
if compress:
self.headers['Content-Encoding'] = 'gzip,deflate'
[docs] def emit(self, record):
"""Convert a :class:`logging.LogRecord` to GELF and emit it to Graylog
via a HTTP POST request
:param record: :class:`logging.LogRecord` to convert into a GELF log
and emit to Graylog via a HTTP POST request.
:type record: logging.LogRecord
"""
pickle = self.makePickle(record)
connection = httplib.HTTPConnection(
host=self.host,
port=self.port,
timeout=self.timeout
)
connection.request('POST', self.path, pickle, self.headers)
[docs]class ChunkedGELF(object):
"""Class that chunks a message into a GELF compatible chunks"""
[docs] def __init__(self, message, size):
"""Initialize the ChunkedGELF message class
:param message: The message to chunk.
:type message: bytes
:param size: The size of the chunks.
:type size: int
"""
self.message = message
self.size = size
self.pieces = \
struct.pack('B', int(math.ceil(len(message) * 1.0 / size)))
self.id = struct.pack('Q', random.randint(0, 0xFFFFFFFFFFFFFFFF))
[docs] def message_chunks(self):
return (self.message[i:i + self.size] for i
in range(0, len(self.message), self.size))
[docs] def encode(self, sequence, chunk):
return b''.join([
b'\x1e\x0f',
self.id,
struct.pack('B', sequence),
self.pieces,
chunk
])
def __iter__(self):
for sequence, chunk in enumerate(self.message_chunks()):
yield self.encode(sequence, chunk)