#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Logging Handler integrating RabbitMQ and
Graylog Extended Log Format (GELF)"""
import json
from logging import Filter
from logging.handlers import SocketHandler
from amqplib import client_0_8 as amqp # pylint: disable=import-error
from graypy.handler import BaseGELFHandler
try:
from urllib.parse import urlparse, unquote
except ImportError:
from urlparse import urlparse
from urllib import unquote
_ifnone = lambda v, x: x if v is None else v
[docs]class GELFRabbitHandler(BaseGELFHandler, SocketHandler):
"""RabbitMQ / GELF handler
.. note::
This handler ignores all messages logged by amqplib.
"""
[docs] def __init__(self, url, exchange='logging.gelf', exchange_type='fanout',
virtual_host='/', routing_key='', **kwargs):
"""Initialize the GELFRabbitHandler
:param url: RabbitMQ URL (ex: amqp://guest:guest@localhost:5672/)
:type url: str
:param exchange: RabbitMQ exchange. A queue binding must be defined
on the server to prevent GELF logs from being dropped.
:type exchange: str
:param exchange_type: RabbitMQ exchange type.
:type exchange_type: str
:param virtual_host:
:type virtual_host: str
:param routing_key:
:type routing_key: str
"""
self.url = url
parsed = urlparse(url)
if parsed.scheme != 'amqp':
raise ValueError('invalid URL scheme (expected "amqp"): %s' % url)
host = parsed.hostname or 'localhost'
port = _ifnone(parsed.port, 5672)
self.virtual_host = virtual_host if not unquote(
parsed.path[1:]) else unquote(parsed.path[1:])
self.cn_args = {
'host': '%s:%s' % (host, port),
'userid': _ifnone(parsed.username, 'guest'),
'password': _ifnone(parsed.password, 'guest'),
'virtual_host': self.virtual_host,
'insist': False,
}
self.exchange = exchange
self.exchange_type = exchange_type
self.routing_key = routing_key
BaseGELFHandler.__init__(
self,
**kwargs
)
SocketHandler.__init__(self, host, port)
self.addFilter(ExcludeFilter('amqplib'))
[docs] def makeSocket(self, timeout=1):
return RabbitSocket(self.cn_args, timeout, self.exchange,
self.exchange_type, self.routing_key)
[docs] def makePickle(self, record):
message_dict = self._make_gelf_dict(record)
return json.dumps(message_dict)
[docs]class RabbitSocket(object):
[docs] def __init__(self, cn_args, timeout, exchange, exchange_type, routing_key):
self.cn_args = cn_args
self.timeout = timeout
self.exchange = exchange
self.exchange_type = exchange_type
self.routing_key = routing_key
self.connection = amqp.Connection(
connection_timeout=timeout, **self.cn_args)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=self.exchange,
type=self.exchange_type,
durable=True,
auto_delete=False,
)
[docs] def sendall(self, data):
msg = amqp.Message(data, delivery_mode=2)
self.channel.basic_publish(
msg,
exchange=self.exchange,
routing_key=self.routing_key
)
[docs] def close(self):
"""Close the connection to the RabbitMQ socket"""
try:
self.connection.close()
except Exception:
pass
[docs]class ExcludeFilter(Filter):
"""A subclass of :class:`logging.Filter` which should be instantiated
with the name of the logger which, together with its children, will have
its events excluded (filtered out)"""
[docs] def __init__(self, name):
"""Initialize the ExcludeFilter
:param name: Name to match for within a :class:`logging.LogRecord`'s
``name`` field for filtering.
:type name: str
"""
if not name:
raise ValueError('ExcludeFilter requires a non-empty name')
Filter.__init__(self, name)
[docs] def filter(self, record):
return not (record.name.startswith(self.name) and (
len(record.name) == self.nlen or record.name[self.nlen] == "."))