Skip to content

fediverse_pasture.server.verify_actor

ActorVerifier dataclass

Class implementing the logic to verify a remote actor

Parameters:

Name Type Description Default
actor_list List[ActorData]

List of actors to run verification with

required
remote_uri str

URI of the remote actor. If acct-uri, it is resolved using webfinger

required
domain str

The domain the verification is done from

required
message Message

Message object containing an event log

required
timeout int

The timeout for requests (in seconds)

20
Source code in fediverse_pasture/server/verify_actor.py
@dataclass
class ActorVerifier:
    """Class implementing the logic to verify a remote actor

    :param actor_list: List of actors to run verification with
    :param remote_uri: URI of the remote actor. If acct-uri,
        it is resolved using webfinger
    :param domain: The domain the verification is done from
    :param message: Message object containing an event log
    :param timeout: The timeout for requests (in seconds)"""

    actor_list: List[ActorData]
    remote_uri: str
    domain: str
    message: Message

    remote_actor_uri: str | None = None
    scheme: str = "http"

    timeout: int = 20

    async def verify(self, only: dict = {}):
        """Public interface, this method loops over the actors in `actor_list`
        and verifies using them if the remote_uri is accessible.

        Returns a dictionary with actor.actor_name as key and the
        result as value.

        This function creates it's own aiohttp.ClientSession"""
        async with aiohttp.ClientSession() as session:
            await self.determine_remote_actor_uri(session)
            result = {
                actor.actor_name: await self.verify_for_actor(session, actor)
                for actor in self.actor_list
                if actor.actor_name != "actor" and only.get(actor.actor_name, True)
            }
            result["webfinger"] = await self.check_webfinger_result(session)

            return result

    @property
    def main_actor(self) -> ActorData | None:
        for actor in self.actor_list:
            if actor.actor_name == "actor":
                return actor
        return None

    async def init_bovine_actor(self, actor, session):
        actor_id = f"{self.scheme}://{self.domain}/{actor.actor_name}"
        bovine_actor, _ = bovine_actor_for_actor_data(actor_id, actor)
        await bovine_actor.init(session=session)

        return bovine_actor

    async def check_webfinger_result(self, session):
        bovine_actor = await self.init_bovine_actor(self.main_actor, session)

        try:
            async with asyncio.timeout(self.timeout):
                actor = await bovine_actor.get(self.remote_actor_uri)
        except Exception as e:
            logger.exception(e)
            return False

        if actor is None:
            self.message.error("Failed to fetch actor")
            return False

        remote_actor_uri = actor.get("id", self.remote_actor_uri)

        preferred_username = actor.get("preferredUsername")

        self.message.add(f"Got preferredUsername {preferred_username}")
        if preferred_username is None:
            return False

        domain = urlparse(self.remote_actor_uri).netloc
        acct_uri = f"acct:{preferred_username}@{domain}"

        self.message.add(f"computed acct uri {acct_uri}")

        try:
            object_id, _ = await lookup_uri_with_webfinger(
                session, acct_uri, f"{self.scheme}://{domain}"
            )
        except Exception as e:
            logger.exception(e)
            return False
        self.message.add(f"Retrieved id {object_id} using webfinger")

        if object_id == remote_actor_uri:
            self.message.add("webfinger result matches expectations")

            return True

        return False

    async def verify_for_actor(self, session, actor):
        bovine_actor = await self.init_bovine_actor(actor, session)
        self.message.add(f"Running verification for {actor.actor_name}")

        if actor.requires_signed_post_for_inbox:
            return await self.fetch_remote_and_post_using_actor(bovine_actor)
        else:
            return await self.fetch_remote_and_post_using_session(session)

    async def fetch_remote_and_post_using_actor(self, bovine_actor):
        result = {"get_actor": False, "post_inbox": False}

        try:
            async with asyncio.timeout(self.timeout):
                actor = await bovine_actor.get(self.remote_actor_uri)
                if actor is None:
                    self.message.add(
                        f"Failed to retrieve actor {self.remote_actor_uri}"
                    )
                    return result

                inbox = actor.get("inbox")
                self.message.add(f"Got inbox {inbox}")

                if inbox:
                    result["get_actor"] = True
                    try:
                        try:
                            response = await bovine_actor.post(
                                inbox,
                                data={
                                    "@context": "https://www.w3.org/ns/activitystreams",
                                    "type": "Like",
                                    "actor": bovine_actor.actor_id,
                                    "id": bovine_actor.actor_id
                                    + secrets.token_urlsafe(8),
                                    "object": bovine_actor.actor_id,
                                },
                            )
                            self.message.add("Successfully posted to inbox with result")
                            self.message.add(response.status)
                            self.message.add((await response.text()))

                        except aiohttp.ClientResponseError as e:
                            if e.status > 400:
                                raise e
                            self.message.add(
                                "Successfully posted to inbox but remote server\
    indicated a bad request"
                            )

                        result["post_inbox"] = True
                    except Exception as e:
                        self.message.add("Failed to post to inbox")
                        self.message.add(repr(e))

        except Exception as e:
            self.message.add("Something went wrong")
            self.message.add(repr(e))

        return result

    async def resolve_inbox_using_session(self, session) -> str:
        async with asyncio.timeout(self.timeout):
            async with session.get(
                self.remote_actor_uri,
                headers={"accept": "application/activity+json"},
            ) as response:
                actor = json.loads(await response.text())
            inbox = actor.get("inbox")
            self.message.add(f"Got inbox {inbox}")
            return inbox

    async def fetch_remote_and_post_using_session(self, session):
        result = {"get_actor": False, "post_inbox": False}

        try:
            inbox = await self.resolve_inbox_using_session(session)
        except Exception as e:
            self.message.add("Something went wrong when fetching actor")
            self.message.add(repr(e))
            return result

        if inbox:
            result["get_actor"] = True
            try:
                async with asyncio.timeout(self.timeout):
                    async with session.post(
                        inbox,
                        data=json.dumps(
                            {
                                "@context": "https://www.w3.org/ns/activitystreams",
                                "type": "EchoRequest",
                            }
                        ),
                        headers={"content-type": "application/activity+json"},
                    ) as response:
                        self.message.add(f"Got {response.status} for unsigned POST")

                        if response.status < 400 and response.status > 100:
                            result["post_inbox"] = True
            except Exception as e:
                self.message.add("Something went wrong when posting to inbox")
                self.message.add(repr(e))
                return result

        return result

    async def determine_remote_actor_uri(self, session) -> None:
        """Used to resolve an acct-URI into the corresponding http/https-URI
        using webfinger"""
        if can_be_resolved(self.remote_uri):
            self.remote_actor_uri = self.remote_uri
            self.message.add(f"Can fetch actor from {self.remote_uri}")
            return

        self.message.add(f"Need to resolve {self.remote_uri} to actor object id")

        if self.remote_uri.startswith("acct:"):
            acct_uri = self.remote_uri
        else:
            self.message.add("Not in account uri format")

            if self.remote_uri[0] == "@":
                acct_uri = "acct:" + self.remote_uri[1:]
            else:
                acct_uri = "acct:" + self.remote_uri

        if "@" not in acct_uri:
            self.message.add(f"Computed invalid account URI {acct_uri}")
            return
        domain = acct_uri.split("@")[1]

        self.message.add(f"Resolving {acct_uri} using webfinger")

        try:
            object_id, _ = await lookup_uri_with_webfinger(
                session, acct_uri, f"{self.scheme}://{domain}"
            )
        except Exception as e:
            logger.exception(e)
            return

        self.message.add(f"Resolved to {object_id}")
        self.remote_actor_uri = object_id

determine_remote_actor_uri(session) async

Used to resolve an acct-URI into the corresponding http/https-URI using webfinger

Source code in fediverse_pasture/server/verify_actor.py
async def determine_remote_actor_uri(self, session) -> None:
    """Used to resolve an acct-URI into the corresponding http/https-URI
    using webfinger"""
    if can_be_resolved(self.remote_uri):
        self.remote_actor_uri = self.remote_uri
        self.message.add(f"Can fetch actor from {self.remote_uri}")
        return

    self.message.add(f"Need to resolve {self.remote_uri} to actor object id")

    if self.remote_uri.startswith("acct:"):
        acct_uri = self.remote_uri
    else:
        self.message.add("Not in account uri format")

        if self.remote_uri[0] == "@":
            acct_uri = "acct:" + self.remote_uri[1:]
        else:
            acct_uri = "acct:" + self.remote_uri

    if "@" not in acct_uri:
        self.message.add(f"Computed invalid account URI {acct_uri}")
        return
    domain = acct_uri.split("@")[1]

    self.message.add(f"Resolving {acct_uri} using webfinger")

    try:
        object_id, _ = await lookup_uri_with_webfinger(
            session, acct_uri, f"{self.scheme}://{domain}"
        )
    except Exception as e:
        logger.exception(e)
        return

    self.message.add(f"Resolved to {object_id}")
    self.remote_actor_uri = object_id

verify(only={}) async

Public interface, this method loops over the actors in actor_list and verifies using them if the remote_uri is accessible.

Returns a dictionary with actor.actor_name as key and the result as value.

This function creates it’s own aiohttp.ClientSession

Source code in fediverse_pasture/server/verify_actor.py
async def verify(self, only: dict = {}):
    """Public interface, this method loops over the actors in `actor_list`
    and verifies using them if the remote_uri is accessible.

    Returns a dictionary with actor.actor_name as key and the
    result as value.

    This function creates it's own aiohttp.ClientSession"""
    async with aiohttp.ClientSession() as session:
        await self.determine_remote_actor_uri(session)
        result = {
            actor.actor_name: await self.verify_for_actor(session, actor)
            for actor in self.actor_list
            if actor.actor_name != "actor" and only.get(actor.actor_name, True)
        }
        result["webfinger"] = await self.check_webfinger_result(session)

        return result