Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.CustomLog;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
Expand All @@ -51,22 +52,20 @@
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.SystemUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A utility class used for benchmarking the performance of bookies.
*/
@CustomLog
public class BenchBookie {
static final Logger LOG = LoggerFactory.getLogger(BenchBookie.class);

static class LatencyCallback implements WriteCallback {
boolean complete;
@Override
public synchronized void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
log.error().attr("rc", rc).log("Got error");
}
complete = true;
notifyAll();
Expand All @@ -88,7 +87,7 @@ static class ThroughputCallback implements WriteCallback {
public synchronized void writeComplete(int rc, long ledgerId, long entryId,
BookieId addr, Object ctx) {
if (rc != 0) {
LOG.error("Got error " + rc);
log.error().attr("rc", rc).log("Got error");
}
count++;
if (count >= waitingCount) {
Expand Down Expand Up @@ -163,7 +162,9 @@ public static void main(String[] args)
try {
eventLoop = new EpollEventLoopGroup();
} catch (Throwable t) {
LOG.warn("Could not use Netty Epoll event loop for benchmark {}", t.getMessage());
log.warn()
.attr("exceptionMessage", t.getMessage())
.log("Could not use Netty Epoll event loop for benchmark");
eventLoop = new NioEventLoopGroup();
}
} else {
Expand Down Expand Up @@ -196,11 +197,11 @@ public static void main(String[] args)
entry, ByteBufList.get(toSend), tc, null, BookieProtocol.FLAG_NONE,
false, WriteFlag.NONE);
}
LOG.info("Waiting for warmup");
log.info("Waiting for warmup");
tc.waitFor(warmUpCount);

ledger = getValidLedgerId(servers);
LOG.info("Benchmarking latency");
log.info("Benchmarking latency");
long startTime = System.nanoTime();
for (long entry = 0; entry < latencyCount; entry++) {
ByteBuf toSend = Unpooled.buffer(size);
Expand All @@ -216,10 +217,12 @@ public static void main(String[] args)
lc.waitForComplete();
}
long endTime = System.nanoTime();
LOG.info("Latency: " + (((double) (endTime - startTime)) / ((double) latencyCount)) / 1000000.0);
log.info()
.attr("latencyMs", (((double) (endTime - startTime)) / ((double) latencyCount)) / 1000000.0)
.log("Latency");

ledger = getValidLedgerId(servers);
LOG.info("Benchmarking throughput");
log.info("Benchmarking throughput");
startTime = System.currentTimeMillis();
tc = new ThroughputCallback();
for (long entry = 0; entry < throughputCount; entry++) {
Expand All @@ -235,7 +238,9 @@ public static void main(String[] args)
}
tc.waitFor(throughputCount);
endTime = System.currentTimeMillis();
LOG.info("Throughput: " + ((long) throughputCount) * 1000 / (endTime - startTime));
log.info()
.attr("throughput", ((long) throughputCount) * 1000 / (endTime - startTime))
.log("Throughput");

bc.close();
scheduler.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.CustomLog;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
Expand All @@ -45,14 +46,12 @@
import org.apache.commons.cli.PosixParser;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A benchmark that benchmarks the read throughput and latency.
*/
@CustomLog
public class BenchReadThroughputLatency {
static final Logger LOG = LoggerFactory.getLogger(BenchReadThroughputLatency.class);

private static final Pattern LEDGER_PATTERN = Pattern.compile("L([0-9]+)$");

Expand All @@ -75,7 +74,7 @@ public int compare(String o1, String o2) {
};

private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] passwd, int batchEntries) {
LOG.info("Reading ledger {}", ledgerId);
log.info().attr("ledgerId", ledgerId).log("Reading ledger");
BookKeeper bk = null;
long time = 0;
long entriesRead = 0;
Expand Down Expand Up @@ -117,7 +116,10 @@ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] p
entriesRead++;
lastRead = e.getEntryId();
if ((entriesRead % 10000) == 0) {
LOG.info("{} entries read from ledger {}", entriesRead, ledgerId);
log.info()
.attr("entriesRead", entriesRead)
.attr("ledgerId", ledgerId)
.log("entries read from ledger");
}
e.getEntryBuffer().release();
}
Expand All @@ -132,9 +134,12 @@ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] p
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.error("Exception in reader", e);
log.error().exception(e).log("Exception in reader");
} finally {
LOG.info("Read {} in {}ms", entriesRead, time / 1000 / 1000);
log.info()
.attr("entriesRead", entriesRead)
.attr("timeMs", time / 1000 / 1000)
.log("Read complete");

try {
if (lh != null) {
Expand All @@ -144,7 +149,7 @@ private static void readLedger(ClientConfiguration conf, long ledgerId, byte[] p
bk.close();
}
} catch (Exception e) {
LOG.error("Exception closing stuff", e);
log.error().exception(e).log("Exception closing stuff");
}
}
}
Expand Down Expand Up @@ -185,7 +190,7 @@ public static void main(String[] args) throws Exception {
final int sockTimeout = Integer.parseInt(cmd.getOptionValue("sockettimeout", "5"));
final int batchentries = Integer.parseInt(cmd.getOptionValue("batchentries", "1000"));
if (cmd.hasOption("ledger") && cmd.hasOption("listen")) {
LOG.error("Cannot used -ledger and -listen together");
log.error("Cannot used -ledger and -listen together");
usage(options);
System.exit(-1);
}
Expand All @@ -197,7 +202,7 @@ public static void main(String[] args) throws Exception {
} else if (cmd.hasOption("listen")) {
numLedgers.set(Integer.parseInt(cmd.getOptionValue("listen")));
} else {
LOG.error("You must use -ledger or -listen");
log.error("You must use -ledger or -listen");
usage(options);
System.exit(-1);
}
Expand All @@ -215,7 +220,9 @@ public static void main(String[] args) throws Exception {
} else if ("longHierarchical".equals(ledgerManagerType)) {
nodepath = String.format("/ledgers%s", StringUtils.getLongHierarchicalLedgerPath(ledger.get()));
} else {
LOG.warn("Unknown ledger manager type: {}, use flat as the value", ledgerManagerType);
log.warn()
.attr("ledgerManagerType", ledgerManagerType)
.log("Unknown ledger manager type, use flat as the value");
nodepath = String.format("/ledgers/L%010d", ledger.get());
}

Expand Down Expand Up @@ -271,15 +278,17 @@ public void run() {
shutdownLatch.countDown();
}
} else {
LOG.error("Cant file ledger id in {}", ledger);
log.error()
.attr("ledger", ledger)
.log("Cant file ledger id");
}
}
}
} else {
LOG.warn("Unknown event {}", event);
log.warn().attr("event", event).log("Unknown event");
}
} catch (Exception e) {
LOG.error("Exception in watcher", e);
log.error().exception(e).log("Exception in watcher");
}
}
});
Expand All @@ -289,13 +298,13 @@ public void run() {
readLedger(conf, ledger.get(), passwd, batchentries);
shutdownLatch.countDown();
} else {
LOG.info("Watching for creation of" + nodepath);
log.info().attr("nodepath", nodepath).log("Watching for creation of znode");
}
} else {
zk.getChildren("/ledgers", true);
}
shutdownLatch.await();
LOG.info("Shutting down");
log.info("Shutting down");
}
}
}
Loading
Loading