() {
+ @Override
+ public void onNext(
+ LeaseKeepAliveResponse leaseKeepAliveResponse) {
+ if (log.isDebugEnabled()) {
+ log.debug("On next: {}", leaseKeepAliveResponse);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn("Lease client error :", throwable);
+ receivedSessionEvent(SessionEvent.SessionLost);
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("Etcd lease has expired");
+ receivedSessionEvent(SessionEvent.SessionLost);
+ }
+ });
+ }
+ });
+
+ if (retryOnFailure) {
+ future.exceptionally(
+ ex -> {
+ log.warn("Failed to create Etcd lease. Retrying later", ex);
+ scheduleDelayedTask(1, TimeUnit.SECONDS, () -> createLease(true));
+ return null;
+ });
+ }
+
+ return future;
+ }
+
+ private void handleWatchResponse(WatchResponse watchResponse) {
+ for (WatchEvent we : watchResponse.getEvents()) {
+ String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
+ if (we.getEventType() == WatchEvent.EventType.PUT) {
+ if (we.getKeyValue().getVersion() == 1) {
+ receivedNotification(new Notification(NotificationType.Created, path));
+ notifyParentChildrenChanged(path);
+ } else {
+ receivedNotification(new Notification(NotificationType.Modified, path));
+ }
+ } else if (we.getEventType() == WatchEvent.EventType.DELETE) {
+ receivedNotification(new Notification(NotificationType.Deleted, path));
+ notifyParentChildrenChanged(path);
+ }
+ }
+ }
+
+ @Override
+ protected void receivedSessionEvent(SessionEvent event) {
+ if (event == SessionEvent.SessionReestablished) {
+ // Re-create the lease before notifying that we are reconnected
+ createLease(true).thenRun(() -> super.receivedSessionEvent(event));
+ } else {
+ super.receivedSessionEvent(event);
+ }
+ }
+}
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+@Builder
+class EtcdConfig {
+ private boolean useTls;
+
+ private SslProvider tlsProvider;
+ private String tlsTrustCertsFilePath;
+ private String tlsKeyFilePath;
+ private String tlsCertificateFilePath;
+
+ private String authority;
+}
diff --git a/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java
new file mode 100644
index 0000000..f3ec39a
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.MetadataStoreProvider;
+
+/**
+ * Etcd metadata store provider plugin.
+ *
+ * To use this provider, add the shaded jar to the Pulsar classpath and configure the system
+ * property:
+ *
+ *
+ * -Dpulsar.metadata.store.providers=org.apache.pulsar.metadata.impl.EtcdMetadataStoreProvider
+ *
+ *
+ * Then use {@code etcd:} URLs for your metadata store configuration.
+ */
+public class EtcdMetadataStoreProvider implements MetadataStoreProvider {
+
+ @Override
+ public String urlScheme() {
+ return EtcdMetadataStore.ETCD_SCHEME;
+ }
+
+ @Override
+ public MetadataStore create(
+ String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher)
+ throws MetadataStoreException {
+ return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
+ }
+}
diff --git a/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
new file mode 100644
index 0000000..20b237f
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+
+/** Monitor the Etcd session state every few seconds and send notifications. */
+@Slf4j
+public class EtcdSessionWatcher implements AutoCloseable {
+ private final Client client;
+
+ private SessionEvent currentStatus;
+ private final Consumer sessionListener;
+
+ // Maximum time to wait for Etcd lease to be re-connected to quorum (set to 5/6 of
+ // SessionTimeout)
+ private final long monitorTimeoutMillis;
+
+ // Interval at which we check the state of the Etcd connection (set to 1/15 of SessionTimeout)
+ private final long tickTimeMillis;
+
+ private final ScheduledExecutorService scheduler;
+ private final ScheduledFuture> task;
+
+ private long disconnectedAt = 0;
+
+ public EtcdSessionWatcher(
+ Client client, long sessionTimeoutMillis, Consumer sessionListener) {
+ this.client = client;
+ this.monitorTimeoutMillis = sessionTimeoutMillis * 5 / 6;
+ this.tickTimeMillis = sessionTimeoutMillis / 15;
+ this.sessionListener = sessionListener;
+
+ this.scheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("metadata-store-etcd-session-watcher"));
+ this.task =
+ scheduler.scheduleAtFixedRate(
+ catchingAndLoggingThrowables(this::checkConnectionStatus),
+ tickTimeMillis,
+ tickTimeMillis,
+ TimeUnit.MILLISECONDS);
+ this.currentStatus = SessionEvent.SessionReestablished;
+ }
+
+ @Override
+ public void close() throws Exception {
+ task.cancel(true);
+ scheduler.shutdownNow();
+ scheduler.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ // task that runs every TICK_TIME to check Etcd connection
+ private synchronized void checkConnectionStatus() {
+ try {
+ CompletableFuture future = new CompletableFuture<>();
+ client
+ .getKVClient()
+ .get(ByteSequence.from("/".getBytes(StandardCharsets.UTF_8)))
+ .thenRun(
+ () -> {
+ future.complete(SessionEvent.Reconnected);
+ })
+ .exceptionally(
+ ex -> {
+ future.complete(SessionEvent.ConnectionLost);
+ return null;
+ });
+
+ SessionEvent etcdClientState;
+ try {
+ etcdClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // Consider etcd disconnection if etcd operation takes more than TICK_TIME
+ etcdClientState = SessionEvent.ConnectionLost;
+ }
+
+ checkState(etcdClientState);
+ } catch (RejectedExecutionException | InterruptedException e) {
+ task.cancel(true);
+ } catch (Throwable t) {
+ log.warn("Error while checking Etcd connection status", t);
+ }
+ }
+
+ synchronized void setSessionInvalid() {
+ currentStatus = SessionEvent.SessionLost;
+ }
+
+ private void checkState(SessionEvent etcdClientState) {
+ switch (etcdClientState) {
+ case SessionLost:
+ if (currentStatus != SessionEvent.SessionLost) {
+ log.error("Etcd lease has expired");
+ currentStatus = SessionEvent.SessionLost;
+ sessionListener.accept(currentStatus);
+ }
+ break;
+
+ case ConnectionLost:
+ if (disconnectedAt == 0) {
+ disconnectedAt = System.nanoTime();
+ }
+
+ long timeRemainingMillis =
+ monitorTimeoutMillis
+ - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
+ if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) {
+ log.error("Etcd lease keep-alive timeout. Notifying session is lost.");
+ currentStatus = SessionEvent.SessionLost;
+ sessionListener.accept(currentStatus);
+ } else if (currentStatus != SessionEvent.SessionLost) {
+ log.warn(
+ "Etcd client is disconnected. Waiting to reconnect, time remaining = {} seconds",
+ timeRemainingMillis / 1000.0);
+ if (currentStatus == SessionEvent.SessionReestablished) {
+ currentStatus = SessionEvent.ConnectionLost;
+ sessionListener.accept(currentStatus);
+ }
+ }
+ break;
+
+ default:
+ if (currentStatus != SessionEvent.SessionReestablished) {
+ log.info(
+ "Etcd client reconnection with server quorum. Current status: {}", currentStatus);
+ disconnectedAt = 0;
+
+ sessionListener.accept(SessionEvent.Reconnected);
+ if (currentStatus == SessionEvent.SessionLost) {
+ sessionListener.accept(SessionEvent.SessionReestablished);
+ }
+ currentStatus = SessionEvent.SessionReestablished;
+ }
+ break;
+ }
+ }
+}
diff --git a/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java b/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
new file mode 100644
index 0000000..ddb8714
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/test/java/org/apache/pulsar/metadata/impl/EtcdMetadataStoreTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.io.Resources;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.test.EtcdClusterExtension;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class EtcdMetadataStoreTest {
+
+ private String getMetadataUrl(EtcdCluster etcdCluster) {
+ return "etcd:"
+ + etcdCluster.clientEndpoints().stream()
+ .map(URI::toString)
+ .collect(Collectors.joining(","));
+ }
+
+ private MetadataStore createStore(EtcdCluster etcdCluster) throws MetadataStoreException {
+ String metadataURL = getMetadataUrl(etcdCluster);
+ return new EtcdMetadataStore(metadataURL, MetadataStoreConfig.builder().build(), true);
+ }
+
+ @Test
+ public void testBasicOperations() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
+ EtcdClusterExtension.builder()
+ .withClusterName("test-basic")
+ .withNodes(1)
+ .withSsl(false)
+ .build()
+ .cluster();
+ etcdCluster.start();
+
+ @Cleanup MetadataStore store = createStore(etcdCluster);
+
+ // Test put and get
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ assertTrue(store.exists("/test").join());
+
+ Optional result = store.get("/test").join();
+ assertTrue(result.isPresent());
+ assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "value");
+
+ // Test update
+ store
+ .put(
+ "/test",
+ "value2".getBytes(StandardCharsets.UTF_8),
+ Optional.of(result.get().getStat().getVersion()))
+ .join();
+ result = store.get("/test").join();
+ assertTrue(result.isPresent());
+ assertEquals(new String(result.get().getValue(), StandardCharsets.UTF_8), "value2");
+
+ // Test delete
+ store.delete("/test", Optional.empty()).join();
+ assertFalse(store.exists("/test").join());
+ }
+
+ @Test
+ public void testGetChildren() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
+ EtcdClusterExtension.builder()
+ .withClusterName("test-children")
+ .withNodes(1)
+ .withSsl(false)
+ .build()
+ .cluster();
+ etcdCluster.start();
+
+ @Cleanup MetadataStore store = createStore(etcdCluster);
+
+ store.put("/parent/child1", "v1".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ store.put("/parent/child2", "v2".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ store.put("/parent/child3", "v3".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+
+ List children = store.getChildren("/parent").join();
+ assertEquals(children.size(), 3);
+ assertTrue(children.contains("child1"));
+ assertTrue(children.contains("child2"));
+ assertTrue(children.contains("child3"));
+ }
+
+ @Test
+ public void testCluster() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
+ EtcdClusterExtension.builder()
+ .withClusterName("test-cluster")
+ .withNodes(3)
+ .withSsl(false)
+ .build()
+ .cluster();
+ etcdCluster.start();
+
+ @Cleanup MetadataStore store = createStore(etcdCluster);
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ assertTrue(store.exists("/test").join());
+ }
+
+ @Test
+ public void testClusterWithTls() throws Exception {
+ @Cleanup
+ EtcdCluster etcdCluster =
+ EtcdClusterExtension.builder()
+ .withClusterName("test-cluster-tls")
+ .withNodes(3)
+ .withSsl(true)
+ .build()
+ .cluster();
+ etcdCluster.start();
+
+ EtcdConfig etcdConfig =
+ EtcdConfig.builder()
+ .useTls(true)
+ .tlsProvider(null)
+ .authority("etcd0")
+ .tlsTrustCertsFilePath(Resources.getResource("ssl/cert/ca.pem").getPath())
+ .tlsKeyFilePath(Resources.getResource("ssl/cert/client-key-pk8.pem").getPath())
+ .tlsCertificateFilePath(Resources.getResource("ssl/cert/client.pem").getPath())
+ .build();
+
+ Path etcdConfigPath = Files.createTempFile("etcd_config_cluster_ssl", ".yml");
+ new ObjectMapper(new YAMLFactory()).writeValue(etcdConfigPath.toFile(), etcdConfig);
+
+ String metadataURL = getMetadataUrl(etcdCluster);
+
+ @Cleanup
+ MetadataStore store =
+ new EtcdMetadataStore(
+ metadataURL,
+ MetadataStoreConfig.builder().configFilePath(etcdConfigPath.toString()).build(),
+ true);
+
+ store.put("/test", "value".getBytes(StandardCharsets.UTF_8), Optional.empty()).join();
+ assertTrue(store.exists("/test").join());
+ }
+}
diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem
new file mode 100644
index 0000000..34a8593
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/ca.pem
@@ -0,0 +1,18 @@
+-----BEGIN CERTIFICATE-----
+MIIC6jCCAdKgAwIBAgIUXHRN1WrCXk813Zz/mWdg3YAu9OIwDQYJKoZIhvcNAQEL
+BQAwDTELMAkGA1UEAxMCQ0EwHhcNMjIwNDI5MDQyMjAwWhcNMjcwNDI4MDQyMjAw
+WjANMQswCQYDVQQDEwJDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AMlF30dWtxQLGGo55GGAElYLzgWPY4ha1njX4vR2bAxesbOft73pqWeRWMPbJsyQ
+cUcFe6+oPEyW11uFdNw647pN2PVUmX5Obsmlk97c69LmFe4WEfZDN0cm4pjdXG7V
+BCyuyyr3YN2PoB67xT00NyNIK1yOIHghnw4dO5j2BZh/EqrQGNtqiW/LBQWiKu+n
+bLbipl+eWmxCVl0BuQdgl8bmGGrHsncwCTZPMvsxHyVWbOKVoDXAuAqM40HJdjbK
+JF/DyBPzmgsSMGmwcNTjRRx3ApRcFu4p154qdX2BPknut+PFOrDlcArdsExmJwST
+qi2cNb7mWWF1FMHYKdGPuqUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgEGMA8GA1Ud
+EwEB/wQFMAMBAf8wHQYDVR0OBBYEFB0oynraPRsiMaCbp4VFErPtqn79MA0GCSqG
+SIb3DQEBCwUAA4IBAQAsOdb6+QjABRa7LMIERRy45V1l3lVy04WzLUWEGOeakakE
+I58NDopYY2NDfGPi/gPkErAo8Jo8ruRKPccguHlTYGTwjUBjKGBSySMpalw5DhN0
+iygHdKh9qzqL1ChqcQsQPlmhKgsAYkZyQzD2Gh6c/GpNMe6/NUmjlg0KcnKY3+Vx
+kzy3qWXxiNfSywJlbk6UVWOy0cCxHV+fE0gzy+DgNrcWm6euSPAD4Qz8kXvQKevB
+syTpCxqsKNKG+aVZDoW2BTyrROnnk/lqZQ7k4sKZTcYgTrpNSgylbg7rxVqOIyBk
+G+jAW18vmOlsZpXE306Pqsng6csUJ3IXoHqaBbrm
+-----END CERTIFICATE-----
diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem
new file mode 100644
index 0000000..a7fec9b
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client-key-pk8.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC/RbW6dJqk2+BN
+SZKAs1M0RQt6PrhIHHhxBBPXFM3XOqKwacrQihuZQSGLgaxfzXbRejwQ9VBg5Ia/
+tOokwDY3tK9Kp4J0bsqgNXZKTRYtnJ4JYh9566jsD47Qpj6xuQZmo3b1QKLEh+Fh
+9buWTEvvdUY8AQTASwvOGAWyzrZgaKuagQQNkZ49TM8gtsy9uJPWS2PZQpW/qk56
+5jRck8Xb8COQ0QV2wPiqwmEtA7/jEeQViQ49HLdfBTi90Hh4Q8XBv8TPW+sQpN5R
+5fvZEz+iwrGm20kIrtVQ+hMCkbywTlA5rSGW7onnakDyqybt10dBzn609/wg5Dee
+LjMZ3dEvAgMBAAECggEAR1r5NcsEWhZQ8mRNDEhZ9PkBFCTL2NMON5M+15FCTVXp
+lYiSCgRL0XuTyRiiNsdO2U0RlX3+83atsl0KsJUoZNW6Q06Eg21FmEj5jTR+3ps7
+9eIuPeylgxM2wy4R23lcIvQ+j7YCQvEyKrpitepWtcl5Xy8+F4Knr8YUciVdsk8T
+a2DxkjXRXG6RW71Hqdd4bpXszM2E22trjKgouy/PV4LOhSCzyDdwAq4fKLBmS7l5
+2j57Ag0/hDZ7L4nW6jtRG1NBonBk7FhPrd4UCGqxaYwfTYvDWDKpQkJgbkWSYCwc
+pUVAqreYi5+udKhL0dXsKrZGE9I20tQlZkaoBBgreQKBgQDAmu73weinuMp9nC6M
+gSXQxuWL02ybEQFsZioP8oGSP+wZDCrSn1HD52iSSZqEoNLoYm0u8u3D9FbWoiw/
+n8BPqTLjLXUozhDdVuvg6YkKxHG2So7ve04UOXBQqCd/pOuNXRUUYUQ+FMq8vmjD
+dRqVo1gH/qUExa4SeTzoOWUXawKBgQD+Onb9+ExPweZjy+T0eAO1FMFP//KsNwSI
+vlf22jUbCqiu6Pem1m/31Rju04AGydWhdHgd+7PG8/arVyLcgNQtTotXtD7zTCVe
+jkr42R/zbG46c0/z8brUXpYOHOjdgFbY62vwWjykulFsruf36hit9/D+tKVNOgep
+tduFS8DSTQKBgC+oJmj3efHGL5RVCM+LRSgbjsDCV6Um2AtSXCYGAzmEx46LDC2B
+bmHi6GUKAUm/4W/OquVrBpnt427IQdqcVKFhZE4B+XNXSaT61PKZ1mbrpJdOa3+m
+KvOmIrxSXzOeQwp/da/NQW17B48cLh/u4d0Uxbt0rrA3mZLInOF5EiJxAoGBAPGS
+GHOntsuq0gNOQZbTW6J7wF0GNk/ST6qoQ+m62u+BJ1xc3sZXyTlT8kcuDd9ldmve
+wiyred64/1E8kVG50OPkWJ/UFGUXnALHbxIbLzMde3hrDjQdJIyb/DYY3mVriBrD
+SWOwOyPEL474fE+k0CKvEP7WJKTHWXS364ozu1uZAoGBAMBXRwDQZgs+Xn3lTohu
+1NLlHhNIuUzcb+4ROn2StoxZS3mEv2W0KWRTzNJGRr4+kbLMcY7K1jOk0qm6fwcG
+HAJnKkxu9E3hFQVUT14CIKPwi3TQCNjmZqj+LaEwe+75w4rXTajwl3WY58KngA5/
+XVn2W3KsXAKOAteL/07xtDEi
+-----END PRIVATE KEY-----
diff --git a/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem
new file mode 100644
index 0000000..6e3d149
--- /dev/null
+++ b/pulsar-metadata-etcd-contrib/src/test/resources/ssl/cert/client.pem
@@ -0,0 +1,20 @@
+-----BEGIN CERTIFICATE-----
+MIIDPDCCAiSgAwIBAgIUdYkf8VBmJYG6HvKxh84BHYaTIH8wDQYJKoZIhvcNAQEL
+BQAwDTELMAkGA1UEAxMCQ0EwIBcNMjIwNDI5MDQyMjAwWhgPMjEyMjA0MDUwNDIy
+MDBaMBExDzANBgNVBAMTBmNsaWVudDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
+AQoCggEBAL9Ftbp0mqTb4E1JkoCzUzRFC3o+uEgceHEEE9cUzdc6orBpytCKG5lB
+IYuBrF/NdtF6PBD1UGDkhr+06iTANje0r0qngnRuyqA1dkpNFi2cngliH3nrqOwP
+jtCmPrG5BmajdvVAosSH4WH1u5ZMS+91RjwBBMBLC84YBbLOtmBoq5qBBA2Rnj1M
+zyC2zL24k9ZLY9lClb+qTnrmNFyTxdvwI5DRBXbA+KrCYS0Dv+MR5BWJDj0ct18F
+OL3QeHhDxcG/xM9b6xCk3lHl+9kTP6LCsabbSQiu1VD6EwKRvLBOUDmtIZbuiedq
+QPKrJu3XR0HOfrT3/CDkN54uMxnd0S8CAwEAAaOBjTCBijAOBgNVHQ8BAf8EBAMC
+BaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAw
+HQYDVR0OBBYEFB/UlCHqPmkA3xcYzddO9kD1w8o1MB8GA1UdIwQYMBaAFB0oynra
+PRsiMaCbp4VFErPtqn79MAsGA1UdEQQEMAKCADANBgkqhkiG9w0BAQsFAAOCAQEA
+AneFIIHBZ21J24+lln995ofX+92Yeu528IVy1WJtTGIpHVN6Fc+jZbsAZqzdhDDd
+q9pKawKlDR2bW6mg7ItF2coYprMoLtHeFAwSUIg5WcMFUgGHxITFDQlscD2mR54Y
+I1otVWegrL2PDyKs2uk5B4Jwm+O/0fbyG+D3FIje9y++gh6Oqi/uwn8YUgnLl/4e
+Yf8POmMrUcOmJn4tXX4y6HtacNR3n0leby8T1dBShNAESuBdmEo8bmgAADh/brfP
+kzGHBdO+AyHEs87TRMZ4ofhspM6m1ZNyazy861xhoYjUcjIoD4vaVsEGIRT9/2HM
+4l8oq0OyarhZRLIpJ11SJQ==
+-----END CERTIFICATE-----