import asyncio
import logging
import time
from typing import Dict, List, Optional
import aiohttp
from .cache import Cache
from .exceptions import (
AmariServerError,
HTTPException,
InvalidToken,
NotFound,
RatelimitException,
)
from .objects import Leaderboard, Rewards, User, Users
__all__ = ("AmariClient",)
logger = logging.getLogger(__name__)
[docs]
class AmariClient:
"""
The client used to make requests to the Amari API.
Attributes
----------
useAntiRatelimit: bool
Whether to use the anti ratelimit or not.
IT IS VERY UNRECOMMENDED TO DISABLE THIS FEATURE
AS IT CAN LEAD TO RATELIMITS
session: aiohttp.ClientSession
The client session used to make requests to the Amari API.
max_requests: int
The number of requests that can be made per second
cache_ttl: int
The time to live for cache entries, in seconds.
cache: Cache
The cache instance used to store API responses.
maxbytes: int
The maximum total size of cached data in bytes.
"""
BASE_URL = "https://amaribot.com/api/v1/"
HTTP_response_errors = {
404: NotFound,
403: InvalidToken,
429: RatelimitException,
500: AmariServerError,
}
def __init__(
self,
token: str,
/,
*,
useAntirateLimit: bool = True,
session: Optional[aiohttp.ClientSession] = None,
max_requests: int = 60,
cache_ttl: int = 60,
maxbytes: int = 25 * 1024 * 1024, # 25 MiB
):
self.session = session or aiohttp.ClientSession()
self._default_headers = {"Authorization": token}
self.lock = asyncio.Lock()
# Anti Ratelimit section
self.use_anti_ratelimit = useAntirateLimit
self.requests = []
self.max_requests = max_requests
self.request_period = 60
self.cache = Cache(ttl=cache_ttl, maxbytes=maxbytes)
async def __aenter__(self):
return self
async def __aexit__(self, *exc) -> None:
await self.close()
[docs]
async def close(self):
"""
Closes the client resources.
This must be called once the client is no longer in use.
"""
await self.session.close()
async def check_ratelimit(self):
async with self.lock:
while len(self.requests) >= self.max_requests:
self.requests = [
request
for request in self.requests
if time.time() - request < self.request_period
]
if len(self.requests) >= self.max_requests:
await self.wait_for_ratelimit_end()
async def wait_for_ratelimit_end(self):
wait_amount = self.request_period - (time.time() - min(self.requests))
logger.warning(
f"You are about to be ratelimited! Waiting {round(wait_amount)} seconds."
)
await asyncio.sleep(wait_amount)
[docs]
async def fetch_user(
self, guild_id: int, user_id: int, cache: bool = False
) -> User:
"""
Fetches a user from the Amari API.
Parameters
----------
guild_id: int
The guild ID to fetch the user from.
user_id: int
The user's ID.
cache: bool
Whether to use caching for this request.
Returns
-------
User
The user object.
"""
if cache:
key = ("fetch_user", guild_id, user_id)
data = await self.cache.get(key)
if data:
return User(guild_id, data)
else:
data = await self.request(f"guild/{guild_id}/member/{user_id}")
await self.cache.set(key, data)
return User(guild_id, data)
else:
data = await self.request(f"guild/{guild_id}/member/{user_id}")
return User(guild_id, data)
[docs]
async def fetch_users(
self, guild_id: int, user_ids: List[int], cache: bool = False
) -> Users:
"""
Fetches multiple users from the Amari API.
Parameters
----------
guild_id: int
The guild ID to fetch the users from.
user_ids: List[int]
The IDs of the users you would like to fetch.
cache: bool
Whether to use caching for this request.
Returns
-------
Users
The users object containing the fetched users.
"""
if cache:
members = []
uncached_user_ids = []
for user_id in user_ids:
key = ("fetch_user", guild_id, user_id)
data = await self.cache.get(key)
if data:
members.append(data)
else:
uncached_user_ids.append(user_id)
if uncached_user_ids:
converted_user_ids = [str(user_id) for user_id in uncached_user_ids]
body = {"members": converted_user_ids}
fetched_data = await self.request(
f"guild/{guild_id}/members",
method="POST",
extra_headers={"Content-Type": "application/json"},
json=body,
)
returned_members = fetched_data.get("members") or []
returned_ids = {int(user["id"]) for user in returned_members}
missing_ids = set(uncached_user_ids) - returned_ids
# Users who are not in the bots database are not returned
# to prevent additional calls the cache retains a fake payload filled with 0's
for user_data in returned_members:
user_id = int(user_data["id"])
key = ("fetch_user", guild_id, user_id)
await self.cache.set(key, user_data)
members.append(user_data)
for user_id in missing_ids:
fake_payload = {
"id": str(user_id),
"username": '',
"exp": 0,
"level": 0,
"weeklyExp": 0,
}
key = ("fetch_user", guild_id, user_id)
await self.cache.set(key, fake_payload)
members.append(fake_payload)
data = {
"members": members,
"total_members": len(members),
"queried_members": len(user_ids),
}
return Users(guild_id, data)
else:
converted_user_ids = [str(user_id) for user_id in user_ids]
body = {"members": converted_user_ids}
data = await self.request(
f"guild/{guild_id}/members",
method="POST",
extra_headers={"Content-Type": "application/json"},
json=body,
)
return Users(guild_id, data)
[docs]
async def fetch_leaderboard(
self,
guild_id: int,
/,
*,
weekly: bool = False,
raw: bool = False,
page: Optional[int] = None,
limit: Optional[int] = None,
cache: bool = False,
) -> Leaderboard:
"""
Fetches a guild's leaderboard from the Amari API.
Parameters
----------
guild_id: int
The guild ID to fetch the leaderboard from.
weekly: bool
Choose either to fetch the weekly leaderboard or the regular leaderboard.
raw: bool
Whether or not to use the raw endpoint. Raw endpoints do not support pagination but
will return the entire leaderboard.
page: int
The leaderboard page to fetch.
limit: int
The amount of users to fetch per page.
cache: bool
Whether to use caching for this request.
Returns
-------
Leaderboard
The guild's leaderboard.
"""
if cache:
key = ("fetch_leaderboard", guild_id, weekly, raw, page, limit)
data = await self.cache.get(key)
if data:
return Leaderboard(guild_id, data)
if raw and page:
raise ValueError("raw endpoints do not support pagination")
params = {}
if page is not None:
params["page"] = page
if limit is not None:
params["limit"] = limit
lb_type = "weekly" if weekly else "leaderboard"
endpoint = ["guild", lb_type, str(guild_id)]
if raw:
endpoint.insert(1, "raw")
data = await self.request("/".join(endpoint), params=params)
if cache:
await self.cache.set(key, data)
return Leaderboard(guild_id, data)
[docs]
async def fetch_full_leaderboard(
self, guild_id: int, /, *, weekly: bool = False, cache: bool = False
) -> Leaderboard:
"""
Fetches a guild's full leaderboard from the Amari API.
Parameters
----------
guild_id: int
The guild ID to fetch the leaderboard from.
weekly: bool
Choose either to fetch the weekly leaderboard or the regular leaderboard.
cache: bool
Whether to use caching for this request.
Returns
-------
Leaderboard
The guild's leaderboard.
"""
if cache:
key = ("fetch_full_leaderboard", guild_id, weekly)
data = await self.cache.get(key)
if data:
return Leaderboard(guild_id, data)
lb_type = "weekly" if weekly else "leaderboard"
data = await self.request(f"guild/raw/{lb_type}/{guild_id}")
if cache:
await self.cache.set(key, data)
return Leaderboard(guild_id, data)
[docs]
async def fetch_rewards(
self, guild_id: int, /, *, page: int = 1, limit: int = 50, cache: bool = False
) -> Rewards:
"""
Fetches a guild's role rewards from the Amari API.
Parameters
----------
guild_id: int
The guild ID to fetch the role rewards from.
page: int
The rewards page to fetch.
limit: int
The amount of rewards to fetch per page.
cache: bool
Whether to use caching for this request.
Returns
-------
Rewards
The guild's role rewards.
"""
if cache:
key = ("fetch_rewards", guild_id, page, limit)
data = await self.cache.get(key)
if data:
return Rewards(guild_id, data)
params = {"page": page, "limit": limit}
data = await self.request(f"guild/rewards/{guild_id}", params=params)
if cache:
await self.cache.set(key, data)
return Rewards(guild_id, data)
@classmethod
async def check_response_for_errors(cls, response: aiohttp.ClientResponse):
if response.status > 399 or response.status < 200:
error = cls.HTTP_response_errors.get(response.status, HTTPException)
try:
message = (await response.json())["error"]
except Exception:
message = await response.text()
raise error(response, message)
async def request(
self,
endpoint: str,
*,
method: str = "GET",
params: Dict = {},
json: Dict = {},
extra_headers: Dict = {},
) -> Dict:
headers = dict(self._default_headers, **extra_headers)
if self.use_anti_ratelimit:
await self.check_ratelimit()
async with self.session.request(
method=method,
url=self.BASE_URL + endpoint,
json=json,
headers=headers,
params=params,
) as response:
if self.use_anti_ratelimit:
self.requests.append(time.time())
await self.check_response_for_errors(response)
return await response.json()