Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import io.dapr.examples.OpenTelemetryConfig;
Expand Down Expand Up @@ -59,13 +58,11 @@ public static void main(String[] args) throws Exception {
Tracer tracer = openTelemetrySdk.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan();

try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient) client;

c.waitForSidecar(10000);
try (DaprClient client = (new DaprClientBuilder()).build()) {
client.waitForSidecar(10000);

try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
System.out.println("Using Dapr client...");

List<String> messages = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

package io.dapr.examples.pubsub;

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
Expand Down Expand Up @@ -53,8 +53,8 @@ public class CloudEventBulkPublisher {
* @throws Exception any exception
*/
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
System.out.println("Using preview client...");
try (DaprClient client = (new DaprClientBuilder()).build()) {
System.out.println("Using Dapr client...");
List<BulkPublishEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
CloudEvent<Map<String, String>> cloudEvent = new CloudEvent<>();
Expand Down
33 changes: 17 additions & 16 deletions examples/src/main/java/io/dapr/examples/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,12 @@ Once running, the Subscriber should print the output as follows:

Messages have been retrieved from the topic.

### Bulk Publish Messages
> Note : This API is currently in Alpha stage in Dapr runtime, hence the API methods in SDK are part of the DaprPreviewClient class.
### Bulk Publish Messages

Another feature provided by the SDK is to allow users to publish multiple messages in a single call to the Dapr sidecar.
For this example, we have a simple Java application with a main method that uses the Dapr gPRC Preview Client to publish 10 messages to a specific topic in a single call.
For this example, we have a simple Java application with a main method that uses the Dapr Client to publish 10 messages to a specific topic in a single call.

In the `BulkPublisher.java` file, you will find the `BulkPublisher` class, containing the main method. The main method declares a Dapr Preview Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: one is for Dapr's sent and recieved objects, and the second is for objects to be persisted.
In the `BulkPublisher.java` file, you will find the `BulkPublisher` class, containing the main method. The main method declares a Dapr Client using the `DaprClientBuilder` class. Notice that this builder gets two serializer implementations in the constructor: one is for Dapr's sent and recieved objects, and the second is for objects to be persisted.
The client publishes messages using `publishEvents` method. The Dapr client is also within a try-with-resource block to properly close the client at the end. See the code snippet below:
Dapr's sidecar will automatically wrap the payload received into a CloudEvent object, which will later on be parsed by the subscriber.

Expand All @@ -296,11 +295,10 @@ public class BulkPublisher {
OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry();
Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName());
Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan();
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
DaprClient c = (DaprClient)client;
c.waitForSidecar(10000);
try (DaprClient client = (new DaprClientBuilder()).build()) {
client.waitForSidecar(10000);
try (Scope scope = span.makeCurrent()) {
System.out.println("Using preview client...");
System.out.println("Using Dapr client...");
List<String> messages = new ArrayList<>();
System.out.println("Constructing the list of messages to publish");
for (int i = 0; i < NUM_MESSAGES; i++) {
Expand Down Expand Up @@ -333,7 +331,7 @@ public class BulkPublisher {
}
}
```
The code uses the `DaprPreviewClient` created by the `DaprClientBuilder` is used for the `publishEvents` (BulkPublish) preview API.
The code uses the `DaprClient` created by the `DaprClientBuilder` for the `publishEvents` (BulkPublish) API.

In this case, when the `publishEvents` call is made, one of the arguments to the method is the content type of data, this being `text/plain` in the example.
In this case, when parsing and printing the response, there is a concept of EntryID, which is automatically generated or can be set manually when using the `BulkPublishRequest` object.
Expand All @@ -350,7 +348,7 @@ In this case, the application **MUST** override the content-type parameter via `
public class CloudEventBulkPublisher {
///...
public static void main(String[] args) throws Exception {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
try (DaprClient client = (new DaprClientBuilder()).build()) {
// Construct request
BulkPublishRequest<CloudEvent<Map<String, String>>> request = new BulkPublishRequest<>(PUBSUB_NAME, TOPIC_NAME);
List<BulkPublishRequestEntry<CloudEvent<Map<String, String>>>> entries = new ArrayList<>();
Expand Down Expand Up @@ -414,7 +412,7 @@ Once running, the BulkPublisher should print the output as follows:
```txt
✅ You're up and running! Both Dapr and your app logs will appear here.

== APP == Using preview client...
== APP == Using Dapr client...
== APP == Constructing the list of messages to publish
== APP == Going to publish message : This is message #0
== APP == Going to publish message : This is message #1
Expand Down Expand Up @@ -655,20 +653,23 @@ public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
///...
}
```
The `BulkSubscriberGrpcService.java` file is responsible for implementing the processing of bulk message subscriptions. When Dapr's sidecar successfully subscribes to bulk messages, it will call `onBulkTopicEventAlpha1` and pass them as a request parameter. You can refer to the example on how to handle bulk messages and respond correctly over gPRC.

The `SubscriberGrpcService.java` now handles both regular and bulk message subscriptions since bulk pubsub is now stable. When Dapr's sidecar successfully subscribes to messages, it will call either `onTopicEvent` for regular messages or `onBulkTopicEvent` for bulk messages. Below is an example of the bulk handler:

```java
public class BulkSubscriberGrpcService extends AppCallbackAlphaGrpc.AppCallbackAlphaImplBase {
public class SubscriberGrpcService extends AppCallbackGrpc.AppCallbackImplBase {
// ... onTopicEvent method shown above ...

@Override
public void onBulkTopicEventAlpha1(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request,
public void onBulkTopicEvent(io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequest request,
io.grpc.stub.StreamObserver<io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse> responseObserver) {
try {
TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder();

if (request.getEntriesCount() == 0) {
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
return;
}

System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages.");
Expand Down Expand Up @@ -715,11 +716,11 @@ background: true
sleep: 15
-->
```bash
// stop http subscriber if you have started one.
# stop http subscriber if you have started one.
dapr stop --app-id subscriber


// start a grpc subscriber
# start a grpc subscriber
dapr run --resources-path ./components/pubsub --app-id subscriber --app-port 3000 --app-protocol grpc -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.grpc.Subscriber -p 3000
```
<!-- END_STEP -->
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(port)
.intercept(new SubscriberGrpcService.MetadataInterceptor())
.addService(new SubscriberGrpcService())
.addService(new BulkSubscriberGrpcService())
.build();
server.start();
server.awaitTermination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
import com.google.protobuf.Empty;
import io.dapr.v1.AppCallbackGrpc;
import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkRequestEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponse;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventBulkResponseEntry;
import io.dapr.v1.DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Metadata;
Expand Down Expand Up @@ -103,6 +107,45 @@ public void onTopicEvent(DaprAppCallbackProtos.TopicEventRequest request,
}
}

@Override
public void onBulkTopicEvent(DaprAppCallbackProtos.TopicEventBulkRequest request,
StreamObserver<TopicEventBulkResponse> responseObserver) {
try {
TopicEventBulkResponse.Builder responseBuilder = TopicEventBulkResponse.newBuilder();

if (request.getEntriesCount() == 0) {
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
return;
}

System.out.println("Bulk Subscriber received " + request.getEntriesCount() + " messages.");

for (TopicEventBulkRequestEntry entry : request.getEntriesList()) {
try {
System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
System.out.printf("Bulk Subscriber got: %s\n", entry.getCloudEvent().getData().toStringUtf8());
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.SUCCESS_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
} catch (Throwable e) {
TopicEventBulkResponseEntry.Builder responseEntryBuilder = TopicEventBulkResponseEntry
.newBuilder()
.setEntryId(entry.getEntryId())
.setStatusValue(TopicEventResponseStatus.RETRY_VALUE);
responseBuilder.addStatuses(responseEntryBuilder);
}
}
TopicEventBulkResponse response = responseBuilder.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Throwable e) {
responseObserver.onError(e);
}
}

/**
* Add pubsub name and topic to topicSubscriptionList.
*
Expand All @@ -119,4 +162,3 @@ public void registerConsumer(String pubsubName, String topic, boolean isBulkMess
.build());
}
}

14 changes: 6 additions & 8 deletions sdk-tests/src/test/java/io/dapr/it/pubsub/http/PubSubIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishEntry;
import io.dapr.client.domain.BulkPublishRequest;
import io.dapr.client.domain.BulkPublishResponse;
Expand Down Expand Up @@ -120,7 +119,7 @@ public void testBulkPublishPubSubNotFound() throws Exception {
this.getClass().getSimpleName(),
60000));

try (DaprPreviewClient client = daprRun.newDaprClientBuilder().buildPreviewClient()) {
try (DaprClient client = daprRun.newDaprClientBuilder().build()) {
assertThrowsDaprException(
"INVALID_ARGUMENT",
"INVALID_ARGUMENT: pubsub unknown pubsub is not found",
Expand Down Expand Up @@ -152,16 +151,15 @@ public String getContentType() {
return "application/json";
}
};
try (DaprClient client = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).build();
DaprPreviewClient previewClient = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).buildPreviewClient()) {
try (DaprClient client = daprRun.newDaprClientBuilder().withObjectSerializer(serializer).build()) {
// Only for the gRPC test
// Send a multiple messages on one topic in messagebus pubsub via publishEvents API.
List<String> messages = new ArrayList<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
messages.add(String.format("This is message #%d on topic %s", i, TOPIC_BULK));
}
//Publishing 10 messages
BulkPublishResponse response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block();
BulkPublishResponse response = client.publishEvents(PUBSUB_NAME, TOPIC_BULK, "", messages).block();
System.out.println(String.format("Published %d messages to topic '%s' pubsub_name '%s'",
NUM_MESSAGES, TOPIC_BULK, PUBSUB_NAME));
assertNotNull(response, "expected not null bulk publish response");
Expand All @@ -170,14 +168,14 @@ public String getContentType() {
//Publishing an object.
MyObject object = new MyObject();
object.setId("123");
response = previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK,
response = client.publishEvents(PUBSUB_NAME, TOPIC_BULK,
"application/json", Collections.singletonList(object)).block();
System.out.println("Published one object.");
assertNotNull(response, "expected not null bulk publish response");
assertEquals(0, response.getFailedEntries().size(), "expected no failures in the response");

//Publishing a single byte: Example of non-string based content published
previewClient.publishEvents(PUBSUB_NAME, TOPIC_BULK, "",
client.publishEvents(PUBSUB_NAME, TOPIC_BULK, "",
Collections.singletonList(new byte[]{1})).block();
System.out.println("Published one byte.");

Expand All @@ -197,7 +195,7 @@ public String getContentType() {
));

//Publishing a cloud event.
previewClient.publishEvents(req).block();
client.publishEvents(req).block();
assertNotNull(response, "expected not null bulk publish response");
assertEquals(0, response.getFailedEntries().size(), "expected no failures in the response");

Expand Down
Loading