11from __future__ import absolute_import
22import json
3+ import copy
34import threading
45from datetime import datetime , timedelta
56from mixpanel import BufferedConsumer as SynchronousBufferedConsumer
@@ -78,6 +79,8 @@ def __init__(self, flush_after=timedelta(0, 10), flush_first=True, max_size=20,
7879 self .flush_after = flush_after
7980 self .flush_first = flush_first
8081
82+ self ._async_buffers = copy .deepcopy (self ._buffers )
83+
8184 if not self .flush_first :
8285 self .last_flushed = datetime .now ()
8386 else :
@@ -106,7 +109,7 @@ def _should_flush(self, endpoint=None):
106109 full = False
107110
108111 if endpoint :
109- full = len (self ._buffers [endpoint ]) >= self ._max_size
112+ full = len (self ._async_buffers [endpoint ]) >= self ._max_size
110113
111114 # always flush the first event
112115 stale = self .last_flushed is None
@@ -145,10 +148,10 @@ def send(self, endpoint, json_message):
145148 :type json_message: str
146149 :raises: MixpanelException
147150 '''
148- if endpoint not in self ._buffers :
149- raise MixpanelException ('No such endpoint "{0}". Valid endpoints are one of {1}' .format (self ._buffers .keys ()))
151+ if endpoint not in self ._async_buffers :
152+ raise MixpanelException ('No such endpoint "{0}". Valid endpoints are one of {1}' .format (self ._async_buffers .keys ()))
150153
151- buf = self ._buffers [endpoint ]
154+ buf = self ._async_buffers [endpoint ]
152155 buf .append (json_message )
153156
154157 should_flush = self ._should_flush (endpoint )
@@ -186,6 +189,8 @@ def flush(self, endpoint=None, async=True):
186189 with self .flush_lock :
187190 if self ._flush_thread_is_free ():
188191
192+ self .transfer_buffers (endpoint = endpoint )
193+
189194 self .flushing_thread = FlushThread (self , endpoint = endpoint )
190195 self .flushing_thread .start ()
191196
@@ -206,7 +211,8 @@ def flush(self, endpoint=None, async=True):
206211 flushing = False
207212
208213 else :
209- self ._sync_flush ()
214+ self .transfer_buffers (endpoint = endpoint )
215+ self ._sync_flush (endpoint = endpoint )
210216 flushing = True
211217
212218 if flushing :
@@ -215,6 +221,25 @@ def flush(self, endpoint=None, async=True):
215221 return flushing
216222
217223
224+ def transfer_buffers (self , endpoint = None ):
225+ """
226+ Transfer events from the `_async_buffers` where they are stored to the
227+ `_buffers` where they will be flushed from by the flushing thread.
228+
229+ :param endpoint (str): One of 'events' or 'people, the Mixpanel endpoint
230+ that is about to be flushed
231+ """
232+ if endpoint :
233+ keys = [endpoint ]
234+ else :
235+ keys = self ._async_buffers .keys ()
236+
237+ for key in keys :
238+ buf = self ._async_buffers [key ]
239+ while buf :
240+ self ._buffers [key ].append (buf .pop (0 ))
241+
242+
218243 def _flush_endpoint (self , endpoint , async = True ):
219244 # we override flush with endpoint so as to keep all the
220245 # threading logic in one place, while still allowing individual
0 commit comments