31
31
import android .os .Process ;
32
32
import android .os .RemoteException ;
33
33
import android .os .TransactionTooLargeException ;
34
+ import androidx .annotation .BinderThread ;
34
35
import com .google .common .annotations .VisibleForTesting ;
35
36
import com .google .common .base .Ticker ;
36
37
import com .google .common .base .Verify ;
107
108
* https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
108
109
*/
109
110
@ ThreadSafe
110
- public abstract class BinderTransport
111
- implements LeakSafeOneWayBinder .TransactionHandler , IBinder .DeathRecipient {
111
+ public abstract class BinderTransport implements IBinder .DeathRecipient {
112
112
113
113
private static final Logger logger = Logger .getLogger (BinderTransport .class .getName ());
114
114
@@ -212,9 +212,11 @@ protected enum TransportState {
212
212
private final FlowController flowController ;
213
213
214
214
/** The number of incoming bytes we've received. */
215
- private final AtomicLong numIncomingBytes ;
215
+ // Only read/written on @BinderThread.
216
+ private long numIncomingBytes ;
216
217
217
218
/** The number of incoming bytes we've told our peer we've received. */
219
+ // Only read/written on @BinderThread.
218
220
private long acknowledgedIncomingBytes ;
219
221
220
222
private BinderTransport (
@@ -227,10 +229,9 @@ private BinderTransport(
227
229
this .attributes = attributes ;
228
230
this .logId = logId ;
229
231
scheduledExecutorService = executorServicePool .getObject ();
230
- incomingBinder = new LeakSafeOneWayBinder (this );
232
+ incomingBinder = new LeakSafeOneWayBinder (this :: handleTransaction );
231
233
ongoingCalls = new ConcurrentHashMap <>();
232
234
flowController = new FlowController (TRANSACTION_BYTES_WINDOW );
233
- numIncomingBytes = new AtomicLong ();
234
235
}
235
236
236
237
// Override in child class.
@@ -425,8 +426,9 @@ final void sendOutOfBandClose(int callId, Status status) {
425
426
}
426
427
}
427
428
428
- @ Override
429
- public final boolean handleTransaction (int code , Parcel parcel ) {
429
+ @ BinderThread
430
+ @ VisibleForTesting
431
+ final boolean handleTransaction (int code , Parcel parcel ) {
430
432
try {
431
433
return handleTransactionInternal (code , parcel );
432
434
} catch (RuntimeException e ) {
@@ -442,6 +444,7 @@ public final boolean handleTransaction(int code, Parcel parcel) {
442
444
}
443
445
}
444
446
447
+ @ BinderThread
445
448
private boolean handleTransactionInternal (int code , Parcel parcel ) {
446
449
if (code < FIRST_CALL_ID ) {
447
450
synchronized (this ) {
@@ -485,11 +488,12 @@ private boolean handleTransactionInternal(int code, Parcel parcel) {
485
488
if (inbound != null ) {
486
489
inbound .handleTransaction (parcel );
487
490
}
488
- long nib = numIncomingBytes . addAndGet ( size ) ;
489
- if ((nib - acknowledgedIncomingBytes ) > TRANSACTION_BYTES_WINDOW_FORCE_ACK ) {
491
+ numIncomingBytes += size ;
492
+ if ((numIncomingBytes - acknowledgedIncomingBytes ) > TRANSACTION_BYTES_WINDOW_FORCE_ACK ) {
490
493
synchronized (this ) {
491
- sendAcknowledgeBytes (checkNotNull (outgoingBinder ));
494
+ sendAcknowledgeBytes (checkNotNull (outgoingBinder ), numIncomingBytes );
492
495
}
496
+ acknowledgedIncomingBytes = numIncomingBytes ;
493
497
}
494
498
return true ;
495
499
}
@@ -521,10 +525,8 @@ private final void handlePing(Parcel requestParcel) {
521
525
protected void handlePingResponse (Parcel parcel ) {}
522
526
523
527
@ GuardedBy ("this" )
524
- private void sendAcknowledgeBytes (OneWayBinderProxy iBinder ) {
528
+ private void sendAcknowledgeBytes (OneWayBinderProxy iBinder , long n ) {
525
529
// Send a transaction to acknowledge reception of incoming data.
526
- long n = numIncomingBytes .get ();
527
- acknowledgedIncomingBytes = n ;
528
530
try (ParcelHolder parcel = ParcelHolder .obtain ()) {
529
531
parcel .get ().writeLong (n );
530
532
iBinder .transact (ACKNOWLEDGE_BYTES , parcel );
0 commit comments