本文档的目的是解释如何将Apache策展人用于分布式领导者选举。它的应用程序旨在满足大型数据集处理高可用性的需求。
这对于试图了解如何为分布式领导者选举实施解决方案的任何人都会有用。
问题
当前,我们将一个数据集分为分区,并且正在通过一个节点进行处理。为了增强系统的可用性,我们打算引入一个负责处理数据集的动态节点。每个节点将具有专门处理任何给定分区的能力。换句话说,在任何给定时间都只能分配一个节点来处理特定分区。
为了处理增加的工作量,我们有灵活性可以根据需要从池中添加或删除节点。这意味着,当添加或删除节点时,必须相应地将分区工人重新分配。但是,这种再分配可能会导致工作量失衡。为了解决这个问题,我们介绍了每个分区首选节点的概念。这意味着每当有节点可用时,每个分区都会在其首选节点上运行,有助于维护更平衡的工作负载分布。
每个节点将维护一组完整的分区工人,这些分区工人可以存在于两个状态:活动或潜在。只有活跃的工人进行实际工作,在节点中,每个工人的副本只能活跃。当添加新节点时,它将初始化以潜在状态设置的整个分区工作者。如果这些分区工人中的任何一个都将新节点作为他们的首选节点,那么他们最终将在该节点上变得活跃,同时在以前的节点上变得潜在。删除节点时,它托管的所有主动分区工人将过渡到其他节点上的活动状态。
解决方案
为了实现这一目标,我们将使用Apache Curator。对于活动/潜在状态,我们将利用LeaderSelector食谱,这将使我们可以独家访问分区。活跃的分区工作者将是领导者过程,而潜在的分区工人将是等待成为领导者的过程。public void start() {
this.selector = new LeaderSelector(client, "/partitions/" + partition.getID(),
new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
try {
LOGGER.info("Node {} taking leadership of partition {}", node.getID(), partition.getID());
job.star(); // blocking call
LOGGER.info("Node {} gracefully finishing leadership of partition {}", node.getID(),
partition.getID());
} catch (Exception e) {
LOGGER.error(
"Error running worker process in node " + node.getID() + " as leader of partition "
+ partition.getID(), e);
throw e;
} finally {
relinquish();
}
}
});
client.getCuratorListenable().addListener((c, event) -> {
if (event.getType() == CuratorEventType.CLOSING) {
relinquish();
}
});
selector.autoRequeue();
selector.setId(node.getID());
selector.start();
LOGGER.info("Node {} started worker for partition {}", node.getID(), partition.getID());
}
,如果当前节点是分区的首选节点,那么我们将定期检查每个分区工作者的计划过程。如果不是偏爱的人,工人将放弃其在该节点中的领导地位。对于这一部分,我们将利用LeaderSelector participants知道什么节点还活着。
public boolean isLeaderNotPreferred() {
return isLeader() && isNotPreferred();
}
public boolean isLeader() {
return selector.hasLeadership();
}
private boolean isNotPreferred() {
return partition.getPreferredNodeID().map(preferredNodeID -> {
return !preferredNodeID.equals(this.node.getID()) && isNodeAlive(preferredNodeID);
}).orElse(false);
}
private boolean isNodeAlive(String nodeID) {
return getAliveNodeIDs().contains(nodeID);
}
public Set<String> getAliveNodeIDs() {
try {
return selector.getParticipants().stream()//
.map(Participant::getId)//
.collect(Collectors.toSet());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
此设计可确保高可用性。该系统能够通过添加/删除节点来响应节点故障,并在需要时向上/向下扩展。上限是分区的数量。 OTOH需要对分区首选节点进行一些手动调整,以平衡节点之间的负载。