在Quarkus框架中如何使用Pulsar消息队列

2023-10-21 05:33
134
0

参考网址:https://cn.quarkus.io/guides/pulsar

1, 引入依赖包

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-pulsar</artifactId>
</dependency>

 

2,配置参数

%prod.pulsar.client.serviceUrl=pulsar:6650 
mp.messaging.incoming.prices.connector=smallrye-pulsar 

%prod 前缀表示该属性只在应用程序运行在生产模式下时生效(而不是在开发或测试模式);更多细节请参考 Profile documentation

3,编写消息接收程序

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;

@Path("/prices")
public class PriceResource {

    @Inject
    @Channel("prices")
    Multi<Double> prices;

    @GET
    @Path("/prices")
    @RestStreamElementType(MediaType.TEXT_PLAIN)
    public Multi<Double> stream() {
        return prices;
    }
}

当使用 @Channel 消费消息时,代码需要负责消息订阅。在上面的例子中,RESTEasy Reactive endpoint 已负责为您处理了这个问题。

以下类型可以作为 channels 被注入:

@Inject @Channel("prices") Multi<Double> streamOfPayloads;

@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;

@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;

@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;

响应式消息在一个I/O线程上调用你的方法。关于这个话题的更多详情,请参阅 Quarkus响应式架构文档 。但是,你经常需要将响应式消息与阻塞处理结合起来,比如数据库交互。为此,你需要使用 @Blocking 注释,来表示处理是 blocking ,不应该在调用者线程上运行。

有2个 @Blocking 注释:

  1. io.smallrye.reactive.messaging.annotations.Blocking

  2. io.smallrye.common.annotation.Blocking

它们具有同样的效果。因此,两者都可以使用。第一个提供了更精细的调优,如使用的工作池以及它是否保留顺序。第二种,也与Quarkus的其他响应式特性一起使用,使用默认的工作池并保留顺序。

@RunOnVirtualThread

For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.

 

@Transactional

如果你的方法被注释为 @Transactional ,它将被自动视为 blocking ,即使该方法没有被注释为 @Blocking 。

4,批量接收消息

@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
    for (PulsarMessage<Double> msg : messages) {
        msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
            String key = metadata.getKey();
            String topic = metadata.getTopicName();
            long timestamp = metadata.getEventTime();
            //... process messages
        });
    }
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
    for (Message<Double> msg : messages) {
        //... process messages
    }
}

或:

@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

批量接收消息时需配置:mp.messaging.incoming.$channel.batchReceive= true

发送消息:

  1. Configure the Pulsar broker service url.

  2. mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class PulsarPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
        .map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}

 

Payload可以被封装在 org.eclipse.microprofile.reactive.messaging.Message,以便对写入的记录有更多的控制:

@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> Message.of(random.nextDouble())
                    .addMetadata(PulsarOutgoingMessageMetadata.builder()
                            .withKey("my-key")
                            .withProperties(Map.of("property-key", "value"))
                            .build()));
}

例子:

@Outgoing("prices-out") T generate(); // T excluding void

@Outgoing("prices-out") Message<T> generate();

@Outgoing("prices-out") Uni<T> generate();

@Outgoing("prices-out") Uni<Message<T>> generate();

@Outgoing("prices-out") CompletionStage<T> generate();

@Outgoing("prices-out") CompletionStage<Message<T>> generate();

 

Serialization and Pulsar Schema

In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a KeyValue schema.

package pulsar.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarKeyValueExample {

    @Identifier("out")
    @Produces
    Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

    @Incoming("in")
    @Outgoing("out")
    public KeyValue<String, Long> process(long in) {
        return new KeyValue<>("my-key", in);
    }

}

If you need more control on the written records, use PulsarOutgoingMessageMetadata.

 

 

mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32

mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE

 

For example the following bean provides an JSON schema and a Key/Value schema:

package pulsar.configuration;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import io.smallrye.common.annotation.Identifier;

@ApplicationScoped
public class PulsarSchemaProvider {

    @Produces
    @Identifier("user-schema")
    Schema<User> userSchema = Schema.JSON(User.class);

    @Produces
    @Identifier("a-channel")
    Schema<KeyValue<Integer, User>> keyValueSchema() {
        return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
    }

    public static class User {
        String name;
        int age;

    }
}

 

mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema

 

mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

 

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("my-consumer-options")
    public ConsumerConfigurationData<String> getConsumerConfig() {
        ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
        data.setAckReceiptEnabled(true);
        data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
                //...
                .build());
        return data;
    }
}

 

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

 

If no [client|consumer|producer]-configuration is configured, the connector will look for instances identified with the channel name:

 

import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;

class PulsarConfig {

    @Produces
    @Identifier("prices")
    public ClientConfigurationData getClientConfig() {
        ClientConfigurationData data = new ClientConfigurationData();
        data.setEnableTransaction(true);
        data.setServiceUrlProvider(AutoClusterFailover.builder()
                // ...
                .build());
        return data;
    }
}

 

Table 1. Incoming Attributes of the 'smallrye-pulsar' connector
属性 (别名) 描述 类型 是否强制 默认值

ack-strategy

Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be ackcumulative.

string

false

ack

ackTimeout.redeliveryBackoff

Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier.

string

false

 

batchReceive

Whether batch receive is used to consume messages

boolean

false

false

client-configuration

Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

 

consumer-configuration

Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

 

deadLetterPolicy.deadLetterTopic

Name of the dead letter topic where the failing messages will be sent

string

false

 

deadLetterPolicy.initialSubscriptionName

Name of the initial subscription name of the dead letter topic

string

false

 

deadLetterPolicy.maxRedeliverCount

Maximum number of times that a message will be redelivered before being sent to the dead letter topic

int

false

 

deadLetterPolicy.retryLetterTopic

Name of the retry topic where the failing messages will be sent

string

false

 

failure-strategy

Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be nack (default), failignore or `reconsume-later

string

false

nack

health-enabled

Whether health reporting is enabled (default) or disabled

boolean

false

true

negativeAck.redeliveryBackoff

Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier.

string

false

 

reconsumeLater.delay

Default delay for reconsume failure-strategy, in seconds

long

false

3

schema

The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used.

string

false

 

serviceUrl

The service URL for the Pulsar service

string

false

pulsar://localhost:6650

topic

The consumed / populated Pulsar topic. If not set, the channel name is used

string

false

 

tracing-enabled

Whether tracing is enabled (default) or disabled

boolean

false

true

You can also configure properties supported by the underlying Pulsar consumer.

These properties can also be globally configured using pulsar.consumer prefix:

 

 

pulsar.consumer.subscriptionInitialPosition=Earliest

Table 2. Pulsar consumer Attributes
Attribute 描述 类型 Config file 默认值

topicNames

Topic name

Set

true

[]

topicsPattern

Topic pattern

Pattern

true

 

subscriptionName

Subscription name

String

true

 

subscriptionType

Subscription type.
Four subscription types are available:
* Exclusive
* Failover
* Shared
* Key_Shared

SubscriptionType

true

Exclusive

subscriptionProperties

 

Map

true

 

subscriptionMode

 

SubscriptionMode

true

Durable

messageListener

 

MessageListener

false

 

consumerEventListener

 

ConsumerEventListener

false

 

negativeAckRedeliveryBackoff

Interface for custom message is negativeAcked policy. You can specify RedeliveryBackoff for a consumer.

RedeliveryBackoff

false

 

ackTimeoutRedeliveryBackoff

Interface for custom message is ackTimeout policy. You can specify RedeliveryBackoff for a consumer.

RedeliveryBackoff

false

 

receiverQueueSize

Size of a consumer’s receiver queue.
For example, the number of messages accumulated by a consumer before an application calls Receive.
A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.

int

true

1000

acknowledgementsGroupTimeMicros

Group a consumer acknowledgment for a specified time.
By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.
Setting a group time of 0 sends out acknowledgments immediately.
A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.

long

true

100000

maxAcknowledgmentGroupSize

Group a consumer acknowledgment for the number of messages.

int

true

1000

negativeAckRedeliveryDelayMicros

Delay to wait before redelivering messages that failed to be processed.
When an application uses Consumer#negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.

long

true

60000000

maxTotalReceiverQueueSizeAcrossPartitions

The max total receiver queue size across partitions.
This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.

int

true

50000

consumerName

Consumer name

String

true

 

ackTimeoutMillis

Timeout of unacked messages

long

true

0

tickDurationMillis

Granularity of the ack-timeout redelivery.
Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).

long

true

1000

priorityLevel

Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type.
The broker follows descending priorities. For example, 0=max-priority, 1, 2,…​
In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.
Example 1
If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.
Example 2
Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.

int

true

0

maxPendingChunkedMessage

The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization.

int

true

10

autoAckOldestChunkedMessageOnQueueFull

Whether to automatically acknowledge pending chunked messages when the threshold of maxPendingChunkedMessage is reached. If set to false, these messages will be redelivered by their broker.

boolean

true

false

expireTimeOfIncompleteChunkedMessageMillis

The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute.

long

true

60000

cryptoKeyReader

 

CryptoKeyReader

false

 

messageCrypto

 

MessageCrypto

false

 

cryptoFailureAction

Consumer should take action when it receives a message that can not be decrypted.
FAIL: this is the default option to fail messages until crypto succeeds.
DISCARD:silently acknowledge and not deliver message to an application.
CONSUME: deliver encrypted messages to applications. It is the application’s responsibility to decrypt the message.

The decompression of message fails.

If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

Delivered encrypted message contains EncryptionContext which contains encryption and compression information in it using which application can decrypt consumed message payload.

ConsumerCryptoFailureAction

true

FAIL

properties

A name or value property of this consumer.

properties is application defined metadata attached to a consumer.

When getting a topic stats, associate this metadata with the consumer stats for easier identification.

SortedMap

true

{}

readCompacted

If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.

boolean

true

false

subscriptionInitialPosition

Initial position at which to set cursor when subscribing to a topic at first time.

SubscriptionInitialPosition

true

Latest

patternAutoDiscoveryPeriod

Topic auto discovery period when using a pattern for topic’s consumer.

The default and minimum value is 1 minute.

int

true

60

regexSubscriptionMode

When subscribing to a topic using a regular expression, you can pick a certain type of topics.

PersistentOnly: only subscribe to persistent topics.
NonPersistentOnly: only subscribe to non-persistent topics.
AllTopics: subscribe to both persistent and non-persistent topics.

RegexSubscriptionMode

true

PersistentOnly

deadLetterPolicy

Dead letter policy for consumers.

By default, some messages are probably redelivered many times, even to the extent that it never stops.

By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically.

You can enable the dead letter mechanism by setting deadLetterPolicy.

When specifying the dead letter policy while not specifying ackTimeoutMillis, you can set the ack timeout to 30000 millisecond.

DeadLetterPolicy

true

 

retryEnable

 

boolean

true

false

batchReceivePolicy

 

BatchReceivePolicy

false

 

autoUpdatePartitions

If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

Note: this is only for partitioned consumers.

boolean

true

true

autoUpdatePartitionsIntervalSeconds

 

long

true

60

replicateSubscriptionState

If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters.

boolean

true

false

resetIncludeHead

 

boolean

true

false

keySharedPolicy

 

KeySharedPolicy

false

 

batchIndexAckEnabled

 

boolean

true

false

ackReceiptEnabled

 

boolean

true

false

poolMessages

 

boolean

true

false

payloadProcessor

 

MessagePayloadProcessor

false

 

startPaused

 

boolean

true

false

autoScaledReceiverQueueSizeEnabled

 

boolean

true

false

topicConfigurations

 

List

true

[]

11.2. Outgoing channel configuration (publishing to Pulsar)

Table 3. Outgoing Attributes of the 'smallrye-pulsar' connector
属性 (别名) 描述 类型 是否强制 默认值

client-configuration

Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

 

health-enabled

Whether health reporting is enabled (default) or disabled

boolean

false

true

maxPendingMessages

The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker

int

false

1000

producer-configuration

Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier.

string

false

 

schema

The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used.

string

false

 

serviceUrl

The service URL for the Pulsar service

string

false

pulsar://localhost:6650

topic

The consumed / populated Pulsar topic. If not set, the channel name is used

string

false

 

tracing-enabled

Whether tracing is enabled (default) or disabled

boolean

false

true

waitForWriteCompletion

Whether the client waits for the broker to acknowledge the written record before acknowledging the message

boolean

false

true

You can also configure properties supported by the underlying Pulsar producer.

These properties can also be globally configured using pulsar.producer prefix:

 

 

pulsar.producer.batchingEnabled=false

Table 4. Pulsar producer Attributes
Attribute 描述 类型 Config file 默认值

topicName

Topic name

String

true

 

producerName

Producer name

String

true

 

sendTimeoutMs

Message send timeout in ms.
If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.

long

true

30000

blockIfQueueFull

If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.
If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.

boolean

true

false

maxPendingMessages

The maximum size of a queue holding pending messages.

For example, a message waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.

int

true

0

maxPendingMessagesAcrossPartitions

The maximum number of pending messages across partitions.

Use the setting to lower the max pending messages for each partition (#setMaxPendingMessages(int)) if the total number exceeds the configured value.

int

true

0

messageRoutingMode

Message routing logic for producers on partitioned topics.
Apply the logic only when setting no key on messages.
Available options are as follows:
pulsar.RoundRobinDistribution: round robin
pulsar.UseSinglePartition: publish all messages to a single partition
pulsar.CustomPartition: a custom partitioning scheme

MessageRoutingMode

true

 

hashingScheme

Hashing function determining the partition where you publish a particular message (partitioned topics only).
Available options are as follows:
pulsar.JavastringHash: the equivalent of string.hashCode() in Java
pulsar.Murmur3_32Hash: applies the Murmur3 hashing function
pulsar.BoostHash: applies the hashing function from C++'s Boost library

HashingScheme

true

JavaStringHash

cryptoFailureAction

Producer should take action when encryption fails.
FAIL: if encryption fails, unencrypted messages fail to send.
SEND: if encryption fails, unencrypted messages are sent.

ProducerCryptoFailureAction

true

FAIL

customMessageRouter

 

MessageRouter

false

 

batchingMaxPublishDelayMicros

Batching time period of sending messages.

long

true

1000

batchingPartitionSwitchFrequencyByPublishDelay

 

int

true

10

batchingMaxMessages

The maximum number of messages permitted in a batch.

int

true

1000

batchingMaxBytes

 

int

true

131072

batchingEnabled

Enable batching of messages.

boolean

true

true

batcherBuilder

 

BatcherBuilder

false

 

chunkingEnabled

Enable chunking of messages.

boolean

true

false

chunkMaxMessageSize

 

int

true

-1

cryptoKeyReader

 

CryptoKeyReader

false

 

messageCrypto

 

MessageCrypto

false

 

encryptionKeys

 

Set

true

[]

compressionType

Message data compression type used by a producer.
Available options:
LZ4
ZLIB
ZSTD
SNAPPY

CompressionType

true

NONE

initialSequenceId

 

Long

true

 

autoUpdatePartitions

 

boolean

true

true

autoUpdatePartitionsIntervalSeconds

 

long

true

60

multiSchema

 

boolean

true

true

accessMode

 

ProducerAccessMode

true

Shared

lazyStartPartitionedProducers

 

boolean

true

false

properties

 

SortedMap

true

{}

initialSubscriptionName

Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created.

String

true

 

11.3. Pulsar Client Configuration

mp.messaging.incoming.your-channel-name.numIoThreads=4
pulsar.client.serviceUrl=pulsar://pulsar:6650
Table 5. Pulsar client Attributes
Attribute 描述 类型 Config file 默认值

serviceUrl

Pulsar cluster HTTP URL to connect to a broker.

String

true

 

serviceUrlProvider

The implementation class of ServiceUrlProvider used to generate ServiceUrl.

ServiceUrlProvider

false

 

authentication

Authentication settings of the client.

Authentication

false

 

authPluginClassName

Class name of authentication plugin of the client.

String

true

 

authParams

Authentication parameter of the client.

String

true

 

authParamMap

Authentication map of the client.

Map

true

 

operationTimeoutMs

Client operation timeout (in milliseconds).

long

true

30000

lookupTimeoutMs

Client lookup timeout (in milliseconds).

long

true

-1

statsIntervalSeconds

Interval to print client stats (in seconds).

long

true

60

numIoThreads

Number of IO threads.

int

true

10

numListenerThreads

Number of consumer listener threads.

int

true

10

connectionsPerBroker

Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling.

int

true

1

connectionMaxIdleSeconds

Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections

int

true

180

useTcpNoDelay

Whether to use TCP NoDelay option.

boolean

true

true

useTls

Whether to use TLS.

boolean

true

false

tlsKeyFilePath

Path to the TLS key file.

String

true

 

tlsCertificateFilePath

Path to the TLS certificate file.

String

true

 

tlsTrustCertsFilePath

Path to the trusted TLS certificate file.

String

true

 

tlsAllowInsecureConnection

Whether the client accepts untrusted TLS certificates from the broker.

boolean

true

false

tlsHostnameVerificationEnable

Whether the hostname is validated when the client creates a TLS connection with brokers.

boolean

true

false

concurrentLookupRequest

The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker.

int

true

5000

maxLookupRequest

Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker.

int

true

50000

maxLookupRedirects

Maximum times of redirected lookup requests.

int

true

20

maxNumberOfRejectedRequestPerConnection

Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker.

int

true

50

keepAliveIntervalSeconds

Seconds of keeping alive interval for each client broker connection.

int

true

30

connectionTimeoutMs

Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped.

int

true

10000

requestTimeoutMs

Maximum duration for completing a request.

int

true

60000

readTimeoutMs

Maximum read time of a request.

int

true

60000

autoCertRefreshSeconds

Seconds of auto refreshing certificate.

int

true

300

initialBackoffIntervalNanos

Initial backoff interval (in nanosecond).

long

true

100000000

maxBackoffIntervalNanos

Max backoff interval (in nanosecond).

long

true

60000000000

enableBusyWait

Whether to enable BusyWait for EpollEventLoopGroup.

boolean

true

false

listenerName

Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side.

String

true

 

useKeyStoreTls

Set TLS using KeyStore way.

boolean

true

false

sslProvider

The TLS provider used by an internal client to authenticate with other Pulsar brokers.

String

true

 

tlsKeyStoreType

TLS KeyStore type configuration.

String

true

JKS

tlsKeyStorePath

Path of TLS KeyStore.

String

true

 

tlsKeyStorePassword

Password of TLS KeyStore.

String

true

 

tlsTrustStoreType

TLS TrustStore type configuration. You need to set this configuration when client authentication is required.

String

true

JKS

tlsTrustStorePath

Path of TLS TrustStore.

String

true

 

tlsTrustStorePassword

Password of TLS TrustStore.

String

true

 

tlsCiphers

Set of TLS Ciphers.

Set

true

[]

tlsProtocols

Protocols of TLS.

Set

true

[]

memoryLimitBytes

Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.

long

true

67108864

proxyServiceUrl

URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive.

String

true

 

proxyProtocol

Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive.

ProxyProtocol

true

 

enableTransaction

Whether to enable transaction.

boolean

true

false

clock

 

Clock

false

 

dnsLookupBindAddress

The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0

String

true

 

dnsLookupBindPort

The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0.

int

true

0

socks5ProxyAddress

Address of SOCKS5 proxy.

InetSocketAddress

true

 

socks5ProxyUsername

User name of SOCKS5 proxy.

String

true

 

socks5ProxyPassword

Password of SOCKS5 proxy.

String

true

 

description

The extra description of the client version. The length cannot exceed 64.

String

true

 

2. 进一步探索

This guide has shown how you can interact with Pulsar using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.

如果您想更进一步,请参看SmallRye Reactive Messaging,在Quarkus中使用的实现。

!!!!!!!!!!!!!!!!!!!!!!!!!!!

全部评论