一、简介

在本教程中,我们将继续探索 Java Kubernetes API。 这次,我们将展示如何使用 Watches 来高效监控集群事件。

2.什么是 Kubernetes 手表?

在我们之前介绍Kubernetes API的文章中,我们展示了如何恢复有关给定资源或其集合的信息。如果我们想要的只是获取这些资源在给定时间点的状态,那么这很好。然而,鉴于 Kubernetes 集群本质上是高度动态的,这通常还不够。

大多数情况下,我们还希望监视这些资源并跟踪发生的事件 。例如,我们可能对跟踪 Pod 生命周期事件或部署状态更改感兴趣。虽然我们可以使用轮询,但这种方法会受到一些限制。首先,随着监控资源数量的增加,它无法很好地扩展。其次,我们面临丢失轮询周期之间发生的事件的风险。

为了解决这些问题,Kubernetes 提出了 Watches 的概念, 它可用于通过 watch 查询参数进行所有资源收集 API 调用。当其值为 false 或省略时, GET 操作的行为与往常一样:服务器处理请求并返回与给定条件匹配的资源实例列表。 然而,传递 watch=true 会极大地改变它的行为:

  • 响应现在由一系列修改事件组成,包含修改类型和受影响的对象
  • 使用称为长轮询的技术发送第一批事件后,连接将保持打开状态

3. 创建 手表

Java Kubernetes API 通过 Watch 类支持 Watches ,该类有一个静态方法: createWatch。 该方法需要三个参数:

  • ApiClient ,处理对 Kubernetes API 服务器的实际 REST 调用
  • 描述要监视的资源集合的 Call 实例
  • 具有预期资源类型的 TypeToken

我们使用库中可用的任何 xxxApi 类的 listXXXCall() 方法之一创建 Call 实例。例如,要创建一个检测Pod事件的 Watch ,我们将使用 listPodForAllNamespacesCall()

CoreV1Api api = new CoreV1Api(client);
Call call = api.listPodForAllNamespacesCall(null, null, null, null, null, null, null, null, 10, true, null);
Watch<V1Pod> watch = Watch.createWatch(
  client, 
  call, 
  new TypeToken<Response<V1Pod>>(){}.getType()));

这里,我们对大多数参数使用 null ,意思是“使用默认值”,只有两个例外: timeoutwatch。 对于监视调用,后者必须设置为 true 。否则,这将是一次常规的休息通话。 在这种情况下, 超时 相当于手表的“生存时间”,这意味着一旦过期,服务器将停止发送事件并终止连接

超时 参数(以秒为单位表示)找到一个好的值需要进行一些尝试和错误,因为它取决于客户端应用程序的确切要求。此外,检查 Kubernetes 集群配置也很重要。通常,手表有 5 分钟的硬性限制,因此超过该时间将不会达到预期的效果。

4. 接收事件

仔细观察 Watch 类,我们可以看到它实现了标准 JRE 中的 IteratorIterable ,因此我们可以在 for-eachhasNext()-next() 循环中使用 createWatch() 返回的值:

for (Response<V1Pod> event : watch) {
    V1Pod pod = event.object;
    V1ObjectMeta meta = pod.getMetadata();
    switch (event.type) {
    case "ADDED":
    case "MODIFIED":
    case "DELETED":
        // ... process pod data
        break;
    default:
        log.warn("Unknown event type: {}", event.type);
    }
}

每个事件的 type 字段告诉我们对象(在我们的例子中是 Pod)发生了什么样的事件。一旦我们消耗了所有事件,我们必须对 Watch.createWatch() 进行新的调用以再次开始接收事件。在示例代码中,我们将 Watch 创建和结果处理放在 while 循环中。其他方法也是可能的,例如使用 ExecutorService 或类似的方法在后台接收更新。

5. 使用资源版本和书签

上面代码的一个问题是,每次我们创建一个新的 Watch 时, 都会有一个初始事件流,其中包含给定类型的所有现有资源实例。 发生这种情况是因为服务器假设我们没有任何有关它们的先前信息,因此它只是将它们全部发送

然而,这样做违背了有效处理事件的目的,因为我们只在初始加载后需要新事件。 为了防止再次接收所有数据,监视机制支持两个附加概念:资源版本和书签。

5.1.资源版本

Kubernetes 中的每个资源在其元数据中都包含一个 resourcesVersion 字段,该字段只是服务器每次发生更改时设置的不透明字符串。此外,由于资源集合也是一种资源,因此有一个与之关联的 资源版本 。当在集合中添加、删除和/或修改新资源时,此字段将相应更改。

当我们进行返回集合 包含 resourceVersion 参数的API调用时,服务器将使用其值作为查询的“起点”。 对于 Watch API 调用,这意味着仅包含在创建通知版本之后发生的事件。

但是,我们如何获取 资源版本 以包含在我们的调用中?简单:我们只需执行初始同步调用来检索初始资源列表,其中包括集合的 资源版本, 然后在后续 Watch 调用中使用它:

String resourceVersion = null;
while (true) {
    if (resourceVersion == null) {
        V1PodList podList = api.listPodForAllNamespaces(null, null, null, null, null, "false",
          resourceVersion, null, 10, null);
        resourceVersion = podList.getMetadata().getResourceVersion();
    }
    try (Watch<V1Pod> watch = Watch.createWatch(
      client,
      api.listPodForAllNamespacesCall(null, null, null, null, null, "false",
        resourceVersion, null, 10, true, null),
      new TypeToken<Response<V1Pod>>(){}.getType())) {
        
        for (Response<V1Pod> event : watch) {
            // ... process events
        }
    } catch (ApiException ex) {
        if (ex.getCode() == 504 || ex.getCode() == 410) {
            resourceVersion = extractResourceVersionFromException(ex);
        }
        else {
            resourceVersion = null;
        }
    }
}

在这种情况下,异常处理代码就相当重要了 。当由于某种原因请求的 资源版本 不存在时,Kubernetes 服务器将返回 504 或 410 错误代码。在这种情况下,返回的消息通常包含当前版本。不幸的是,这些信息不是以任何结构化方式出现的,而是作为错误消息本身的一部分。

提取代码(又名丑陋的黑客)使用正则表达式来实现此意图,但由于错误消息往往依赖于实现,因此代码会回退到 值。通过这样做,主循环返回到其起点,使用新的 资源版本 恢复新列表并恢复监视操作。

无论如何,即使有这个警告,关键是现在的事件列表不会在每块手表上从头开始。

5.2.书签

书签是一项可选功能,可在从 Watch 调用返回的事件流上启用特殊的 BOOKMARK 事件 。该事件在其元数据中包含一个 resourceVersion 值,我们可以在后续的 Watch 调用中将其用作新的起点。

由于这是一项选择性加入功能,因此我们必须通过在 API 调用上将 true 传递给 allowedWatchBookmarks 来显式启用它。该选项仅在创建 Watch 时有效,否则忽略。此外,服务器可能会完全忽略它,因此客户端根本不应该依赖于接收这些事件。

与之前单独使用 resourceVersion 的 方法相比,书签使我们能够避免昂贵的同步调用:

String resourceVersion = null;

while (true) {
    // Get a fresh list whenever we need to resync
    if (resourceVersion == null) {
        V1PodList podList = api.listPodForAllNamespaces(true, null, null, null, null,
          "false", resourceVersion, null, null, null);
        resourceVersion = podList.getMetadata().getResourceVersion();
    }

    while (true) {
        try (Watch<V1Pod> watch = Watch.createWatch(
          client,
          api.listPodForAllNamespacesCall(true, null, null, null, null, 
            "false", resourceVersion, null, 10, true, null),
          new TypeToken<Response<V1Pod>>(){}.getType())) {
              for (Response<V1Pod> event : watch) {
                  V1Pod pod = event.object;
                  V1ObjectMeta meta = pod.getMetadata();
                  switch (event.type) {
                      case "BOOKMARK":
                          resourceVersion = meta.getResourceVersion();
                          break;
                      case "ADDED":
                      case "MODIFIED":
                      case "DELETED":
                          // ... event processing omitted
                          break;
                      default:
                          log.warn("Unknown event type: {}", event.type);
                  }
              }
          }
        } catch (ApiException ex) {
            resourceVersion = null;
            break;
        }
    }
}

在这里,我们只需要在第一次传递时以及每当我们在内循环中得到 ApiException 时获取完整列表。请注意, BOOKMARK 事件与其他事件具有相同的对象类型,因此我们在这里不需要任何特殊的转换。然而,我们唯一关心的字段是 resourcesVersion ,我们将其保存以供下一次 Watch 调用。

六,结论

在本文中,我们介绍了使用 Java API 客户端创建 Kubernetes Watch 的不同方法。与往常一样,示例的完整源代码可以在 GitHub 上找到。