from dataclasses import dataclass, asdict from texture import ImageTextureImmichAsset @dataclass class Album: id: str range_start: int range_end: int assets_list: list[str] = None @property def assets_count(self): return end - start @dataclass class CallbackStateData: asset_index: int movement: int assets: list[str] @classmethod def from_lctl(cls, l): si = (l.asset_index - 5) % l.asset_count ei = (l.asset_index + 6) % l.asset_count sa = l.assets[si:ei] if si < ei else l.assets[si:] + l.assets[:ei] assets = [ a["id"] for a in sa ] return cls( asset_index=l.asset_index, movement=l.last_movement, assets=assets, ) class LazyCachingTextureList(): def __init__(self, immich_connector, album_ids, max_cache_items=100, change_callback=None): self.immich_connector = immich_connector assert max_cache_items >= 20, "Minimum cache items is 20" # Double small radius # Ring buffer self.cache_length = max_cache_items self.cache = [None] * max_cache_items self.radius_small = 10 self.radius_large = int(max_cache_items / 2) self.cache_items_behind = 0 self.cache_items_ahead = 0 self.cache_index = 0 self.asset_index = 0 self.asset_count = 0 self.last_movement = 0 self.cached_items = 0 self.album_keys = album_ids # TODO simplify album handling, dont need classes, etc. self.albums = self._get_albums() self.assets = self._get_album_assets() self.change_callback = change_callback def index_in_cache_range(self, index): index_range_low = (self.asset_index - self.cache_items_behind) % self.asset_count index_range_high = (self.asset_index + self.cache_items_ahead ) % self.asset_count if index_range_low > index_range_high: return index_range_low <= index or index <= index_range_high return index_range_low <= index <= index_range_high def _get_albums(self): albums = [] self.asset_count = i = 0 albums_info = self.immich_connector.load_all_albums() for album_info in albums_info: id = album_info["id"] if id not in self.album_keys: continue asset_count = album_info["assetCount"] albums.append(Album(id, i, i + asset_count)) i += asset_count + 1 self.asset_count += asset_count return albums def _get_album_assets(self): assets = [] for album in self.albums: assets += self.immich_connector.load_album_assets(album.id) return assets def _fill_cache(self, current_radius, final_radius, step): if current_radius >= final_radius: return current_radius for i in range(current_radius * step, final_radius * step, step): cache_index = (self.cache_index + i) % self.cache_length asset_index = (self.asset_index + i) % self.asset_count if self.cache[cache_index]: self.cache[cache_index].free() # Since this is a ring buffer, textures outside of range can just get free'd here asset = self.assets[asset_index] tex = ImageTextureImmichAsset(asset, asset_index) self.immich_connector.load_texture_async(self, tex) self.cache[cache_index] = tex self.cached_items += 1 return max(current_radius, final_radius) def _update_cache_get_item(self, asset_index): prev_asset_index = self.asset_index self.asset_index = asset_index % self.asset_count #if prev_asset_index == asset_index: # return self.cache[self.cache_index] # Movement is the distance between the previous and current index in the assets list # Since the list wraps around, fastest method is just to get the abs min of the 3 cases movement = min( asset_index - prev_asset_index, # No list wrap asset_index - self.asset_count, # Wrap backwards (0 -> -1) self.asset_count - prev_asset_index, # Wrap forwards (-1 -> 0) key=abs) self.last_movement = movement self.cache_index = (self.cache_index + movement) % self.cache_length ahead = max(0, self.cache_items_ahead - movement) behind = max(0, self.cache_items_behind + movement) if ahead + behind > self.cache_length: if movement < 0: ahead += movement else: behind -= movement #print("AHEAD/BEHIND/CACHE_I/ASSET_I/MOVEMENT:", ahead, behind, self.cache_index, self.asset_index, movement) # TODO if ahead is 0 then clear queue ahead = self._fill_cache(ahead, self.radius_small, +1) # Fill small radius ahead of cache_index behind = self._fill_cache(behind, self.radius_small, -1) # Fill small radius behind cache_index ahead = self._fill_cache(ahead, self.radius_large, +1) # Fill large radius ahead of cache_index self.cache_items_ahead = ahead self.cache_items_behind = behind # Perform callback if prev_asset_index != asset_index and self.change_callback: self.change_callback(asdict(CallbackStateData.from_lctl(self))) return self.cache[self.cache_index] def __len__(self): return self.asset_count def __getitem__(self, index): i = index % self.asset_count if abs(index) > i: raise IndexError("Index out of bounds") return self._update_cache_get_item(index)