1. 概述
我们经常将大量数据存储在 Amazon S3 中,但分析这些数据可能颇具挑战。传统方法需要移动数据或设置复杂系统(如数据仓库)。
Amazon Athena 提供了更简单的解决方案,允许我们直接使用 SQL 查询 S3 数据。
在本教程中,我们将探索如何使用 Amazon Athena 通过 Spring Boot 分析 S3 存储桶中的数据。我们将逐步完成必要的配置、以编程方式执行 Athena 查询并处理结果。
2. 理解 Amazon Athena
Amazon Athena 是一种无服务器查询服务,允许我们对存储在 S3 存储桶中的数据执行即席查询,无需设置任何基础设施。
使用 Athena 的主要优势之一是:我们只需为执行查询时消耗的数据量付费,这使其成为即席和偶尔数据分析的经济高效解决方案。
Athena 还使用"读取时架构"(schema-on-read)将 S3 数据动态转换为表状结构。具体来说,这意味着我们无需修改源数据或执行任何 ETL(提取、转换、加载)操作即可查询数据。我们在 Athena 中定义的表不像传统数据库那样包含实际数据,而是存储如何转换源数据以进行查询的指令。
S3 存储桶中的数据可能来自各种 AWS 服务,例如 CloudTrail 日志、VPC 流日志 和 ALB 访问日志,甚至是我们以 JSON、XML、Parquet 等格式存储在 S3 中的自定义数据。
3. 项目设置
在使用 Amazon Athena 之前,我们需要添加其依赖项并正确配置应用程序。
3.1. 依赖项
首先,将 Amazon Athena 依赖项 添加到项目的 pom.xml 文件中:
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>athena</artifactId>
<version>2.26.0</version>
</dependency>
</dependencies>
此依赖项提供了 AthenaClient 和其他相关类,我们将使用它们与 Athena 服务交互。
3.2. 定义 Athena 配置属性
现在,为了与 Athena 服务交互并执行查询,我们需要配置 AWS 凭据进行身份验证、用于运行 SQL 查询的 Athena 数据库名称,以及查询结果位置(即 Athena 存储查询结果的 S3 存储桶)。
我们将这些属性存储在项目的 application.yaml 文件中,并使用 @ConfigurationProperties 将值映射到 POJO,服务层在与 Athena 交互时引用该 POJO:
@Getter
@Setter
@Validated
@ConfigurationProperties(prefix = "com.baeldung.aws")
class AwsConfigurationProperties {
@NotBlank
private String accessKey;
@NotBlank
private String secretKey;
@Valid
private Athena athena = new Athena();
@Getter
@Setter
public class Athena {
@Nullable
private String database = "default";
@NotBlank
private String s3OutputLocation;
}
}
s3OutputLocation 字段表示 Athena 存储查询结果的 S3 存储桶位置。这是必要的,因为 Athena 是无服务器的,本身不存储任何数据。相反,它执行查询并将结果写入指定的 S3 位置,然后我们的应用程序可以从中读取。
我们还添加了验证注解以确保所有必需的属性都正确配置。如果任何定义的验证失败,将导致 Spring ApplicationContext 启动失败。这使我们能够遵循快速失败模式。
以下是我们的 application.yaml 文件片段,它定义了将自动映射到 AwsConfigurationProperties 类的必需属性:
com:
baeldung:
aws:
access-key: ${AWS_ACCESS_KEY}
secret-key: ${AWS_SECRET_KEY}
athena:
database: ${AMAZON_ATHENA_DATABASE}
s3-output-location: ${AMAZON_ATHENA_S3_OUTPUT_LOCATION}
相应地,此设置允许我们将 Athena 属性外部化,并在应用程序中轻松访问它们。
4. 在 Spring Boot 中配置 Athena
现在我们已经定义了属性,让我们引用它们来配置与 Athena 交互所需的 bean。
4.1. 创建 AthenaClient Bean
AthenaClient 是与 Athena 服务交互的主要入口点。我们将创建一个 bean 来设置它:
@Bean
public AthenaClient athenaClient() {
String accessKey = awsConfigurationProperties.getAccessKey();
String secretKey = awsConfigurationProperties.getSecretKey();
AwsBasicCredentials awsCredentials = AwsBasicCredentials.create(accessKey, secretKey);
return AthenaClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
.build();
}
这里,我们使用配置的 AWS 凭据创建 AthenaClient 实例。此客户端用于启动查询执行并从 S3 存储桶检索结果。
4.2. 定义 QueryExecutionContext Bean
接下来,我们需要告诉 Athena 在运行 SQL 查询时使用哪个数据库:
@Bean
public QueryExecutionContext queryExecutionContext() {
String database = awsConfigurationProperties.getAthena().getDatabase();
return QueryExecutionContext.builder()
.database(database)
.build();
}
我们创建一个 QueryExecutionContext bean 并指定用于查询的数据库。数据库名称从我们的配置属性中检索,如果未明确指定,则默认为 default 数据库。
4.3. 设置 ResultConfiguration Bean
最后,我们需要配置 Athena 应该存储 SQL 查询结果的位置:
@Bean
public ResultConfiguration resultConfiguration() {
String outputLocation = awsConfigurationProperties.getAthena().getS3OutputLocation();
return ResultConfiguration.builder()
.outputLocation(outputLocation)
.build();
}
⚠️ 重要提示:用于存储查询结果的 S3 存储桶应与包含源数据的存储桶不同。
这种分离可防止查询结果被解释为额外的源数据,从而导致意外的查询结果。此外,Athena 应对源存储桶具有只读访问权限以保持数据完整性,并且仅对我们为存储结果而预配的存储桶授予写入权限。
5. 执行 Athena 查询
在完成必要的配置后,让我们看看如何使用 Athena 执行查询。我们将创建一个 QueryService 类,自动装配我们创建的所有 bean,并公开一个公共的 execute() 方法来封装查询执行逻辑。
5.1. 启动查询执行
首先,我们将使用 AthenaClient 实例启动查询执行:
public <T> List<T> execute(String sqlQuery, Class<T> targetClass) {
String queryExecutionId;
try {
queryExecutionId = athenaClient.startQueryExecution(query ->
query.queryString(sqlQuery)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
).queryExecutionId();
} catch (InvalidRequestException exception) {
log.error("Invalid SQL syntax detected in query {}", sqlQuery, exception);
throw new QueryExecutionFailureException();
}
// ... upcoming sections will show the rest of the implementation
}
我们提供 SQL 查询字符串、QueryExecutionContext 和 ResultConfiguration 来启动查询执行。startQueryExecution()* 方法返回一个唯一的 queryExecutionId,我们将使用它来跟踪查询状态并检索结果。
targetClass 参数指定我们将把查询结果映射到的 Java 类。
我们还处理 InvalidRequestException,如果提供的 SQL 查询包含语法错误,Athena SDK 会抛出该异常。我们捕获此异常,记录错误消息和无效查询,并抛出自定义的 QueryExecutionFailureException。
5.2. 等待查询完成
启动查询执行后,我们需要等待它完成,然后再尝试检索结果:
private static final long WAIT_PERIOD = 30;
private void waitForQueryToComplete(String queryExecutionId) {
QueryExecutionState queryState;
do {
GetQueryExecutionResponse response = athenaClient.getQueryExecution(request ->
request.queryExecutionId(queryExecutionId));
queryState = response.queryExecution().status().state();
switch (queryState) {
case FAILED:
case CANCELLED:
String error = response.queryExecution().status().athenaError().errorMessage();
log.error("Query execution failed: {}", error);
throw new QueryExecutionFailureException();
case QUEUED:
case RUNNING:
TimeUnit.MILLISECONDS.sleep(WAIT_PERIOD);
break;
case SUCCEEDED:
queryState = QueryExecutionState.SUCCEEDED;
return;
}
} while (queryState != QueryExecutionState.SUCCEEDED);
}
我们创建一个私有的 waitForQueryToComplete() 方法,并使用 getQueryExecution() 方法定期轮询查询状态,直到达到 SUCCEEDED 状态。
如果查询失败或被取消,我们记录错误消息并抛出自定义的 QueryExecutionFailureException。如果查询处于排队或运行状态,我们会等待一小段时间再检查。
我们从 execute() 方法调用 waitForQueryToComplete() 方法,并传入启动查询执行时收到的 queryExecutionId。
5.3. 处理查询结果
查询执行成功完成后,我们可以检索结果:
GetQueryResultsResponse queryResult = athenaClient.getQueryResults(request ->
request.queryExecutionId(queryExecutionId));
getQueryResults() 方法返回包含结果集的 GetQueryResultsResponse 对象。我们可以处理这些结果,并将它们转换为我们 execute() 方法的 targetClass 参数指定的类的实例:
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JsonOrgModule());
private <T> List<T> transformQueryResult(GetQueryResultsResponse queryResultsResponse, Class<T> targetClass) {
List<T> response = new ArrayList<T>();
List<Row> rows = queryResultsResponse.resultSet().rows();
List<String> headers = rows.get(0).data().stream().map(Datum::varCharValue).toList();
rows.stream()
.skip(1)
.forEach(row -> {
JSONObject element = new JSONObject();
List<Datum> data = row.data();
for (int i = 0; i < headers.size(); i++) {
String key = headers.get(i);
String value = data.get(i).varCharValue();
element.put(key, value);
}
T obj = OBJECT_MAPPER.convertValue(element, targetClass);
response.add(obj);
});
return response;
}
这里,我们从结果集的第一行提取标题,然后处理每一后续行,将其转换为 JSONObject,其中键是列名,值是对应的单元格值。然后,我们使用 ObjectMapper 将每个 JSONObject 转换为指定目标类的实例,表示域模型。这些域模型对象被添加到返回的列表中。
值得注意的是,我们的 transformQueryResult() 实现是通用的,适用于所有类型的读取查询,无论表或域模型如何。
5.4. 使用 execute() 方法执行 SQL 查询
在完全实现 execute() 方法后,我们现在可以轻松地对 S3 数据运行 SQL 查询,并将结果检索为域模型对象:
String query = "SELECT * FROM users WHERE age < 25;";
List<User> users = queryService.execute(query, User.class);
record User(Integer id, String name, Integer age, String city) {};
这里,我们定义了一个 SQL 查询,选择所有年龄小于 25 岁的用户。我们将此查询和 User 类传递给我们的 execute() 方法。User 类是一个简单的 record,表示我们期望检索的数据结构。
execute() 方法负责启动查询执行、等待其完成、检索结果并将它们转换为 User 对象列表*。这种抽象使我们能够专注于查询和域模型,而无需担心与 Athena 的底层交互。
5.5. 使用 Athena 的参数化语句
⚠️ 重要提示:当使用用户输入构建 SQL 查询时,我们应注意 SQL 注入攻击的风险。Athena 支持参数化语句,允许我们将 SQL 查询与参数值分开,为使用用户输入执行查询提供更安全的方式。虽然这里我们使用原始 SQL 查询进行演示,但在使用用户提供输入构建查询时,强烈建议使用参数化语句。
要使用参数化查询,我们可以修改 execute() 方法以接受可选的参数列表:
public <T> List<T> execute(String sqlQuery, List<String> parameters, Class<T> targetClass) {
// ... same as above
queryExecutionId = athenaClient.startQueryExecution(query ->
query.queryString(sqlQuery)
.queryExecutionContext(queryExecutionContext)
.resultConfiguration(resultConfiguration)
.executionParameters(parameters)
).queryExecutionId();
// ... same as above
}
我们向 execute() 方法添加了一个新的 parameters 参数,这是一个字符串值列表,将用于参数化查询。启动查询执行时,我们使用 executionParameters() 方法传递这些 parameters。
让我们看看如何使用更新后的 execute() 方法:
public List<User> getUsersByName(String name) {
String query = "SELECT * FROM users WHERE name = ?";
return queryService.execute(query, List.of(name), User.class);
}
此示例定义了一个带有占位符 '?' 的 SQL 查询,用于 name 参数。我们将名称值作为包含单个元素的列表传递给 execute() 方法,同时传递查询和目标类。
6. 自动化数据库和表创建
要使用 Athena 查询我们的 S3 数据,我们需要首先定义一个数据库和一个表,该表将映射到存储在 S3 存储桶中的数据。虽然我们可以使用 AWS 管理控制台手动创建这些,但将此过程作为应用程序启动的一部分自动化会更方便。
我们将设置必要数据库和表的 SQL 脚本放在一个新的 athena-init 目录中,该目录将在 src/main/resources 目录内创建。
要执行这些 SQL 脚本,我们将创建一个实现 ApplicationRunner 接口的 AthenaInitializer 类:
@Component
@RequiredArgsConstructor
class AthenaInitializer implements ApplicationRunner {
private final QueryService queryService;
private final ResourcePatternResolver resourcePatternResolver;
@Override
public void run(ApplicationArguments args) {
Resource[] initScripts = resourcePatternResolver.getResources("classpath:athena-init/*.sql");
for (Resource script : initScripts) {
String sqlScript = FileUtils.readFileToString(script.getFile(), StandardCharsets.UTF_8);
queryService.execute(sqlScript, Void.class);
}
}
}
使用 Lombok 的构造函数注入,我们注入 ResourcePatternResolver 和之前创建的 QueryService 实例。
我们使用 ResourcePatternResolver 定位 athena-init 目录中的所有 SQL 脚本。然后我们遍历这些脚本,使用 Apache Commons IO 读取其内容,并使用我们的 QueryService 执行它们。
我们首先创建一个 create-database.sql 脚本来创建自定义数据库:
CREATE DATABASE IF NOT EXISTS baeldung;
我们创建一个名为 baeldung 的自定义数据库(如果它尚不存在)。这里使用的数据库名称可以在 application.yaml 文件中配置,如本教程前面所述。
类似地,要在 baeldung 数据库中创建名为 users 的表,我们将创建另一个名为 create-users-table.sql 的脚本,内容如下:
CREATE EXTERNAL TABLE IF NOT EXISTS users (
id INT,
name STRING,
age INT,
city STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://baeldung-athena-tutorial-bucket/';
此脚本创建一个名为 users 的外部表,其列对应于我们将存储在 S3 中的 JSON 数据中的字段。我们将 JsonSerDe 指定为行格式,并提供我们将存储 JSON 文件的 S3 位置。
✅ 关键点:为了正确使用 Athena 查询存储在 S3 中的数据,确保每个 JSON 记录完全位于一行文本中,键和值之间没有空格或换行符:
{"id":1,"name":"Homelander","age":41,"city":"New York"}
{"id":2,"name":"Black Noir","age":58,"city":"Los Angeles"}
{"id":3,"name":"Billy Butcher","age":46,"city":"London"}
7. IAM 权限
最后,为了使我们的应用程序正常运行,我们需要为应用程序中配置的 IAM 用户配置一些权限。
我们的策略应配置 Athena 和 S3 访问权限:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAthenaQueryExecution",
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": "arn:aws:athena:region:account-id:workgroup/primary"
},
{
"Sid": "AllowS3ReadAccessToSourceBucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::baeldung-athena-tutorial-bucket",
"arn:aws:s3:::baeldung-athena-tutorial-bucket/*"
]
},
{
"Sid": "AllowS3AccessForAthenaQueryResults",
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::baeldung-athena-results-bucket",
"arn:aws:s3:::baeldung-athena-results-bucket/*"
]
},
{
"Sid": "AllowGlueCatalogAccessForAthena",
"Effect": "Allow",
"Action": [
"glue:CreateDatabase",
"glue:GetDatabase",
"glue:CreateTable",
"glue:GetTable"
],
"Resource": [
"arn:aws:glue:region:account-id:catalog",
"arn:aws:glue:region:account-id:database/baeldung",
"arn:aws:glue:region:account-id:table/baeldung/users"
]
}
]
}
IAM 策略包含四个关键语句,用于构建 Spring Boot 应用程序所需的权限:
- ✅ AllowAthenaQueryExecution:提供与 Athena 本身交互的必要权限,包括启动查询、检查状态和检索结果。
- ✅ AllowS3ReadAccessToSourceBucket:允许对我们包含要查询的源数据的 S3 存储桶进行读取访问。
- ✅ AllowS3AccessForAthenaQueryResults:专注于 Athena 存储查询结果的 S3 存储桶。它授予 Athena 将结果写入配置的 S3 存储桶以及我们的应用程序检索它们的权限。
- ✅ AllowGlueCatalogAccessForAthena:允许与 AWS Glue(Athena 用作其元数据存储)交互。它允许我们创建和检索数据库和表定义,这些定义对于 Athena 理解 S3 数据结构和执行 SQL 查询至关重要。
我们的 IAM 策略遵循最小权限原则,仅授予应用程序正常运行所需的必要权限。
8. 结论
在本文中,我们探讨了如何使用 Amazon Athena 和 Spring Boot 直接从 S3 存储桶查询数据,而无需设置任何复杂的基础设施。
我们讨论了启动查询执行、等待其完成以及通用处理查询结果。此外,我们还通过在应用程序启动期间执行的 SQL 脚本自动化了数据库和表的创建。
与往常一样,本文中使用的所有代码示例都可以在 GitHub 上找到。