Elastisearch查询最近一年消费金额排名前五的用户
实现这一需求的主要思路是通过 Elasticsearch 的聚合功能来实现。
- 首先,我们需要从索引中筛选出过去一年内的订单记录;
- 然后,对这些记录进行聚合,按用户ID分组,并计算每个用户的总消费金额;
- 接着,根据总消费金额对用户进行排序,并限制结果为前五名;
- 最后,对于这前五名用户,我们还需要获取他们的完整姓名或者其他信息。
使用 range
查询来过滤出过去一年内的订单记录。
"range": {"order_date": {"gte": "now-1y/d","lt": "now/d"}
}
使用 terms
聚合按 customer_id
字段分组,并在每个分组内计算 taxful_total_price
字段的总和。在 terms
聚合中设置 order
参数,按照 total_spent
的降序排列,并设置 size
为 5 来限制结果数量。
"aggs": {"customers": {"terms": {"field": "customer_id","size": 5,"order": {"total_spent": "desc"}},"aggs": {"total_spent": {"sum": {"field": "taxful_total_price"}}}}
}
为了获取用户的完整姓名,我们在 terms
聚合中嵌入一个 top_hits
子聚合,top_hits
聚合可以用来获取每个分组中的部分文档数据。
"aggs": {"customers": {"terms": {"field": "customer_id","size": 5,"order": {"total_spent": "desc"}},"aggs": {"total_spent": {"sum": {"field": "taxful_total_price"}},"customer_details": {"top_hits": {"_source": {"includes": ["customer_full_name", "email", "phone_number"]},"size": 1}}}}
}
完整的查询语句
{"size": 0,"query": {"range": {"order_date": {"gte": "now-1y/d","lt": "now/d"}}},"aggs": {"customers": {"terms": {"field": "customer_id","size": 5,"order": {"total_spent": "desc"}},"aggs": {"total_spent": {"sum": {"field": "taxful_total_price"}},"customer_details": {"top_hits": {"_source": {"includes": ["customer_full_name", "email", "phone_number"]},"size": 1}}}}}
}
再加一个限定条件,收货地址是爱荷华州的,需要在 query
部分添加一个 term
查询来匹配 shipping_address_state
字段。
{"size": 0,"query": {"bool": {"must": [{"term": {"shipping_address_state": "IA"}},{"range": {"order_date": {"gte": "now-1y/d","lt": "now/d"}}}]}},"aggs": {"customers": {"terms": {"field": "customer_id","size": 5,"order": {"total_spent": "desc"}},"aggs": {"total_spent": {"sum": {"field": "taxful_total_price"}},"customer_details": {"top_hits": {"_source": {"includes": ["customer_full_name", "email", "phone_number"]},"size": 1}}}}}
}
must
子句中的第一个term
查询确保了只有那些shipping_address_state
字段值为 “IA” 的文档才会被包含。- 第二个
range
查询保持不变,依然用于筛选过去一年内的订单记录。
现在这个查询已经满足我们的需求了,能够得到我们想要的结果,接下来我们尝试优化下它。首先是数据模型方面,需要确保 shipping_address_state
字段被映射为 keyword
类型,order_date
被映射为 date
类型。然后是将 range
查询和 term
查询放在 filter
上下文中,因为它们是用于过滤数据的,这可以提高性能并允许 Elasticsearch 使用缓存。query
部分可以改造为:
"query": {"bool": {"filter": [{"term": {"shipping_address_state": "IA"}},{"range": {"order_date": {"gte": "now-1y/d","lt": "now/d"}}}]}}
在 Spring Boot 项目中使用 Spring Data Elasticsearch 实现上述查询
-
添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
-
定义一个与索引结构相对应的实体类
import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType;@Data @Document(indexName = "orders") public class Order {@Idprivate String id;@Field(type = FieldType.Keyword)private String customer_id;@Field(type = FieldType.Date, format = DateFormat.basic_date_time)private String order_date;@Field(type = FieldType.Float)private float taxful_total_price;@Field(type = FieldType.Keyword)private String shipping_address_state;}
-
定义一个接口继承
ElasticsearchRepository
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface OrderRepository extends ElasticsearchRepository<Order, String> { }
-
定义聚合结果的 DTO 类
@Data public class CustomerAggregationResult {private String customerId;private String customerFullName;private String email;private String phoneNumber;private double totalSpent;public CustomerAggregationResult(String customerId, String customerFullName, String email, String phoneNumber, double totalSpent) {this.customerId = customerId;this.customerFullName = customerFullName;this.email = email;this.phoneNumber = phoneNumber;this.totalSpent = totalSpent;} }
-
实现业务逻辑并构建查询
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.Sum; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; import org.springframework.data.elasticsearch.core.SearchHit; import org.springframework.data.elasticsearch.core.SearchHits; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.stereotype.Service;import java.util.ArrayList; import java.util.List; import java.util.Map;@Service public class CustomerOrderService {@Autowiredprivate ElasticsearchRestTemplate elasticsearchTemplate;/*** 获取过去一年内在美国爱荷华州下单并且消费金额排名前五的用户及其详细信息。** @return 一个包含用户信息和消费总额的列表*/public List<CustomerAggregationResult> getTopCustomersBySpendingInIowa() {// 创建布尔查询构建器BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();// 添加过滤条件:shipping_address_state 必须是 "IA"boolQuery.filter(QueryBuilders.termQuery("shipping_address_state", "IA"));// 添加过滤条件:order_date 在过去一年内boolQuery.filter(QueryBuilders.rangeQuery("order_date").gte("now-1y/d").lt("now/d"));// 构建 NativeSearchQueryNativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(boolQuery) // 设置布尔查询.addAggregation(AggregationBuilders.terms("customers") // 添加 terms 聚合.field("customer_id") // 按 customer_id 分组.size(5) // 返回前 5 个分组.order(BucketOrder.aggregation("total_spent", false)) // 按 total_spent 降序排序.subAggregation(AggregationBuilders.sum("total_spent").field("taxful_total_price")) // 计算每个分组的总消费额.subAggregation(AggregationBuilders.topHits("customer_details") // 获取每个分组的 top hits.setSize(1) // 只获取每个分组的一个 top hit.setFetchSource(new String[]{"customer_full_name", "email", "phone_number"}, null))) // 指定需要返回的字段.withPageable(PageRequest.of(0, 0)) // 设置 size 为 0,因为我们只关心聚合结果.build();// 执行查询SearchHits<?> searchHits = elasticsearchTemplate.search(searchQuery, Object.class);Aggregations aggregations = searchHits.getAggregations();// 如果没有聚合结果,返回空列表if (aggregations == null) {return new ArrayList<>();}// 获取 customers 聚合Terms customers = aggregations.get("customers");List<CustomerAggregationResult> results = new ArrayList<>();// 遍历每个分组for (Terms.Bucket entry : customers.getBuckets()) {// 获取 total_spent 聚合结果Sum totalSpent = entry.getAggregations().get("total_spent");// 获取 top_hits 聚合的第一个结果SearchHit<?> topHit = ((InternalTopHits) entry.getAggregations().get("customer_details")).getHits().getAt(0);// 将 top_hit 的内容转换为 MapMap<String, Object> sourceAsMap = topHit.getContentAsMap();// 从 Map 中提取客户信息String customerFullName = (String) sourceAsMap.get("customer_full_name");String email = (String) sourceAsMap.get("email");String phoneNumber = (String) sourceAsMap.get("phone_number");// 创建 CustomerAggregationResult 对象并添加到结果列表results.add(new CustomerAggregationResult(entry.getKeyAsString(), // 客户 IDcustomerFullName, // 客户全名email, // 电子邮件phoneNumber, // 电话号码totalSpent.getValue() // 总消费额));}// 返回结果列表return results;} }