|
1 |
| -#!/usr/bin/env python3 |
2 |
| -"""IRCStream — MediaWiki RecentChanges → IRC gateway. |
| 1 | +"""IRC Server component. |
3 | 2 |
|
4 |
| -IRCStream is a simple gateway to the MediaWiki recent changes feed, from the |
5 |
| -IRC protocol. It was written mainly for compatibility reasons, as there are a |
6 |
| -number of legacy clients in the wild relying on this interface. |
| 3 | +This implements an IRC Server, capable of broadcasting messages to clients |
| 4 | +through a "fake" IRC bot. |
| 5 | +
|
| 6 | +Implements all the relevant bits of the IRC protocol, including a few IRCv3 |
| 7 | +extensions. |
7 | 8 | """
|
8 | 9 |
|
9 |
| -# Copyright © Faidon Liambotis |
10 |
| -# Copyright © Wikimedia Foundation, Inc. |
11 |
| -# |
12 |
| -# Licensed under the Apache License, Version 2.0 (the "License"); |
13 |
| -# you may not use this file except in compliance with the License. |
14 |
| -# You may obtain a copy of the License at |
15 |
| -# |
16 |
| -# http://www.apache.org/licenses/LICENSE-2.0 |
17 |
| -# |
18 |
| -# Unless required by applicable law or agreed to in writing, software |
19 |
| -# distributed under the License is distributed on an "AS IS" BASIS, |
20 |
| -# WITHOUT WARRANTIES OR CONDITIONS OF ANY CODE, either express or implied. |
21 |
| -# See the License for the specific language governing permissions and |
22 |
| -# limitations under the License. |
23 |
| -# |
| 10 | +# SPDX-FileCopyrightText: Faidon Liambotis |
| 11 | +# SPDX-FileCopyrightText: Wikimedia Foundation |
24 | 12 | # SPDX-License-Identifier: Apache-2.0
|
25 | 13 |
|
26 | 14 | from __future__ import annotations
|
27 | 15 |
|
28 |
| -__version__ = "0.12.0.dev0" |
29 |
| - |
30 |
| -import argparse |
31 | 16 | import asyncio
|
32 | 17 | import configparser
|
33 | 18 | import dataclasses
|
34 | 19 | import datetime
|
35 | 20 | import enum
|
36 | 21 | import errno
|
37 |
| -import http.server |
38 |
| -import logging |
39 |
| -import pathlib |
40 | 22 | import re
|
41 | 23 | import socket
|
42 |
| -import sys |
43 | 24 | from collections.abc import Iterable, Sequence
|
44 | 25 | from typing import Any
|
45 | 26 |
|
46 | 27 | import prometheus_client
|
47 | 28 | import structlog
|
48 | 29 | from prometheus_client import Counter, Gauge
|
49 | 30 |
|
| 31 | +from ._version import __version__ |
| 32 | + |
50 | 33 |
|
51 | 34 | class IRCNumeric(enum.Enum):
|
52 | 35 | """Base class for IRC numeric enums."""
|
@@ -833,194 +816,3 @@ async def broadcast(self, target: str, msg: str) -> None:
|
833 | 816 | self.log.debug("Unable to broadcast", exc_info=True)
|
834 | 817 | continue # ignore exceptions, to catch corner cases
|
835 | 818 | self.metrics["messages"].inc()
|
836 |
| - |
837 |
| - |
838 |
| -class RC2UDPHandler(asyncio.Protocol): |
839 |
| - """A handler implementing the RC2UDP protocol, as used by MediaWiki.""" |
840 |
| - |
841 |
| - log = structlog.get_logger("ircstream.rc2udp") |
842 |
| - |
843 |
| - def __init__(self, server: RC2UDPServer) -> None: |
844 |
| - self.server = server |
845 |
| - self.running_tasks: set[asyncio.Task[Any]] = set() |
846 |
| - |
847 |
| - def datagram_received(self, data: bytes, _: tuple[str, int]) -> None: |
848 |
| - """Receive a new RC2UDP message and broadcast to all clients.""" |
849 |
| - try: |
850 |
| - decoded = data.decode("utf8") |
851 |
| - channel, text = decoded.split("\t", maxsplit=1) |
852 |
| - channel = channel.strip() |
853 |
| - text = text.lstrip().replace("\r", "").replace("\n", "") |
854 |
| - except Exception: # pylint: disable=broad-except |
855 |
| - self.server.ircserver.metrics["errors"].labels("rc2udp-parsing").inc() |
856 |
| - return |
857 |
| - |
858 |
| - self.log.debug("Broadcasting message", channel=channel, message=text) |
859 |
| - task = asyncio.create_task(self.server.ircserver.broadcast(channel, text)) |
860 |
| - self.running_tasks.add(task) |
861 |
| - task.add_done_callback(self.running_tasks.discard) |
862 |
| - |
863 |
| - |
864 |
| -class RC2UDPServer: # pylint: disable=too-few-public-methods |
865 |
| - """A server implementing the RC2UDP protocol, as used by MediaWiki.""" |
866 |
| - |
867 |
| - log = structlog.get_logger("ircstream.rc2udp") |
868 |
| - |
869 |
| - def __init__(self, config: configparser.SectionProxy, ircserver: IRCServer) -> None: |
870 |
| - self.ircserver = ircserver |
871 |
| - self.address = config.get("listen_address", fallback="::") |
872 |
| - self.port = config.getint("listen_port", fallback=9390) |
873 |
| - |
874 |
| - async def serve(self) -> None: |
875 |
| - """Create a new socket, listen to it and serve requests.""" |
876 |
| - loop = asyncio.get_running_loop() |
877 |
| - local_addr = (self.address, self.port) |
878 |
| - transport, _ = await loop.create_datagram_endpoint(lambda: RC2UDPHandler(self), local_addr=local_addr) |
879 |
| - local_addr = transport.get_extra_info("sockname")[:2] |
880 |
| - self.address, self.port = local_addr # update address/port based on what bind() returned |
881 |
| - self.log.info("Listening for RC2UDP broadcast", listen_address=self.address, listen_port=self.port) |
882 |
| - |
883 |
| - |
884 |
| -class PrometheusServer(http.server.ThreadingHTTPServer): |
885 |
| - """A Prometheus HTTP server.""" |
886 |
| - |
887 |
| - log = structlog.get_logger("ircstream.prometheus") |
888 |
| - daemon_threads = True |
889 |
| - allow_reuse_address = True |
890 |
| - |
891 |
| - def __init__( |
892 |
| - self, |
893 |
| - config: configparser.SectionProxy, |
894 |
| - registry: prometheus_client.CollectorRegistry = prometheus_client.REGISTRY, |
895 |
| - ) -> None: |
896 |
| - listen_address = config.get("listen_address", fallback="::") |
897 |
| - if ":" in listen_address: |
898 |
| - self.address_family = socket.AF_INET6 |
899 |
| - listen_port = config.getint("listen_port", fallback=9200) |
900 |
| - super().__init__((listen_address, listen_port), prometheus_client.MetricsHandler.factory(registry)) |
901 |
| - # update address/port based on what bind() returned |
902 |
| - self.address, self.port = str(self.server_address[0]), self.server_address[1] |
903 |
| - self.log.info("Listening for Prometheus HTTP", listen_address=self.address, listen_port=self.port) |
904 |
| - |
905 |
| - def server_bind(self) -> None: |
906 |
| - """Bind to an IP address. |
907 |
| -
|
908 |
| - Override to set an opt to listen to both IPv4/IPv6 on the same socket. |
909 |
| - """ |
910 |
| - if self.address_family == socket.AF_INET6: |
911 |
| - self.socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) |
912 |
| - super().server_bind() |
913 |
| - |
914 |
| - |
915 |
| -def parse_args(argv: Sequence[str] | None) -> argparse.Namespace: |
916 |
| - """Parse and return the parsed command line arguments.""" |
917 |
| - parser = argparse.ArgumentParser( |
918 |
| - prog="ircstream", |
919 |
| - description="MediaWiki RecentChanges → IRC gateway", |
920 |
| - formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
921 |
| - ) |
922 |
| - cfg_dflt = pathlib.Path("ircstream.conf") |
923 |
| - if not cfg_dflt.exists(): |
924 |
| - cfg_dflt = pathlib.Path("/etc/ircstream.conf") |
925 |
| - parser.add_argument("--config-file", "-c", type=pathlib.Path, default=cfg_dflt, help="Path to configuration file") |
926 |
| - |
927 |
| - log_levels = ("DEBUG", "INFO", "WARNING", "ERROR") # no public method to get a list from logging :( |
928 |
| - parser.add_argument("--log-level", default="INFO", choices=log_levels, type=str.upper, help="Log level") |
929 |
| - log_formats = ("plain", "console", "json") |
930 |
| - log_dflt = "console" if sys.stdout.isatty() else "plain" |
931 |
| - parser.add_argument("--log-format", default=log_dflt, choices=log_formats, help="Log format") |
932 |
| - return parser.parse_args(argv) |
933 |
| - |
934 |
| - |
935 |
| -def configure_logging(log_format: str = "plain") -> None: |
936 |
| - """Configure logging parameters.""" |
937 |
| - logging.basicConfig(format="%(message)s", level=logging.WARNING) |
938 |
| - |
939 |
| - processors: list[structlog.types.Processor] = [ |
940 |
| - structlog.stdlib.filter_by_level, |
941 |
| - structlog.processors.StackInfoRenderer(), |
942 |
| - structlog.processors.format_exc_info, |
943 |
| - ] |
944 |
| - if log_format == "plain": |
945 |
| - processors += [structlog.dev.ConsoleRenderer(colors=False)] |
946 |
| - elif log_format == "console": |
947 |
| - # >= 20.2.0 has this in the default config |
948 |
| - processors = [structlog.stdlib.add_log_level] + structlog.get_config()["processors"] |
949 |
| - elif log_format == "json": |
950 |
| - processors += [ |
951 |
| - structlog.stdlib.add_logger_name, # adds a "logger" key |
952 |
| - structlog.stdlib.add_log_level, # adds a "level" key (string, not int) |
953 |
| - structlog.processors.TimeStamper(fmt="iso"), |
954 |
| - structlog.processors.JSONRenderer(sort_keys=True), |
955 |
| - ] |
956 |
| - else: |
957 |
| - raise ValueError(f"Invalid logging format specified: {log_format}") |
958 |
| - |
959 |
| - structlog.configure( |
960 |
| - processors=processors, |
961 |
| - logger_factory=structlog.stdlib.LoggerFactory(), |
962 |
| - wrapper_class=structlog.stdlib.BoundLogger, |
963 |
| - ) |
964 |
| - |
965 |
| - |
966 |
| -async def start_servers(config: configparser.ConfigParser) -> None: |
967 |
| - """Start all servers in asyncio tasks or threads and then busy-loop.""" |
968 |
| - log = structlog.get_logger("ircstream.main") |
969 |
| - loop = asyncio.get_running_loop() |
970 |
| - |
971 |
| - try: |
972 |
| - if "irc" in config: |
973 |
| - ircserver = IRCServer(config["irc"]) |
974 |
| - irc_coro = ircserver.serve() |
975 |
| - irc_task = asyncio.create_task(irc_coro) |
976 |
| - else: |
977 |
| - log.critical('Invalid configuration, missing section "irc"') |
978 |
| - raise SystemExit(-1) |
979 |
| - |
980 |
| - if "rc2udp" in config: |
981 |
| - rc2udp_coro = RC2UDPServer(config["rc2udp"], ircserver).serve() |
982 |
| - rc2udp_task = asyncio.create_task(rc2udp_coro) # noqa: F841 pylint: disable=unused-variable |
983 |
| - else: |
984 |
| - log.warning("RC2UDP is not enabled in the config; server usefulness may be limited") |
985 |
| - |
986 |
| - if "prometheus" in config: |
987 |
| - prom_server = PrometheusServer(config["prometheus"], ircserver.metrics_registry) |
988 |
| - prom_server.socket.setblocking(False) |
989 |
| - loop.add_reader(prom_server.socket, prom_server.handle_request) |
990 |
| - |
991 |
| - await asyncio.wait_for(irc_task, timeout=None) # run forever |
992 |
| - except OSError as exc: |
993 |
| - log.critical(f"System error: {exc.strerror}", errno=errno.errorcode[exc.errno]) |
994 |
| - raise SystemExit(-2) from exc |
995 |
| - |
996 |
| - |
997 |
| -def main(argv: Sequence[str] | None = None) -> None: |
998 |
| - """Entry point.""" |
999 |
| - options = parse_args(argv) |
1000 |
| - |
1001 |
| - configure_logging(options.log_format) |
1002 |
| - # only set e.g. INFO or DEBUG for our own loggers |
1003 |
| - structlog.get_logger("ircstream").setLevel(options.log_level) |
1004 |
| - log = structlog.get_logger("ircstream.main") |
1005 |
| - log.info("Starting IRCStream", config_file=str(options.config_file), version=__version__) |
1006 |
| - |
1007 |
| - config = configparser.ConfigParser(strict=True) |
1008 |
| - try: |
1009 |
| - with options.config_file.open(encoding="utf-8") as config_fh: |
1010 |
| - config.read_file(config_fh) |
1011 |
| - except OSError as exc: |
1012 |
| - log.critical(f"Cannot open configuration file: {exc.strerror}", errno=errno.errorcode[exc.errno]) |
1013 |
| - raise SystemExit(-1) from exc |
1014 |
| - except configparser.Error as exc: |
1015 |
| - msg = repr(exc).replace("\n", " ") # configparser exceptions sometimes include newlines |
1016 |
| - log.critical(f"Invalid configuration, {msg}") |
1017 |
| - raise SystemExit(-1) from exc |
1018 |
| - |
1019 |
| - try: |
1020 |
| - asyncio.run(start_servers(config)) |
1021 |
| - except KeyboardInterrupt: |
1022 |
| - pass |
1023 |
| - |
1024 |
| - |
1025 |
| -if __name__ == "__main__": |
1026 |
| - main() |
0 commit comments