Apache ZooKeeper Watcher 机制总结

分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程。不同节点上的进程相互协调行为的过程叫做分布式同步。许多分布式系统需要一个进程作为任务的协调者,执行一些其他进程并不执行的特殊的操作,一般情况下哪个进程担当任务的协调者都无所谓,但是必须有一个进程作为协调者,自动选举出一个协调者的过程就是分布式选举。Zookeeper正式为了解决这一系列问题而生的,本文介绍Watcher机制,首先介绍一个监听示例,然后再聊聊Watcher机制原理。

Zookeeper Watcher机制

集群状态监控示例

为了确保集群能够正常运行,ZooKeeper 可以被用来监视集群状态,这样就可以提供集群高可用性。使用 ZooKeeper 的瞬时(ephemeral)节点概念可以设计一个集群机器状态检测机制:

1. 每一个运行了 ZooKeeper 客户端的生产环境机器都是一个终端进程,我们可以在它们连接到 ZooKeeper 服务端后在 ZooKeeper 服务端创建一系列对应的瞬时节点,可以用/hostname 来进行区分。

2. 这里还是采用监听(Watcher)方式来完成对节点状态的监视,通过对/hostname 节点的 NodeChildrenChanged 事件的监听来完成这一目标。监听进程是作为一个独立的服务或者进程运行的,它覆盖了 process 方法来实现应急措施。

3. 由于是一个瞬时节点,所以每次客户端断开时 znode 会立即消失,这样我们就可以监听到集群节点异常。

4. NodeChildrenChanged 事件触发后我们可以调用 getChildren 方法来知道哪台机器发生了异常。

清单1:ClusterMonitor类

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterMonitor implements Runnable {
    private static String membershipRoot = "/Members";
    private final Watcher connectionWatcher;
    private final Watcher childrenWatcher;
    private ZooKeeper zk;
    boolean alive = true;

    public ClusterMonitor(String HostPort) throws IOException, InterruptedException, KeeperException {
        connectionWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                if (event.getType() == Watcher.Event.EventType.None
                        && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    System.out.println("\nconnectionWatcher Event Received:%s" + event.toString());
                }
            }
        };

        childrenWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // TODO Auto-generated method stub
                System.out.println("\nchildrenWatcher Event Received:%s" + event.toString());
                if (event.getType() == Event.EventType.NodeChildrenChanged) {
                    try {
                        // Get current list of child znode and reset the watch
                        List<String> children = zk.getChildren(membershipRoot, this);
                        System.out.println("Cluster Membership change,Members: " + children);
                    } catch (KeeperException ex) {
                        throw new RuntimeException(ex);
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        alive = false;
                        throw new RuntimeException(ex);
                    }
                }
            }
        };

        zk = new ZooKeeper(HostPort, 2000, connectionWatcher);
        // Ensure the parent znode exists
        if (zk.exists(membershipRoot, false) == null) {
            zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        // Set a watch on the parent znode
        List<String> children = zk.getChildren(membershipRoot, childrenWatcher);
        System.err.println("Members:" + children);
    }

    public synchronized void close() {
        try {
            zk.close();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            synchronized (this) {
                while (alive) {
                    wait();
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            this.close();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        if (args.length != 1) {
            System.err.println("Usage:ClusterMonitor<Host:Port>");
            System.exit(0);
        }
        String hostPort = args[0];
        new ClusterMonitor(hostPort).run();
    }

}

清单2:ClusterClient类

import java.io.IOException;
import java.lang.management.ManagementFactory;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class ClusterClient implements Watcher, Runnable {
    private static String membershipRoot = "/Members";
    ZooKeeper zk;

    public ClusterClient(String hostPort, Long pid) {
        String processId = pid.toString();
        try {
            zk = new ZooKeeper(hostPort, 2000, this);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
        if (zk != null) {
            try {
                zk.create(membershipRoot + '/' + processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL);
            } catch (KeeperException | InterruptedException ex) {
                ex.printStackTrace();
            }
        }
    }

    public synchronized void close() {
        try {
            zk.close();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        // TODO Auto-generated method stub
        System.out.println("\nEvent Received:%s" + event.toString());
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            synchronized (this) {
                while (true) {
                    wait();
                }
            }
        } catch (InterruptedException ex) {
            ex.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            this.close();
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            System.err.println("Usage:ClusterClient<Host:Port>");
            System.exit(0);
        }

        String hostPort = args[0];
        // Get the process id
        String name = ManagementFactory.getRuntimeMXBean().getName();
        int index = name.indexOf('@');
        Long processId = Long.parseLong(name.substring(0, index));
        new ClusterClient(hostPort, processId).run();
    }

}

清单3:Eclipse运行输出

childrenWatcher Event Received:%sWatchedEvent state:SyncConnected type:NodeChildrenChanged path:/Members
Cluster Membership change,Members: [dweref0000000009, test100000000003, dsdawqeqw0000000008, 
test111110000000004, test22220000000005, dsda32130000000007, dsda0000000006, test10000000002]

我们通过 zkCli 方式对被监听的/Members 这个 ZNODE 操作,增加一个子节点,您会在 zkCli 里看到如清单 4 所示输出。

清单4:ZKCli 创建 ZNode 子节点

[zk: localhost:2181(CONNECTED) 0] create -s /Members/dweref rew23rf
Created /Members/dweref0000000009 [zk: localhost:2181(CONNECTED) 4]

上面的示例我们演示了如何发起对于一个 ZNODE 的监听,当该 ZNODE 被改变后,我们会触发对应的方法进行处理,这类方式可以被用在数据监听、集群状态监听等用途。

回调函数

由于 Watcher 机制涉及到回调函数,所以我们先来介绍一下回调函数的基础知识。

打个比方,有一家旅馆提供叫醒服务,但是要求旅客自己决定叫醒的方法。可以是打客房电话,也可以是派服务员去敲门,睡得死怕耽误事的,还可以要求往自己头上浇盆水。这里,“叫醒”这个行为是旅馆提供的,相当于库函数,但是叫醒的方式是由旅客决定并告诉旅馆的,也就是回调函数。而旅客告诉旅馆怎么叫醒自己的动作,也就是把回调函数传入库函数的动作,称为登记回调函数(to register a callback function)。

乍看起来,回调似乎只是函数间的调用,但仔细一琢磨,可以发现两者之间的一个关键的不同:在回调中,我们利用某种方式,把回调函数像参数一样传入中间函数。可以这么理解,在传入一个回调函数之前,中间函数是不完整的。换句话说,程序可以在运行时,通过登记不同的回调函数,来决定、改变中间函数的行为。这就比简单的函数调用要灵活太多了。

回调实际上有两种:阻塞式回调和延迟式回调。两者的区别在于:阻塞式回调里,回调函数的调用一定发生在起始函数返回之前;而延迟式回调里,回调函数的调用有可能是在起始函数返回之后。我们来看一个简单的示例。

清单5:Caller类

public class Caller {
    public MyCallInterface mc;

    public void setCallfuc(MyCallInterface mc) {
        this.mc = mc;
    }

    public void call() {
        this.mc.method();
    }
}

清单6:MyCallInterface 接口

public interface MyCallInterface {
    public void method();
}

清单7:CallbackClass 类

public class CallbackClass implements MyCallInterface {
    public void method() {
        System.out.println("回调函数");
    }

    public static void main(String args[]) {
        Caller call = new Caller();
        call.setCallfuc(new CallbackClass());
        call.call();
    }
}

清单8:运行结果

回调函数


原理及源代码解释

实现原理

ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三部分。在具体工作流程上,简单地讲,客户端在向 ZooKeeper 服务器注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManager 中。当 ZooKeeper 服务器端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。如清单 9 所示,WatchManager 创建了一个 HashMap,这个 HashMap 被用来存放 Watcher 对象。

清单9:WatchManager 类

public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

整个Watcher注册和通知流程如下图所示:

Watcher 注册和通知流程图

Watcher 接口

Watcher 的理念是启动一个客户端去接收从 ZooKeeper 服务端发过来的消息并且同步地处理这些信息。ZooKeeper 的 Java API 提供了公共接口 Watcher,具体操作类通过实现这个接口相关的方法来实现从所连接的 ZooKeeper 服务端接收数据。如果要处理这个消息,需要为客户端注册一个 CallBack(回调)对象。Watcher 接口定义在 org.apache.zookeeper 包里面,代码如清单 10 所示。

清单10:Watcher接口

import org.apache.zookeeper.WatchedEvent;

public interface Watcher {
    abstract public void process(WatchedEvent event);
}

在 Watcher 接口里面,除了回调函数 process 以外,还包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型,如下图所示。

Watcher通知状态和事件类型表

process 方法是 Watcher 接口中的一个回调方法,当 ZooKeeper 向客户端发送一个 Watcher 事件通知时,客户端就会对相应的 process 方法进行回调,从而实现对事件的处理。

process 方法包含 WatcherEvent 类型的参数,WatchedEvent 包含了每一个事件的三个基本属性:通知状态(KeeperState)、事件类型(EventType)和节点路径(Path),ZooKeeper 使用 WatchedEvent 对象来封装服务端事件并传递给 Watcher,从而方便回调方法 process 对服务端事件进行处理。

WatchedEvent 和 WatcherEvent 都表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent 是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而 WatcherEvent 因为实现了序列化接口,因此可以用于网络传输。

服务端在线程 WatchedEvent 事件之后,会调用 getWrapper 方法将自己包装成一个可序列化的 WatcherEvent 事件,如清单 7 所示,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将 WatcherEvent 事件还原成一个 WatchedEvent 事件,并传递给 process 方法处理,回调方法 process 根据入参就能够解析出完整的服务端事件了。

清单11:可序列化的事件

public WatcherEvent getWrapper() {
    return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

ZooKeeper Watcher 特性总结

1. 注册只能确保一次消费

无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。如果注册一个 Watcher 之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。

2. 客户端串行执行

客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。

3. 轻量级设计

WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分的内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对 NodeDataChanged 事件,ZooKeeper 的 Watcher 只会通知客户指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据,这也是 ZooKeeper 的 Watcher 机制的一个非常重要的特性。另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递到服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。这样轻量级的 Watcher 机制设计,在网络开销和服务端内存开销上都是非常廉价的。

参考资料

本文标题:Apache ZooKeeper Watcher 机制总结

本文链接:http://yedward.net/post/423.html

本文版权归作者所有,欢迎转载,转载请以文字链接的形式注明文章出处。

本博客不提供评论功能,有任何问题请发送邮件至:yedward92@qq.com

相关文章