参考网址:https://cn.quarkus.io/guides/pulsar
1, 引入依赖包
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-pulsar</artifactId>
</dependency>
2,配置参数
%prod.pulsar.client.serviceUrl=pulsar:6650
mp.messaging.incoming.prices.connector=smallrye-pulsar
%prod
前缀表示该属性只在应用程序运行在生产模式下时生效(而不是在开发或测试模式);更多细节请参考 Profile documentation。
3,编写消息接收程序
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
当使用 @Channel
消费消息时,代码需要负责消息订阅。在上面的例子中,RESTEasy Reactive endpoint 已负责为您处理了这个问题。
以下类型可以作为 channels 被注入:
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
响应式消息在一个I/O线程上调用你的方法。关于这个话题的更多详情,请参阅 Quarkus响应式架构文档 。但是,你经常需要将响应式消息与阻塞处理结合起来,比如数据库交互。为此,你需要使用 @Blocking
注释,来表示处理是 blocking ,不应该在调用者线程上运行。
有2个 @Blocking
注释:
io.smallrye.reactive.messaging.annotations.Blocking
io.smallrye.common.annotation.Blocking
它们具有同样的效果。因此,两者都可以使用。第一个提供了更精细的调优,如使用的工作池以及它是否保留顺序。第二种,也与Quarkus的其他响应式特性一起使用,使用默认的工作池并保留顺序。
@RunOnVirtualThread
For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation.
@Transactional
如果你的方法被注释为 @Transactional
,它将被自动视为 blocking ,即使该方法没有被注释为 @Blocking
。
4,批量接收消息
@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
for (PulsarMessage<Double> msg : messages) {
msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
String key = metadata.getKey();
String topic = metadata.getTopicName();
long timestamp = metadata.getEventTime();
//... process messages
});
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
for (Message<Double> msg : messages) {
//... process messages
}
}
或:
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
批量接收消息时需配置:mp.messaging.incoming.$channel.batchReceive= true
发送消息:
Configure the Pulsar broker service url.
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
Payload可以被封装在 org.eclipse.microprofile.reactive.messaging.Message
,以便对写入的记录有更多的控制:
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
例子:
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a KeyValue schema.
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
If you need more control on the written records, use PulsarOutgoingMessageMetadata
.
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
For example the following bean provides an JSON schema and a Key/Value schema:
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
If no [client|consumer|producer]-configuration
is configured, the connector will look for instances identified with the channel name:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
属性 (别名) | 描述 | 类型 | 是否强制 | 默认值 |
---|---|---|---|---|
ack-strategy |
Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be |
string |
false |
|
ackTimeout.redeliveryBackoff |
Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
batchReceive |
Whether batch receive is used to consume messages |
boolean |
false |
|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
consumer-configuration |
Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
deadLetterPolicy.deadLetterTopic |
Name of the dead letter topic where the failing messages will be sent |
string |
false |
|
deadLetterPolicy.initialSubscriptionName |
Name of the initial subscription name of the dead letter topic |
string |
false |
|
deadLetterPolicy.maxRedeliverCount |
Maximum number of times that a message will be redelivered before being sent to the dead letter topic |
int |
false |
|
deadLetterPolicy.retryLetterTopic |
Name of the retry topic where the failing messages will be sent |
string |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
negativeAck.redeliveryBackoff |
Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
reconsumeLater.delay |
Default delay for reconsume failure-strategy, in seconds |
long |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
You can also configure properties supported by the underlying Pulsar consumer.
These properties can also be globally configured using pulsar.consumer
prefix:
pulsar.consumer.subscriptionInitialPosition=Earliest
Attribute | 描述 | 类型 | Config file | 默认值 |
---|---|---|---|---|
topicNames |
Topic name |
Set |
true |
[] |
topicsPattern |
Topic pattern |
Pattern |
true |
|
subscriptionName |
Subscription name |
String |
true |
|
subscriptionType |
Subscription type. |
SubscriptionType |
true |
Exclusive |
subscriptionProperties |
Map |
true |
||
subscriptionMode |
SubscriptionMode |
true |
Durable |
|
messageListener |
MessageListener |
false |
||
consumerEventListener |
ConsumerEventListener |
false |
||
negativeAckRedeliveryBackoff |
Interface for custom message is negativeAcked policy. You can specify |
RedeliveryBackoff |
false |
|
ackTimeoutRedeliveryBackoff |
Interface for custom message is ackTimeout policy. You can specify |
RedeliveryBackoff |
false |
|
receiverQueueSize |
Size of a consumer’s receiver queue. |
int |
true |
1000 |
acknowledgementsGroupTimeMicros |
Group a consumer acknowledgment for a specified time. |
long |
true |
100000 |
maxAcknowledgmentGroupSize |
Group a consumer acknowledgment for the number of messages. |
int |
true |
1000 |
negativeAckRedeliveryDelayMicros |
Delay to wait before redelivering messages that failed to be processed. |
long |
true |
60000000 |
maxTotalReceiverQueueSizeAcrossPartitions |
The max total receiver queue size across partitions. |
int |
true |
50000 |
consumerName |
Consumer name |
String |
true |
|
ackTimeoutMillis |
Timeout of unacked messages |
long |
true |
0 |
tickDurationMillis |
Granularity of the ack-timeout redelivery. |
long |
true |
1000 |
priorityLevel |
Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4. |
int |
true |
0 |
maxPendingChunkedMessage |
The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization. |
int |
true |
10 |
autoAckOldestChunkedMessageOnQueueFull |
Whether to automatically acknowledge pending chunked messages when the threshold of |
boolean |
true |
false |
expireTimeOfIncompleteChunkedMessageMillis |
The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute. |
long |
true |
60000 |
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
cryptoFailureAction |
Consumer should take action when it receives a message that can not be decrypted. The decompression of message fails. If messages contain batch messages, a client is not be able to retrieve individual messages in batch. Delivered encrypted message contains |
ConsumerCryptoFailureAction |
true |
FAIL |
properties |
A name or value property of this consumer.
When getting a topic stats, associate this metadata with the consumer stats for easier identification. |
SortedMap |
true |
{} |
readCompacted |
If enabling A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. Only enabling Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a |
boolean |
true |
false |
subscriptionInitialPosition |
Initial position at which to set cursor when subscribing to a topic at first time. |
SubscriptionInitialPosition |
true |
Latest |
patternAutoDiscoveryPeriod |
Topic auto discovery period when using a pattern for topic’s consumer. The default and minimum value is 1 minute. |
int |
true |
60 |
regexSubscriptionMode |
When subscribing to a topic using a regular expression, you can pick a certain type of topics. * PersistentOnly: only subscribe to persistent topics. |
RegexSubscriptionMode |
true |
PersistentOnly |
deadLetterPolicy |
Dead letter policy for consumers. By default, some messages are probably redelivered many times, even to the extent that it never stops. By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically. You can enable the dead letter mechanism by setting When specifying the dead letter policy while not specifying |
DeadLetterPolicy |
true |
|
retryEnable |
boolean |
true |
false |
|
batchReceivePolicy |
BatchReceivePolicy |
false |
||
autoUpdatePartitions |
If Note: this is only for partitioned consumers. |
boolean |
true |
true |
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
replicateSubscriptionState |
If |
boolean |
true |
false |
resetIncludeHead |
boolean |
true |
false |
|
keySharedPolicy |
KeySharedPolicy |
false |
||
batchIndexAckEnabled |
boolean |
true |
false |
|
ackReceiptEnabled |
boolean |
true |
false |
|
poolMessages |
boolean |
true |
false |
|
payloadProcessor |
MessagePayloadProcessor |
false |
||
startPaused |
boolean |
true |
false |
|
autoScaledReceiverQueueSizeEnabled |
boolean |
true |
false |
|
topicConfigurations |
List |
true |
[] |
属性 (别名) | 描述 | 类型 | 是否强制 | 默认值 |
---|---|---|---|---|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
maxPendingMessages |
The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker |
int |
false |
|
producer-configuration |
Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
waitForWriteCompletion |
Whether the client waits for the broker to acknowledge the written record before acknowledging the message |
boolean |
false |
|
You can also configure properties supported by the underlying Pulsar producer.
These properties can also be globally configured using pulsar.producer
prefix:
pulsar.producer.batchingEnabled=false
Attribute | 描述 | 类型 | Config file | 默认值 |
---|---|---|---|---|
topicName |
Topic name |
String |
true |
|
producerName |
Producer name |
String |
true |
|
sendTimeoutMs |
Message send timeout in ms. |
long |
true |
30000 |
blockIfQueueFull |
If it is set to The |
boolean |
true |
false |
maxPendingMessages |
The maximum size of a queue holding pending messages. For example, a message waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the |
int |
true |
0 |
maxPendingMessagesAcrossPartitions |
The maximum number of pending messages across partitions. Use the setting to lower the max pending messages for each partition ( |
int |
true |
0 |
messageRoutingMode |
Message routing logic for producers on partitioned topics. |
MessageRoutingMode |
true |
|
hashingScheme |
Hashing function determining the partition where you publish a particular message (partitioned topics only). |
HashingScheme |
true |
JavaStringHash |
cryptoFailureAction |
Producer should take action when encryption fails. |
ProducerCryptoFailureAction |
true |
FAIL |
customMessageRouter |
MessageRouter |
false |
||
batchingMaxPublishDelayMicros |
Batching time period of sending messages. |
long |
true |
1000 |
batchingPartitionSwitchFrequencyByPublishDelay |
int |
true |
10 |
|
batchingMaxMessages |
The maximum number of messages permitted in a batch. |
int |
true |
1000 |
batchingMaxBytes |
int |
true |
131072 |
|
batchingEnabled |
Enable batching of messages. |
boolean |
true |
true |
batcherBuilder |
BatcherBuilder |
false |
||
chunkingEnabled |
Enable chunking of messages. |
boolean |
true |
false |
chunkMaxMessageSize |
int |
true |
-1 |
|
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
encryptionKeys |
Set |
true |
[] |
|
compressionType |
Message data compression type used by a producer. |
CompressionType |
true |
NONE |
initialSequenceId |
Long |
true |
||
autoUpdatePartitions |
boolean |
true |
true |
|
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
multiSchema |
boolean |
true |
true |
|
accessMode |
ProducerAccessMode |
true |
Shared |
|
lazyStartPartitionedProducers |
boolean |
true |
false |
|
properties |
SortedMap |
true |
{} |
|
initialSubscriptionName |
Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. |
String |
true |
mp.messaging.incoming.your-channel-name.numIoThreads=4
pulsar.client.serviceUrl=pulsar://pulsar:6650
Attribute | 描述 | 类型 | Config file | 默认值 |
---|---|---|---|---|
serviceUrl |
Pulsar cluster HTTP URL to connect to a broker. |
String |
true |
|
serviceUrlProvider |
The implementation class of ServiceUrlProvider used to generate ServiceUrl. |
ServiceUrlProvider |
false |
|
authentication |
Authentication settings of the client. |
Authentication |
false |
|
authPluginClassName |
Class name of authentication plugin of the client. |
String |
true |
|
authParams |
Authentication parameter of the client. |
String |
true |
|
authParamMap |
Authentication map of the client. |
Map |
true |
|
operationTimeoutMs |
Client operation timeout (in milliseconds). |
long |
true |
30000 |
lookupTimeoutMs |
Client lookup timeout (in milliseconds). |
long |
true |
-1 |
statsIntervalSeconds |
Interval to print client stats (in seconds). |
long |
true |
60 |
numIoThreads |
Number of IO threads. |
int |
true |
10 |
numListenerThreads |
Number of consumer listener threads. |
int |
true |
10 |
connectionsPerBroker |
Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling. |
int |
true |
1 |
connectionMaxIdleSeconds |
Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections |
int |
true |
180 |
useTcpNoDelay |
Whether to use TCP NoDelay option. |
boolean |
true |
true |
useTls |
Whether to use TLS. |
boolean |
true |
false |
tlsKeyFilePath |
Path to the TLS key file. |
String |
true |
|
tlsCertificateFilePath |
Path to the TLS certificate file. |
String |
true |
|
tlsTrustCertsFilePath |
Path to the trusted TLS certificate file. |
String |
true |
|
tlsAllowInsecureConnection |
Whether the client accepts untrusted TLS certificates from the broker. |
boolean |
true |
false |
tlsHostnameVerificationEnable |
Whether the hostname is validated when the client creates a TLS connection with brokers. |
boolean |
true |
false |
concurrentLookupRequest |
The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. |
int |
true |
5000 |
maxLookupRequest |
Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. |
int |
true |
50000 |
maxLookupRedirects |
Maximum times of redirected lookup requests. |
int |
true |
20 |
maxNumberOfRejectedRequestPerConnection |
Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. |
int |
true |
50 |
keepAliveIntervalSeconds |
Seconds of keeping alive interval for each client broker connection. |
int |
true |
30 |
connectionTimeoutMs |
Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped. |
int |
true |
10000 |
requestTimeoutMs |
Maximum duration for completing a request. |
int |
true |
60000 |
readTimeoutMs |
Maximum read time of a request. |
int |
true |
60000 |
autoCertRefreshSeconds |
Seconds of auto refreshing certificate. |
int |
true |
300 |
initialBackoffIntervalNanos |
Initial backoff interval (in nanosecond). |
long |
true |
100000000 |
maxBackoffIntervalNanos |
Max backoff interval (in nanosecond). |
long |
true |
60000000000 |
enableBusyWait |
Whether to enable BusyWait for EpollEventLoopGroup. |
boolean |
true |
false |
listenerName |
Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side. |
String |
true |
|
useKeyStoreTls |
Set TLS using KeyStore way. |
boolean |
true |
false |
sslProvider |
The TLS provider used by an internal client to authenticate with other Pulsar brokers. |
String |
true |
|
tlsKeyStoreType |
TLS KeyStore type configuration. |
String |
true |
JKS |
tlsKeyStorePath |
Path of TLS KeyStore. |
String |
true |
|
tlsKeyStorePassword |
Password of TLS KeyStore. |
String |
true |
|
tlsTrustStoreType |
TLS TrustStore type configuration. You need to set this configuration when client authentication is required. |
String |
true |
JKS |
tlsTrustStorePath |
Path of TLS TrustStore. |
String |
true |
|
tlsTrustStorePassword |
Password of TLS TrustStore. |
String |
true |
|
tlsCiphers |
Set of TLS Ciphers. |
Set |
true |
[] |
tlsProtocols |
Protocols of TLS. |
Set |
true |
[] |
memoryLimitBytes |
Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput. |
long |
true |
67108864 |
proxyServiceUrl |
URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
String |
true |
|
proxyProtocol |
Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
ProxyProtocol |
true |
|
enableTransaction |
Whether to enable transaction. |
boolean |
true |
false |
clock |
Clock |
false |
||
dnsLookupBindAddress |
The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0 |
String |
true |
|
dnsLookupBindPort |
The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0. |
int |
true |
0 |
socks5ProxyAddress |
Address of SOCKS5 proxy. |
InetSocketAddress |
true |
|
socks5ProxyUsername |
User name of SOCKS5 proxy. |
String |
true |
|
socks5ProxyPassword |
Password of SOCKS5 proxy. |
String |
true |
|
description |
The extra description of the client version. The length cannot exceed 64. |
String |
true |
This guide has shown how you can interact with Pulsar using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.
如果您想更进一步,请参看SmallRye Reactive Messaging,在Quarkus中使用的实现。
!!!!!!!!!!!!!!!!!!!!!!!!!!!
全部评论