jerkzhang
30s内行为只计算1次,python单进程,不想用memcached,想简单一点,于是选择了:
from cachetools import TTLCache
但是TTLCache超过最大使用量 MaxSize后会报错;
所以,考虑到删除最不常用的缓存来实现。
一开始用AI来写,AI提出了很多种方案,结合LRUCache等等,引入time进行计时,为了规避内存长期被高占用,又定时清理,但定时清理又会导致运算增加。
后来,我决定指导AI的思路,用了一种更巧妙的算法,不做过多的重新造轮子的方式去手动写很复杂的逻辑。那样太麻烦了,本来就是一个细节问题,没必要这么复杂。
于是我让AI的思路改变成,同时开两个TTLCache,尽可能利用TTLCache原生的过期自动清理的特性;为了规避TTLCache过量,那么当其中一个TTLCache过量的时候,就直接去使用另1个TTLCache即可,把另一个TTLCache清空后,直接用另一个TTLCache即可。
from cachetools import TTLCache
import time
class DualTTLCache:
def __init__(self, maxsize=1000, ttl=30):
self.maxsize = maxsize
self.ttl = ttl
self.cache_A = TTLCache(maxsize=maxsize, ttl=ttl)
self.cache_B = TTLCache(maxsize=maxsize, ttl=ttl)
self.current_active = 'A'
# 缓存映射字典
self.cache_map = {'A': self.cache_A, 'B': self.cache_B}
def should_count(self, key):
# 使用dict映射获取活跃缓存
active_cache = self.cache_map[self.current_active]
if key in active_cache:
return False
try:
active_cache[key] = True
return True
except KeyError:
# 切换缓存
self._switch_cache()
# 获取新的活跃缓存
new_active_cache = self.cache_map[self.current_active]
new_active_cache[key] = True
return True
def _switch_cache(self):
"""切换缓存"""
if self.current_active == 'A':
self.cache_B.clear()
self.current_active = 'B'
else:
self.cache_A.clear()
self.current_active = 'A'
print(f"缓存切换: {'A->B' if self.current_active == 'B' else 'B->A'}")
def get_stats(self):
return {
'active': self.current_active,
'cache_A_size': len(self.cache_A),
'cache_B_size': len(self.cache_B),
'max_size': self.maxsize
}
# 测试
# 全局缓存
dual_cache = DualTTLCache(maxsize=1000, ttl=30)
if dual_cache.should_count(user_id):
# 有效点击
print({"status": "counted", "cache_info": dual_cache.get_stats()})
else:
# 已经30s内重复
print({"status": "skipped", "cache_info": dual_cache.get_stats()})我自认为自己这方法太巧妙了,简直神来一笔。于是写下这篇笔记。
但是,我发现我错了,切换后,再次请求的数据,很可能就在以前的TTLCache中,这样不对呀!
于是补救方案:双缓存同时检查,如下所示。
from cachetools import TTLCache
import time
class DualTTLCache:
def __init__(self, maxsize=1000, ttl=30):
self.maxsize = maxsize
self.ttl = ttl
self.cache_A = TTLCache(maxsize=maxsize, ttl=ttl)
self.cache_B = TTLCache(maxsize=maxsize, ttl=ttl)
self.current_active = 'A'
self.cache_map = {'A': self.cache_A, 'B': self.cache_B}
def should_count(self, key):
# 同时检查两个缓存!
if key in self.cache_A or key in self.cache_B:
return False
# 写入当前活跃缓存
active_cache = self.cache_map[self.current_active]
try:
active_cache[key] = True
return True
except KeyError:
# 切换缓存
self._switch_cache()
new_active_cache = self.cache_map[self.current_active]
new_active_cache[key] = True
return True
def _switch_cache(self):
"""切换缓存,清空非活跃缓存"""
if self.current_active == 'A':
self.cache_B.clear() # 清空即将变为活跃的缓存
self.current_active = 'B'
else:
self.cache_A.clear() # 清空即将变为活跃的缓存
self.current_active = 'A'
print(f"缓存切换: {'A->B' if self.current_active == 'B' else 'B->A'}")
def get_stats(self):
return {
'active': self.current_active,
'cache_A_size': len(self.cache_A),
'cache_B_size': len(self.cache_B),
'total_unique_keys': len(set(list(self.cache_A.keys()) + list(self.cache_B.keys())))
}这样好了,稳妥了。
但是,有利必有弊,现在变成了两次检索内存。
等于运行效率的上也是双倍开销;但是,手动去定时清理,这类做法也是会有开销的。双缓存策略,至少开发上是优雅的,开发效率高、逻辑更清晰。
有利必然有弊,性能上整体还是弱于手动定时清理的那种,毕竟那就是1个缓存查询。但不会差别太多。好处,除了开发上更优雅,逻辑更清晰;它会有更少的概率去清除缓存,整体性能上更平稳平顺。
其实,这里是把一个细节问题复杂化。
实际上,开一个比较大的MaxSize,我认为就可以了;不需要这么复杂。而且这个问题不会是瓶颈问题。此外,因为不是瓶颈问题,使用上述的方案更稳健,消耗一点点运行效率,很值得。因此,上述的补救方案,是有意义的。