22import logging
33
44from celery .exceptions import TimeoutError
5- from celery .result import AsyncResult
6- from celery .states import FAILURE , PENDING , SUCCESS
5+ # from celery.result import AsyncResult
6+ from celery .states import FAILURE , PENDING , STARTED , SUCCESS
77from django .contrib .auth .decorators import login_required
88from django .core import serializers
99from django .http import (
2222
2323from app .constants .str import PERMISSION_DENIED
2424from app .models import Item
25- from app .worker .app_celery import ATTEMPT_LIMIT , PROGRESS
25+ from app .worker .app_celery import ATTEMPT_LIMIT
2626from app .worker .tasks import receiptor
2727from app .worker .tasks .exporter import exporter
2828from app .worker .tasks .importers import historical_data_importer
29+ from reboot .celery import app
2930
3031logger = logging .getLogger (__name__ )
31-
32+ tasks_cache = {}
3233
3334@require_GET
3435@login_required (login_url = "/login" )
@@ -121,14 +122,16 @@ def poll_state(request: HttpRequest):
121122 request = request ,
122123 err_msg = "The task_id query parameter of the request was omitted." )
123124
124- task = AsyncResult (task_id )
125+ task = app . AsyncResult (task_id )
125126 res = JsonResponse (_poll_state (PENDING , 0 , 200 ))
127+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
126128 if task .state == FAILURE or task .failed ():
127129 res = JsonResponse (_poll_state (FAILURE , 0 , 400 ))
128- elif task .state == PROGRESS :
130+ elif task .state == STARTED :
129131 res = JsonResponse (task .result ) if isinstance (
130132 task .result , dict ) else HttpResponse (task .result )
131133 elif task .state == SUCCESS or task .successful () or task .ready ():
134+ tasks_cache [task_id ] = task
132135 res = HttpResponse (SUCCESS )
133136 return res
134137
@@ -142,12 +145,17 @@ def download_file(request: HttpRequest):
142145 task_id = request .GET .get ("task_id" )
143146 task_name = request .GET .get ("task_name" , "task" )
144147 attempts = 0
145- # CloudAMQP free tier is unstable and must be circuit breakered
146148 while (attempts < ATTEMPT_LIMIT ):
147149 try :
148150 attempts += 1
149- task = AsyncResult (task_id )
150- result = task .get (timeout = 0.5 * attempts )
151+ # if tasks_cache[task_id]:
152+ # task = tasks_cache[task_id]
153+ # del tasks_cache[task_id]
154+ # else:
155+ # task = app.AsyncResult(task_id)
156+ task = app .AsyncResult (task_id )
157+ print (f"!!! task id={ task_id } ,state={ task .state } ,successful={ task .successful ()} ,ready={ task .ready ()} ,failed={ task .failed ()} " )
158+ result = task .get (timeout = 1.0 * attempts )
151159 print (f"{ task } { task_name } success #{ attempts } : { result } " )
152160 break
153161 except TimeoutError :
@@ -158,6 +166,7 @@ def download_file(request: HttpRequest):
158166 err_msg = "Download exceeded max attempts" )
159167 return result
160168 except Exception as e :
169+ print (f"!!! error" , e )
161170 return _error (request = request , err_msg = f"Failed to download file: { e } " )
162171
163172
0 commit comments