一、简介
在本教程中,我们将继续探索 Java Kubernetes API。 这次,我们将展示如何使用 Watches 来高效监控集群事件。
2.什么是 Kubernetes 手表?
在我们之前介绍Kubernetes API的文章中,我们展示了如何恢复有关给定资源或其集合的信息。如果我们想要的只是获取这些资源在给定时间点的状态,那么这很好。然而,鉴于 Kubernetes 集群本质上是高度动态的,这通常还不够。
大多数情况下,我们还希望监视这些资源并跟踪发生的事件 。例如,我们可能对跟踪 Pod 生命周期事件或部署状态更改感兴趣。虽然我们可以使用轮询,但这种方法会受到一些限制。首先,随着监控资源数量的增加,它无法很好地扩展。其次,我们面临丢失轮询周期之间发生的事件的风险。
为了解决这些问题,Kubernetes 提出了 Watches 的概念, 它可用于通过 watch 查询参数进行所有资源收集 API 调用。当其值为 false 或省略时, GET 操作的行为与往常一样:服务器处理请求并返回与给定条件匹配的资源实例列表。 然而,传递 watch=true 会极大地改变它的行为:
- 响应现在由一系列修改事件组成,包含修改类型和受影响的对象
- 使用称为长轮询的技术发送第一批事件后,连接将保持打开状态
3. 创建 手表
Java Kubernetes API 通过 Watch 类支持 Watches ,该类有一个静态方法: createWatch。 该方法需要三个参数:
- ApiClient ,处理对 Kubernetes API 服务器的实际 REST 调用
- 描述要监视的资源集合的 Call 实例
- 具有预期资源类型的 TypeToken
我们使用库中可用的任何 xxxApi 类的 listXXXCall() 方法之一创建 Call 实例。例如,要创建一个检测Pod事件的 Watch ,我们将使用 listPodForAllNamespacesCall() :
CoreV1Api api = new CoreV1Api(client);
Call call = api.listPodForAllNamespacesCall(null, null, null, null, null, null, null, null, 10, true, null);
Watch<V1Pod> watch = Watch.createWatch(
client,
call,
new TypeToken<Response<V1Pod>>(){}.getType()));
这里,我们对大多数参数使用 null ,意思是“使用默认值”,只有两个例外: timeout 和 watch。 对于监视调用,后者必须设置为 true 。否则,这将是一次常规的休息通话。 在这种情况下, 超时 相当于手表的“生存时间”,这意味着一旦过期,服务器将停止发送事件并终止连接 。
为 超时 参数(以秒为单位表示)找到一个好的值需要进行一些尝试和错误,因为它取决于客户端应用程序的确切要求。此外,检查 Kubernetes 集群配置也很重要。通常,手表有 5 分钟的硬性限制,因此超过该时间将不会达到预期的效果。
4. 接收事件
仔细观察 Watch 类,我们可以看到它实现了标准 JRE 中的 Iterator 和 Iterable ,因此我们可以在 for-each 或 hasNext()-next() 循环中使用 createWatch() 返回的值:
for (Response<V1Pod> event : watch) {
V1Pod pod = event.object;
V1ObjectMeta meta = pod.getMetadata();
switch (event.type) {
case "ADDED":
case "MODIFIED":
case "DELETED":
// ... process pod data
break;
default:
log.warn("Unknown event type: {}", event.type);
}
}
每个事件的 type 字段告诉我们对象(在我们的例子中是 Pod)发生了什么样的事件。一旦我们消耗了所有事件,我们必须对 Watch.createWatch() 进行新的调用以再次开始接收事件。在示例代码中,我们将 Watch 创建和结果处理放在 while 循环中。其他方法也是可能的,例如使用 ExecutorService 或类似的方法在后台接收更新。
5. 使用资源版本和书签
上面代码的一个问题是,每次我们创建一个新的 Watch 时, 都会有一个初始事件流,其中包含给定类型的所有现有资源实例。 发生这种情况是因为服务器假设我们没有任何有关它们的先前信息,因此它只是将它们全部发送 。
然而,这样做违背了有效处理事件的目的,因为我们只在初始加载后需要新事件。 为了防止再次接收所有数据,监视机制支持两个附加概念:资源版本和书签。
5.1.资源版本
Kubernetes 中的每个资源在其元数据中都包含一个 resourcesVersion 字段,该字段只是服务器每次发生更改时设置的不透明字符串。此外,由于资源集合也是一种资源,因此有一个与之关联的 资源版本 。当在集合中添加、删除和/或修改新资源时,此字段将相应更改。
当我们进行返回集合 并 包含 resourceVersion 参数的API调用时,服务器将使用其值作为查询的“起点”。 对于 Watch API 调用,这意味着仅包含在创建通知版本之后发生的事件。
但是,我们如何获取 资源版本 以包含在我们的调用中?简单:我们只需执行初始同步调用来检索初始资源列表,其中包括集合的 资源版本, 然后在后续 Watch 调用中使用它:
String resourceVersion = null;
while (true) {
if (resourceVersion == null) {
V1PodList podList = api.listPodForAllNamespaces(null, null, null, null, null, "false",
resourceVersion, null, 10, null);
resourceVersion = podList.getMetadata().getResourceVersion();
}
try (Watch<V1Pod> watch = Watch.createWatch(
client,
api.listPodForAllNamespacesCall(null, null, null, null, null, "false",
resourceVersion, null, 10, true, null),
new TypeToken<Response<V1Pod>>(){}.getType())) {
for (Response<V1Pod> event : watch) {
// ... process events
}
} catch (ApiException ex) {
if (ex.getCode() == 504 || ex.getCode() == 410) {
resourceVersion = extractResourceVersionFromException(ex);
}
else {
resourceVersion = null;
}
}
}
在这种情况下,异常处理代码就相当重要了 。当由于某种原因请求的 资源版本 不存在时,Kubernetes 服务器将返回 504 或 410 错误代码。在这种情况下,返回的消息通常包含当前版本。不幸的是,这些信息不是以任何结构化方式出现的,而是作为错误消息本身的一部分。
提取代码(又名丑陋的黑客)使用正则表达式来实现此意图,但由于错误消息往往依赖于实现,因此代码会回退到 空 值。通过这样做,主循环返回到其起点,使用新的 资源版本 恢复新列表并恢复监视操作。
无论如何,即使有这个警告,关键是现在的事件列表不会在每块手表上从头开始。
5.2.书签
书签是一项可选功能,可在从 Watch 调用返回的事件流上启用特殊的 BOOKMARK 事件 。该事件在其元数据中包含一个 resourceVersion 值,我们可以在后续的 Watch 调用中将其用作新的起点。
由于这是一项选择性加入功能,因此我们必须通过在 API 调用上将 true 传递给 allowedWatchBookmarks 来显式启用它。该选项仅在创建 Watch 时有效,否则忽略。此外,服务器可能会完全忽略它,因此客户端根本不应该依赖于接收这些事件。
与之前单独使用 resourceVersion 的 方法相比,书签使我们能够避免昂贵的同步调用:
String resourceVersion = null;
while (true) {
// Get a fresh list whenever we need to resync
if (resourceVersion == null) {
V1PodList podList = api.listPodForAllNamespaces(true, null, null, null, null,
"false", resourceVersion, null, null, null);
resourceVersion = podList.getMetadata().getResourceVersion();
}
while (true) {
try (Watch<V1Pod> watch = Watch.createWatch(
client,
api.listPodForAllNamespacesCall(true, null, null, null, null,
"false", resourceVersion, null, 10, true, null),
new TypeToken<Response<V1Pod>>(){}.getType())) {
for (Response<V1Pod> event : watch) {
V1Pod pod = event.object;
V1ObjectMeta meta = pod.getMetadata();
switch (event.type) {
case "BOOKMARK":
resourceVersion = meta.getResourceVersion();
break;
case "ADDED":
case "MODIFIED":
case "DELETED":
// ... event processing omitted
break;
default:
log.warn("Unknown event type: {}", event.type);
}
}
}
} catch (ApiException ex) {
resourceVersion = null;
break;
}
}
}
在这里,我们只需要在第一次传递时以及每当我们在内循环中得到 ApiException 时获取完整列表。请注意, BOOKMARK 事件与其他事件具有相同的对象类型,因此我们在这里不需要任何特殊的转换。然而,我们唯一关心的字段是 resourcesVersion ,我们将其保存以供下一次 Watch 调用。
六,结论
在本文中,我们介绍了使用 Java API 客户端创建 Kubernetes Watch 的不同方法。与往常一样,示例的完整源代码可以在 GitHub 上找到。