普通的zk用法,如下写法:
zk.Exists("/aaa", true); zk.Create(...);
但是由于这些API会抛Zookeeper的Exception,比如ConnectionLossException, NoNodeException等,所以必须配合一堆try/catch的机制来catch错误,catch后再处理...
写起来很麻烦
因此写了个RetryHelper来封装上面这个try/catch行为,用起来也比较方便,如下:
RetryHelper helper=RetryHelper.Make(); helper.CreateNodeStructure = () => { Console.WriteLine("CreateNodeStructure"); }; helper.FixConnectionLossAction = () => { Console.WriteLine("FixConnectionLossAction");}; helper.IfErrorThen = () => { Console.WriteLine("IfErrorThen"); }; helper.Execute(() => { this.zk.GetChildren(...); });
上面的意思是如果在Execute中,如果报错了,则会看报错的是哪种类型,如果是ConnectionLoss则执行FixConnectionLossAction委托,如果是NoNode则执行建立节点的委托
也就是将最常见的2个zookeeper动作给结构化了:建立节点目录结构以及连接丢失时的重新连接动作
RetryHelper代码:
public class RetryHelper { private int retryDelay = 500; private long signal = 0; public Action IfErrorThen; public Action CreateNodeStructure; public Action FixConnectionLossAction; public static RetryHelper Make() { return new RetryHelper(); } public void Execute(Action action) { while (true) { try { action(); break; } catch (ZooKeeperNet.KeeperException.NoNodeException ex) { //create node structure Console.WriteLine("retry helper NoNodeException: " + ex.Message); if (CreateNodeStructure != null) RetryHelper.Make().Execute(CreateNodeStructure); continue; } catch (ZooKeeperNet.KeeperException.ConnectionLossException ex) { Console.WriteLine("retry helper ConnectionLossException: " + ex.Message); long attempSignal = Interlocked.Read(ref signal); while (Interlocked.Read(ref signal) > 0) Thread.Sleep(retryDelay); if (attempSignal == 0) { Interlocked.Increment(ref signal); if (FixConnectionLossAction != null) RetryHelper.Make().Execute(FixConnectionLossAction); Interlocked.Decrement(ref signal); } continue; } catch (Exception ex) { Console.WriteLine("retry helper catch: " + ex.Message); Thread.Sleep(retryDelay); if (IfErrorThen != null) IfErrorThen(); continue; } } } }
仔细看上面代码的朋友肯定也注意到里面catch connectionloss exception的代码块中使用了Interlocked,这是因为:在多线程系统侠,如果zk连接丢失了,由于多个地方都在尝试zk操作,所以会导致并发性的进入catch loss connection exception代码处理块,如果此时不加判断的处理所有并发请求,则会出现连接多次到zk,严重影响性能;因此,这里的代码实际上意图是将多次连接请求合并为一次连接。此处特别感谢我同事的code review,哈哈。
下面是个测试并发消除的demo,为了让结果清晰,我把RetryHelper的catch中的Console.WriteLine注释了
static void Main(string[] args) { RetryHelper helper=RetryHelper.Make(); helper.CreateNodeStructure = () => { Console.WriteLine("CreateNodeStructure"); }; helper.FixConnectionLossAction = () => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId+" FixConnectionLossAction BEGIN "+DateTime.Now.ToString()); Thread.Sleep(2000); Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " FixConnectionLossAction END " + DateTime.Now.ToString()); }; helper.IfErrorThen = () => { Console.WriteLine("IfErrorThen"); }; var tasks=new List<Task>(); for (int i = 0; i < 10; i++) { var task = new Task(() => { helper.Execute(() => { throw new ZooKeeperNet.KeeperException.ConnectionLossException(); }); }); tasks.Add(task); } tasks.ForEach(t=>t.Start()); Task.WaitAll(tasks.ToArray()); Console.ReadKey(); }
运行: