Skip to content

Record

Bases: BaseModel, ABC

A basic record which is persisted on Aleph decentralized storage.

Records can be updated: revision numbers begin at 0 (original upload) and increment for each .save() call.

Previous revisions can be restored by calling

await record.fetch_revision(rev_no=0)
or
await record.fetch_revision(rev_hash="abc123")

They can also be forgotten: Aleph will ask the network to forget given item, in order to allow for GDPR-compliant applications.

Records have an indices class attribute, which allows one to select an index and query it with a key.

It uses TypeVar("R", bound="Record") to allow for type hinting of subclasses.

content: Dict[str, Any] property

Returns:

Type Description
Dict[str, Any]

A dictionary of the object’s content, as it is to be stored on Aleph inside the content property of the POST message.

add_index(index: Index) -> None classmethod

Adds an index to the class. This allows the index to be used for queries and will be automatically updated when records are created or updated.

Parameters:

Name Type Description Default
index Index

The index to add.

required
Source code in aars/core.py
346
347
348
349
350
351
352
353
354
@classmethod
def add_index(cls: Type[R], index: "Index") -> None:
    """
    Adds an index to the class. This allows the index to be used for queries and will be automatically updated
    when records are created or updated.
    Args:
        index: The index to add.
    """
    cls.__indices[repr(index)] = index

fetch(hashes: Union[Union[str, ItemHash], List[Union[str, ItemHash]]]) -> PageableResponse[R] classmethod

Fetches one or more objects of given type by its/their item_hash[es].

Parameters:

Name Type Description Default
hashes Union[Union[str, ItemHash], List[Union[str, ItemHash]]]

The item_hash[es] of the objects to fetch.

required

Returns:

Type Description
PageableResponse[R]

A pageable response object, which can be asynchronously iterated over.

Source code in aars/core.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@classmethod
def fetch(
    cls: Type[R], hashes: Union[Union[str, ItemHash], List[Union[str, ItemHash]]]
) -> PageableResponse[R]:
    """
    Fetches one or more objects of given type by its/their item_hash[es].
    Args:
        hashes: The item_hash[es] of the objects to fetch.
    Returns:
        A pageable response object, which can be asynchronously iterated over.
    """
    if not isinstance(hashes, List):
        hashes = [hashes]
    items = AARS.fetch_records(cls, list(hashes))
    return PageableResponse(items)

fetch_objects() -> PageableRequest[R] classmethod

Fetches all objects of given type.

Returns:

Type Description
PageableRequest[R]

A pageable request object, which can be asynchronously iterated over.

Source code in aars/core.py
302
303
304
305
306
307
308
309
310
@classmethod
def fetch_objects(cls: Type[R]) -> PageableRequest[R]:
    """
    Fetches all objects of given type.

    Returns:
        A pageable request object, which can be asynchronously iterated over.
    """
    return PageableRequest(AARS.fetch_records, record_type=cls)

fetch_revision(rev_no: Optional[int] = None, rev_hash: Optional[str] = None) -> R async

Fetches a Record’s revision by its revision number (0 => original record) or revision hash.

Parameters:

Name Type Description Default
rev_no Optional[int]

The revision number of the revision to fetch. Negative numbers are allowed and count from the end.

None
rev_hash Optional[str]

The hash of the revision to fetch.

None

Returns:

Type Description
R

The object with the given revision.

Source code in aars/core.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def fetch_revision(
    self: R, rev_no: Optional[int] = None, rev_hash: Optional[str] = None
) -> R:
    """
    Fetches a Record's revision by its revision number (0 => original record) or revision hash.

    Args:
        rev_no: The revision number of the revision to fetch. Negative numbers are allowed and count from the end.
        rev_hash: The hash of the revision to fetch.

    Returns:
        The object with the given revision.
    """
    if self.id_hash is None:
        raise NotStoredError(self)

    if rev_no is not None:
        if rev_no < 0:
            rev_no = len(self.revision_hashes) + rev_no
        if self.current_revision == rev_no:
            return self
        elif rev_no > len(self.revision_hashes):
            raise IndexError(f"No revision no. {rev_no} found for {self}")
        else:
            self.current_revision = rev_no
    elif rev_hash is not None:
        rev_item_hash = ItemHash(rev_hash)
        if self.current_revision and rev_hash == self.revision_hashes[self.current_revision]:
            return self
        try:
            self.current_revision = self.revision_hashes.index(rev_item_hash)
        except ValueError:
            raise IndexError(f"{rev_hash} is not a revision of {self}")
    else:
        raise ValueError("Either rev or hash must be provided")

    resp = await AARS.fetch_exact(
        type(self), self.revision_hashes[self.current_revision]
    )
    self.__dict__.update(resp.content)
    self.timestamp = resp.timestamp
    self.signer = resp.signer

    return self

forget() -> R async

Orders Aleph to forget a specific object with all its revisions. Will remove the object from all indices. The content of all POST messages will be deleted, but the hashes and timestamps will remain.

The forgotten object should be deleted afterwards, as it is useless now.

Raises:

Type Description
NotStoredError

If the object is not stored on Aleph.

AlephPermissionError

If the object is not owned by the current account.

AlreadyForgottenError

If the object was already forgotten.

Source code in aars/core.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
async def forget(self: R) -> R:
    """
    Orders Aleph to forget a specific object with all its revisions. Will remove the object from all indices.
    The content of all POST messages will be deleted, but the hashes and timestamps will remain.
    !!! note "The forgotten object should be deleted afterwards, as it is useless now."
    Raises:
        NotStoredError: If the object is not stored on Aleph.
        AlephPermissionError: If the object is not owned by the current account.
        AlreadyForgottenError: If the object was already forgotten.
    """

    if not self.forgotten:
        if self.id_hash is None:
            raise NotStoredError(self)
        if self.signer != AARS.account.get_address():
            raise AlephPermissionError(AARS.account.get_address(), self.id_hash, self.signer)
        await AARS.forget_objects([self])
        [index.remove_record(self) for index in self.get_indices()]
        self.forgotten = True
        return self
    else:
        raise AlreadyForgottenError(self)

forget_all() -> List[ItemHash] classmethod async

Forgets all Records of given type of the authorized user. If invoked on Record, will try to fetch all objects of the current channel and forget them.

Returns:

Type Description
List[ItemHash]

A list of all item hashes that were forgotten.

Source code in aars/core.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
@classmethod
async def forget_all(cls: Type[R]) -> List[ItemHash]:
    """
    Forgets all Records of given type of the authorized user. If invoked on Record, will try to fetch all objects
    of the current channel and forget them.
    Returns:
        A list of all item hashes that were forgotten.
    """
    response = cls.fetch_objects()

    item_hashes = []
    record_batch = []
    i = 0
    async for record in response:
        if record.signer != AARS.account.get_address():
            continue
        record_batch.append(record)
        i += 1
        if i % 50 == 0:
            item_hashes.extend([record.id_hash for record in record_batch])
            await AARS.forget_objects(record_batch)
            record_batch = []
    if record_batch:
        item_hashes.extend([record.id_hash for record in record_batch])
        await AARS.forget_objects(record_batch)

    return item_hashes

from_dict(post: Dict[str, Any]) -> R classmethod async

Initializes a record object from its raw Aleph data.

Parameters:

Name Type Description Default
post Dict[str, Any]

The raw Aleph data to initialize from.

required

Returns:

Type Description
R

The initialized object.

Source code in aars/core.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@classmethod
async def from_dict(cls: Type[R], post: Dict[str, Any]) -> R:
    """
    Initializes a record object from its raw Aleph data.
    Args:
        post: The raw Aleph data to initialize from.
    Returns:
        The initialized object.
    """
    obj = cls(**post["content"])
    if post.get("ref") is None:
        obj.id_hash = ItemHash(post["item_hash"])
    else:
        obj.id_hash = ItemHash(post["ref"])
    await obj.update_revision_hashes()
    assert obj.id_hash is not None
    obj.current_revision = obj.revision_hashes.index(obj.id_hash)
    obj.timestamp = post["time"]
    obj.signer = post["sender"]
    return obj

from_post(post: PostMessage) -> R classmethod async

Initializes a record object from its PostMessage.

Parameters:

Name Type Description Default
post PostMessage

The PostMessage to initialize from.

required

Returns:

Type Description
R

The initialized object.

Source code in aars/core.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@classmethod
async def from_post(cls: Type[R], post: PostMessage) -> R:
    """
    Initializes a record object from its PostMessage.
    Args:
        post: The PostMessage to initialize from.
    Returns:
        The initialized object.
    """
    obj: R = cls(**post.content.content)
    if post.content.ref is None:
        obj.id_hash = post.item_hash
    else:
        if isinstance(post.content.ref, str):
            obj.id_hash = ItemHash(post.content.ref)
        elif isinstance(post.content.ref, ChainRef):
            obj.id_hash = post.content.ref.item_hash
        else:
            raise TypeError(f"Unknown type of ref: {type(post.content.ref)}")
    await obj.update_revision_hashes()
    assert obj.id_hash is not None
    obj.current_revision = obj.revision_hashes.index(obj.id_hash)
    obj.timestamp = post.time
    obj.signer = post.sender
    return obj

get_index(index_name: str) -> Index[R] classmethod

Returns an index or any of its subindices by its name. The name is defined as "<object_class>.[<object_properties>.]" with the properties being sorted alphabetically. For example, "Book.author.title" is a valid index name, while "Book.title.author" is not.

Parameters:

Name Type Description Default
index_name str

The name of the index to fetch.

required

Returns:

Type Description
Index[R]

The index instance or a subindex.

Source code in aars/core.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
@classmethod
def get_index(cls: Type[R], index_name: str) -> "Index[R]":
    """
    Returns an index or any of its subindices by its name. The name is defined as
    `"<object_class>.[<object_properties>.]"` with the properties being sorted alphabetically. For example,
    `"Book.author.title"` is a valid index name, while `"Book.title.author"` is not.
    Args:
        index_name: The name of the index to fetch.
    Returns:
        The index instance or a subindex.
    """
    index = cls.__indices.get(index_name)
    if index is None:
        key_subslices = subslices(list(index_name.split(".")[1:]))
        # returns all plausible combinations of keys
        key_subslices = sorted(key_subslices, key=lambda x: len(x), reverse=True)
        for keys in key_subslices:
            name = cls.__name__ + "." + ".".join(keys)
            if cls.__indices.get(name):
                warnings.warn(f"No index {index_name} found. Using {name} instead.")
                return cls.__indices[name]
        raise IndexError(f"No index or subindex for {index_name} found.")
    return index

get_indices() -> List[Index] classmethod

Returns all indices of a given Record subclass.

Returns:

Type Description
List[Index]

A list of existing indices.

Source code in aars/core.py
389
390
391
392
393
394
395
396
397
398
@classmethod
def get_indices(cls: Type[R]) -> List["Index"]:
    """
    Returns all indices of a given Record subclass.
    Returns:
        A list of existing indices.
    """
    if cls == Record:
        return list(cls.__indices.values())
    return [index for index in cls.__indices.values() if index.record_type == cls]

is_indexed(id_hash: Union[ItemHash, str]) -> bool classmethod

Checks if a given object is indexed.

Parameters:

Name Type Description Default
id_hash Union[ItemHash, str]

The hash of the object to check.

required

Returns:

Type Description
bool

True if the object is indexed, False otherwise.

Source code in aars/core.py
205
206
207
208
209
210
211
212
213
214
@classmethod
def is_indexed(cls: Type[R], id_hash: Union[ItemHash, str]) -> bool:
    """
    Checks if a given object is indexed.
    Args:
        id_hash: The hash of the object to check.
    Returns:
        True if the object is indexed, False otherwise.
    """
    return id_hash in cls.__indexed_items

regenerate_indices() -> List[R] classmethod async

Regenerates all indices of given Record subtype. If invoked on Record, will try to fetch all objects of the current channel and index them.

This can take quite some time, depending on the amount of records to be fetched.

Returns:

Type Description
List[R]

A list of all records that were indexed.

Source code in aars/core.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
@classmethod
async def regenerate_indices(cls: Type[R]) -> List[R]:
    """
    Regenerates all indices of given Record subtype.
    If invoked on Record, will try to fetch all objects of the current channel and index them.

    !!! warning "This can take quite some time, depending on the amount of records to be fetched."

    Returns:
        A list of all records that were indexed.
    """
    response = cls.fetch_objects()

    records = []
    async for record in response:
        record._index()
        records.append(record)
    return records

remove_index(index: Index) -> None classmethod

Removes an index from the class. This stops the index from being used for queries or updates.

Parameters:

Name Type Description Default
index Index

The index to remove.

required
Source code in aars/core.py
356
357
358
359
360
361
362
363
@classmethod
def remove_index(cls: Type[R], index: "Index") -> None:
    """
    Removes an index from the class. This stops the index from being used for queries or updates.
    Args:
        index: The index to remove.
    """
    del cls.__indices[repr(index)]

save() -> R async

Posts a new item to Aleph or amends it, if it was already posted. Will add new items to local indices. For indices to be persisted on Aleph, you need to call save() on the index itself or cls.save_indices().

Returns:

Type Description
R

The updated object itself.

Source code in aars/core.py
183
184
185
186
187
188
189
190
191
192
193
async def save(self: R) -> R:
    """
    Posts a new item to Aleph or amends it, if it was already posted. Will add new items to local indices.
    For indices to be persisted on Aleph, you need to call `save()` on the index itself or `cls.save_indices()`.
    Returns:
        The updated object itself.
    """
    await AARS.post_or_amend_object(self)
    if self.current_revision == 0:
        self._index()
    return self

save_indices() -> None classmethod async

Updates all indices of given Record subclass.

Source code in aars/core.py
400
401
402
403
404
@classmethod
async def save_indices(cls: Type[R]) -> None:
    """Updates all indices of given Record subclass."""
    tasks = [index.save() for index in cls.get_indices()]
    await asyncio.gather(*tasks)

update_revision_hashes() -> R async

Updates the list of available revision hashes, in order to fetch these.

Returns:

Type Description
R

The object with the updated revision hashes.

Source code in aars/core.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def update_revision_hashes(self: R) -> R:
    """
    Updates the list of available revision hashes, in order to fetch these.

    Returns:
        The object with the updated revision hashes.
    """
    assert self.id_hash is not None
    self.revision_hashes = [self.id_hash] + await async_iterator_to_list(
        AARS.fetch_revisions(type(self), ref=self.id_hash)
    )
    if self.current_revision is None:
        # latest revision
        self.current_revision = len(self.revision_hashes) - 1
    return self

where_eq(**kwargs) -> PageableResponse[R] classmethod

Queries an object by given properties through an index, in order to fetch applicable records. An index name is defined as

"Class.property1.property2"
and is initialized by creating an Index instance, targeting a BaseRecord class with a list of properties.

Index(MyRecord, ['property1', 'property2'])

This will create an index named ‘MyRecord.property1.property2’ which can be queried with:

MyRecord.where_eq(property1='value1', property2='value2')

If no index is defined for the given properties, an IndexError is raised.

If only a part of the keys is indexed for the given query, a fallback index is used and locally filtered.

Parameters:

Name Type Description Default
**kwargs

The properties to query for.

{}

Returns:

Type Description
PageableResponse[R]

A pageable response object, which can be asynchronously iterated over.

Source code in aars/core.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
@classmethod
def where_eq(cls: Type[R], **kwargs) -> PageableResponse[R]:
    """
    Queries an object by given properties through an index, in order to fetch applicable records.
    An index name is defined as
    ```python
    "Class.property1.property2"
    ```
    and is initialized by creating an `Index` instance, targeting a BaseRecord class with a list of properties.

    ```python
    Index(MyRecord, ['property1', 'property2'])
    ```

    This will create an index named 'MyRecord.property1.property2' which can be queried with:

    ```python
    MyRecord.where_eq(property1='value1', property2='value2')
    ```

    If no index is defined for the given properties, an IndexError is raised.

    If only a part of the keys is indexed for the given query, a fallback index is used and locally filtered.

    Args:
        **kwargs: The properties to query for.
    Returns:
        A pageable response object, which can be asynchronously iterated over.
    """
    query = IndexQuery(cls, **kwargs)
    index = cls.get_index(query.get_index_name())
    generator = index.lookup(query)
    return PageableResponse(generator)