import requests import json import os class ABSConnector: def __init__(self, abs_url, token=None): self.abs_url = abs_url self.requests = requests.Session() self.requests.headers = {"Authorization": f"Bearer {token}"} def get_library_ids(self): endpoint = f"{self.abs_url}/api/libraries" response = self.requests.get(endpoint) response.raise_for_status() data = response.json() return data["libraries"] def _get_library_page(self, library_id, page=0, page_size=100): endpoint = f"{self.abs_url}/api/libraries/{library_id}/series" response = self.requests.get( endpoint, params={ "limit": page_size, "page": page, "minified": 1, "sort": "name", }, ) response.raise_for_status() return response.json() def get_series_by_library_id(self, library_id, page_size=100): page = 0 while True: data = self._get_library_page(library_id, page, page_size) yield from data["results"] page += 1 if data["total"] < page_size * page: # Stop if no more data break class ABSConnectorMock(ABSConnector): def __init__(self, abs_url, token=None): super().__init__(abs_url, token) self.directory = "dumps/abs" if not os.path.exists(self.directory): os.makedirs(self.directory) def get_library_ids(self): path = f"{self.directory}/libraries.json" try: with open(path, "r") as f: data = json.load(f) return data["libraries"] except FileNotFoundError: data = ABSConnector.get_library_ids(self) with open(path, "w+") as f: json.dump({"libraries": data}, f, indent=4) return data def get_series_by_library_id(self, library_id, page_size=100): page = 0 while True: path = f"{self.directory}/library_{library_id}.page_{page}.json" try: with open(path, "r") as f: data = json.load(f) except FileNotFoundError: data = ABSConnector._get_library_page(self, library_id, page, page_size) with open(path, "w+") as f: json.dump(data, f, indent=4) yield from data["results"] page += 1 if data["total"] < page_size * page: # Stop if no more data break