|  | 
|  | 1 | +# EXAMPLE: query_agg | 
|  | 2 | +# HIDE_START | 
|  | 3 | +import json | 
|  | 4 | +import redis | 
|  | 5 | +from redis.commands.json.path import Path | 
|  | 6 | +from redis.commands.search import Search | 
|  | 7 | +from redis.commands.search.aggregation import AggregateRequest | 
|  | 8 | +from redis.commands.search.field import NumericField, TagField | 
|  | 9 | +from redis.commands.search.indexDefinition import IndexDefinition, IndexType | 
|  | 10 | +import redis.commands.search.reducers as reducers | 
|  | 11 | + | 
|  | 12 | +r = redis.Redis(decode_responses=True) | 
|  | 13 | + | 
|  | 14 | +# create index | 
|  | 15 | +schema = ( | 
|  | 16 | +    TagField("$.condition", as_name="condition"), | 
|  | 17 | +    NumericField("$.price", as_name="price"), | 
|  | 18 | +) | 
|  | 19 | + | 
|  | 20 | +index = r.ft("idx:bicycle") | 
|  | 21 | +index.create_index( | 
|  | 22 | +    schema, | 
|  | 23 | +    definition=IndexDefinition(prefix=["bicycle:"], index_type=IndexType.JSON), | 
|  | 24 | +) | 
|  | 25 | + | 
|  | 26 | +# load data | 
|  | 27 | +with open("data/query_em.json") as f: | 
|  | 28 | +    bicycles = json.load(f) | 
|  | 29 | + | 
|  | 30 | +pipeline = r.pipeline(transaction=False) | 
|  | 31 | +for bid, bicycle in enumerate(bicycles): | 
|  | 32 | +    pipeline.json().set(f'bicycle:{bid}', Path.root_path(), bicycle) | 
|  | 33 | +pipeline.execute() | 
|  | 34 | +# HIDE_END | 
|  | 35 | + | 
|  | 36 | +# STEP_START agg1 | 
|  | 37 | +search = Search(r, index_name="idx:bicycle") | 
|  | 38 | +aggregate_request = AggregateRequest(query='@condition:{new}') \ | 
|  | 39 | +    .load('__key', 'price') \ | 
|  | 40 | +    .apply(discounted='@price - (@price * 0.1)') | 
|  | 41 | +res = search.aggregate(aggregate_request) | 
|  | 42 | +print(len(res.rows)) # >>> 5 | 
|  | 43 | +print(res.rows) # >>> [['__key', 'bicycle:0', ... | 
|  | 44 | +#[['__key', 'bicycle:0', 'price', '270', 'discounted', '243'], | 
|  | 45 | +# ['__key', 'bicycle:5', 'price', '810', 'discounted', '729'], | 
|  | 46 | +# ['__key', 'bicycle:6', 'price', '2300', 'discounted', '2070'], | 
|  | 47 | +# ['__key', 'bicycle:7', 'price', '430', 'discounted', '387'], | 
|  | 48 | +# ['__key', 'bicycle:8', 'price', '1200', 'discounted', '1080']] | 
|  | 49 | +# REMOVE_START | 
|  | 50 | +assert len(res.rows) == 5 | 
|  | 51 | +# REMOVE_END | 
|  | 52 | +# STEP_END | 
|  | 53 | + | 
|  | 54 | +# STEP_START agg2 | 
|  | 55 | +search = Search(r, index_name="idx:bicycle") | 
|  | 56 | +aggregate_request = AggregateRequest(query='*') \ | 
|  | 57 | +    .load('price') \ | 
|  | 58 | +    .apply(price_category='@price<1000') \ | 
|  | 59 | +    .group_by('@condition', reducers.sum('@price_category').alias('num_affordable')) | 
|  | 60 | +res = search.aggregate(aggregate_request) | 
|  | 61 | +print(len(res.rows)) # >>> 3 | 
|  | 62 | +print(res.rows) # >>> | 
|  | 63 | +#[['condition', 'refurbished', 'num_affordable', '1'], | 
|  | 64 | +# ['condition', 'used', 'num_affordable', '1'], | 
|  | 65 | +# ['condition', 'new', 'num_affordable', '3']] | 
|  | 66 | +# REMOVE_START | 
|  | 67 | +assert len(res.rows) == 3 | 
|  | 68 | +# REMOVE_END | 
|  | 69 | +# STEP_END | 
|  | 70 | + | 
|  | 71 | +# STEP_START agg3 | 
|  | 72 | +search = Search(r, index_name="idx:bicycle") | 
|  | 73 | +aggregate_request = AggregateRequest(query='*') \ | 
|  | 74 | +    .apply(type="'bicycle'") \ | 
|  | 75 | +    .group_by('@type', reducers.count().alias('num_total')) | 
|  | 76 | +res = search.aggregate(aggregate_request) | 
|  | 77 | +print(len(res.rows)) # >>> 1 | 
|  | 78 | +print(res.rows) # >>> [['type', 'bicycle', 'num_total', '10']] | 
|  | 79 | +# REMOVE_START | 
|  | 80 | +assert len(res.rows) == 1 | 
|  | 81 | +# REMOVE_END | 
|  | 82 | +# STEP_END | 
|  | 83 | + | 
|  | 84 | +# STEP_START agg4 | 
|  | 85 | +search = Search(r, index_name="idx:bicycle") | 
|  | 86 | +aggregate_request = AggregateRequest(query='*') \ | 
|  | 87 | +    .load('__key') \ | 
|  | 88 | +    .group_by('@condition', reducers.tolist('__key').alias('bicycles')) | 
|  | 89 | +res = search.aggregate(aggregate_request) | 
|  | 90 | +print(len(res.rows)) # >>> 3 | 
|  | 91 | +print(res.rows) # >>> | 
|  | 92 | +#[['condition', 'refurbished', 'bicycles', ['bicycle:9']], | 
|  | 93 | +# ['condition', 'used', 'bicycles', ['bicycle:1', 'bicycle:2', 'bicycle:3', 'bicycle:4']], | 
|  | 94 | +# ['condition', 'new', 'bicycles', ['bicycle:5', 'bicycle:6', 'bicycle:7', 'bicycle:0', 'bicycle:8']]] | 
|  | 95 | +# REMOVE_START | 
|  | 96 | +assert len(res.rows) == 3 | 
|  | 97 | +# REMOVE_END | 
|  | 98 | +# STEP_END | 
|  | 99 | + | 
|  | 100 | +# REMOVE_START | 
|  | 101 | +# destroy index and data | 
|  | 102 | +r.ft("idx:bicycle").dropindex(delete_documents=True) | 
|  | 103 | +# REMOVE_END | 
0 commit comments