5050import io .dapr .serializer .DaprObjectSerializer ;
5151import io .dapr .serializer .DefaultObjectSerializer ;
5252import io .dapr .utils .DefaultContentTypeConverter ;
53- import io .dapr .utils .NetworkUtils ;
5453import io .dapr .utils .TypeRef ;
5554import io .dapr .v1 .CommonProtos ;
5655import io .dapr .v1 .DaprGrpc ;
6968import reactor .core .publisher .MonoSink ;
7069import reactor .util .context .ContextView ;
7170
72- import java .io .Closeable ;
7371import java .io .IOException ;
7472import java .util .ArrayList ;
7573import java .util .Collections ;
@@ -92,7 +90,7 @@ public class DaprClientGrpc extends AbstractDaprClient {
9290 /**
9391 * The GRPC managed channel to be used.
9492 */
95- private Closeable channel ;
93+ private final GrpcChannelFacade channel ;
9694
9795 /**
9896 * The async gRPC stub.
@@ -102,19 +100,19 @@ public class DaprClientGrpc extends AbstractDaprClient {
102100 /**
103101 * Default access level constructor, in order to create an instance of this class use io.dapr.client.DaprClientBuilder
104102 *
105- * @param closeableChannel A closeable for a Managed GRPC channel
103+ * @param channel Facade for the managed GRPC channel
106104 * @param asyncStub async gRPC stub
107105 * @param objectSerializer Serializer for transient request/response objects.
108106 * @param stateSerializer Serializer for state objects.
109107 * @see DaprClientBuilder
110108 */
111109 DaprClientGrpc (
112- Closeable closeableChannel ,
110+ GrpcChannelFacade channel ,
113111 DaprGrpc .DaprStub asyncStub ,
114112 DaprObjectSerializer objectSerializer ,
115113 DaprObjectSerializer stateSerializer ) {
116114 super (objectSerializer , stateSerializer );
117- this .channel = closeableChannel ;
115+ this .channel = channel ;
118116 this .asyncStub = intercept (asyncStub );
119117 }
120118
@@ -145,13 +143,7 @@ private CommonProtos.StateOptions.StateConcurrency getGrpcStateConcurrency(State
145143 */
146144 @ Override
147145 public Mono <Void > waitForSidecar (int timeoutInMilliseconds ) {
148- return Mono .fromRunnable (() -> {
149- try {
150- NetworkUtils .waitForSocket (Properties .SIDECAR_IP .get (), Properties .GRPC_PORT .get (), timeoutInMilliseconds );
151- } catch (InterruptedException e ) {
152- throw new RuntimeException (e );
153- }
154- });
146+ return this .channel .waitForChannelReady (timeoutInMilliseconds );
155147 }
156148
157149 /**
@@ -193,7 +185,6 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
193185 }
194186
195187 /**
196- *
197188 * {@inheritDoc}
198189 */
199190 @ Override
@@ -209,7 +200,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
209200 throw new IllegalArgumentException ("pubsubName and topic name cannot be null or empty" );
210201 }
211202
212- for (BulkPublishEntry <?> entry : request .getEntries ()) {
203+ for (BulkPublishEntry <?> entry : request .getEntries ()) {
213204 Object event = entry .getEvent ();
214205 byte [] data ;
215206 String contentType = entry .getContentType ();
@@ -251,7 +242,7 @@ public <T> Mono<BulkPublishResponse<T>> publishEvents(BulkPublishRequest<T> requ
251242 }
252243
253244 Map <String , BulkPublishEntry <T >> entryMap = new HashMap <>();
254- for (BulkPublishEntry <T > entry : request .getEntries ()) {
245+ for (BulkPublishEntry <T > entry : request .getEntries ()) {
255246 entryMap .put (entry .getEntryId (), entry );
256247 }
257248 return Mono .deferContextual (
@@ -299,17 +290,17 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
299290 // gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342
300291
301292 return Mono .deferContextual (
302- context -> this .<CommonProtos .InvokeResponse >createMono (
303- it -> intercept (context , asyncStub ).invokeService (envelope , it )
304- )
305- ).flatMap (
306- it -> {
307- try {
308- return Mono .justOrEmpty (objectSerializer .deserialize (it .getData ().getValue ().toByteArray (), type ));
309- } catch (IOException e ) {
310- throw DaprException .propagate (e );
311- }
312- }
293+ context -> this .<CommonProtos .InvokeResponse >createMono (
294+ it -> intercept (context , asyncStub ).invokeService (envelope , it )
295+ )
296+ ).flatMap (
297+ it -> {
298+ try {
299+ return Mono .justOrEmpty (objectSerializer .deserialize (it .getData ().getValue ().toByteArray (), type ));
300+ } catch (IOException e ) {
301+ throw DaprException .propagate (e );
302+ }
303+ }
313304 );
314305 } catch (Exception ex ) {
315306 return DaprException .wrapMono (ex );
@@ -346,17 +337,17 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
346337 DaprProtos .InvokeBindingRequest envelope = builder .build ();
347338
348339 return Mono .deferContextual (
349- context -> this .<DaprProtos .InvokeBindingResponse >createMono (
350- it -> intercept (context , asyncStub ).invokeBinding (envelope , it )
351- )
352- ).flatMap (
353- it -> {
354- try {
355- return Mono .justOrEmpty (objectSerializer .deserialize (it .getData ().toByteArray (), type ));
356- } catch (IOException e ) {
357- throw DaprException .propagate (e );
358- }
359- }
340+ context -> this .<DaprProtos .InvokeBindingResponse >createMono (
341+ it -> intercept (context , asyncStub ).invokeBinding (envelope , it )
342+ )
343+ ).flatMap (
344+ it -> {
345+ try {
346+ return Mono .justOrEmpty (objectSerializer .deserialize (it .getData ().toByteArray (), type ));
347+ } catch (IOException e ) {
348+ throw DaprException .propagate (e );
349+ }
350+ }
360351 );
361352 } catch (Exception ex ) {
362353 return DaprException .wrapMono (ex );
@@ -442,12 +433,12 @@ public <T> Mono<List<State<T>>> getBulkState(GetBulkStateRequest request, TypeRe
442433 DaprProtos .GetBulkStateRequest envelope = builder .build ();
443434
444435 return Mono .deferContextual (
445- context -> this .<DaprProtos .GetBulkStateResponse >createMono (it -> intercept (context , asyncStub )
446- .getBulkState (envelope , it )
447- )
448- ).map (
449- it ->
450- it
436+ context -> this .<DaprProtos .GetBulkStateResponse >createMono (it -> intercept (context , asyncStub )
437+ .getBulkState (envelope , it )
438+ )
439+ ).map (
440+ it ->
441+ it
451442 .getItemsList ()
452443 .stream ()
453444 .map (b -> {
@@ -705,8 +696,8 @@ public Mono<Map<String, String>> getSecret(GetSecretRequest request) {
705696 }
706697
707698 DaprProtos .GetSecretRequest .Builder requestBuilder = DaprProtos .GetSecretRequest .newBuilder ()
708- .setStoreName (secretStoreName )
709- .setKey (key );
699+ .setStoreName (secretStoreName )
700+ .setKey (key );
710701
711702 if (metadata != null ) {
712703 requestBuilder .putAllMetadata (metadata );
@@ -740,18 +731,18 @@ public Mono<Map<String, Map<String, String>>> getBulkSecret(GetBulkSecretRequest
740731
741732 return Mono .deferContextual (
742733 context ->
743- this .<DaprProtos .GetBulkSecretResponse >createMono (
744- it -> intercept (context , asyncStub ).getBulkSecret (envelope , it )
745- )
734+ this .<DaprProtos .GetBulkSecretResponse >createMono (
735+ it -> intercept (context , asyncStub ).getBulkSecret (envelope , it )
736+ )
746737 ).map (it -> {
747738 Map <String , DaprProtos .SecretResponse > secretsMap = it .getDataMap ();
748739 if (secretsMap == null ) {
749740 return Collections .emptyMap ();
750741 }
751742 return secretsMap
752- .entrySet ()
753- .stream ()
754- .collect (Collectors .toMap (Map .Entry ::getKey , s -> s .getValue ().getSecretsMap ()));
743+ .entrySet ()
744+ .stream ()
745+ .collect (Collectors .toMap (Map .Entry ::getKey , s -> s .getValue ().getSecretsMap ()));
755746 });
756747 } catch (Exception ex ) {
757748 return DaprException .wrapMono (ex );
@@ -805,7 +796,7 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
805796 try {
806797 return buildQueryStateKeyValue (v , type );
807798 } catch (Exception e ) {
808- throw DaprException .propagate (e );
799+ throw DaprException .propagate (e );
809800 }
810801 })
811802 .collect (Collectors .toList ());
@@ -900,7 +891,7 @@ private Mono<Map<String, ConfigurationItem>> getConfiguration(DaprProtos.GetConf
900891 Iterator <Map .Entry <String , CommonProtos .ConfigurationItem >> itr = it .getItems ().entrySet ().iterator ();
901892 while (itr .hasNext ()) {
902893 Map .Entry <String , CommonProtos .ConfigurationItem > entry = itr .next ();
903- configMap .put (entry .getKey (), buildConfigurationItem (entry .getValue (), entry .getKey ()));
894+ configMap .put (entry .getKey (), buildConfigurationItem (entry .getValue (), entry .getKey ()));
904895 }
905896 return Collections .unmodifiableMap (configMap );
906897 }
@@ -934,15 +925,15 @@ public Flux<SubscribeConfigurationResponse> subscribeConfiguration(SubscribeConf
934925 return this .<DaprProtos .SubscribeConfigurationResponse >createFlux (
935926 it -> intercept (asyncStub ).subscribeConfiguration (envelope , it )
936927 ).map (
937- it -> {
938- Map <String , ConfigurationItem > configMap = new HashMap <>();
939- Iterator <Map .Entry <String , CommonProtos .ConfigurationItem >> itr = it .getItemsMap ().entrySet ().iterator ();
940- while (itr .hasNext ()) {
941- Map .Entry <String , CommonProtos .ConfigurationItem > entry = itr .next ();
942- configMap .put (entry .getKey (), buildConfigurationItem (entry .getValue (), entry .getKey ()));
928+ it -> {
929+ Map <String , ConfigurationItem > configMap = new HashMap <>();
930+ Iterator <Map .Entry <String , CommonProtos .ConfigurationItem >> itr = it .getItemsMap ().entrySet ().iterator ();
931+ while (itr .hasNext ()) {
932+ Map .Entry <String , CommonProtos .ConfigurationItem > entry = itr .next ();
933+ configMap .put (entry .getKey (), buildConfigurationItem (entry .getValue (), entry .getKey ()));
934+ }
935+ return new SubscribeConfigurationResponse (it .getId (), Collections .unmodifiableMap (configMap ));
943936 }
944- return new SubscribeConfigurationResponse (it .getId (), Collections .unmodifiableMap (configMap ));
945- }
946937 );
947938 } catch (Exception ex ) {
948939 return DaprException .wrapFlux (ex );
@@ -990,8 +981,8 @@ public Mono<UnsubscribeConfigurationResponse> unsubscribeConfiguration(Unsubscri
990981 private ConfigurationItem buildConfigurationItem (
991982 CommonProtos .ConfigurationItem configurationItem , String key ) {
992983 return new ConfigurationItem (
993- key ,
994- configurationItem .getValue (),
984+ key ,
985+ configurationItem .getValue (),
995986 configurationItem .getVersion (),
996987 configurationItem .getMetadataMap ()
997988 );
0 commit comments