本文共 3044 字,大约阅读时间需要 10 分钟。
简单的写一下吧,用python实现的,因为代码量少,可以更清晰的看原理
redis是NIO+IO多路复用无锁模式,利用这一特性,我们可以保证setnx命令不会出现多个线程同时成功,保证我们确实获得了该锁.
代码中有一些幂等性的细节,并且为不可重入锁,如果递归你们自己改一改,用个标记记录ID之类的。
import redisimport timeimport uuidimport threadingconn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, password=123456)def acquire_lock_with_timeout(lock_name, timeout=10, lock_timeout=20): # 这里演示了 给锁加上过期时间,自动删除 # 这里的lock_timeout ,并不是申请锁的超时时间,而是自动删除锁的时间 # 一定要合理设置,确保这个timeout会比你所申请锁后处理业务的时间要长 identifier = str(uuid.uuid4()) timeout = time.time() + timeout lock_name = "lock:" + lock_name while time.time() < timeout: if conn.setnx(lock_name, identifier): conn.expire(lock_name, lock_timeout) return identifier elif not conn.ttl(lock_name): # 如果这个锁存在,但是没有被设置过期时间,那么我们设置过期时间 conn.expire(lock_name, lock_timeout) time.sleep(0.001) return False# 用watch的原因是为了适用于有过期时间的锁def release_lock(lock_name, identifier): pipe = conn.pipeline(True) lock_name = "lock:" + lock_name while True: try: pipe.watch(lock_name) # 监视lock_name 没有被篡改 if pipe.get(lock_name) == identifier: # 如果这个键的值还是我们申请锁时返回的标识符 pipe.multi() pipe.delete(lock_name) # 删除这个锁 pipe.execute() return True pipe.unwatch() # 如果没有达成上面的条件,说明锁已经被移除了 break # 停止监视,退出循环 最后会返回False except redis.exceptions.WatchError: # 如果锁被篡改过,pass 继续执行while 重试 pass return Falsedef test(): lock_res = acquire_lock_with_timeout("test") if lock_res: print(uuid.uuid4()) release_lock("test", lock_res) if __name__ == '__main__': t1 = threading.Thread(target=test) t2 = threading.Thread(target=test) t1.start() t2.start() time.sleep(10)
关于信号量的解释,总有人和多线程/进程的max数搞混
- 信号量:如果你有30个线程启动,那么同时就有30个线程运行了,只是这30个线程,被阻塞住,确保只有X个信号量个线程处理业务
- 递归锁:如果你有30个线程启动,那么同时就有30个线程运行了,只是每个线程中针对上锁的地方,都会有引用计数,当前线程 可以申请很多次这个锁,但是如果引用计数不为0,其他线程就永远无法进入
- 线程/进程池的最大数:不管你注册了多少个worker,但是同时运行的线程/进程数,不能超过设置的最大数,它和信号量总是被人搞混
1.实现信号量可以用string实现,递增递减就完事了,但是这样无法设置过期时间,因为计数是在这个string里完成的
2.还是用string,但是一个信号量占用一个string,这样可以设置过期时间,例如:setnx semcount:1 identifier,但是时间复杂度跟随信号量数量递增 3.使用有序集合,将多个信号量持有者的信息存储到同一个结构里,并且可以利用分值进行移除过期信号量. 4.使用无序集合,使用scard完成计数,但是无法设置过期时间。
import redisimport timeimport uuidimport threadingconn = redis.StrictRedis(host='192.168.0.6', port=6379, db=0, password=123456)# 下面这个方法,思路是OK的,但是分布式信号量每台服务器的时间戳不可能完全相同一丝不差。# 所造成的的结果就是,下面信号量的数量限制没有问题,但是公平性有问题# 例如:服务器A和服务器B 同时请求5个信号量,但是服务器A的系统时间比服务器B的系统时间快10毫秒# 那么最后,B服务器会获得更多的信号量,因为A服务器的排名永远会比B服务器的大(所以靠后)def acquire_semaphore(semaphore_name, limit, timeout=10): identifier = str(uuid.uuid4()) now = time.time() pipe = conn.pipeline(True) pipe.zremrangebyscore(semaphore_name, '-inf', now - timeout) # 清理过期的信号量持有者 pipe.zadd(semaphore_name, { identifier: now}) # 申请信号量 # 获取rank值,因为有序集合zrank是取排名,那么根据identifier对应的score(时间戳)取排名,就可以确定有多少个信号量了 pipe.zrank(semaphore_name, identifier) if pipe.execute()[-1] < limit: # 执行 取最后一个元素(rank值),如果rank值
转载地址:http://zxlsi.baihongyu.com/