1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
from dataclasses import dataclass
from texture import ImageTextureImmichAsset
@dataclass
class Album:
id: str
range_start: int
range_end: int
assets_list: list[str] = None
@property
def assets_count(self):
return end - start
class LazyCachingTextureList():
def __init__(self, immich_connector, album_ids, max_cache_items=100):
self.immich_connector = immich_connector
assert max_cache_items >= 20, "Minimum cache items is 20" # Double small radius
# Ring buffer
self.cache_length = max_cache_items
self.cache = [None] * max_cache_items
self.radius_small = 10
self.radius_large = int(max_cache_items / 2)
self.cache_items_behind = 0
self.cache_items_ahead = 0
self.cache_index = 0
self.asset_index = 0
self.asset_count = 0
self.cached_items = 0
self.album_keys = album_ids
self.albums = self._get_albums()
def index_in_cache_range(self, index):
index_range_low = (self.asset_index - self.cache_items_behind) % self.asset_count
index_range_high = (self.asset_index + self.cache_items_ahead ) % self.asset_count
if index_range_low > index_range_high:
return index_range_low <= index or index <= index_range_high
return index_range_low <= index <= index_range_high
def _get_albums(self):
albums = []
self.asset_count = i = 0
albums_info = self.immich_connector.load_all_albums()
for album_info in albums_info:
id = album_info["id"]
if id not in self.album_keys:
continue
asset_count = album_info["assetCount"]
albums.append(Album(id, i, i + asset_count))
i += asset_count + 1
self.asset_count += asset_count
#self.asset_count = sum(( album.assets_count for album in self.albums ))
return albums
def _get_album_asset_by_index(self, asset_index):
if asset_index < 0:
asset_index = asset_count - asset_index
for album in self.albums:
if album.range_start <= asset_index <= album.range_end:
if album.assets_list is None:
album.assets_list = self.immich_connector.load_album_assets(album.id)
return album.assets_list[asset_index - album.range_start]
raise IndexError("Index out of bounds")
def _fill_cache(self, current_radius, final_radius, step):
if current_radius >= final_radius:
return current_radius
for i in range(current_radius * step, final_radius * step, step):
cache_index = (self.cache_index + i) % self.cache_length
asset_index = (self.asset_index + i) % self.asset_count
if self.cache[cache_index]:
self.cache[cache_index].free() # Since this is a ring buffer, textures outside of range can just get free'd here
asset = self._get_album_asset_by_index(asset_index)
tex = ImageTextureImmichAsset(asset, asset_index)
self.immich_connector.load_texture_async(self, tex)
self.cache[cache_index] = tex
self.cached_items += 1
return max(current_radius, final_radius)
def _update_cache_get_item(self, asset_index):
prev_asset_index = self.asset_index
self.asset_index = asset_index % self.asset_count
# Movement is the distance between the previous and current index in the assets list
# Since the list wraps around, fastest method is just to get the abs min of the 3 cases
movement = min(
asset_index - prev_asset_index, # No list wrap
asset_index - self.asset_count, # Wrap backwards (0 -> -1)
self.asset_count - prev_asset_index, # Wrap forwards (-1 -> 0)
key=abs)
self.cache_index = (self.cache_index + movement) % self.cache_length
ahead = max(0, self.cache_items_ahead - movement)
behind = max(0, self.cache_items_behind + movement)
if ahead + behind > self.cache_length:
if movement < 0:
ahead += movement
else:
behind -= movement
#print("AHEAD/BEHIND/CACHE_I/ASSET_I/MOVEMENT:", ahead, behind, self.cache_index, self.asset_index, movement)
# TODO if ahead is 0 then clear queue
ahead = self._fill_cache(ahead, self.radius_small, +1) # Fill small radius ahead of cache_index
behind = self._fill_cache(behind, self.radius_small, -1) # Fill small radius behind cache_index
ahead = self._fill_cache(ahead, self.radius_large, +1) # Fill large radius ahead of cache_index
self.cache_items_ahead = ahead
self.cache_items_behind = behind
return self.cache[self.cache_index]
def __len__(self):
return self.asset_count
def __getitem__(self, index):
i = index % self.asset_count
if abs(index) > i:
raise IndexError("Index out of bounds")
return self._update_cache_get_item(index)
|