一、简介

在我们之前的文章中,我们研究了使用DataStax Astra构建一个仪表板来查看复仇者联盟的当前状态,DataStax Astra 是一个由Apache Cassandra提供支持的 DBaaS,使用Stargate提供额外的 API 来使用它。

在本文中,我们将对此进行扩展以存储离散事件而不是汇总摘要。这将使我们能够在 UI 中查看这些事件。 我们将允许用户单击一张卡片并获取导致我们走到这一步的事件表。与摘要不同,这些事件将分别代表一名复仇者和一个离散的时间点。每次收到新事件时,它都会与所有其他事件一起附加到表中。

我们使用 Cassandra 来实现这一点,是因为它提供了一种非常有效的方式来存储时间序列数据,而我们写入的次数比读取的次数多得多。这里的目标是一个可以频繁更新(例如每 30 秒)的系统,然后可以让用户轻松查看已记录的最新事件。

2. 构建数据库架构

与我们在上一篇文章中使用的文档 API 不同,它将使用RESTGraphQL API 构建。 它们在 Cassandra 表之上工作,并且这些 API 可以完全相互协作以及与 CQL API 协作。

为了使用这些,我们需要为我们存储数据的表定义一个模式。我们使用的表旨在与特定模式配合使用 - 按事件发生的顺序查找给定 Avenger 的事件。

该架构如下所示:

CREATE TABLE events (
    avenger text,
    timestamp timestamp,
    latitude decimal,
    longitude decimal,
    status decimal,
    PRIMARY KEY (avenger, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);

数据与此类似:

复仇者

时间戳

纬度

经度

地位

2021-05-16 09:00:30.000000+0000

40.715255

-73.975353

0.999954

鹰眼

2021-05-16 09:00:30.000000+0000

40.714602

-73.975238

0.99986

鹰眼

2021-05-16 09:01:00.000000+0000

40.713572

-73.975289

0.999804

这将我们的表定义为具有多行分区,分区键为“avenger”,集群键为“timestamp”。 Cassandra 使用分区键来确定数据存储在哪个节点上。聚类键用于确定数据在分区内的存储顺序。

通过指示“avenger”是我们的分区键,它将确保同一 Avenger 的所有数据保存在一起。通过指示“时间戳”是我们的聚类键,它将以最有效的顺序存储该分区中的数据,以便我们检索。鉴于我们对此数据的核心查询是为单个 Avenger 选择每个事件(我们的分区键),按事件的时间戳排序(我们的集群键),Cassandra 可以让我们非常有效地访问它。

此外,应用程序的设计使用方式意味着我们以近乎连续的方式写入事件数据。例如,我们可能每 30 秒从每个 Avenger 那里收到一个新事件。以这种方式构建表可以非常有效地将新事件插入到正确分区中的正确位置。

为了方便起见,我们用于预填充数据库的脚本也将创建并填充此模式。

3. 使用 Astra、REST 和 GraphQL API 构建客户端层

我们将出于不同目的使用 REST 和 GraphQL API 与 Astra 进行交互 。 REST API 将用于将新事件插入表中。 GraphQL API 将用于再次检索它们。

为了最好地做到这一点,我们需要一个可以与 Astra 进行交互的客户端层。这些相当于我们在上一篇文章中为其他两个 API 构建的 DocumentClient 类。

3.1.休息客户端

首先,我们的 REST 客户端。 我们将使用它来插入新的完整记录 ,因此只需要一个方法来插入数据:

@Repository
public class RestClient {
  @Value("https://${ASTRA_DB_ID}-${ASTRA_DB_REGION}.apps.astra.datastax.com/api/rest/v2/keyspaces/${ASTRA_DB_KEYSPACE}")
  private String baseUrl;

  @Value("${ASTRA_DB_APPLICATION_TOKEN}")
  private String token;

  private RestTemplate restTemplate;

  public RestClient() {
    this.restTemplate = new RestTemplate();
    this.restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
  }

  public <T> void createRecord(String table, T record) {
    var uri = UriComponentsBuilder.fromHttpUrl(baseUrl)
      .pathSegment(table)
      .build()
      .toUri();
    var request = RequestEntity.post(uri)
      .header("X-Cassandra-Token", token)
      .body(record);
    restTemplate.exchange(request, Map.class);
  }
}

3.2. GraphQL 客户端

然后是我们的 GraphQL 客户端。 这次我们进行完整的 GraphQL 查询并返回它获取的数据

@Repository
public class GraphqlClient {
  @Value("https://${ASTRA_DB_ID}-${ASTRA_DB_REGION}.apps.astra.datastax.com/api/graphql/${ASTRA_DB_KEYSPACE}")
  private String baseUrl;

  @Value("${ASTRA_DB_APPLICATION_TOKEN}")
  private String token;

  private RestTemplate restTemplate;

  public GraphqlClient() {
    this.restTemplate = new RestTemplate();
    this.restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
  }

  public <T> T query(String query, Class<T> cls) {
    var request = RequestEntity.post(baseUrl)
      .header("X-Cassandra-Token", token)
      .body(Map.of("query", query));
    var response = restTemplate.exchange(request, cls);
  
    return response.getBody();
  }
}

和以前一样,我们的 baseUrltoken 字段是根据定义如何与 Astra 对话的属性进行配置的。这些客户端类都知道如何构建与数据库交互所需的完整 URL。我们可以使用它们发出正确的 HTTP 请求来执行所需的操作。

这就是与 Astra 交互所需的全部内容,因为这些 API 只需通过 HTTP 交换 JSON 文档即可工作。

4. 记录个人事件

为了显示事件,我们需要能够记录它们。这将建立在我们之前更新 状态 表的功能之上,并将另外将新记录插入到 事件 表中。

4.1.插入事件

我们首先需要的是该表中数据的表示。这将表示为 Java 记录:

public record Event(String avenger, 
  String timestamp,
  Double latitude,
  Double longitude,
  Double status) {}

这与我们之前定义的模式直接相关。 当我们实际进行 API 调用时,Jackson 会将其转换为 REST API 的正确 JSON。

接下来,我们需要我们的服务层来实际记录这些。这将从外部获取适当的详细信息,使用时间戳对其进行扩充,并调用我们的 REST 客户端来创建新记录:

@Service
public class EventsService {
  @Autowired
  private RestClient restClient;

  public void createEvent(String avenger, Double latitude, Double longitude, Double status) {
    var event = new Event(avenger, Instant.now().toString(), latitude, longitude, status);

    restClient.createRecord("events", event);
  }
}

4.2.更新API

最后,我们需要一个控制器来接收事件。 这是对我们在上一篇文章中编写的 UpdateController 的 扩展,以连接新的 EventsService ,然后从我们的 update 方法中调用它

@RestController
public class UpdateController {
  ......
  @Autowired
  private EventsService eventsService;

  @PostMapping("/update/{avenger}")
  public void update(@PathVariable String avenger, @RequestBody UpdateBody body) throws Exception {
    eventsService.createEvent(avenger, body.lat(), body.lng(), body.status());
    statusesService.updateStatus(avenger, lookupLocation(body.lat(), body.lng()), getStatus(body.status()));
  }
  ......
}

此时,调用我们的 API 来记录 Avenger 的状态将更新状态文档并将新记录插入事件表中。这将使我们能够记录发生的每个更新事件。

这意味着每次我们收到更新 Avenger 状态的电话时,我们都会向该表添加一条新记录。 实际上,我们需要通过修剪或添加额外的分区来支持存储的数据规模,但这超出了本文的范围。

5. 通过 GraphQL API 向用户提供事件

一旦我们的表中有事件,下一步就是将它们提供给用户。 我们将使用 GraphQL API 来实现这一点,一次检索给定 Avenger 的一页事件,并且始终按顺序排列,以便最新的事件排在前面

使用 GraphQL,我们还能够仅检索我们真正感兴趣的字段子集,而不是全部。如果我们要获取大量记录,那么这可以帮助降低有效负载大小,从而提高性能。

5.1.检索事件

我们需要的第一件事是我们正在检索的数据的表示。 这是表中存储的实际数据的子集。因此,我们需要一个不同的类来表示它:

public record EventSummary(String timestamp,
  Double latitude,
  Double longitude,
  Double status) {}

我们还需要一个类来表示这些列表的 GraphQL 响应。这将包括事件摘要列表和用于光标到下一页的页面状态:

public record Events(List<EventSummary> values, String pageState) {}

现在,我们可以在事件服务中创建一个新方法来实际执行搜索。

public class EventsService {
  ......
  @Autowired
  private GraphqlClient graphqlClient;

  public Events getEvents(String avenger, String offset) {
    var query = "query {" + 
      "  events(filter:{avenger:{eq:\"%s\"}}, orderBy:[timestamp_DESC], options:{pageSize:5, pageState:%s}) {" +
      "    pageState " +
      "    values {" +
      "     timestamp " +
      "     latitude " +
      "     longitude " +
      "     status" +
      "   }" +
      "  }" +
      "}";

    var fullQuery = String.format(query, avenger, offset == null ? "null" : "\"" + offset + "\"");

    return graphqlClient.query(fullQuery, EventsGraphqlResponse.class).data().events();
  }

  private static record EventsResponse(Events events) {}
  private static record EventsGraphqlResponse(EventsResponse data) {}
}

这里我们有几个内部类,它们的存在纯粹是为了表示 GraphQL API 返回的 JSON 结构,直到我们感兴趣的部分——这些完全是 GraphQL API 的产物。

然后,我们有一个方法可以为我们想要的详细信息构建 GraphQL 查询,按 avenger 字段进行过滤,并按 时间戳 字段降序排序。我们将其替换为实际的 Avenger ID 和要使用的页面状态,然后将其传递到 GraphQL 客户端以获取实际数据。

5.2.在 UI 中显示事件

现在我们可以从数据库中获取事件,然后我们可以将其连接到我们的 UI。

首先,我们将更新我们在上一篇文章中编写的 StatusesController 以支持用于获取事件的 UI 端点:

public class StatusesController {
  ......

  @Autowired
  private EventsService eventsService;

  @GetMapping("/avenger/{avenger}")
  public Object getAvengerStatus(@PathVariable String avenger, @RequestParam(required = false) String page) {
    var result = new ModelAndView("dashboard");
    result.addObject("avenger", avenger);
    result.addObject("statuses", statusesService.getStatuses());
    result.addObject("events", eventsService.getEvents(avenger, page));

    return result;
  }
}

然后我们需要更新模板来呈现事件表。我们将向 dashboard.html 文件添加一个新表,仅当 事件 对象存在于从控制器接收到的模型中时才会呈现该新表:

......
    <div th:if="${events}">
      <div class="row">
        <table class="table">
          <thead>
            <tr>
              <th scope="col">Timestamp</th>
              <th scope="col">Latitude</th>
              <th scope="col">Longitude</th>
              <th scope="col">Status</th>
            </tr>
          </thead>
          <tbody>
            <tr th:each="data, iterstat: ${events.values}">
              <th scope="row" th:text="${data.timestamp}">
                </td>
              <td th:text="${data.latitude}"></td>
              <td th:text="${data.longitude}"></td>
              <td th:text="${(data.status * 100) + '%'}"></td>
            </tr>
          </tbody>
        </table>
      </div>

      <div class="row" th:if="${events.pageState}">
        <div class="col position-relative">
          <a th:href="@{/avenger/{id}(id = ${avenger}, page = ${events.pageState})}"
            class="position-absolute top-50 start-50 translate-middle">Next
            Page</a>
        </div>
      </div>
    </div>
  </div>
......

这包括底部的一个链接,用于转到下一页,该链接通过我们的事件数据的页面状态和我们正在查看的复仇者的 ID。

最后,我们需要更新状态卡,以允许我们链接到该条目的事件表。这只是每个卡标题周围的超链接,在 status.html 中呈现:

......
  <a th:href="@{/avenger/{id}(id = ${data.avenger})}">
    <h5 class="card-title" th:text="${data.name}"></h5>
  </a>
......

我们现在可以启动应用程序,然后从卡片中单击以查看导致此状态的最新事件:

六、总结

在这里,我们了解了如何使用 Astra REST 和 GraphQL API 来处理基于行的数据,以及它们如何协同工作 。我们还开始了解 Cassandra 和这些 API 在海量数据集上的应用效果如何。

本文中的所有代码都可以在GitHub上找到。