Skip to content

AARS

The AARS class is the main entry point for the Aleph Active Record SDK. It provides versatile methods to create, update, delete and query records.

Initializes the SDK with an account and a channel.

Parameters:

Name Type Description Default
account Optional[Account]

Account with which to sign the messages. Defaults to the fallback account.

None
channel Optional[str]

Channel to which to send the messages. Defaults to ‘AARS_TEST’.

None
api_url Optional[str]

The API URL to use. Defaults to an official Aleph API host.

None
session Optional[AuthenticatedAlephClient]

An aiohttp session to use. Defaults to a new session with the given account.

None
cache Optional[VmCache]

An optional Aleph VM cache to cache messages.

None
retry_count Optional[int]

The number of times to retry a failed request. Defaults to 3.

None
Source code in aars/core.py
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
def __init__(
    self,
    account: Optional[Account] = None,
    channel: Optional[str] = None,
    api_url: Optional[str] = None,
    session: Optional[AuthenticatedAlephClient] = None,
    cache: Optional[VmCache] = None,
    retry_count: Optional[int] = None,
):
    """
    Initializes the SDK with an account and a channel.
    Args:
        account: Account with which to sign the messages. Defaults to the fallback account.
        channel: Channel to which to send the messages. Defaults to 'AARS_TEST'.
        api_url: The API URL to use. Defaults to an official Aleph API host.
        session: An aiohttp session to use. Defaults to a new session with the given account.
        cache: An optional Aleph VM cache to cache messages.
        retry_count: The number of times to retry a failed request. Defaults to 3.
    """
    AARS.account = account if account else get_fallback_account()
    AARS.channel = channel if channel else "AARS_TEST"
    AARS.api_url = api_url if api_url else settings.API_HOST
    AARS.session = (
        session
        if session
        else AuthenticatedAlephClient(
            account=AARS.account, api_server=settings.API_HOST
        )
    )
    AARS.cache = cache
    AARS.retry_count = retry_count if retry_count else 3

fetch_exact(record_type: Type[R], item_hash: str) -> R classmethod async

Retrieves the revision of an object by its item_hash of the message. The content will be exactly the same as in the referenced message, so no amendments will be applied.

Parameters:

Name Type Description Default
record_type Type[R]

The type of the object to retrieve.

required
item_hash str

item_hash of the message, whose content to fetch.

required

Returns:

Type Description
R

The record in the state it was when the message was created.

Source code in aars/core.py
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
@classmethod
async def fetch_exact(cls, record_type: Type[R], item_hash: str) -> R:
    """Retrieves the revision of an object by its item_hash of the message. The content will be exactly the same
    as in the referenced message, so no amendments will be applied.
    Args:
        record_type: The type of the object to retrieve.
        item_hash: item_hash of the message, whose content to fetch.
    Returns:
        The record in the state it was when the message was created.
    """
    if cls.cache:
        cache_resp = await cls._fetch_records_from_cache(record_type, [item_hash])
        if len(cache_resp) > 0:
            return cache_resp[0]
    aleph_resp = await cls.session.get_messages(hashes=[item_hash])
    if len(aleph_resp.messages) == 0:
        raise ValueError(f"Message with hash {item_hash} not found.")
    message: PostMessage = aleph_resp.messages[0]
    return await record_type.from_post(message)

fetch_records(record_type: Type[R], item_hashes: Optional[List[Union[str, ItemHash]]] = None, channel: Optional[str] = None, owner: Optional[str] = None, page_size: int = 50, page: int = 1) -> AsyncIterator[R] classmethod async

Retrieves posts as objects by its aleph item_hash.

Parameters:

Name Type Description Default
record_type Type[R]

The type of the objects to retrieve.

required
item_hashes Optional[List[Union[str, ItemHash]]]

Aleph item_hashes of the objects to fetch.

None
channel Optional[str]

Channel in which to look for it.

None
owner Optional[str]

Account that owns the object.

None
page_size int

Number of items to fetch per page.

50
page int

Page number to fetch, based on page_size.

1

Returns:

Type Description
AsyncIterator[R]

An iterator over the found records.

Source code in aars/core.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
@classmethod
async def fetch_records(
    cls,
    record_type: Type[R],
    item_hashes: Optional[List[Union[str, ItemHash]]] = None,
    channel: Optional[str] = None,
    owner: Optional[str] = None,
    page_size: int = 50,
    page: int = 1,
) -> AsyncIterator[R]:
    """
    Retrieves posts as objects by its aleph item_hash.
    Args:
        record_type: The type of the objects to retrieve.
        item_hashes: Aleph item_hashes of the objects to fetch.
        channel: Channel in which to look for it.
        owner: Account that owns the object.
        page_size: Number of items to fetch per page.
        page: Page number to fetch, based on page_size.
    Returns:
        An iterator over the found records.
    """
    assert issubclass(record_type, Record)
    channels = None if channel is None else [channel]
    owners = None if owner is None else [owner]
    if item_hashes is None and channels is None and owners is None:
        channels = [cls.channel]

    if cls.cache and item_hashes is not None:
        # TODO: Add some kind of caching for channels and owners or add recent item_hashes endpoint to the Aleph API
        records = await cls._fetch_records_from_cache(record_type, item_hashes)
        cached_ids = []
        for r in records:
            cached_ids.append(r.id_hash)
            yield r
        item_hashes = [h for h in item_hashes if h not in cached_ids]
        if len(item_hashes) == 0:
            return

    async for record in cls._fetch_records_from_api(
        record_type=record_type,
        item_hashes=item_hashes,
        channels=channels,
        owners=owners,
        page_size=page_size,
        page=page,
    ):
        yield record

fetch_revisions(record_type: Type[R], ref: str, channel: Optional[str] = None, owner: Optional[str] = None, page = 1) -> AsyncIterator[ItemHash] classmethod async

Retrieves posts of revisions of an object by its item_hash.

Parameters:

Name Type Description Default
record_type Type[R]

The type of the objects to retrieve.

required
ref str

item_hash of the object, whose revisions to fetch.

required
channel Optional[str]

Channel in which to look for it.

None
owner Optional[str]

Account that owns the object.

None
page

Page number to fetch.

1

Returns:

Type Description
AsyncIterator[ItemHash]

An iterator over the found records.

Source code in aars/core.py
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
@classmethod
async def fetch_revisions(
    cls,
    record_type: Type[R],
    ref: str,
    channel: Optional[str] = None,
    owner: Optional[str] = None,
    page=1,
) -> AsyncIterator[ItemHash]:
    """Retrieves posts of revisions of an object by its item_hash.
    Args:
        record_type: The type of the objects to retrieve.
        ref: item_hash of the object, whose revisions to fetch.
        channel: Channel in which to look for it.
        owner: Account that owns the object.
        page: Page number to fetch.
    Returns:
        An iterator over the found records.
    """
    owners = None if owner is None else [owner]
    channels = None if channel is None else [channel]
    if owners is None and channels is None:
        channels = [cls.channel]

    aleph_resp = None
    retries = cls.retry_count
    while aleph_resp is None:
        try:
            aleph_resp = await cls.session.get_messages(
                channels=channels,
                addresses=owners,
                refs=[ref],
                pagination=50,
                page=page,
            )
        except ServerDisconnectedError:
            retries -= 1
            if retries == 0:
                raise
    for message in aleph_resp.messages:
        yield message.item_hash

    if page == 1:
        # If there are more pages, fetch them
        total_items = aleph_resp.pagination_total
        per_page = aleph_resp.pagination_per_page
        if total_items > per_page:
            for next_page in range(2, math.ceil(total_items / per_page) + 1):
                async for item_hash in cls.fetch_revisions(
                    record_type=record_type,
                    ref=ref,
                    channel=channel,
                    owner=owner,
                    page=next_page,
                ):
                    yield ItemHash(item_hash)

forget_objects(objs: List[R], channel: Optional[str] = None) classmethod async

Forgets multiple objects from Aleph and local cache. All related revisions will be forgotten too.

Parameters:

Name Type Description Default
objs List[R]

The objects to forget.

required
channel Optional[str]

The channel to delete the object from. If None, will use the configured default channel.

None
Source code in aars/core.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
@classmethod
async def forget_objects(
    cls,
    objs: List[R],
    channel: Optional[str] = None,
):
    """
    Forgets multiple objects from Aleph and local cache. All related revisions will be forgotten too.
    Args:
        objs: The objects to forget.
        channel: The channel to delete the object from. If None, will use the configured default channel.
    """
    if channel is None:
        channel = cls.channel
    hashes = []
    for obj in objs:
        if obj.id_hash is None:
            raise ValueError("Cannot forget an object that has not been posted.")
        if obj.signer != cls.account.get_address():
            raise AlephPermissionError(
                obj.signer, obj.id_hash, cls.account.get_address()
            )
        hashes += [obj.id_hash] + obj.revision_hashes
    forget_task = cls.session.forget(
        hashes=hashes,
        reason=None,
        channel=channel,
    )
    if cls.cache:
        await asyncio.gather(forget_task, *[cls.cache.delete(h) for h in hashes])
    else:
        await forget_task

post_or_amend_object(obj: R, channel: Optional[str] = None) -> R classmethod async

Posts or amends an object to Aleph. If the object is already posted, it’s list of revision hashes is updated and the object receives the latest revision number.

Parameters:

Name Type Description Default
obj R

The object to post or amend.

required
channel Optional[str]

The channel to post the object to. If None, will use the configured default channel.

None

Returns:

Type Description
R

The object with the updated revision hashes and revision number.

Source code in aars/core.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
@classmethod
async def post_or_amend_object(cls, obj: R, channel: Optional[str] = None) -> R:
    """
    Posts or amends an object to Aleph. If the object is already posted, it's list of revision hashes is updated and
    the object receives the latest revision number.
    Args:
        obj: The object to post or amend.
        channel: The channel to post the object to. If None, will use the configured default channel.
    Returns:
        The object with the updated revision hashes and revision number.
    """
    if channel is None:
        channel = cls.channel
    assert isinstance(obj, Record)
    post_type = type(obj).__name__ if obj.id_hash is None else "amend"
    if obj.id_hash is not None and obj.signer is not None and obj.signer != cls.account.get_address():
        raise AlephPermissionError(
            cls.account.get_address(), obj.id_hash, obj.signer
        )
    message, status = await cls.session.create_post(
        post_content=obj.content,
        post_type=post_type,
        channel=channel,
        ref=obj.id_hash,
    )
    if obj.id_hash is None:
        obj.id_hash = message.item_hash
    obj.revision_hashes.append(message.item_hash)
    obj.current_revision = len(obj.revision_hashes) - 1
    obj.timestamp = message.time
    obj.signer = message.sender
    if cls.cache:
        await cls.cache.set(message.item_hash, obj.json())
    return obj

sync_indices() classmethod async

Synchronizes all the indices created so far, by iteratively fetching all the messages from the channel, having post_types of the Record subclasses that have been declared so far.

This can take quite some time on large databases.

Source code in aars/core.py
631
632
633
634
635
636
637
638
639
640
641
@classmethod
async def sync_indices(cls):
    """
    Synchronizes all the indices created so far, by iteratively fetching all the messages from the channel,
    having post_types of the Record subclasses that have been declared so far.

    !!! warning "This can take quite some time on large databases."
    """
    for record in Record.__subclasses__():
        if record.get_indices():
            await record.regenerate_indices()