Skip to content

Utils

This module contains utility functions and classes for the AARS library. Most notably it contains the IndexQuery class, which prepares a query for records using an Index.

Also, it contains the PageableResponse and PageableRequest classes, which allow for easy pagination of queries and efficient retrieval of large amounts of data in the asynchronous environment of AARS.

EmptyAsyncIterator

Bases: AsyncIterator[T]

An async iterator that can be returned when there are no results.

IndexQuery(record_type: Type[T], **kwargs)

Bases: OrderedDict, Generic[T]

A query for a specific index. The keys are the index keys, and the values are the values to query for. It is an ordered dict in which the order is alphabetically determined by the keys, so that the same query will always have the same string representation. This is used to determine the index name.

Create a new IndexQuery.

Parameters:

Name Type Description Default
record_type Type[T]

The type of record that this query is for.

required
**kwargs

The keys and values to query for.

{}
Source code in aars/utils.py
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, record_type: Type[T], **kwargs):
    """
    Create a new IndexQuery.
    Args:
        record_type: The type of record that this query is for.
        **kwargs: The keys and values to query for.
    """
    super().__init__(
        {item[0]: item[1] for item in sorted(kwargs.items()) if item[1] is not None}
    )
    self.record_type = record_type

get_index_name() -> str

Get the name of the index that this query would use.

Returns:

Type Description
str

The name of the index to query.

Source code in aars/utils.py
60
61
62
63
64
65
66
def get_index_name(self) -> str:
    """
    Get the name of the index that this query would use.
    Returns:
        The name of the index to query.
    """
    return self.record_type.__name__ + "." + ".".join(self.keys())

get_subquery(keys: List[str]) -> IndexQuery

Get a subquery of this query, containing only the specified keys.

Example
query = IndexQuery(record_type=MyRecord, a=1, b=2, c=3)
subquery = query.get_subquery(["a", "c"])
assert subquery == IndexQuery(record_type=MyRecord, a=1, c=3)

Parameters:

Name Type Description Default
keys List[str]

The keys to include in the subquery.

required

Returns:

Type Description
IndexQuery

The subquery.

Source code in aars/utils.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_subquery(self, keys: List[str]) -> "IndexQuery":
    """
    Get a subquery of this query, containing only the specified keys.
    Example:
        ```python
        query = IndexQuery(record_type=MyRecord, a=1, b=2, c=3)
        subquery = query.get_subquery(["a", "c"])
        assert subquery == IndexQuery(record_type=MyRecord, a=1, c=3)
        ```
    Args:
        keys: The keys to include in the subquery.
    Returns:
        The subquery.
    """
    return IndexQuery(
        self.record_type, **{key: arg for key, arg in self.items() if key in keys}
    )

PageableRequest(func: Callable[..., AsyncIterator[T]], *args, **kwargs)

Bases: AsyncIterator[T], Generic[T]

A wrapper around a request that returns a PageableResponse. Useful if performance improvements can be obtained by passing page and page_size parameters to the request. Can be treated like PageableResponse.

Create a new PageableRequest. The request will be triggered when the first record is requested.

Parameters:

Name Type Description Default
func Callable[..., AsyncIterator[T]]

The function to call to get the record generator.

required
*args

The arguments to pass to the function.

()
**kwargs

The keyword arguments to pass to the function.

{}
Source code in aars/utils.py
190
191
192
193
194
195
196
197
198
199
200
def __init__(self, func: Callable[..., AsyncIterator[T]], *args, **kwargs):
    """
    Create a new PageableRequest. The request will be triggered when the first record is requested.
    Args:
        func: The function to call to get the record generator.
        *args: The arguments to pass to the function.
        **kwargs: The keyword arguments to pass to the function.
    """
    self.func = func
    self.args = args
    self.kwargs = kwargs

response property

The PageableResponse of this request.

all() -> List[T] async

Trigger the request and return all records.

Returns:

Type Description
List[T]

A list of all records.

Source code in aars/utils.py
223
224
225
226
227
228
229
async def all(self) -> List[T]:
    """
    Trigger the request and return all records.
    Returns:
        A list of all records.
    """
    return await self.response.all()

first() -> Optional[T] async

Trigger the request and return the first record.

Returns:

Type Description
Optional[T]

The first record, or None if there are no records.

Source code in aars/utils.py
245
246
247
248
249
250
251
252
253
254
async def first(self) -> Optional[T]:
    """
    Trigger the request and return the first record.
    Returns:
        The first record, or None if there are no records.
    """
    self._response = PageableResponse(
        self.func(*self.args, **self.kwargs, page=1, page_size=1)
    )
    return await self.response.first()

page(page: int, page_size: int) -> List[T] async

Trigger the request and return a page of records.

Parameters:

Name Type Description Default
page int

The page number to fetch.

required
page_size int

The number of records per page.

required

Returns:

Type Description
List[T]

A list of records on the specified page.

Source code in aars/utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
async def page(self, page: int, page_size: int) -> List[T]:
    """
    Trigger the request and return a page of records.
    Args:
        page: The page number to fetch.
        page_size: The number of records per page.
    Returns:
        A list of records on the specified page.
    """
    self._response = PageableResponse(
        self.func(*self.args, **self.kwargs, page=page, page_size=page_size)
    )
    return await self.response.all()

PageableResponse(record_generator: AsyncIterator[T])

Bases: AsyncIterator[T], Generic[T]

A wrapper around an AsyncIterator that allows for easy pagination and iteration, while also preventing multiple iterations. This is mainly used for nicer syntax when not using the async generator syntax.

Iterate over all records
async for record in PageableResponse(record_generator):
    print(record)
Consuming the generator

Calling any of the methods will consume the generator, so it is not possible to iterate over the records after calling any of the methods.

Iteration vs. Fetching all records at once

If you plan to stop iterating over the records after a certain point, it is more efficient to use the async for syntax, as it will consume less calls to the Aleph Message API. If you want to get all records, AARS will automatically use the most efficient amount of API calls to do so.

Source code in aars/utils.py
111
112
def __init__(self, record_generator: AsyncIterator[T]):
    self.record_generator = record_generator

all() -> List[T] async

Fetch all records of this response.

Fetch all records
records = await PageableResponse(record_generator).all()

Returns:

Type Description
List[T]

A list of all records.

Source code in aars/utils.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
async def all(self) -> List[T]:
    """
    Fetch all records of this response.

    Example: Fetch all records
        ```python
        records = await PageableResponse(record_generator).all()
        ```
    Returns:
        A list of all records.
    """
    if self.used:
        raise AlreadyUsedError()
    self.used = True
    return await async_iterator_to_list(self.record_generator)

first() -> Optional[T] async

Fetch the first record, which is usually the most recent record.

Fetch the first record
record = await PageableResponse(record_generator).first()

Returns:

Type Description
Optional[T]

The first record, or None if there are no records.

Source code in aars/utils.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def first(self) -> Optional[T]:
    """
    Fetch the first record, which is usually the most recent record.

    Example: Fetch the first record
        ```python
        record = await PageableResponse(record_generator).first()
        ```
    Returns:
        The first record, or None if there are no records.
    """
    if self.used:
        raise AlreadyUsedError()
    self.used = True
    try:
        return await self.record_generator.__anext__()
    except StopAsyncIteration:
        return None

page(page: int, page_size: int) -> List[T] async

Fetch a page of records.

Fetch a page of records
records = await PageableResponse(record_generator).page(2, 10)

Parameters:

Name Type Description Default
page int

The page number to fetch.

required
page_size int

The number of records per page.

required

Returns:

Type Description
List[T]

A list of records on the specified page.

Source code in aars/utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
async def page(self, page: int, page_size: int) -> List[T]:
    """
    Fetch a page of records.

    Example: Fetch a page of records
        ```python
        records = await PageableResponse(record_generator).page(2, 10)
        ```
    Args:
        page: The page number to fetch.
        page_size: The number of records per page.
    Returns:
        A list of records on the specified page.
    """
    if self.used:
        raise AlreadyUsedError()
    self.used = True
    return await async_iterator_to_list(
        self.record_generator, (page - 1) * page_size, page_size
    )

async_iterator_to_list(iterator: AsyncIterator[T], skip: int = 0, count: Optional[int] = None) -> List[T] async

Return a list from an async iterator.

Parameters:

Name Type Description Default
iterator AsyncIterator[T]

The async iterator to convert to a list.

required
skip int

The number of items to skip.

0
count Optional[int]

The maximum number of items to return.

None

Returns:

Type Description
List[T]

A list of items.

Source code in aars/utils.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def async_iterator_to_list(
    iterator: AsyncIterator[T], skip: int = 0, count: Optional[int] = None
) -> List[T]:
    """
    Return a list from an async iterator.
    Args:
        iterator: The async iterator to convert to a list.
        skip: The number of items to skip.
        count: The maximum number of items to return.
    Returns:
        A list of items.
    """
    if count is None and skip == 0:
        return [item async for item in iterator]
    else:
        items = []
        async for item in iterator:
            if skip > 0:
                skip -= 1
                continue

            items.append(item)
            if len(items) == count:
                break
        return items

possible_index_names(seq)

Return all possible index names for a sequence of properties.

Example
list(possible_index_names(['A', 'B', 'C'])) == [['A'], ['A.B'], ['A.B.C'], ['B'], ['B.C'], ['C']]
Source code in aars/utils.py
272
273
274
275
276
277
278
279
280
281
def possible_index_names(seq):
    """
    Return all possible index names for a sequence of properties.

    Example:
        ```python
        list(possible_index_names(['A', 'B', 'C'])) == [['A'], ['A.B'], ['A.B.C'], ['B'], ['B.C'], ['C']]
        ```
    """
    return map(".".join, subslices(seq))

subslices(seq)

Return all contiguous non-empty subslices of a sequence. Taken from more_itertools.

Example
list(subslices([1, 2, 3])) == [[1], [1, 2], [1, 2, 3], [2], [2, 3], [3]]
Source code in aars/utils.py
257
258
259
260
261
262
263
264
265
266
267
268
269
def subslices(seq):
    """
    Return all contiguous non-empty subslices of a sequence.
    Taken from more_itertools.

    Example:
        ```python
        list(subslices([1, 2, 3])) == [[1], [1, 2], [1, 2, 3], [2], [2, 3], [3]]
        ```
    """
    #
    slices = starmap(slice, combinations(range(len(seq) + 1), 2))
    return map(operator.getitem, repeat(seq), slices)