22import  logging 
33
44from  celery .exceptions  import  TimeoutError 
5- from  celery .result  import  AsyncResult 
6- from  celery .states  import  FAILURE , PENDING , SUCCESS 
5+ #  from celery.result import AsyncResult
6+ from  celery .states  import  FAILURE , PENDING , STARTED ,  SUCCESS 
77from  django .contrib .auth .decorators  import  login_required 
88from  django .core  import  serializers 
99from  django .http  import  (
2222
2323from  app .constants .str  import  PERMISSION_DENIED 
2424from  app .models  import  Item 
25- from  app .worker .app_celery  import  ATTEMPT_LIMIT ,  PROGRESS 
25+ from  app .worker .app_celery  import  ATTEMPT_LIMIT 
2626from  app .worker .tasks  import  receiptor 
2727from  app .worker .tasks .exporter  import  exporter 
2828from  app .worker .tasks .importers  import  historical_data_importer 
29+ from  reboot .celery  import  app 
2930
3031logger  =  logging .getLogger (__name__ )
31- 
32+ tasks_cache   =  {} 
3233
3334@require_GET  
3435@login_required (login_url = "/login" ) 
@@ -121,14 +122,16 @@ def poll_state(request: HttpRequest):
121122            request = request ,
122123            err_msg = "The task_id query parameter of the request was omitted." )
123124
124-     task  =  AsyncResult (task_id )
125+     task  =  app . AsyncResult (task_id )
125126    res  =  JsonResponse (_poll_state (PENDING , 0 , 200 ))
127+     print (f"!!! task id={ task_id } { task .state } { task .successful ()} { task .ready ()} { task .failed ()}  )
126128    if  task .state  ==  FAILURE  or  task .failed ():
127129        res  =  JsonResponse (_poll_state (FAILURE , 0 , 400 ))
128-     elif  task .state  ==  PROGRESS :
130+     elif  task .state  ==  STARTED :
129131        res  =  JsonResponse (task .result ) if  isinstance (
130132            task .result , dict ) else  HttpResponse (task .result )
131133    elif  task .state  ==  SUCCESS  or  task .successful () or  task .ready ():
134+         tasks_cache [task_id ] =  task 
132135        res  =  HttpResponse (SUCCESS )
133136    return  res 
134137
@@ -142,12 +145,17 @@ def download_file(request: HttpRequest):
142145        task_id  =  request .GET .get ("task_id" )
143146        task_name  =  request .GET .get ("task_name" , "task" )
144147        attempts  =  0 
148+         if  tasks_cache [task_id ]:
149+             result  =  tasks_cache [task_id ].get (timeout = 5 )
150+             del  tasks_cache [task_id ]
151+             return  result 
145152        # CloudAMQP free tier is unstable and must be circuit breakered 
146153        while  (attempts  <  ATTEMPT_LIMIT ):
147154            try :
148155                attempts  +=  1 
149-                 task  =  AsyncResult (task_id )
150-                 result  =  task .get (timeout = 0.5  *  attempts )
156+                 task  =  app .AsyncResult (task_id )
157+                 print (f"!!! task id={ task_id } { task .state } { task .successful ()} { task .ready ()} { task .failed ()}  )
158+                 result  =  task .get (timeout = 1.0  *  attempts )
151159                print (f"{ task } { task_name } { attempts } { result }  )
152160                break 
153161            except  TimeoutError :
0 commit comments