#!/usr/bin/python3.13

# Copyright © Universität Stuttgart
# LICENSE: MIT

from __future__ import annotations

import abc
import argparse
import atexit
import contextlib
import dataclasses
import datetime
import enum
import functools
import inspect
import io
import json
import os
import os.path
import pathlib
import shlex
import pyparsing
import re
import readline
import socket
import subprocess
import sys
import traceback
import trio
import typing
import uuid
import yaml

T = typing.TypeVar('T')


@dataclasses.dataclass(slots=True)
class Config:
    DEFAULT_FORWARD_NOTE: str = """\
The postmaster decided you should know about the following mails in
the queue, probably because you need to fix something in your setup.

The postmaster will probably delete the queued message afterwards,
or have them expire (i.e. send a bounce message about failed delivery
to the sender).
"""

    # if not remote_sources are configured -> use local queue
    remote_sources: dict[str, str] = dataclasses.field(default_factory=dict)
    forward_note: str = DEFAULT_FORWARD_NOTE
    forward_sender: str = 'MAILER-DAEMON'
    forward_sender_name: str = 'Mail Delivery System'

    @staticmethod
    def load(filename: str = '') -> Config:
        config = Config()

        if not filename:
            default_paths = [
                os.path.expanduser('~/.config/pqm.yaml'),
                '/etc/pqm.yaml',
            ]
            for path in default_paths:
                if os.path.exists(path):
                    filename = path
                    break
            else:
                # no configfile -> default config
                return config
        with open(filename) as file:
            data = yaml.safe_load(file)
        assert isinstance(data, dict), f"Config file (yaml) doesn't contain a dict: {data!r}"

        rs_list = data.pop('remote-sources', [])
        assert isinstance(rs_list, list), f"remote-source must be a list of strings: {rs_list!r}"
        for entry in rs_list:
            assert isinstance(entry, str), f"remote-source entries must be strings: {entry!r}"
            if ':' in entry:
                alias, target = entry.split(':', maxsplit=1)
            else:
                target = entry
                alias = entry.split('.', maxsplit=1)[0]
            assert not '/' in alias, f"Alias for entry must not contain /: {alias!r}"
            if alias in config.remote_sources:
                raise ValueError(
                    f'Duplicate alias {alias!r} in remote-sources'
                    f' (have: {config.remote_sources[alias]!r}, new: {target!r}',
                )
            config.remote_sources[alias] = target

        config.forward_note = data.pop('forward-note', config.forward_note)
        config.forward_sender = data.pop('forward-sender', config.forward_sender)
        config.forward_sender_name = data.pop('forward-sender-name', config.forward_sender_name)

        assert not data, f"Unknown config options: {data}"

        return config

    def forward_build_from(self, *, hostname: str) -> str:
        if '@' in self.forward_sender:
            sender = self.forward_sender
        else:
            sender = f'{self.forward_sender}@{hostname}'
        return f'{self.forward_sender_name} <{sender}>'

    def source(self) -> Source:
        if self.remote_sources:
            return RemoteSource(config=self, remotes=self.remote_sources)
        else:
            return LocalSource(config=self)


def input_ack(prompt: str) -> str:
    # don't want readline for simple acks (and input always uses readline)
    sys.stdout.write(prompt)
    sys.stdout.flush()
    return sys.stdin.readline(16)


def parser_action(act: typing.Callable[[pyparsing.ParseResults], T]) -> typing.Callable[[pyparsing.ParseResults], T]:
    # treat any exception thrown within a custom parser action as `ParseFatalException`,
    # (stops parsing immediately)
    @functools.wraps(act)
    def wrapped(tokens: pyparsing.ParseResults):
        try:
            return act(tokens)
        except Exception as e:
            raise pyparsing.ParseFatalException(str(e))

    return wrapped


def format_mail_forward(
    *,
    config: Config,
    mails: dict[str, tuple[Mail, bytes | None]],
    hostname: str,
    recipients: typing.Iterable[str],
) -> bytes:
    bin_buf = io.BytesIO()
    buf = io.TextIOWrapper(bin_buf, write_through=True)
    buf.write(f'From: {config.forward_build_from(hostname=hostname)}\n')
    for recp in recipients:
        buf.write(f'To: <{recp}>\n')
    buf.write(f"Subject: Deferred mails on {hostname}\n")
    buf.write("MIME-Version: 1.0\n")
    boundary = f"postmaster-forward-{uuid.uuid4().hex}"
    buf.write(f"Content-Type: multipart/mixed; boundary=\"{boundary}\"\n")
    buf.write("\n")
    buf.write("This is a message with multiple parts in MIME format.\n")

    buf.write(f"--{boundary}\n")
    buf.write("Content-Type: text/plain\n")
    buf.write("\n")
    buf.write(config.forward_note)
    if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
        buf.write("\n")

    buf.write(f"--{boundary}\n")
    buf.write("Content-Type: text/plain\n")
    buf.write("Content-Disposition: inline; filename=\"queue-list.txt\"\n")
    buf.write("\n")
    for queue_id, (mail, _) in mails.items():
        mail.print(verbose=True, out=buf)

    for queue_id, (_, mail_message) in mails.items():
        basename = queue_id.replace('/', '_')
        if mail_message is None:
            # this should be very unlikely unless you try to forward messages from the active queue
            buf.write(f"--{boundary}\n")
            buf.write("Content-Type: text/plain\n")
            buf.write(f"Content-Disposition: inline; filename=\"{basename}.txt\"\n")
            buf.write("\n")
            buf.write(f"Message {queue_id} couldn't be found (anymore); may have been sent.")
            continue
        buf.write(f"--{boundary}\n")
        buf.write("Content-Type: message/rfc822\n")
        buf.write(f"Content-Disposition: inline; filename=\"{basename}.eml\"\n")
        buf.write("\n")
        bin_buf.write(mail_message)
        # ensure parts are terminated by newline
        if bytes(bin_buf.getbuffer()[-1:]) != b'\n':
            buf.write("\n")

    buf.write(f"--{boundary}--\n")
    return bin_buf.getvalue()


class TrioParallelOrdered(typing.Generic[T]):
    # used by trio_parallel_ordered below

    def __init__(self, nursery: trio.Nursery, result_handler: typing.Callable[[T], typing.Awaitable[None]]) -> None:
        # basic idea: for each task create a oneshot channel (capacity 1) to deliver result;
        # put receiving channel into self._tx (-> ordered by task start time)
        # in _handle_results: get result channel (still ordered), then wait for result
        self.nursery = nursery
        self.result_handler = result_handler
        self.limit = trio.CapacityLimiter(8)
        send_channel: trio.MemorySendChannel[trio.MemoryReceiveChannel[T]]
        receive_channel: trio.MemoryReceiveChannel[trio.MemoryReceiveChannel[T]]
        (send_channel, receive_channel) = trio.open_memory_channel(128)
        self._tx = send_channel

        async def _handle_results() -> None:
            item: trio.MemoryReceiveChannel[T]
            async for item in receive_channel:
                try:
                    result = await item.receive()
                except trio.EndOfChannel:
                    continue
                await self.result_handler(result)

        self.nursery.start_soon(_handle_results)

    async def close(self) -> None:
        await self._tx.aclose()

    async def _spawn(self, job, args, kwargs, *, task_status=trio.TASK_STATUS_IGNORED) -> None:
        tx: trio.MemorySendChannel[T]
        rx: trio.MemoryReceiveChannel[T]
        (tx, rx) = trio.open_memory_channel(1)
        await self._tx.send(rx)
        async with self.limit:
            task_status.started()
            async with tx:
                result = await job(*args, **kwargs)
                await tx.send(result)

    async def start(self, job, *args, **kwargs) -> None:
        await self.nursery.start(self._spawn, job, args, kwargs)


# async def handle_result_of_some_job_function(result) -> None: ...
# async def some_job_function(foo) -> ResultType: ...
# async with trio_parallel_ordered(handle_result_of_some_job_function) as tpo:
#     for foo in biglist:
#         await tpo.start(some_job_function, foo)
@contextlib.asynccontextmanager
async def trio_parallel_ordered(
    result_handler: typing.Callable[[T], typing.Awaitable[None]],
) -> typing.AsyncGenerator[TrioParallelOrdered[T], None]:
    async with trio.open_nursery() as nursery:
        tpo = TrioParallelOrdered(nursery=nursery, result_handler=result_handler)
        yield tpo
        await tpo.close()


@dataclasses.dataclass(slots=True)
class Recipient:
    """Recipient in a postfix mail"""
    address: str
    # "error message" from last delivery attempt
    delay_reason: str

    @staticmethod
    def from_json(d: dict) -> Recipient:
        return Recipient(
            address=d['address'],
            delay_reason=d.get('delay_reason', ''),
        )


class QueueName(enum.Enum):
    MAILDROP = "maildrop"
    HOLD = "hold"
    INCOMING = "incoming"
    ACTIVE = "active"
    DEFERRED = "deferred"
    CORRUPT = "corrupt"


ALL_QUEUE_NAMES: set[QueueName] = set(QueueName)


def json_decode_stream(data: str):
    decoder = json.JSONDecoder()
    data_len = len(data)
    data_pos = 0
    while data_len > data_pos and data[data_len-1].isspace():
        data_len -= 1
    while True:
        while data_pos < data_len and data[data_pos].isspace():
            data_pos += 1
        if data_pos >= data_len:
            return
        obj, data_pos = decoder.raw_decode(data, data_pos)
        yield obj


@dataclasses.dataclass(slots=True)
class Mail:
    """Metadata for mail in postfix queue"""
    queue_name: QueueName
    queue_id: str
    arrival_time: datetime.datetime
    message_size: int
    sender: str
    recipients: list[Recipient]
    # forced_expire: bool

    @staticmethod
    def from_json(d: dict) -> Mail:
        return Mail(
            queue_name=QueueName(d['queue_name']),
            queue_id=d['queue_id'],
            arrival_time=datetime.datetime.fromtimestamp(d['arrival_time']),
            message_size=d['message_size'],
            sender=d['sender'],
            recipients=[Recipient.from_json(recp) for recp in d['recipients']],
        )

    @staticmethod
    def read_postqueue_json(data: str, id_prefix: str = '') -> list[Mail]:
        queue = []
        for obj in json_decode_stream(data):
            mail = Mail.from_json(obj)
            mail.queue_id = id_prefix + mail.queue_id
            queue.append(mail)
        return queue

    def print(self, *, verbose: bool, out: typing.TextIO = sys.stdout) -> None:
        flag = CLI.QUEUE_FLAGS.get(self.queue_name, ' ')
        if verbose:
            print(
                f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} {self.sender:<60s}",
                file=out,
            )
            if not self.recipients:
                print(f"{'':21}No recipients listed for this mail?", file=out)
            for recpt in self.recipients:
                print(f"{'':21}{recpt.address}", file=out)
                if recpt.delay_reason:
                    print(f"{'':29}{recpt.delay_reason}", file=out)
        else:
            cnt_recpts = len(self.recipients)
            if cnt_recpts:
                last_recpt = self.recipients[-1].address
                print(
                    f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
                    f"{self.sender:<60s} (Targets: {cnt_recpts}, last: {last_recpt})",
                    file=out,
                )
            else:
                print(
                    f"{self.queue_id + flag:<17s} {self.message_size:>8d} {self.arrival_time:%a %b %d %H:%M:%S} "
                    f"{self.sender:<60s} (No recipients listed for this mail?)",
                    file=out,
                )


# abstract collection/cluster of postfix nodes (or just a single one)
class Source(abc.ABC):
    __slots__ = ('config',)

    def __init__(self, *, config: Config) -> None:
        self.config = config

    # list of server names in collection
    @abc.abstractmethod
    def server_list(self) -> list[str]:
        raise NotImplementedError()

    # split "list" of input elements into chunks
    @typing.final
    @staticmethod
    def chunks(data: typing.Iterable[T], *, chunk_size: int = 512) -> typing.Generator[list[T], None, None]:
        if isinstance(data, list):
            while data:
                yield data[:chunk_size]
                data = data[chunk_size:]
        else:
            chunk = []
            for item in data:
                chunk.append(item)
                if len(chunk) >= chunk_size:
                    yield chunk
                    chunk = []
            if chunk:
                yield chunk

    # retrieve list of mails in all queues
    @abc.abstractmethod
    async def get_list(self) -> list[Mail]:
        raise NotImplementedError()

    # basically postcat, and flags are used as command line arguments to postcast
    # returns stdout and stderr of postcat
    @abc.abstractmethod
    async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
        raise NotImplementedError()

    @abc.abstractmethod
    async def flush_all(self) -> None:
        """Flush the queue: attempt to deliver all queued mail."""
        # postqueue -f
        raise NotImplementedError()

    @abc.abstractmethod
    async def flush(self, queue_ids: typing.Iterable[str]) -> None:
        """Schedule immediate delivery of deferred mail with the specified queue IDs."""
        # postqueue -i MSG1 -i MSG2 -i ...
        raise NotImplementedError()

    # abstract way to call `postsuper`, action is the command line flag for the action to take
    @abc.abstractmethod
    async def _postsuper(
        self,
        action: str,
        queue_ids: typing.Iterable[str],
        *,
        from_queues: set[QueueName] | tuple[()] = (),
    ):
        raise NotImplementedError()

    async def delete(self, queue_ids: typing.Iterable[str], *, from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Delete one message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
        """
        await self._postsuper('-d', queue_ids=queue_ids, from_queues=from_queues)

    async def hold(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Put mail "on hold" so that no attempt is made to deliver it.
        Move one message with the named queue ID from the named mail queue(s) (default: incoming, active and deferred) to the hold queue.
        """
        await self._postsuper('-h', queue_ids=queue_ids, from_queues=from_queues)

    async def release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Release mail that was put "on hold".
        Move one message with the named queue ID from the named mail queue(s) (default: hold) to the deferred queue.
        """
        await self._postsuper('-H', queue_ids=queue_ids, from_queues=from_queues)

    async def requeue(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Requeue the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
        """
        await self._postsuper('-r', queue_ids=queue_ids, from_queues=from_queues)

    async def expire(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Expire the message with the named queue ID from the named mail queue(s) (default: hold, incoming, active and deferred).
        """
        await self._postsuper('-e', queue_ids=queue_ids, from_queues=from_queues)

    async def expire_release(self, queue_ids: typing.Iterable[str], from_queues: set[QueueName] | tuple[()] = ()) -> None:
        """
        Expire and release (if in hold) the message with the named queue ID from the named mail queue(s)
        (default queues: hold, incoming, active and deferred).
        """
        await self._postsuper('-f', queue_ids=queue_ids, from_queues=from_queues)

    @abc.abstractmethod
    async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
        raise NotImplementedError()


class LocalSource(Source):
    __slots__ = ()

    def __init__(self, *, config: Config) -> None:
        super().__init__(config=config)

    def server_list(self) -> list[str]:
        return [socket.gethostname()]

    async def get_list(self) -> list[Mail]:
        res: subprocess.CompletedProcess = await trio.run_process(
            ['/usr/sbin/postqueue', '-j'],
            capture_stdout=True,
        )
        queue = Mail.read_postqueue_json(res.stdout.decode('utf-8'))
        queue = sorted(queue, key=lambda item: item.arrival_time)  # sort by incoming date, newest last
        return queue

    async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
        """stdout and stderr of postcat; stdout is None if we got non-zero status"""
        if not re.fullmatch(r'[A-Z0-9]+', queue_id):
            return (None, f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/".encode())
        flags_s = ' '.join(flags)
        res: subprocess.CompletedProcess = await trio.run_process(
            ['/usr/sbin/postcat'] + list(flags_s) + ['-q', queue_id],
            capture_stdout=True,
            capture_stderr=True,
            check=False,
        )
        if res.returncode != 0:
            # expect some error was printed to stderr
            return (None, res.stderr)
        return (res.stdout, res.stderr)

    async def flush_all(self) -> None:
        await trio.run_process(['/usr/sbin/postqueue', '-f'])

    async def flush(self, queue_ids: typing.Iterable[str]) -> None:
        # postqueue only deals with single messages, because the flush daemon
        # only accepts a single (flush) request per connection...
        for msg in queue_ids:
            await trio.run_process(['/usr/sbin/postqueue', '-i', msg])

    async def _postsuper(
        self,
        action: str,
        queue_ids: typing.Iterable[str],
        *,
        from_queues: set[QueueName] | tuple[()] = (),
    ):
        cmd = ['/usr/sbin/postsuper', action, '-']
        if isinstance(from_queues, set):
            from_queues = from_queues - set((QueueName.CORRUPT,))  # remove "invalid" queue names
            if from_queues:
                cmd += [qn.value for qn in from_queues]
            else:
                # empty set of queues allowed -> no queue allowed
                return
        data = ''.join(f'{msg}\n' for msg in queue_ids)
        await trio.run_process(cmd, stdin=data.encode())

    async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
        mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
        for mail in mails:
            (mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
            if mail_message is None:
                print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
            mail_messages[mail.queue_id] = (mail, mail_message)
        if not mail_messages:
            print("No mails found to forward.")
            return
        message = format_mail_forward(config=self.config, mails=mail_messages, hostname=socket.getfqdn(), recipients=recipients)
        cmd = ['sendmail', '-t', '-bm', '-f', '']
        await trio.run_process(cmd, stdin=message)


class RemoteSource(Source):
    __slots__ = ('remotes',)

    def __init__(self, *, config: Config, remotes: dict[str, str]) -> None:
        super().__init__(config=config)
        self.remotes = remotes

    def server_list(self) -> list[str]:
        return list(self.remotes)

    # split "remote mail id" into alias for remote and id on remote system
    @staticmethod
    def decode_queue_id(queue_id: str, *, allow_all: bool = False) -> tuple[str, str]:
        parts = queue_id.split('/', maxsplit=1)
        if len(parts) < 2:
            raise ValueError(f"Invalid mail id: {queue_id!r} - expected HOST/localid")
        host, queue_id = parts
        if not re.fullmatch(r'[A-Z0-9]+', queue_id):
            raise ValueError(f"Invalid queue id: {queue_id!r} - expected /[A-Z0-9]+/")
        if not allow_all and queue_id == 'ALL':
            raise ValueError(f"Invalid queue id: {queue_id!r} - ALL not allowed")
        return (host, queue_id)

    async def get_list(self) -> list[Mail]:
        queue: list[Mail] = []

        async def get_host(remote_id: str, remote_host: str) -> None:
            remote_id_prefix = remote_id + '/'
            res: subprocess.CompletedProcess = await trio.run_process(
                ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -j'],
                capture_stdout=True,
            )
            queue.extend(Mail.read_postqueue_json(res.stdout.decode('utf-8'), id_prefix=remote_id_prefix))

        async with trio.open_nursery() as nursery:
            for remote_id, remote_host in self.remotes.items():
                nursery.start_soon(get_host, remote_id, remote_host)

        queue = sorted(queue, key=lambda item: item.arrival_time)  # sort by incoming date, newest last
        return queue

    async def get_mail(self, queue_id: str, *, flags: typing.Iterable[str] = ()) -> tuple[bytes | None, bytes]:
        try:
            host, queue_id = self.decode_queue_id(queue_id)
        except ValueError as e:
            return (None, str(e).encode())
        if not host in self.remotes:
            return (None, f"Unknown remote host: {host!r}".encode())
        remote_host = self.remotes[host]
        cmd = ['/usr/sbin/postcat'] + list(flags) + ['-q', queue_id]
        res: subprocess.CompletedProcess = await trio.run_process(
            ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)],
            capture_stdout=True,
            capture_stderr=True,
            check=False,
        )
        if res.returncode != 0:
            # expect some error was printed to stderr
            return (None, res.stderr)
        return (res.stdout, res.stderr)

    async def flush_all(self) -> None:
        async def flush_host(remote_host: str) -> None:
            await trio.run_process(
                ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, '/usr/sbin/postqueue -f'],
            )

        async with trio.open_nursery() as nursery:
            for _, remote_host in self.remotes.items():
                nursery.start_soon(flush_host, remote_host)

    # group "long ids" (with alias/ prefix) by alias (remote host)
    def aggregate_by_host(self, queue_ids: typing.Iterable[str], *, allow_all: bool) -> dict[str, list[str]]:
        by_host: dict[str, list[str]] = {}
        for long_queue_id in queue_ids:
            if long_queue_id == 'ALL':
                if not allow_all:
                    raise ValueError(f"Invalid queue id: {long_queue_id!r} - ALL not allowed")
                for host in self.remotes:
                    by_host.setdefault(host, []).append('ALL')
                continue
            host, queue_id = self.decode_queue_id(long_queue_id, allow_all=allow_all)
            if not host in by_host:
                by_host[host] = [queue_id]
            else:
                by_host[host].append(queue_id)
        for host in list(by_host):
            if not host in self.remotes:
                ids = by_host.pop(host)
                raise ValueError(f"Unknown remote host {host!r}, used for {len(ids)} queue ids")
        return by_host

    async def flush(self, queue_ids: typing.Iterable[str]) -> None:
        by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=False)

        async def run_host(remote_host: str, ids: list[str]) -> None:
            for run in self.chunks(ids):
                # postqueue only deals with single messages, because the flush daemon
                # only accepts a single (flush) request per connection...
                cmd = 'for msg in ' + shlex.join(run) + '; do /usr/sbin/postqueue -i "${msg}"; done'
                await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, cmd])

        async with trio.open_nursery() as nursery:
            for host, ids in by_host.items():
                remote_host = self.remotes[host]
                nursery.start_soon(run_host, remote_host, ids)

    async def _postsuper(
        self,
        action: str,
        queue_ids: typing.Iterable[str],
        *,
        from_queues: set[QueueName] | tuple[()] = (),
    ) -> None:
        cmd = ['/usr/sbin/postsuper', action, '-']
        if isinstance(from_queues, set):
            from_queues = from_queues - set((QueueName.CORRUPT,))  # remove "invalid" queue names
            if from_queues:
                cmd += [qn.value for qn in from_queues]
            else:
                # empty set of queues allowed -> no queue allowed
                return
        by_host = self.aggregate_by_host(queue_ids=queue_ids, allow_all=True)

        async def run_host(remote_host: str, data: str) -> None:
            await trio.run_process(['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)], stdin=data.encode())

        async with trio.open_nursery() as nursery:
            for host, ids in by_host.items():
                remote_host = self.remotes[host]
                data = ''.join(f'{msg}\n' for msg in ids)
                nursery.start_soon(run_host, remote_host, data)

    async def forward_to(self, *, mails: typing.Iterable[Mail], recipients: list[str]) -> None:
        mails_by_id = {
            mail.queue_id: mail
            for mail in mails
        }

        if not mails_by_id:
            print("No mails found to forward.")

        async def run_host(host: str, remote_host: str, queue_ids: list[str]) -> None:
            mail_messages: dict[str, tuple[Mail, bytes | None]] = {}
            for queue_id in queue_ids:
                mail = mails_by_id[f'{host}/{queue_id}']
                (mail_message, err) = await self.get_mail(mail.queue_id, flags=['-h', '-b'])
                if mail_message is None:
                    print(f"Failed to get mail {mail.queue_id}: {err.decode()}")
                mail_messages[mail.queue_id] = (mail, mail_message)
            if not mails:
                return
            message = format_mail_forward(config=self.config, mails=mail_messages, hostname=remote_host, recipients=recipients)
            cmd = ['sendmail', '-t', '-bm', '-f', '']
            cmd = ['/usr/bin/ssh', '-oBatchMode=yes', 'root@' + remote_host, shlex.join(cmd)]
            await trio.run_process(cmd, stdin=message)

        by_host = self.aggregate_by_host(queue_ids=mails_by_id.keys(), allow_all=True)

        async with trio.open_nursery() as nursery:
            for host, ids in by_host.items():
                remote_host = self.remotes[host]
                nursery.start_soon(run_host, host, remote_host, ids)


class UserActivityStats:
    __slots__ = ('count_mails', 'related_mails', 'cut_off')

    # find most active address for `health` command

    def __init__(self, cut_off: int = 10) -> None:
        # count recipients individually for each sender, and recipients just once
        self.count_mails: dict[str, int] = {}
        self.related_mails: dict[str, list[Mail]] = {}
        self.cut_off = cut_off

    def count(self, item: Mail) -> None:
        if QueueName.HOLD == item.queue_name:
            return
        sender = item.sender.lower()
        if item.sender != 'MAILER-DAEMON':
            if not sender in self.count_mails:
                self.count_mails[sender] = 0
                self.related_mails[sender] = []
            self.count_mails[sender] += len(item.recipients)
            self.related_mails[sender].append(item)
        recipients = {
            recp.address.lower()
            for recp in item.recipients
        }
        for recp in recipients:
            if sender == recp: continue  # count user just once per mail
            if not recp in self.count_mails:
                self.count_mails[recp] = 0
                self.related_mails[recp] = []
            self.count_mails[recp] += 1
            self.related_mails[recp].append(item)

    def prepare(self) -> None:
        for (user, count) in list(self.count_mails.items()):
            if count < self.cut_off:
                self.count_mails.pop(user)

    def extract_highest_user(self) -> None | tuple[str, int, list[Mail]]:
        max_count = 0
        if not self.count_mails:
            return None
        user = ''
        for (loop_user, count) in list(self.count_mails.items()):
            if count > max_count:
                user = loop_user
                max_count = count
        activity = self.count_mails.pop(user)
        mails = self.related_mails.pop(user)
        for item in mails:
            sender = item.sender.lower()
            if user != sender and sender in self.count_mails:
                self.count_mails[sender] -= len(item.recipients)
                if self.count_mails[sender] < self.cut_off:
                    self.count_mails.pop(sender)
            recipients = {
                recp.address.lower()
                for recp in item.recipients
            }
            for recp in recipients:
                if sender == recp: continue  # count user just once per mail
                if user != recp and recp in self.count_mails:
                    self.count_mails[recp] -= 1
                    if self.count_mails[recp] < self.cut_off:
                        self.count_mails.pop(recp)
        return (user, activity, mails)


class Filter(abc.ABC):
    """abstract base class for filter expressions"""

    __slots__ = ()

    def __init__(self) -> None:
        pass

    @abc.abstractmethod
    def matches(self, mail: Mail) -> bool:
        """Whether mail matches the filter"""
        raise NotImplementedError()

    @abc.abstractmethod
    def possible_queues(self) -> set[QueueName]:
        """Which queues matched mails are possible from (all unless explicitly restricted)"""
        raise NotImplementedError()

    @abc.abstractmethod
    def __repr__(self) -> str:
        raise NotImplementedError()

    @typing.final
    def filter(self, mails: typing.Iterable[Mail]) -> typing.Generator[Mail, None, None]:
        """Filter list of mails"""
        for mail in mails:
            if self.matches(mail):
                yield mail

    @typing.final
    def filter_ids(self, mails: typing.Iterable[Mail]) -> typing.Generator[str, None, None]:
        """Filter list of mails but only return mail ids"""
        for mail in mails:
            if self.matches(mail):
                yield mail.queue_id

    def __and__(self, other: typing.Any) -> Filter:
        """Combine two filter into one that only matches mails that matched both"""
        if not isinstance(other, Filter):
            return NotImplemented
        if other is TRUE_FILTER:
            return self
        if other is FALSE_FILTER:
            return other
        if isinstance(other, AndFilter):
            return AndFilter([self] + other.expressions)
        return AndFilter([self, other])

    def __or__(self, other: typing.Any) -> Filter:
        """Combine two filter into one that matches mails that matched at least one"""
        if not isinstance(other, Filter):
            return NotImplemented
        if other is TRUE_FILTER:
            return other
        if other is FALSE_FILTER:
            return self
        if isinstance(other, OrFilter):
            return OrFilter([self] + other.expressions)
        return OrFilter([self, other])

    def __invert__(self) -> Filter:
        """Create filter that exactly matches mail that didn't match the original filter"""
        return NotFilter(self)


# abstract base class for filters that don't need (...) around in representations of members in combined filters
class SingleExprFilter(Filter):
    __slots__ = ()


class FalseFilter(SingleExprFilter):
    """constant false - matches no mail"""

    __slots__ = ()

    def __repr__(self) -> str:
        return '0'

    def matches(self, mail: Mail) -> bool:
        return False

    def possible_queues(self) -> set[QueueName]:
        return set()

    def __and__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        return self

    def __or__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        return other

    def __invert__(self) -> Filter:
        return TRUE_FILTER


class TrueFilter(SingleExprFilter):
    """constant true - matches all mail"""

    __slots__ = ()

    def __repr__(self) -> str:
        return '1'

    def matches(self, mail: Mail) -> bool:
        return True

    def possible_queues(self) -> set[QueueName]:
        return ALL_QUEUE_NAMES

    def __and__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        return other

    def __or__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        return self

    def __invert__(self) -> Filter:
        return FALSE_FILTER


FALSE_FILTER = FalseFilter()
TRUE_FILTER = TrueFilter()


class AndFilter(Filter):
    __slots__ = ('expressions',)

    def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
        super().__init__()
        self.expressions: list[Filter] = expressions or []

    def matches(self, mail: Mail) -> bool:
        for expr in self.expressions:
            if not expr.matches(mail):
                return False
        return True

    def possible_queues(self) -> set[QueueName]:
        res = set(ALL_QUEUE_NAMES)  # clone set
        for expr in self.expressions:
            res &= expr.possible_queues()
        return res

    def __repr__(self) -> str:
        if len(self.expressions) == 0:
            return '1'
        if len(self.expressions) == 1:
            return repr(self.expressions[0])
        subexprs = []
        for e in self.expressions:
            if isinstance(e, (SingleExprFilter, AndFilter)):
                subexprs.append(repr(e))
            else:
                subexprs.append(f'({e!r})')
        return ' and '.join(subexprs)

    def __and__(self, other: typing.Any) -> AndFilter:
        if not isinstance(other, Filter):
            return NotImplemented
        if isinstance(other, AndFilter):
            return AndFilter(self.expressions + other.expressions)
        return AndFilter(self.expressions + [other])

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
        assert len(toks) == 1
        inner_toks = list(toks[0])
        assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
        token = inner_toks.pop(0)
        assert isinstance(token, Filter)
        while inner_toks:
            assert inner_toks.pop(0) == 'and', f"Missing 'and' in {toks}"
            next_token = inner_toks.pop(0)
            assert isinstance(next_token, Filter), f"Non-filter in {toks}"
            token = token & next_token
        return token


class OrFilter(Filter):
    __slots__ = ('expressions',)

    def __init__(self, expressions: list[Filter] | tuple[()] = ()) -> None:
        super().__init__()
        self.expressions: list[Filter] = expressions or []

    def matches(self, mail: Mail) -> bool:
        for expr in self.expressions:
            if expr.matches(mail):
                return True
        return False

    def possible_queues(self) -> set[QueueName]:
        res = set()
        for expr in self.expressions:
            res |= expr.possible_queues()
        return res

    def __repr__(self) -> str:
        if len(self.expressions) == 0:
            return '0'
        if len(self.expressions) == 1:
            return repr(self.expressions[0])
        subexprs = []
        for e in self.expressions:
            if isinstance(e, (SingleExprFilter, OrFilter)):
                subexprs.append(repr(e))
            else:
                subexprs.append(f'({e!r})')
        return ' or '.join(subexprs)

    def __or__(self, other: typing.Any) -> OrFilter:
        if not isinstance(other, Filter):
            return NotImplemented
        if isinstance(other, OrFilter):
            return OrFilter(self.expressions + other.expressions)
        return OrFilter(self.expressions + [other])

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
        assert len(toks) == 1
        inner_toks = list(toks[0])
        assert len(inner_toks) % 2 == 1, f"need odd number of tokens in {toks}"
        token = inner_toks.pop(0)
        assert isinstance(token, Filter)
        while inner_toks:
            assert inner_toks.pop(0) == 'or', f"Missing 'or' in {toks}"
            next_token = inner_toks.pop(0)
            assert isinstance(next_token, Filter), f"Non-filter in {toks}"
            token = token | next_token
        return token


class NotFilter(SingleExprFilter):
    __slots__ = ('expression',)

    def __init__(self, expression: Filter) -> None:
        super().__init__()
        self.expression = expression

    def matches(self, mail: Mail) -> bool:
        return not self.expression.matches(mail)

    def possible_queues(self) -> set[QueueName]:
        # negation could match mails from any filter!
        if isinstance(self.expression, QueueFilter):
            # in this case we actually no the negation
            return ALL_QUEUE_NAMES - self.expression.select
        # otherwise: everything is possible
        return ALL_QUEUE_NAMES

    def __repr__(self) -> str:
        if isinstance(self.expression, SingleExprFilter):
            return f'not {self.expression!r}'
        else:
            return f'not ({self.expression!r})'

    def __invert__(self) -> Filter:
        return self.expression

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> Filter:
        assert len(toks) == 1
        toks = toks[0]
        assert len(toks) == 2
        assert toks[0] == 'not'
        assert isinstance(toks[1], Filter)
        return ~toks[1]


class QueueFilter(SingleExprFilter):
    """match mails based on queue they are in"""

    __slots__ = ('select',)

    def __init__(self, select: list[QueueName]) -> None:
        self.select = set(select)

    def matches(self, mail: Mail) -> bool:
        return mail.queue_name in self.select

    def possible_queues(self) -> set[QueueName]:
        return self.select

    def __repr__(self) -> str:
        if not self.select:
            return '0'
        return f'queue {",".join(qn.value for qn in self.select)}'

    def __and__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        if isinstance(other, QueueFilter):
            select = self.select & other.select
            if select:
                return QueueFilter(list(select))
            else:
                return FALSE_FILTER
        return super().__and__(other)

    def __or__(self, other: typing.Any) -> Filter:
        if not isinstance(other, Filter):
            return NotImplemented
        if isinstance(other, QueueFilter):
            select = self.select | other.select
            return QueueFilter(list(select))
        return super().__or__(other)

    # def __invert__(self) -> Filter:
    #     select = ALL_QUEUE_NAMES - self.select
    #     return QueueFilter(list(select))

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> QueueFilter:
        return QueueFilter([QueueName(t) for t in toks])


class AddressSelector(enum.Enum):
    # addresses of a mail to extract
    SENDER = "from"
    RECIPIENT = "to"
    ANY = "address"

    def extract(self, mail: Mail) -> typing.Generator[str, None, None]:
        if self == AddressSelector.SENDER or self == AddressSelector.ANY:
            yield mail.sender
        if self == AddressSelector.RECIPIENT or self == AddressSelector.ANY:
            for recp in mail.recipients:
                yield recp.address


class BaseAddressPattern(abc.ABC):
    __slots__ = ()

    # abstract base class of patterns to use to match a mail address
    # subclasses match either: full address, domain part, regex
    def __init__(self) -> None:
        pass

    @abc.abstractmethod
    def __repr__(self) -> str:
        raise NotImplementedError()

    @abc.abstractmethod
    def matches(self, address: str) -> bool:
        raise NotImplementedError()

    @staticmethod
    @parser_action
    def from_unquoted_parse_results(toks: pyparsing.ParseResults) -> list[BaseAddressPattern]:
        addresses: str = toks[0]
        patterns: list[BaseAddressPattern] = []
        for address in addresses.strip().split(','):
            address = address.strip()
            if not address:
                continue
            if address.startswith('~'):
                patterns.append(AddressRegexMatch(address[1:]))
            elif address.startswith('@'):
                patterns.append(AddressDomainPattern(address[1:]))
            else:
                patterns.append(AddressPattern(address))
        return patterns


class AddressPattern(BaseAddressPattern):
    __slots__ = ('address',)

    # match full address exactly
    def __init__(self, address: str) -> None:
        super().__init__()
        self.address = address.lower().removesuffix('.')

    def matches(self, address: str) -> bool:
        return address.lower().removesuffix('.') == self.address

    def __repr__(self) -> str:
        return f'{self.address!r}'

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> BaseAddressPattern:
        address: str = toks[0]
        if address.startswith('@'):
            return AddressDomainPattern(address[1:])
        else:
            return AddressPattern(address)


class AddressDomainPattern(BaseAddressPattern):
    __slots__ = ('domain',)

    # match address by domain
    def __init__(self, domain: str) -> None:
        super().__init__()
        self.domain = domain.lower().removesuffix('.')

    def matches(self, address: str) -> bool:
        parts = address.rsplit('@', maxsplit=1)
        if len(parts) == 1:
            # "MAILER-DAEMON"
            return False
        return parts[1].lower().removesuffix('.') == self.domain

    def __repr__(self) -> str:
        return f'{"@" + self.domain!r}'


class AddressRegexMatch(BaseAddressPattern):
    __slots__ = ('address',)

    # match address by regex
    def __init__(self, address: str | re.Pattern[str]) -> None:
        super().__init__()
        if isinstance(address, re.Pattern):
            self.address = address
        else:
            self.address = re.compile(address)

    def matches(self, address: str) -> bool:
        return bool(self.address.search(address.lower().removesuffix('.')))

    def __repr__(self) -> str:
        return f'~{self.address.pattern!r}'

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> AddressRegexMatch:
        return AddressRegexMatch(toks[0])


class AddressFilter(SingleExprFilter):
    __slots__ = ('selector', 'patterns')

    # match mails by address
    def __init__(self, selector: AddressSelector, patterns: list[BaseAddressPattern]) -> None:
        self.selector = selector
        self.patterns = patterns

    def matches(self, mail: Mail) -> bool:
        for address in self.selector.extract(mail):
            for pattern in self.patterns:
                if pattern.matches(address):
                    return True
        return False

    def possible_queues(self) -> set[QueueName]:
        return ALL_QUEUE_NAMES

    def __repr__(self) -> str:
        if not self.patterns:
            return '0'
        return f'{self.selector.value} {" ".join(repr(p) for p in self.patterns)}'

    @staticmethod
    @parser_action
    def from_parse_results(toks: pyparsing.ParseResults) -> AddressFilter:
        return AddressFilter(
            selector=AddressSelector(toks[0]),
            patterns=list(toks[1]),
        )


# create parser for filter expressions
def build_filter_parser() -> typing.Callable[[str], Filter]:
    addr_expr_selector = pyparsing.MatchFirst((
        pyparsing.Keyword(sel.value)
        for sel in AddressSelector
    ))
    squoted = pyparsing.QuotedString(quoteChar="'", escChar='\\', unquoteResults=True)
    dquoted = pyparsing.QuotedString(quoteChar='"', escChar='\\', unquoteResults=True)
    quoted = squoted | dquoted
    addr_expr_pattern_q_re = (pyparsing.Suppress('~') + quoted).setParseAction(AddressRegexMatch.from_parse_results)
    addr_expr_pattern_q = quoted.copy().setParseAction(AddressPattern.from_parse_results)
    addr_expr_pattern = pyparsing.Word(pyparsing.printables, excludeChars="'\"") \
        .setParseAction(BaseAddressPattern.from_unquoted_parse_results)
    addr_expr_patterns = pyparsing.MatchFirst([
        pyparsing.OneOrMore(addr_expr_pattern_q_re | addr_expr_pattern_q),
        addr_expr_pattern,
    ])
    addr_expr = (
        addr_expr_selector + pyparsing.Optional(pyparsing.Suppress('=')) + pyparsing.Group(addr_expr_patterns)
    ).setParseAction(AddressFilter.from_parse_results)
    queue_expr_name = pyparsing.MatchFirst([pyparsing.Keyword(qn.value) for qn in QueueName])
    queue_expr_single = queue_expr_name
    queue_expr_mult = (
        pyparsing.Suppress(pyparsing.Keyword('queue'))
        + queue_expr_name
        + pyparsing.ZeroOrMore(pyparsing.Suppress(',') + queue_expr_name)
    )
    queue_expr = (queue_expr_single | queue_expr_mult).setParseAction(QueueFilter.from_parse_results)
    const_true = pyparsing.Keyword('1').setParseAction(lambda x: TRUE_FILTER)
    const_false = pyparsing.Keyword('0').setParseAction(lambda x: FALSE_FILTER)
    base_expr = addr_expr | queue_expr | const_true | const_false
    parser = pyparsing.infixNotation(base_expr, [
        ('not', 1, pyparsing.opAssoc.RIGHT, NotFilter.from_parse_results),
        ('or', 2, pyparsing.opAssoc.LEFT, OrFilter.from_parse_results),
        ('and', 2, pyparsing.opAssoc.LEFT, AndFilter.from_parse_results),
    ])

    def parse(input: str) -> Filter:
        return parser.parseString(input, parseAll=True)[0]

    return parse


FILTER_PARSER = build_filter_parser()


# TODO: proper type hinting for decorator builder?
# commands are methods in the CLI class and need to be decorated with this.
# the name defaults to the name of the function.
def register_command(*, name: str | tuple[()] = ()):
    def register(cmd: typing.Callable) -> typing.Callable:
        setattr(cmd, '__command', name)
        return cmd

    return register


class CLI:
    def __init__(self, source: Source) -> None:
        self.source = source
        self.current_filter: AndFilter = AndFilter()
        self._simple_commands: dict[str, typing.Callable[[], typing.Awaitable[None]]] = {}
        self._long_commands: dict[str, typing.Callable[[str], typing.Awaitable[None]]] = {}
        for name in dir(self):
            prop = getattr(self, name)
            if callable(prop):
                command = getattr(prop, '__command', None)
                if command is None: continue
                if command == ():
                    cmd_name = name
                else:
                    assert isinstance(command, str)
                    cmd_name = command
                sig = inspect.signature(prop)
                simple = False
                long = False
                if len(sig.parameters) == 0:
                    simple = True
                elif len(sig.parameters) == 1:
                    long = True
                    param = list(sig.parameters.values())[0]
                    if not param.default is inspect.Parameter.empty:
                        simple = True
                else:
                    raise TypeError('Commands must take zero or one arguments')
                if simple:
                    self._simple_commands[cmd_name] = prop
                if long:
                    self._long_commands[cmd_name] = prop
        self._known_commands = set(self._simple_commands) | set(self._long_commands)
        self._init_readline()
        self._completion_cache: list[str] = []

    def _readline_completer(self, text: str, state: int) -> str | None:
        text_lstripped = text.lstrip()
        front_space = text[:len(text) - len(text_lstripped)]
        text = text_lstripped
        if len(text.split(maxsplit=1)) > 1:
            # not in first word (command) anymore - no more completions
            return None
        if text.rstrip() != text:
            # whitespace after first word (command) - no more completions
            return None
        if state == 0:
            self._completion_cache = [
                front_space + cmd
                for cmd in self._known_commands
                if cmd.startswith(text)
            ]
        if state >= len(self._completion_cache):
            return None
        return self._completion_cache[state]

    def _init_readline(self) -> None:
        readline.read_init_file()
        # completer
        readline.parse_and_bind("tab: complete")
        readline.set_completer(self._readline_completer)
        readline.set_completer_delims("")  # want to see full buffer in completer
        # history handling:
        self._history_file = os.path.join(os.path.expanduser("~"), ".pqm_history")
        try:
            readline.read_history_file(self._history_file)
        except FileNotFoundError:
            pass
        previous_history_len = readline.get_current_history_length()
        readline.set_auto_history(True)
        readline.set_history_length(100)

        def finish_readline() -> None:
            new_entries = readline.get_current_history_length() - previous_history_len
            if new_entries > 0:
                try:
                    readline.append_history_file(new_entries, self._history_file)
                except FileNotFoundError:
                    readline.write_history_file(self._history_file)

        atexit.register(finish_readline)

    @register_command()
    async def help(self) -> None:
        """Show all available commands"""
        all_cmds: list[tuple[str, str]] = []
        for cmd, func1 in self._simple_commands.items():
            if self._long_commands.get(cmd, None) is func1:
                all_cmds.append((f'{cmd} [<arg>]', func1.__doc__ or 'No help available'))
            else:
                all_cmds.append((cmd, func1.__doc__ or 'No help available'))
        for cmd, func2 in self._long_commands.items():
            if self._simple_commands.get(cmd, None) is func2:
                continue
            all_cmds.append((f'{cmd} <arg>', func2.__doc__ or 'No help available'))
        for cmd, doc in sorted(all_cmds):
            doc = doc.strip().splitlines()[0]
            print(f'{cmd:<30} {doc}', file=sys.stderr)
        print(file=sys.stderr)

    @register_command(name="help")
    async def help_command(self, args: str) -> None:
        """Show detailed help for a command"""
        command = args.strip()
        if not command in self._known_commands:
            print(f'Command {command} not known, no help available', file=sys.stderr)
            return
        simple_command = self._simple_commands.get(command, None)
        long_command = self._long_commands.get(command, None)
        if simple_command is long_command:
            print(f'>>>> {command} [args]', file=sys.stderr)
            print(file=sys.stderr)
            doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
            print(doc, file=sys.stderr)
            print(file=sys.stderr)
            return
        if simple_command:
            print(f'>>>> {command}', file=sys.stderr)
            print(file=sys.stderr)
            doc = inspect.cleandoc(simple_command.__doc__ or 'No help available')
            print(doc, file=sys.stderr)
            print(file=sys.stderr)
        if long_command:
            print(f'>>>> {command} <args>', file=sys.stderr)
            print(file=sys.stderr)
            doc = inspect.cleandoc(long_command.__doc__ or 'No help available')
            print(doc, file=sys.stderr)
            print(file=sys.stderr)

    @register_command()
    async def exit(self) -> None:
        """Exit pqm (or press Ctrl-D on empty prompt)"""
        # leave main loop
        raise KeyboardInterrupt()

    def prompt_if_empty_filter(self) -> bool:
        if self.current_filter.expressions:
            return True
        print(
            "You currently have no filters configured, which means this action will "
            "potentially run for a lot of mails",
            file=sys.stderr,
        )
        ack = input_ack("Continue anyway (y/N)? ")
        if ack.lower()[:1] != "y":
            return False
        return True

    @register_command()
    async def health(self) -> None:
        """
        Show generic stats for queues and overly active address

        If the current filter isn't empty show queue counts for both matching and all mails.
        Only shows overly active addresses for mails matching the current filter (or all, if not set); never includes mails on hold.
        """
        count_q_other = 0
        count_q = {
            QueueName.ACTIVE: 0,
            QueueName.DEFERRED: 0,
            QueueName.HOLD: 0,
        }
        total_q_other = 0
        total_q = {
            QueueName.ACTIVE: 0,
            QueueName.DEFERRED: 0,
            QueueName.HOLD: 0,
        }
        stats = UserActivityStats()
        for item in await self.source.get_list():
            if item.queue_name in total_q:
                total_q[item.queue_name] += 1
            else:
                total_q_other += 1
            if self.current_filter.matches(item):
                if item.queue_name in count_q:
                    count_q[item.queue_name] += 1
                else:
                    count_q_other += 1
                stats.count(item)
        stats.prepare()
        servers = self.source.server_list()
        print(f"Servers: {len(servers)} ({', '.join(servers)})")
        if self.current_filter.expressions:
            print(f"Active: {count_q[QueueName.ACTIVE]} (total: {total_q[QueueName.ACTIVE]})")
            print(f"Deferred: {count_q[QueueName.DEFERRED]} (total: {total_q[QueueName.DEFERRED]})")
            print(f"Hold: {count_q[QueueName.HOLD]} (total: {total_q[QueueName.HOLD]})")
            if count_q_other:
                print(f"Other: {count_q_other} (total: {total_q_other}")
        else:
            print(f"Active: {count_q[QueueName.ACTIVE]}")
            print(f"Deferred: {count_q[QueueName.DEFERRED]}")
            print(f"Hold: {count_q[QueueName.HOLD]}")
            if count_q_other:
                print(f"Other: {count_q_other}")
        print()
        print("Looking for users with more many open communications matching the current search criteria")
        while True:
            x = stats.extract_highest_user()
            if not x: break
            user, activity, mails = x
            print("User {} has {} open communications".format(user, activity))

    QUEUE_FLAGS = {
        QueueName.HOLD: '!',
        QueueName.ACTIVE: '*',
        QueueName.CORRUPT: '?',
        QueueName.INCOMING: '$',
        QueueName.MAILDROP: '~',
        QueueName.DEFERRED: ' ',
    }

    async def list_impl(self, *, args: str, verbose: bool, all: bool) -> None:
        flt: AndFilter
        if args:
            # additional restrictions
            try:
                parsed_flt = FILTER_PARSER(args)
            except pyparsing.ParseException as e:
                print(pyparsing.ParseException.explain(e, 0))
                # print(e.explain(depth=0), file=sys.stderr)
                return
            flt = self.current_filter & parsed_flt
        else:
            flt = self.current_filter
        show_filter: Filter
        if not all and not flt.expressions:
            # by default hide active and hold, unless -all variant is used or specific filters are used
            show_filter = NotFilter(QueueFilter([QueueName.ACTIVE, QueueName.HOLD]))
        else:
            show_filter = flt
        for item in show_filter.filter(await self.source.get_list()):
            item.print(verbose=verbose)

    async def _print_mails_with_ids_for_ack(self, given_ids: typing.Iterable[str]) -> list[Mail]:
        mails = {
            mail.queue_id: mail
            for mail in await self.source.get_list()
        }
        missing = []
        verified_list = []
        for queue_id in given_ids:
            item = mails.get(queue_id, None)
            if item:
                item.print(verbose=False)
                verified_list.append(item)
            else:
                missing.append(queue_id)
        if missing:
            print(f"Couldn't find mails with the following IDs: {' '.join(missing)}", file=sys.stderr)
        return verified_list

    @register_command()
    async def list(self, args: str = '') -> None:
        """
        List all mails (single line per mail); if no filter is configured hide active and hold queue

        Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
        Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
        """
        await self.list_impl(args=args, verbose=False, all=False)

    @register_command(name="list-verbose")
    async def list_verbose(self, args: str = '') -> None:
        """
        List all mails (verbose output); if no filter is configured hide active and hold queue

        Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
        Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
        """
        await self.list_impl(args=args, verbose=True, all=False)

    @register_command(name="list-all")
    async def list_all(self, args: str = '') -> None:
        """
        List all mails (including active + hold queue, single line per mail)

        Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
        Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
        """
        await self.list_impl(args=args, verbose=False, all=True)

    @register_command(name="list-verbose-all")
    async def list_verbose_all(self, args: str = '') -> None:
        """
        List all mails (including active + hold queue, verbose output)

        Additional filters can be passed (see `help select` for syntax); the given filter is not remembered for other commands.
        Queue flags (after queue id): hold: !, active: *, incoming: $, maildrop: ~, corrupt: ?, deferred: none
        """
        await self.list_impl(args=args, verbose=True, all=True)

    @register_command(name="select")
    async def select(self, args: str) -> None:
        """
        Add filter for mails that other commands work on

        Filter syntax:
        - `from bob@example.com`, `from alice@example.com,bob@example.com,@example.net`, `from "alice@example.com" "bob@example.com"`
          Multiple address can be given to match any of them; either unquoted and comma separated, or quoted and whitespace
          separated. In the unquoted case the (comma separated) pattern must not include any whitespace.
          An address starting with `@` only checks the domain name and ignores the local part.
        - `from ~regex.*@example.(com|net)`, `from ~"regex.*@example.(com|net)" "other@example.com`
          To use regular expressions either put `~` in front of an unquoted pattern (need to repeat for each regex in a
          comma separated pattern list) or before the quotes of a quoted pattern.
        - `to ...` and `address ...` (matching both to and from) work the same as `from ...`
        - `queue hold,deferred`
          Comma separated list of queues a mail must be in. For single queue names `queue` can be omitted, e.g. `hold`.
        - `$expr1 and $expr2`
        - `$expr1 or $expr2`  (`a and b or c` is parsed as `a and (b or c)`)
        - `($expr)`
        - `not $expr`
        """
        try:
            flt = FILTER_PARSER(args)
        except pyparsing.ParseException as e:
            print(pyparsing.ParseException.explain(e, 0))
            # print(e.explain(depth=0), file=sys.stderr)
            return
        self.current_filter.expressions.append(flt)

    @register_command()
    async def clear(self) -> None:
        """Clear all filters"""
        self.current_filter.expressions = []

    @register_command(name="clear")
    async def clear_ndx(self, args: str) -> None:
        """Clear filter with given index (zero based; first filter is at index 0)"""
        ndx = int(args)
        self.current_filter.expressions.pop(ndx)

    @register_command(name="pop")
    async def pop(self) -> None:
        """Remove last filter"""
        if self.current_filter.expressions:
            self.current_filter.expressions.pop()
        else:
            print("No filters configured, nothing to pop", file=sys.stderr)

    @register_command()
    async def current(self) -> None:
        """Show current filters"""
        for e in self.current_filter.expressions:
            print(f'select {e}', file=sys.stderr)

    _SHOW_ARGS_PARSER = argparse.ArgumentParser('show', description="Show mails", add_help=False)
    _SHOW_ARGS_PARSER.add_argument('-b', dest='body', action='store_true', help="Show body content")
    _SHOW_ARGS_PARSER.add_argument('-e', dest='envelope', action='store_true', help="Show message envelope content.")
    _SHOW_ARGS_PARSER.add_argument('-h', dest='header', action='store_true', help="Show message header content.")
    _SHOW_ARGS_PARSER.add_argument('ID', nargs='+')

    @register_command()
    async def show(self, args: str) -> None:
        ns_args = self._SHOW_ARGS_PARSER.parse_args(shlex.split(args))
        if ns_args.body or ns_args.envelope or ns_args.header:
            flags = []
            if ns_args.body: flags += ['-b']
            if ns_args.envelope: flags += ['-e']
            if ns_args.header: flags += ['-h']
        else:
            # default to header only for now
            flags = ['-h']

        head = 16 * "-"

        async def print_mail(result: tuple[str, bytes, bytes]) -> None:
            msg, mail, stderr = result
            print(f'{head} {msg:^16} {head}')
            sys.stdout.flush()
            if stderr:
                sys.stderr.buffer.write(stderr)
                sys.stderr.flush()
            sys.stdout.buffer.write(mail)
            sys.stdout.flush()

        async def get_mail(msg: str) -> tuple[str, bytes, bytes]:
            mail, err = await self.source.get_mail(msg, flags=flags)
            if err and not err.endswith(b'\n'):
                err += b'\n'
            mail = mail or b''
            if mail and not mail.endswith(b'\n'):
                mail += b'\n'
            return (msg, mail, err)

        async with trio_parallel_ordered(print_mail) as tpo:
            for msg in ns_args.ID:
                await tpo.start(get_mail, msg)

        print(head + 18*"-" + head)
    show.__doc__ = _SHOW_ARGS_PARSER.format_help()

    @register_command(name="flush-all")
    async def flush_all(self) -> None:
        """
        Flush the queue: attempt to deliver all queued mail.

        Ignores the current filters (but prompts if you have filters).
        """
        if self.current_filter.expressions:
            print(
                "You have currently configured filters, but this command will flush all mails, whether they match the filter or not.\n"
                "You might want to use the `flush` command instead (or clear the filters before)",
                file=sys.stderr,
            )
            ack = input_ack("Continue anyway (y/N)? ")
            if ack.lower()[:1] != "y":
                return
        await self.source.flush_all()

    @register_command()
    async def flush(self) -> None:
        """Flush (deferred) selected mails in the queue: attempt to deliver them"""
        if not self.prompt_if_empty_filter():
            return
        flt = QueueFilter([QueueName.DEFERRED]) & self.current_filter
        await self.source.flush(flt.filter_ids(await self.source.get_list()))

    @register_command(name="flush")
    async def flush_args(self, args: str) -> None:
        """Flush (deferred) mails with given IDs: attempt to deliver them"""
        await self.source.flush(args.strip().split())

    async def delete_impl(self, flt: Filter) -> None:
        mails = list(flt.filter(await self.source.get_list()))
        for item in mails:
            item.print(verbose=False)
        ack = input_ack("Really delete those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        queues = flt.possible_queues()  # restrict to selected queues to prevent accidents
        await self.source.delete((mail.queue_id for mail in mails), from_queues=queues)

    @register_command()
    async def delete(self) -> None:
        """Delete selected mails from the hold queue"""
        if not self.current_filter.expressions:
            print("Command not allowed without filter")
            return
        # restrict to hold queue
        flt = QueueFilter([QueueName.HOLD]) & self.current_filter
        await self.delete_impl(flt)

    @register_command(name="delete-all")
    async def delete_all(self) -> None:
        """Delete selected mails from all queues"""
        if not self.current_filter.expressions:
            print("Command not allowed without filter")
            return
        await self.delete_impl(self.current_filter)

    @register_command(name="delete")
    async def delete_args(self, args: str) -> None:
        """Delete mails with given IDs from the queues"""
        delete_list = [mail.queue_id for mail in await self._print_mails_with_ids_for_ack(args.strip().split())]
        ack = input_ack("Really delete those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.delete(delete_list)

    @register_command()
    async def hold(self) -> None:
        """Put selected mails on hold"""
        if not self.prompt_if_empty_filter():
            return
        # basically: skip mails that are already on hold
        restrict_from_queues = [QueueName.INCOMING, QueueName.ACTIVE, QueueName.DEFERRED]
        flt = QueueFilter(restrict_from_queues) & self.current_filter
        queues = flt.possible_queues()
        await self.source.hold(
            flt.filter_ids(await self.source.get_list()),
            from_queues=queues,
        )

    @register_command(name="hold")
    async def hold_args(self, args: str) -> None:
        """Put mails with given IDs on hold"""
        await self.source.hold(
            args.strip().split(),
        )

    @register_command()
    async def release(self) -> None:
        """Release mail that was put "on hold"."""
        if not self.prompt_if_empty_filter():
            return
        flt = QueueFilter([QueueName.HOLD]) & self.current_filter
        queues = flt.possible_queues()
        await self.source.release(
            flt.filter_ids(await self.source.get_list()),
            from_queues=queues,
        )

    @register_command(name="release")
    async def release_args(self, args: str) -> None:
        """Release mails with given IDs that were put "on hold"."""
        await self.source.release(
            args.strip().split(),
        )

    @register_command()
    async def requeue(self) -> None:
        """Requeue mail (move to "maildrop"; get reprocessed)"""
        ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        if not self.prompt_if_empty_filter():
            return
        flt = self.current_filter
        queues = flt.possible_queues()
        await self.source.requeue(
            flt.filter_ids(await self.source.get_list()),
            from_queues=queues,
        )

    @register_command(name="requeue")
    async def requeue_args(self, args: str) -> None:
        """Requeue mail with given IDs (move to "maildrop"; get reprocessed)"""
        ack = input_ack("Requeue is a rather unusual operation. Do you know what you are doing and want to continue (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.requeue(
            args.strip().split(),
        )

    @register_command()
    async def expire(self) -> None:
        """Expire mails and flush them (return error to sender)."""
        if not self.prompt_if_empty_filter():
            return
        flt = self.current_filter
        mails = list(flt.filter(await self.source.get_list()))
        for item in mails:
            item.print(verbose=False)
        ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        queues = flt.possible_queues()
        await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)
        await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))

    @register_command(name="expire")
    async def expire_args(self, args: str) -> None:
        """Expire mails (return error to sender) with given IDs and flush them."""
        mails = await self._print_mails_with_ids_for_ack(args.strip().split())
        ack = input_ack("Really expire and flush (if in deferred queue) those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.expire((mail.queue_id for mail in mails))
        await self.source.flush((mail.queue_id for mail in mails if mail.queue_name == QueueName.DEFERRED))

    @register_command(name="expire-release")
    async def expire_release(self) -> None:
        """Expire mails and flush them (return error to sender); release if mails are on hold."""
        if not self.prompt_if_empty_filter():
            return
        flt = self.current_filter
        mails = list(flt.filter(await self.source.get_list()))
        for item in mails:
            item.print(verbose=False)
        ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        queues = flt.possible_queues()
        await self.source.expire_release((mail.queue_id for mail in mails), from_queues=queues)
        await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))

    @register_command(name="expire-release")
    async def expire_release_args(self, args: str) -> None:
        """Expire mails (return error to sender) with given IDs and flush them; release if mails are on hold."""
        mails = await self._print_mails_with_ids_for_ack(args.strip().split())
        ack = input_ack("Really expire, release and flush (if in deferred/hold queue) those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.expire_release((mail.queue_id for mail in mails))
        await self.source.flush((mail.queue_id for mail in mails if mail.queue_name in (QueueName.DEFERRED, QueueName.HOLD)))

    @register_command(name="expire-noflush")
    async def expire_noflush(self) -> None:
        """Expire mails (return error to sender on next delivery attempt)."""
        if not self.prompt_if_empty_filter():
            return
        flt = self.current_filter
        mails = list(flt.filter(await self.source.get_list()))
        for item in mails:
            item.print(verbose=False)
        ack = input_ack("Really expire those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        queues = flt.possible_queues()
        await self.source.expire((mail.queue_id for mail in mails), from_queues=queues)

    @register_command(name="expire-noflush")
    async def expire_noflush_args(self, args: str) -> None:
        """Expire mails (return error to sender on next delivery attempt) with given IDs."""
        mails = await self._print_mails_with_ids_for_ack(args.strip().split())
        ack = input_ack("Really expire those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.expire((mail.queue_id for mail in mails))

    @register_command(name="forward-to")
    async def forward_to(self, args: str) -> None:
        """
        Forward mails to given recipients.

        Specify recipients as simple mail addresses (e.g. `alice@example.com`, not `Alice <alice.example.com>`).
        """
        if not args:
            print("No recipients given")
            return
        if not self.prompt_if_empty_filter():
            return
        mails = list(self.current_filter.filter(await self.source.get_list()))
        for item in mails:
            item.print(verbose=False)
        ack = input_ack("Really forward those mails (y/N)? ")
        if ack.lower()[:1] != "y":
            return
        await self.source.forward_to(mails=mails, recipients=args.split())

    async def prompt(self) -> None:
        # main loop of CLI
        while True:
            try:
                full_line = input('# ')
            except KeyboardInterrupt:  # ctrl-c
                print('^C', file=sys.stderr)
                continue
            except EOFError:  # ctrl-d
                print()  # newline after prompt
                return
            stripped_line = full_line.strip()
            if full_line.startswith(' '):
                # don't add space prefixed lines to history
                readline.remove_history_item(readline.get_current_history_length() - 1)
            elif stripped_line != full_line:
                # don't store trailing spaces
                readline.replace_history_item(readline.get_current_history_length() - 1, stripped_line)
            parts = stripped_line.split(maxsplit=1)
            if not parts:
                continue
            command = parts[0]
            has_arg = len(parts) > 1
            try:
                if has_arg and command in self._long_commands:
                    await self._long_commands[command](parts[1])
                    continue
                elif not has_arg and command in self._simple_commands:
                    await self._simple_commands[command]()
                    continue
            except (Exception, KeyboardInterrupt):
                # TODO: skip first frame (only pointing to line above)
                traceback.print_exc()
                continue
            # unknown command / invalid invocation
            if command in self._simple_commands:
                print(f"Command {command} doesn't take any arguments", file=sys.stderr)
            elif command in self._long_commands:
                print(f"Command {command} requires arguments", file=sys.stderr)
            else:
                print(f"Unknown command: {command}", file=sys.stderr)


def main() -> None:
    parser = argparse.ArgumentParser(description="postfix (cluster) queue manager")
    parser.add_argument('--config', '-c', type=pathlib.Path, help="Path to config file (default: try ~/.config/pqm.yaml, /etc/pqm.yaml)")
    args = parser.parse_args()
    config = Config.load(args.config or '')
    cli = CLI(source=config.source())

    try:
        trio.run(cli.prompt)
    except KeyboardInterrupt:
        pass


main()
