diff --git a/pom.xml b/pom.xml index e65edca..02b4c3f 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ pulsar-auth-contrib pulsar-rpc-contrib pulsar-admin-mcp-contrib + pulsar-metadata-etcd-contrib diff --git a/pulsar-metadata-etcd-contrib/pom.xml b/pulsar-metadata-etcd-contrib/pom.xml new file mode 100644 index 0000000..066fb55 --- /dev/null +++ b/pulsar-metadata-etcd-contrib/pom.xml @@ -0,0 +1,175 @@ + + + + 4.0.0 + + org.apache + pulsar-java-contrib + 1.0.0-SNAPSHOT + + + pulsar-metadata-etcd-contrib + Pulsar Metadata Etcd Contrib + Etcd metadata store provider plugin for Apache Pulsar + + + 0.7.7 + 1.60.0 + 2.17.2 + 3.3.2 + 4.2.0-SNAPSHOT + + + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${jackson-dataformat-yaml.version} + + + + dev.failsafe + failsafe + ${failsafe.version} + + + + io.etcd + jetcd-core + ${jetcd.version} + + + io.grpc + grpc-netty + + + io.netty + * + + + javax.annotation + javax.annotation-api + + + + + + io.etcd + jetcd-test + ${jetcd.version} + test + + + io.etcd + jetcd-api + + + io.etcd + jetcd-core + + + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + + io.grpc + grpc-protobuf + ${grpc.version} + + + + io.grpc + grpc-stub + ${grpc.version} + + + org.apache.pulsar + pulsar-metadata + ${pulsar-metadata.version} + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.6.0 + + + + shade + + package + + true + true + false + + + io.etcd:* + io.vertx:* + + + + + io.vertx + org.apache.pulsar.jetcd.shaded.io.vertx + + + io.grpc.netty + io.grpc.netty.shaded.io.grpc.netty + + + io.netty + io.grpc.netty.shaded.io.netty + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + true + + + + + + + + + + + + 2025 + diff --git a/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java new file mode 100644 index 0000000..e94e14c --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java @@ -0,0 +1,539 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.metadata.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.ClientBuilder; +import io.etcd.jetcd.KV; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Txn; +import io.etcd.jetcd.kv.DeleteResponse; +import io.etcd.jetcd.kv.GetResponse; +import io.etcd.jetcd.kv.PutResponse; +import io.etcd.jetcd.kv.TxnResponse; +import io.etcd.jetcd.lease.LeaseKeepAliveResponse; +import io.etcd.jetcd.op.Cmp; +import io.etcd.jetcd.op.CmpTarget; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; +import io.etcd.jetcd.options.WatchOption; +import io.etcd.jetcd.support.CloseableClient; +import io.etcd.jetcd.watch.WatchEvent; +import io.etcd.jetcd.watch.WatchResponse; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider; +import io.grpc.stub.StreamObserver; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore; +import org.apache.pulsar.metadata.impl.batching.MetadataOp; +import org.apache.pulsar.metadata.impl.batching.OpDelete; +import org.apache.pulsar.metadata.impl.batching.OpGet; +import org.apache.pulsar.metadata.impl.batching.OpGetChildren; +import org.apache.pulsar.metadata.impl.batching.OpPut; + +@Slf4j +public class EtcdMetadataStore extends AbstractBatchedMetadataStore { + + static final String ETCD_SCHEME = "etcd"; + static final String ETCD_SCHEME_IDENTIFIER = "etcd:"; + + private final int leaseTTLSeconds; + private final Client client; + private final KV kv; + private volatile long leaseId; + private volatile CloseableClient leaseClient; + private final EtcdSessionWatcher sessionWatcher; + + public EtcdMetadataStore( + String metadataURL, MetadataStoreConfig conf, boolean enableSessionWatcher) + throws MetadataStoreException { + super(conf); + + this.leaseTTLSeconds = conf.getSessionTimeoutMillis() / 1000; + try { + this.client = newEtcdClient(metadataURL, conf); + this.kv = client.getKVClient(); + this.client + .getWatchClient() + .watch( + ByteSequence.from("/", StandardCharsets.UTF_8), + WatchOption.newBuilder().isPrefix(true).build(), + this::handleWatchResponse); + if (enableSessionWatcher) { + this.sessionWatcher = + new EtcdSessionWatcher( + client, conf.getSessionTimeoutMillis(), this::receivedSessionEvent); + + // Ensure the lease is created when we start + this.createLease(false).join(); + } else { + sessionWatcher = null; + } + } catch (Exception e) { + throw new MetadataStoreException(e); + } + } + + private Client newEtcdClient(String metadataURL, MetadataStoreConfig conf) throws IOException { + String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, ""); + ClientBuilder clientBuilder = Client.builder().endpoints(etcdUrl.split(",")); + + if (StringUtils.isNotEmpty(conf.getConfigFilePath())) { + try (InputStream inputStream = Files.newInputStream(Paths.get(conf.getConfigFilePath()))) { + EtcdConfig etcdConfig = + new ObjectMapper(new YAMLFactory()).readValue(inputStream, EtcdConfig.class); + if (etcdConfig.isUseTls()) { + File trustCertsFile = readFile(etcdConfig.getTlsTrustCertsFilePath()); + File keyFile = readFile(etcdConfig.getTlsKeyFilePath()); + File certFile = readFile(etcdConfig.getTlsCertificateFilePath()); + SslContext context = + GrpcSslContexts.forClient() + .trustManager(trustCertsFile) + .sslProvider(etcdConfig.getTlsProvider()) + .keyManager(certFile, keyFile) + .build(); + clientBuilder.sslContext(context); + } + + if (StringUtils.isNotEmpty(etcdConfig.getAuthority())) { + clientBuilder.authority(etcdConfig.getAuthority()); + } + } + } + + return clientBuilder.build(); + } + + private File readFile(String path) { + return StringUtils.isEmpty(path) ? null : new File(path); + } + + @Override + public void close() throws Exception { + if (isClosed.compareAndSet(false, true)) { + super.close(); + + if (sessionWatcher != null) { + sessionWatcher.close(); + } + + if (leaseClient != null) { + leaseClient.close(); + } + + if (leaseId != 0) { + client.getLeaseClient().revoke(leaseId); + } + + kv.close(); + client.close(); + } + } + + private static final GetOption EXISTS_GET_OPTION = + GetOption.newBuilder().withCountOnly(true).build(); + private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1).build(); + + @Override + protected CompletableFuture existsFromStore(String path) { + return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION) + .thenApply(gr -> gr.getCount() == 1); + } + + @Override + protected CompletableFuture storePut( + String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + if (!options.contains(CreateOption.Sequential)) { + return super.storePut(path, data, optExpectedVersion, options); + } else { + // First get the version from parent + String parent = parent(path); + if (parent == null) { + parent = "/"; + } + return super.storePut( + parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class)) + // Then create the unique key with the version added in the path + .thenCompose( + stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options)); + } + } + + @Override + protected void batchOperation(List ops) { + try { + Txn txn = kv.txn(); + + // First, set all the conditions + for (int i = 0; i < ops.size(); i++) { + MetadataOp op = ops.get(i); + switch (op.getType()) { + case PUT: + { + OpPut put = op.asPut(); + ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8); + if (put.getOptExpectedVersion().isPresent()) { + long expectedVersion = put.getOptExpectedVersion().get(); + if (expectedVersion == -1L) { + // Check that key does not exist + txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0))); + } else { + txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.version(expectedVersion + 1))); + } + } + break; + } + case DELETE: + { + OpDelete del = op.asDelete(); + ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8); + if (del.getOptExpectedVersion().isPresent()) { + txn.If( + new Cmp( + key, + Cmp.Op.EQUAL, + CmpTarget.version(del.getOptExpectedVersion().get() + 1))); + } + break; + } + default: + break; + } + } + + // Then the requests + for (int i = 0; i < ops.size(); i++) { + MetadataOp op = ops.get(i); + switch (op.getType()) { + case GET: + { + txn.Then( + Op.get( + ByteSequence.from(op.asGet().getPath(), StandardCharsets.UTF_8), + SINGLE_GET_OPTION)); + break; + } + case PUT: + { + OpPut put = op.asPut(); + ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8); + if (!put.getFuture().isDone()) { + PutOption.Builder b = PutOption.newBuilder().withPrevKV(); + + if (put.isEphemeral()) { + b.withLeaseId(leaseId); + } + + txn.Then(Op.put(key, ByteSequence.from(put.getData()), b.build())); + } + break; + } + case DELETE: + { + OpDelete del = op.asDelete(); + ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8); + txn.Then(Op.delete(key, DeleteOption.DEFAULT)); + break; + } + case GET_CHILDREN: + { + OpGetChildren opGetChildren = op.asGetChildren(); + String path = opGetChildren.getPath(); + + ByteSequence prefix = + ByteSequence.from(path.equals("/") ? path : path + "/", StandardCharsets.UTF_8); + + txn.Then( + Op.get( + prefix, + GetOption.newBuilder() + .withKeysOnly(true) + .withSortField(GetOption.SortTarget.KEY) + .withSortOrder(GetOption.SortOrder.ASCEND) + .isPrefix(true) + .build())); + break; + } + default: + break; + } + } + + txn.commit() + .thenAccept( + txnResponse -> { + handleBatchOperationResult(txnResponse, ops); + }) + .exceptionally( + ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof ExecutionException || cause instanceof CompletionException) { + cause = cause.getCause(); + } + if (ops.size() > 1 && cause instanceof StatusRuntimeException) { + Status.Code code = ((StatusRuntimeException) cause).getStatus().getCode(); + if (code == Status.Code.INVALID_ARGUMENT + || code == Status.Code.RESOURCE_EXHAUSTED) { + for (int i = 0; i < ops.size(); i++) { + batchOperation(Collections.singletonList(ops.get(i))); + } + } + } else { + log.warn("Failed to commit: {}", cause.getMessage()); + for (int i = 0; i < ops.size(); i++) { + ops.get(i).getFuture().completeExceptionally(ex); + } + } + return null; + }); + } catch (Throwable t) { + log.warn("Error in committing batch: {}", t.getMessage()); + } + } + + private void handleBatchOperationResult(TxnResponse txnResponse, List ops) { + safeExecuteCallbacks( + () -> { + if (!txnResponse.isSucceeded()) { + if (ops.size() > 1) { + // Retry individually + for (int i = 0; i < ops.size(); i++) { + batchOperation(Collections.singletonList(ops.get(i))); + } + } else { + ops.get(0) + .getFuture() + .completeExceptionally( + new MetadataStoreException.BadVersionException("Bad version")); + } + return; + } + + int getIdx = 0; + int deletedIdx = 0; + int putIdx = 0; + for (int i = 0; i < ops.size(); i++) { + MetadataOp op = ops.get(i); + switch (op.getType()) { + case GET: + { + OpGet get = op.asGet(); + GetResponse gr = txnResponse.getGetResponses().get(getIdx++); + if (gr.getCount() == 0) { + get.getFuture().complete(Optional.empty()); + } else { + KeyValue kv = gr.getKvs().get(0); + boolean isEphemeral = kv.getLease() != 0; + boolean createdBySelf = kv.getLease() == leaseId; + get.getFuture() + .complete( + Optional.of( + new GetResult( + kv.getValue().getBytes(), + new Stat( + get.getPath(), + kv.getVersion() - 1, + 0, + 0, + isEphemeral, + createdBySelf)))); + } + break; + } + case PUT: + { + OpPut put = op.asPut(); + PutResponse pr = txnResponse.getPutResponses().get(putIdx++); + KeyValue prevKv = pr.getPrevKv(); + if (prevKv == null) { + put.getFuture() + .complete(new Stat(put.getPath(), 0, 0, 0, put.isEphemeral(), true)); + } else { + put.getFuture() + .complete( + new Stat( + put.getPath(), prevKv.getVersion(), 0, 0, put.isEphemeral(), true)); + } + break; + } + case DELETE: + { + OpDelete del = op.asDelete(); + DeleteResponse dr = txnResponse.getDeleteResponses().get(deletedIdx++); + if (dr.getDeleted() == 0) { + del.getFuture() + .completeExceptionally(new MetadataStoreException.NotFoundException()); + } else { + del.getFuture().complete(null); + } + break; + } + case GET_CHILDREN: + { + OpGetChildren getChildren = op.asGetChildren(); + GetResponse gr = txnResponse.getGetResponses().get(getIdx++); + String basePath = + getChildren.getPath().equals("/") ? "/" : getChildren.getPath() + "/"; + + Set children = + gr.getKvs().stream() + .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) + .map(p -> p.replaceFirst(basePath, "")) + // Only return first-level children + .map(k -> k.split("/", 2)[0]) + .collect(Collectors.toCollection(TreeSet::new)); + + getChildren.getFuture().complete(new ArrayList<>(children)); + } + } + } + }, + ops); + } + + private synchronized CompletableFuture createLease(boolean retryOnFailure) { + CompletableFuture future = + client + .getLeaseClient() + .grant(leaseTTLSeconds) + .thenAccept( + lease -> { + synchronized (this) { + this.leaseId = lease.getID(); + + if (leaseClient != null) { + leaseClient.close(); + } + this.leaseClient = + this.client + .getLeaseClient() + .keepAlive( + leaseId, + new StreamObserver() { + @Override + public void onNext( + LeaseKeepAliveResponse leaseKeepAliveResponse) { + if (log.isDebugEnabled()) { + log.debug("On next: {}", leaseKeepAliveResponse); + } + } + + @Override + public void onError(Throwable throwable) { + log.warn("Lease client error :", throwable); + receivedSessionEvent(SessionEvent.SessionLost); + } + + @Override + public void onCompleted() { + log.info("Etcd lease has expired"); + receivedSessionEvent(SessionEvent.SessionLost); + } + }); + } + }); + + if (retryOnFailure) { + future.exceptionally( + ex -> { + log.warn("Failed to create Etcd lease. Retrying later", ex); + scheduleDelayedTask(1, TimeUnit.SECONDS, () -> createLease(true)); + return null; + }); + } + + return future; + } + + private void handleWatchResponse(WatchResponse watchResponse) { + for (WatchEvent we : watchResponse.getEvents()) { + String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8); + if (we.getEventType() == WatchEvent.EventType.PUT) { + if (we.getKeyValue().getVersion() == 1) { + receivedNotification(new Notification(NotificationType.Created, path)); + notifyParentChildrenChanged(path); + } else { + receivedNotification(new Notification(NotificationType.Modified, path)); + } + } else if (we.getEventType() == WatchEvent.EventType.DELETE) { + receivedNotification(new Notification(NotificationType.Deleted, path)); + notifyParentChildrenChanged(path); + } + } + } + + @Override + protected void receivedSessionEvent(SessionEvent event) { + if (event == SessionEvent.SessionReestablished) { + // Re-create the lease before notifying that we are reconnected + createLease(true).thenRun(() -> super.receivedSessionEvent(event)); + } else { + super.receivedSessionEvent(event); + } + } +} + +@AllArgsConstructor +@NoArgsConstructor +@Data +@Builder +class EtcdConfig { + private boolean useTls; + + private SslProvider tlsProvider; + private String tlsTrustCertsFilePath; + private String tlsKeyFilePath; + private String tlsCertificateFilePath; + + private String authority; +} diff --git a/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java new file mode 100644 index 0000000..f3ec39a --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.metadata.impl; + +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreProvider; + +/** + * Etcd metadata store provider plugin. + * + *

To use this provider, add the shaded jar to the Pulsar classpath and configure the system + * property: + * + *

+ * -Dpulsar.metadata.store.providers=org.apache.pulsar.metadata.impl.EtcdMetadataStoreProvider
+ * 
+ * + *

Then use {@code etcd:} URLs for your metadata store configuration. + */ +public class EtcdMetadataStoreProvider implements MetadataStoreProvider { + + @Override + public String urlScheme() { + return EtcdMetadataStore.ETCD_SCHEME; + } + + @Override + public MetadataStore create( + String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) + throws MetadataStoreException { + return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher); + } +} diff --git a/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java new file mode 100644 index 0000000..20b237f --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.metadata.impl; + +import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.metadata.api.extended.SessionEvent; + +/** Monitor the Etcd session state every few seconds and send notifications. */ +@Slf4j +public class EtcdSessionWatcher implements AutoCloseable { + private final Client client; + + private SessionEvent currentStatus; + private final Consumer sessionListener; + + // Maximum time to wait for Etcd lease to be re-connected to quorum (set to 5/6 of + // SessionTimeout) + private final long monitorTimeoutMillis; + + // Interval at which we check the state of the Etcd connection (set to 1/15 of SessionTimeout) + private final long tickTimeMillis; + + private final ScheduledExecutorService scheduler; + private final ScheduledFuture task; + + private long disconnectedAt = 0; + + public EtcdSessionWatcher( + Client client, long sessionTimeoutMillis, Consumer sessionListener) { + this.client = client; + this.monitorTimeoutMillis = sessionTimeoutMillis * 5 / 6; + this.tickTimeMillis = sessionTimeoutMillis / 15; + this.sessionListener = sessionListener; + + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("metadata-store-etcd-session-watcher")); + this.task = + scheduler.scheduleAtFixedRate( + catchingAndLoggingThrowables(this::checkConnectionStatus), + tickTimeMillis, + tickTimeMillis, + TimeUnit.MILLISECONDS); + this.currentStatus = SessionEvent.SessionReestablished; + } + + @Override + public void close() throws Exception { + task.cancel(true); + scheduler.shutdownNow(); + scheduler.awaitTermination(10, TimeUnit.SECONDS); + } + + // task that runs every TICK_TIME to check Etcd connection + private synchronized void checkConnectionStatus() { + try { + CompletableFuture future = new CompletableFuture<>(); + client + .getKVClient() + .get(ByteSequence.from("/".getBytes(StandardCharsets.UTF_8))) + .thenRun( + () -> { + future.complete(SessionEvent.Reconnected); + }) + .exceptionally( + ex -> { + future.complete(SessionEvent.ConnectionLost); + return null; + }); + + SessionEvent etcdClientState; + try { + etcdClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // Consider etcd disconnection if etcd operation takes more than TICK_TIME + etcdClientState = SessionEvent.ConnectionLost; + } + + checkState(etcdClientState); + } catch (RejectedExecutionException | InterruptedException e) { + task.cancel(true); + } catch (Throwable t) { + log.warn("Error while checking Etcd connection status", t); + } + } + + synchronized void setSessionInvalid() { + currentStatus = SessionEvent.SessionLost; + } + + private void checkState(SessionEvent etcdClientState) { + switch (etcdClientState) { + case SessionLost: + if (currentStatus != SessionEvent.SessionLost) { + log.error("Etcd lease has expired"); + currentStatus = SessionEvent.SessionLost; + sessionListener.accept(currentStatus); + } + break; + + case ConnectionLost: + if (disconnectedAt == 0) { + disconnectedAt = System.nanoTime(); + } + + long timeRemainingMillis = + monitorTimeoutMillis + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt); + if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) { + log.error("Etcd lease keep-alive timeout. Notifying session is lost."); + currentStatus = SessionEvent.SessionLost; + sessionListener.accept(currentStatus); + } else if (currentStatus != SessionEvent.SessionLost) { + log.warn( + "Etcd client is disconnected. Waiting to reconnect, time remaining = {} seconds", + timeRemainingMillis / 1000.0); + if (currentStatus == SessionEvent.SessionReestablished) { + currentStatus = SessionEvent.ConnectionLost; + sessionListener.accept(currentStatus); + } + } + break; + + default: + if (currentStatus != SessionEvent.SessionReestablished) { + log.info( + "Etcd client reconnection with server quorum. Current status: {}", currentStatus); + disconnectedAt = 0; + + sessionListener.accept(SessionEvent.Reconnected); + if (currentStatus == SessionEvent.SessionLost) { + sessionListener.accept(SessionEvent.SessionReestablished); + } + currentStatus = SessionEvent.SessionReestablished; + } + break; + } + } +} diff --git a/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java b/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java new file mode 100644 index 0000000..ddb8714 --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java @@ -0,0 +1,172 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.metadata.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.io.Resources; +import io.etcd.jetcd.launcher.EtcdCluster; +import io.etcd.jetcd.test.EtcdClusterExtension; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.testng.annotations.Test; + +@Slf4j +public class EtcdMetadataStoreTest { + + private String getMetadataUrl(EtcdCluster etcdCluster) { + return "etcd:" + + etcdCluster.clientEndpoints().stream() + .map(URI::toString) + .collect(Collectors.joining(",")); + } + + private MetadataStore createStore(EtcdCluster etcdCluster) throws MetadataStoreException { + String metadataURL = getMetadataUrl(etcdCluster); + return new EtcdMetadataStore(metadataURL, MetadataStoreConfig.builder().build(), true); + } + + @Test + public void testBasicOperations() throws Exception { + @Cleanup + EtcdCluster etcdCluster = + EtcdClusterExtension.builder() + .withClusterName("test-basic") + .withNodes(1) + .withSsl(false) + .build() + .cluster(); + etcdCluster.start(); + + @Cleanup MetadataStore store = createStore(etcdCluster); + + // Test put and get + store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + assertTrue(store.exists("/test").join()); + + Optional result = store.get("/test").join(); + assertTrue(result.isPresent()); + assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "value"); + + // Test update + store + .put( + "/test", + "value2".getBytes(StandardCharsets.UTF_8), + Optional.of(result.get().getStat().getVersion())) + .join(); + result = store.get("/test").join(); + assertTrue(result.isPresent()); + assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "value2"); + + // Test delete + store.delete("/test", Optional.empty()).join(); + assertFalse(store.exists("/test").join()); + } + + @Test + public void testGetChildren() throws Exception { + @Cleanup + EtcdCluster etcdCluster = + EtcdClusterExtension.builder() + .withClusterName("test-children") + .withNodes(1) + .withSsl(false) + .build() + .cluster(); + etcdCluster.start(); + + @Cleanup MetadataStore store = createStore(etcdCluster); + + store.put("/parent/child1", "v1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + store.put("/parent/child2", "v2".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + store.put("/parent/child3", "v3".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + + List children = store.getChildren("/parent").join(); + assertEquals(children.size(), 3); + assertTrue(children.contains("child1")); + assertTrue(children.contains("child2")); + assertTrue(children.contains("child3")); + } + + @Test + public void testCluster() throws Exception { + @Cleanup + EtcdCluster etcdCluster = + EtcdClusterExtension.builder() + .withClusterName("test-cluster") + .withNodes(3) + .withSsl(false) + .build() + .cluster(); + etcdCluster.start(); + + @Cleanup MetadataStore store = createStore(etcdCluster); + + store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + assertTrue(store.exists("/test").join()); + } + + @Test + public void testClusterWithTls() throws Exception { + @Cleanup + EtcdCluster etcdCluster = + EtcdClusterExtension.builder() + .withClusterName("test-cluster-tls") + .withNodes(3) + .withSsl(true) + .build() + .cluster(); + etcdCluster.start(); + + EtcdConfig etcdConfig = + EtcdConfig.builder() + .useTls(true) + .tlsProvider(null) + .authority("etcd0") + .tlsTrustCertsFilePath(Resources.getResource("ssl/cert/ca.pem").getPath()) + .tlsKeyFilePath(Resources.getResource("ssl/cert/client-key-pk8.pem").getPath()) + .tlsCertificateFilePath(Resources.getResource("ssl/cert/client.pem").getPath()) + .build(); + + Path etcdConfigPath = Files.createTempFile("etcd_config_cluster_ssl", ".yml"); + new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig); + + String metadataURL = getMetadataUrl(etcdCluster); + + @Cleanup + MetadataStore store = + new EtcdMetadataStore( + metadataURL, + MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build(), + true); + + store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join(); + assertTrue(store.exists("/test").join()); + } +} diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem new file mode 100644 index 0000000..34a8593 --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC6jCCAdKgAwIBAgIUXHRN1WrCXk813Zz/mWdg3YAu9OIwDQYJKoZIhvcNAQEL +BQAwDTELMAkGA1UEAxMCQ0EwHhcNMjIwNDI5MDQyMjAwWhcNMjcwNDI4MDQyMjAw +WjANMQswCQYDVQQDEwJDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB +AMlF30dWtxQLGGo55GGAElYLzgWPY4ha1njX4vR2bAxesbOft73pqWeRWMPbJsyQ +cUcFe6+oPEyW11uFdNw647pN2PVUmX5Obsmlk97c69LmFe4WEfZDN0cm4pjdXG7V +BCyuyyr3YN2PoB67xT00NyNIK1yOIHghnw4dO5j2BZh/EqrQGNtqiW/LBQWiKu+n +bLbipl+eWmxCVl0BuQdgl8bmGGrHsncwCTZPMvsxHyVWbOKVoDXAuAqM40HJdjbK +JF/DyBPzmgsSMGmwcNTjRRx3ApRcFu4p154qdX2BPknut+PFOrDlcArdsExmJwST +qi2cNb7mWWF1FMHYKdGPuqUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgEGMA8GA1Ud +EwEB/wQFMAMBAf8wHQYDVR0OBBYEFB0oynraPRsiMaCbp4VFErPtqn79MA0GCSqG +SIb3DQEBCwUAA4IBAQAsOdb6+QjABRa7LMIERRy45V1l3lVy04WzLUWEGOeakakE +I58NDopYY2NDfGPi/gPkErAo8Jo8ruRKPccguHlTYGTwjUBjKGBSySMpalw5DhN0 +iygHdKh9qzqL1ChqcQsQPlmhKgsAYkZyQzD2Gh6c/GpNMe6/NUmjlg0KcnKY3+Vx +kzy3qWXxiNfSywJlbk6UVWOy0cCxHV+fE0gzy+DgNrcWm6euSPAD4Qz8kXvQKevB +syTpCxqsKNKG+aVZDoW2BTyrROnnk/lqZQ7k4sKZTcYgTrpNSgylbg7rxVqOIyBk +G+jAW18vmOlsZpXE306Pqsng6csUJ3IXoHqaBbrm +-----END CERTIFICATE----- diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem new file mode 100644 index 0000000..a7fec9b --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC/RbW6dJqk2+BN +SZKAs1M0RQt6PrhIHHhxBBPXFM3XOqKwacrQihuZQSGLgaxfzXbRejwQ9VBg5Ia/ +tOokwDY3tK9Kp4J0bsqgNXZKTRYtnJ4JYh9566jsD47Qpj6xuQZmo3b1QKLEh+Fh +9buWTEvvdUY8AQTASwvOGAWyzrZgaKuagQQNkZ49TM8gtsy9uJPWS2PZQpW/qk56 +5jRck8Xb8COQ0QV2wPiqwmEtA7/jEeQViQ49HLdfBTi90Hh4Q8XBv8TPW+sQpN5R +5fvZEz+iwrGm20kIrtVQ+hMCkbywTlA5rSGW7onnakDyqybt10dBzn609/wg5Dee +LjMZ3dEvAgMBAAECggEAR1r5NcsEWhZQ8mRNDEhZ9PkBFCTL2NMON5M+15FCTVXp +lYiSCgRL0XuTyRiiNsdO2U0RlX3+83atsl0KsJUoZNW6Q06Eg21FmEj5jTR+3ps7 +9eIuPeylgxM2wy4R23lcIvQ+j7YCQvEyKrpitepWtcl5Xy8+F4Knr8YUciVdsk8T +a2DxkjXRXG6RW71Hqdd4bpXszM2E22trjKgouy/PV4LOhSCzyDdwAq4fKLBmS7l5 +2j57Ag0/hDZ7L4nW6jtRG1NBonBk7FhPrd4UCGqxaYwfTYvDWDKpQkJgbkWSYCwc +pUVAqreYi5+udKhL0dXsKrZGE9I20tQlZkaoBBgreQKBgQDAmu73weinuMp9nC6M +gSXQxuWL02ybEQFsZioP8oGSP+wZDCrSn1HD52iSSZqEoNLoYm0u8u3D9FbWoiw/ +n8BPqTLjLXUozhDdVuvg6YkKxHG2So7ve04UOXBQqCd/pOuNXRUUYUQ+FMq8vmjD +dRqVo1gH/qUExa4SeTzoOWUXawKBgQD+Onb9+ExPweZjy+T0eAO1FMFP//KsNwSI +vlf22jUbCqiu6Pem1m/31Rju04AGydWhdHgd+7PG8/arVyLcgNQtTotXtD7zTCVe +jkr42R/zbG46c0/z8brUXpYOHOjdgFbY62vwWjykulFsruf36hit9/D+tKVNOgep +tduFS8DSTQKBgC+oJmj3efHGL5RVCM+LRSgbjsDCV6Um2AtSXCYGAzmEx46LDC2B +bmHi6GUKAUm/4W/OquVrBpnt427IQdqcVKFhZE4B+XNXSaT61PKZ1mbrpJdOa3+m +KvOmIrxSXzOeQwp/da/NQW17B48cLh/u4d0Uxbt0rrA3mZLInOF5EiJxAoGBAPGS +GHOntsuq0gNOQZbTW6J7wF0GNk/ST6qoQ+m62u+BJ1xc3sZXyTlT8kcuDd9ldmve +wiyred64/1E8kVG50OPkWJ/UFGUXnALHbxIbLzMde3hrDjQdJIyb/DYY3mVriBrD +SWOwOyPEL474fE+k0CKvEP7WJKTHWXS364ozu1uZAoGBAMBXRwDQZgs+Xn3lTohu +1NLlHhNIuUzcb+4ROn2StoxZS3mEv2W0KWRTzNJGRr4+kbLMcY7K1jOk0qm6fwcG +HAJnKkxu9E3hFQVUT14CIKPwi3TQCNjmZqj+LaEwe+75w4rXTajwl3WY58KngA5/ +XVn2W3KsXAKOAteL/07xtDEi +-----END PRIVATE KEY----- diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem new file mode 100644 index 0000000..6e3d149 --- /dev/null +++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDPDCCAiSgAwIBAgIUdYkf8VBmJYG6HvKxh84BHYaTIH8wDQYJKoZIhvcNAQEL +BQAwDTELMAkGA1UEAxMCQ0EwIBcNMjIwNDI5MDQyMjAwWhgPMjEyMjA0MDUwNDIy +MDBaMBExDzANBgNVBAMTBmNsaWVudDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC +AQoCggEBAL9Ftbp0mqTb4E1JkoCzUzRFC3o+uEgceHEEE9cUzdc6orBpytCKG5lB +IYuBrF/NdtF6PBD1UGDkhr+06iTANje0r0qngnRuyqA1dkpNFi2cngliH3nrqOwP +jtCmPrG5BmajdvVAosSH4WH1u5ZMS+91RjwBBMBLC84YBbLOtmBoq5qBBA2Rnj1M +zyC2zL24k9ZLY9lClb+qTnrmNFyTxdvwI5DRBXbA+KrCYS0Dv+MR5BWJDj0ct18F +OL3QeHhDxcG/xM9b6xCk3lHl+9kTP6LCsabbSQiu1VD6EwKRvLBOUDmtIZbuiedq +QPKrJu3XR0HOfrT3/CDkN54uMxnd0S8CAwEAAaOBjTCBijAOBgNVHQ8BAf8EBAMC +BaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAw +HQYDVR0OBBYEFB/UlCHqPmkA3xcYzddO9kD1w8o1MB8GA1UdIwQYMBaAFB0oynra +PRsiMaCbp4VFErPtqn79MAsGA1UdEQQEMAKCADANBgkqhkiG9w0BAQsFAAOCAQEA +AneFIIHBZ21J24+lln995ofX+92Yeu528IVy1WJtTGIpHVN6Fc+jZbsAZqzdhDDd +q9pKawKlDR2bW6mg7ItF2coYprMoLtHeFAwSUIg5WcMFUgGHxITFDQlscD2mR54Y +I1otVWegrL2PDyKs2uk5B4Jwm+O/0fbyG+D3FIje9y++gh6Oqi/uwn8YUgnLl/4e +Yf8POmMrUcOmJn4tXX4y6HtacNR3n0leby8T1dBShNAESuBdmEo8bmgAADh/brfP +kzGHBdO+AyHEs87TRMZ4ofhspM6m1ZNyazy861xhoYjUcjIoD4vaVsEGIRT9/2HM +4l8oq0OyarhZRLIpJ11SJQ== +-----END CERTIFICATE-----