博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis 分布式锁和分布式信号量(Python实现)
阅读量:4109 次
发布时间:2019-05-25

本文共 3044 字,大约阅读时间需要 10 分钟。

简单的写一下吧,用python实现的,因为代码量少,可以更清晰的看原理

redis是NIO+IO多路复用无锁模式,利用这一特性,我们可以保证setnx命令不会出现多个线程同时成功,保证我们确实获得了该锁.

代码中有一些幂等性的细节,并且为不可重入锁,如果递归你们自己改一改,用个标记记录ID之类的。

锁的实现

import redisimport timeimport uuidimport threadingconn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, password=123456)def acquire_lock_with_timeout(lock_name, timeout=10, lock_timeout=20):    # 这里演示了 给锁加上过期时间,自动删除    # 这里的lock_timeout ,并不是申请锁的超时时间,而是自动删除锁的时间    # 一定要合理设置,确保这个timeout会比你所申请锁后处理业务的时间要长    identifier = str(uuid.uuid4())    timeout = time.time() + timeout    lock_name = "lock:" + lock_name    while time.time() < timeout:        if conn.setnx(lock_name, identifier):             conn.expire(lock_name, lock_timeout)            return identifier        elif not conn.ttl(lock_name):  # 如果这个锁存在,但是没有被设置过期时间,那么我们设置过期时间            conn.expire(lock_name, lock_timeout)        time.sleep(0.001)    return False# 用watch的原因是为了适用于有过期时间的锁def release_lock(lock_name, identifier):    pipe = conn.pipeline(True)    lock_name = "lock:" + lock_name    while True:        try:            pipe.watch(lock_name)  # 监视lock_name 没有被篡改            if pipe.get(lock_name) == identifier:  # 如果这个键的值还是我们申请锁时返回的标识符                pipe.multi()                pipe.delete(lock_name)  # 删除这个锁                pipe.execute()                return True            pipe.unwatch()  # 如果没有达成上面的条件,说明锁已经被移除了            break   # 停止监视,退出循环  最后会返回False        except redis.exceptions.WatchError:  # 如果锁被篡改过,pass 继续执行while 重试            pass        return Falsedef test():    lock_res = acquire_lock_with_timeout("test")    if lock_res:        print(uuid.uuid4())        release_lock("test", lock_res)    if __name__ == '__main__':    t1 = threading.Thread(target=test)    t2 = threading.Thread(target=test)    t1.start()    t2.start()    time.sleep(10)

信号量实现

关于信号量的解释,总有人和多线程/进程的max数搞混

  • 信号量:如果你有30个线程启动,那么同时就有30个线程运行了,只是这30个线程,被阻塞住,确保只有X个信号量个线程处理业务
  • 递归锁:如果你有30个线程启动,那么同时就有30个线程运行了,只是每个线程中针对上锁的地方,都会有引用计数,当前线程
    可以申请很多次这个锁,但是如果引用计数不为0,其他线程就永远无法进入
  • 线程/进程池的最大数:不管你注册了多少个worker,但是同时运行的线程/进程数,不能超过设置的最大数,它和信号量总是被人搞混

1.实现信号量可以用string实现,递增递减就完事了,但是这样无法设置过期时间,因为计数是在这个string里完成的

2.还是用string,但是一个信号量占用一个string,这样可以设置过期时间,例如:setnx semcount:1 identifier,但是时间复杂度跟随信号量数量递增
3.使用有序集合,将多个信号量持有者的信息存储到同一个结构里,并且可以利用分值进行移除过期信号量.
4.使用无序集合,使用scard完成计数,但是无法设置过期时间。

import redisimport timeimport uuidimport threadingconn = redis.StrictRedis(host='192.168.0.6', port=6379, db=0, password=123456)# 下面这个方法,思路是OK的,但是分布式信号量每台服务器的时间戳不可能完全相同一丝不差。# 所造成的的结果就是,下面信号量的数量限制没有问题,但是公平性有问题# 例如:服务器A和服务器B 同时请求5个信号量,但是服务器A的系统时间比服务器B的系统时间快10毫秒# 那么最后,B服务器会获得更多的信号量,因为A服务器的排名永远会比B服务器的大(所以靠后)def acquire_semaphore(semaphore_name, limit, timeout=10):    identifier = str(uuid.uuid4())    now = time.time()    pipe = conn.pipeline(True)    pipe.zremrangebyscore(semaphore_name, '-inf', now - timeout)  # 清理过期的信号量持有者    pipe.zadd(semaphore_name, {
identifier: now}) # 申请信号量 # 获取rank值,因为有序集合zrank是取排名,那么根据identifier对应的score(时间戳)取排名,就可以确定有多少个信号量了 pipe.zrank(semaphore_name, identifier) if pipe.execute()[-1] < limit: # 执行 取最后一个元素(rank值),如果rank值

转载地址:http://zxlsi.baihongyu.com/

你可能感兴趣的文章
手绘VS码绘(一):静态图绘制(码绘使用P5.js)
查看>>
手绘VS码绘(二):动态图绘制(码绘使用Processing)
查看>>
基于P5.js的“绘画系统”
查看>>
《达芬奇的人生密码》观后感
查看>>
论文翻译:《一个包容性设计的具体例子:聋人导向可访问性》
查看>>
基于“分形”编写的交互应用
查看>>
《融入动画技术的交互应用》主题博文推荐
查看>>
链睿和家乐福合作推出下一代零售业隐私保护技术
查看>>
Unifrax宣布新建SiFAB™生产线
查看>>
艾默生纪念谷轮™在空调和制冷领域的百年创新成就
查看>>
NEXO代币持有者获得20,428,359.89美元股息
查看>>
Piper Sandler为EverArc收购Perimeter Solutions提供咨询服务
查看>>
RMRK筹集600万美元,用于在Polkadot上建立先进的NFT系统标准
查看>>
JavaSE_day12 集合
查看>>
JavaSE_day14 集合中的Map集合_键值映射关系
查看>>
Day_15JavaSE 异常
查看>>
异常 Java学习Day_15
查看>>
JavaSE_day_03 方法
查看>>
day-03JavaSE_循环
查看>>
Mysql初始化的命令
查看>>