22import logging
33
44from celery .exceptions import TimeoutError
5- from celery .result import AsyncResult
6- from celery .states import FAILURE , PENDING , SUCCESS
5+ # from celery.result import AsyncResult
6+ from celery .states import FAILURE , PENDING , STARTED , SUCCESS
77from django .contrib .auth .decorators import login_required
88from django .core import serializers
99from django .http import (
2222
2323from app .constants .str import PERMISSION_DENIED
2424from app .models import Item
25- from app .worker .app_celery import ATTEMPT_LIMIT , PROGRESS
25+ from app .worker .app_celery import ATTEMPT_LIMIT
2626from app .worker .tasks import receiptor
2727from app .worker .tasks .exporter import exporter
2828from app .worker .tasks .importers import historical_data_importer
29+ from reboot .celery import app
2930
3031logger = logging .getLogger (__name__ )
31-
32+ tasks_cache = {}
33+ results_cache = {}
3234
3335@require_GET
3436@login_required (login_url = "/login" )
@@ -121,14 +123,21 @@ def poll_state(request: HttpRequest):
121123 request = request ,
122124 err_msg = "The task_id query parameter of the request was omitted." )
123125
124- task = AsyncResult (task_id )
126+ task = app . AsyncResult (task_id )
125127 res = JsonResponse (_poll_state (PENDING , 0 , 200 ))
128+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
126129 if task .state == FAILURE or task .failed ():
127130 res = JsonResponse (_poll_state (FAILURE , 0 , 400 ))
128- elif task .state == PROGRESS :
131+ elif task .state == STARTED :
129132 res = JsonResponse (task .result ) if isinstance (
130133 task .result , dict ) else HttpResponse (task .result )
131134 elif task .state == SUCCESS or task .successful () or task .ready ():
135+ tasks_cache [task_id ] = task
136+ try :
137+ results_cache [task_id ] = task .get (timeout = 5 )
138+ print ("!!! saved" , results_cache [task_id ], task .result )
139+ except Exception as e :
140+ print (f"!!! error" , e )
132141 res = HttpResponse (SUCCESS )
133142 return res
134143
@@ -142,13 +151,22 @@ def download_file(request: HttpRequest):
142151 task_id = request .GET .get ("task_id" )
143152 task_name = request .GET .get ("task_name" , "task" )
144153 attempts = 0
145- # CloudAMQP free tier is unstable and must be circuit breakered
154+ # if task_id in results_cache:
155+ # return results_cache[task_id]
146156 while (attempts < ATTEMPT_LIMIT ):
147157 try :
148158 attempts += 1
149- task = AsyncResult (task_id )
150- result = task .get (timeout = 0.5 * attempts )
159+ # if tasks_cache[task_id]:
160+ # task = tasks_cache[task_id]
161+ # del tasks_cache[task_id]
162+ # else:
163+ # task = app.AsyncResult(task_id)
164+ task = tasks_cache [task_id ] if task_id in tasks_cache else app .AsyncResult (task_id )
165+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
166+ result = task .get (timeout = 1.0 * attempts )
151167 print (f"{ task } { task_name } success #{ attempts } : { result } " )
168+ if task_id in tasks_cache :
169+ del tasks_cache [task_id ]
152170 break
153171 except TimeoutError :
154172 print (f"{ task } { task_name } failed #{ attempts } " )
@@ -158,6 +176,7 @@ def download_file(request: HttpRequest):
158176 err_msg = "Download exceeded max attempts" )
159177 return result
160178 except Exception as e :
179+ print (f"!!! error" , e )
161180 return _error (request = request , err_msg = f"Failed to download file: { e } " )
162181
163182
0 commit comments