基于zookeeper的分布式锁实现(监听临时节点的上一个节点)


声明:本文转载自https://my.oschina.net/u/3426682/blog/1540284,转载目的在于传递更多信息,仅供学习交流之用。如有侵权行为,请联系我,我会及时删除。

    1.父节点:/disLocks1(zk根目录下的disLocks1目录,CreateMode.PERSISTENT类型)

      2.所有需要获取锁的线程,都会在/disLocks1目录下建立一个临时顺序的子节点(CreateMode.EPHEMERAL_SEQUENTIAL类型)

           3.每次都是序号最小的节点获取锁,当最小的节点业务逻辑处理完毕后,断开本次连接(或者删除当前子节点),则临时顺序的节点自动删除,接着让其他没有获取锁的节点去获取锁

          每当一台机器去创建了一个临时顺序节点,去获取锁失败的时候(本节点不是序号最小的节点),则找出当前节点的前一个节点,并在该节点上注册一个监听事件,当前一个节点删除(获取锁成功),则后一个节点立即去获取锁。

         相比于监听父节点机制的优势:不用所有的子节点都去监听父节点,讲子节点的监听事件均匀的分布在了对应的兄弟子节点上,这样在高并发的情况下,避免了当子节点删除的时候,父节点的监听事件过多,父节点每次需要触发大量的监听事件。

  贴代码:

 

package zoo.com.zk.zoo.lock;  import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch;  import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat;   /**  *   * zk分布式锁实现  * 基于监听父节点下面的全部子节点实现,效率较低  * */ public class DistributedLock2 implements Watcher{ 	 	public static String host="127.0.0.1:2181";     //缓存时间       private static final int TIME_OUT  = 2000;           private static String FATHER_PATH = "/disLocks1";          private ZooKeeper zk;          private int threadId;          protected  CountDownLatch countDownLatch=new CountDownLatch(1);           private String lockPath;          public DistributedLock2(int threadId){     	this.threadId = threadId;     }          public void getZkClient(String host,int timeout) 	{     	try {     		if(null == zk){         		zk = new ZooKeeper(host, timeout, this);     		} 		} catch (IOException e) { 			e.printStackTrace(); 		} 	} 	/** 	 *  	 *  	 * */     public String createNode(){     	try {     		//检测节点是否存在 			Stat stat = zk.exists(FATHER_PATH, false); 			//父节点不存在,则创建父节点 			if(Objects.isNull(stat)){ 				synchronized (FATHER_PATH) { 					Stat stat2 = zk.exists(FATHER_PATH, false); 					if(Objects.isNull(stat2)){ 						//父节点是持久节点 						String path = zk.create(FATHER_PATH, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 						System.out.println("父节点创建成功,返回值【"+path+"】"); 					} 				} 			} 			//创建持久性父节点下面的临时顺序子节点,/父节点路径/0000000002 	        String lockPath = zk.create(FATHER_PATH+"/",null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);           	System.out.println("线程【"+threadId+"】开始执行,子节点创建成功,返回值【"+lockPath+"】");         	return lockPath; 		} catch (KeeperException e1) { 			e1.printStackTrace(); 		} catch (InterruptedException e1) { 			e1.printStackTrace(); 		} 		return null;     }          //校验当前节点是否为序号最小的节点     public boolean checkLockPath(String lockPath){     	try {     		//注册父节点监听事件,当父节点下面的子节点有变化,就会触发Watcher事件 			List<String> nodeList = zk.getChildren(FATHER_PATH, false); 			Collections.sort(nodeList); 			int index = nodeList.indexOf( lockPath.substring(FATHER_PATH.length()+1));   		     switch (index){   		         case -1:{   		        	 System.out.println("本节点已不在了"+lockPath); 		             return false;   		         }   		         case 0:{   		        	 System.out.println("线程【"+threadId+"】获取锁成功,子节点序号【"+lockPath+"】"); 		             return true;   		         }   		         default:{   		             String waitPath = nodeList.get(index - 1);   		             zk.exists(FATHER_PATH+"/"+waitPath, this); 		             System.out.println(waitPath+"在"+nodeList.get(index)+"点前面,需要等待【"+nodeList.get(index)+"】"); 		             return false; 		         }  		     } 		} catch (KeeperException e) { 			e.printStackTrace(); 		} catch (InterruptedException e) { 			e.printStackTrace(); 		} 		return false;     }          public boolean getLock(){     	//创建获取锁的节点(顺序临时节点)     	String childPath = createNode();     	boolean flag = true;     	if(null != childPath){     		lockPath = childPath; 			try { 				//轮询等待zk获取锁的通知 				while(flag){ 					if(checkLockPath(childPath)){ 						//获取锁成功 						return true; 					}else{ 						//节点创建成功, 则等待zk通知 						countDownLatch.await(); 					} 				} 			} catch (InterruptedException e) { 				e.printStackTrace(); 			}     	}else{     		System.out.println("节点没有创建成功,获取锁失败");     	}     	return false;     }      	public void process(WatchedEvent event) { 		//成功连接zk,状态判断 		if(event.getState() == KeeperState.SyncConnected){ 			//子节点有变化 			if(event.getType() == EventType.NodeDeleted){ 				System.out.println("临时节点自动删除"); 				countDownLatch.countDown(); 			} 		}		 	} 	 	public void unlock(){ 		try { 			zk.delete(getLockPath(), -1); 			zk.close(); 		} catch (InterruptedException e) { 			e.printStackTrace(); 		} catch (KeeperException e) { 			e.printStackTrace(); 		} 	} 	 	public String getLockPath() { 		return lockPath; 	} 	public ZooKeeper getZooKeeper(){ 		return zk; 	} 	public static void main(String[] args) throws KeeperException, InterruptedException { 		 for(int i=0; i < 10; i++){               final int threadId = i+1;               new Thread(){                   @Override                   public void run() {                       try{                           DistributedLock2 dis = new DistributedLock2(threadId);                           dis.getZkClient(host,TIME_OUT);                         if(dis.getLock()){                         	Thread.sleep(200);                         	dis.unlock();                         }                     } catch (Exception e){                       	System.out.println("【第"+threadId+"个线程】 抛出的异常:");                         e.printStackTrace();                       }                   }               }.start();   	     } 	} } 

 

 

 

 

       

    

本文发表于2017年09月20日 14:37
(c)注:本文转载自https://my.oschina.net/u/3426682/blog/1540284,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如有侵权行为,请联系我们,我们会及时删除.

阅读 2076 讨论 0 喜欢 0

抢先体验

扫码体验
趣味小程序
文字表情生成器

闪念胶囊

你要过得好哇,这样我才能恨你啊,你要是过得不好,我都不知道该恨你还是拥抱你啊。

直抵黄龙府,与诸君痛饮尔。

那时陪伴我的人啊,你们如今在何方。

不出意外的话,我们再也不会见了,祝你前程似锦。

这世界真好,吃野东西也要留出这条命来看看

快捷链接
网站地图
提交友链
Copyright © 2016 - 2021 Cion.
All Rights Reserved.
京ICP备2021004668号-1