diff --git a/ethereumj-core/src/main/java/org/ethereum/datasource/Serializers.java b/ethereumj-core/src/main/java/org/ethereum/datasource/Serializers.java index 72359e73fb..2e66372a13 100644 --- a/ethereumj-core/src/main/java/org/ethereum/datasource/Serializers.java +++ b/ethereumj-core/src/main/java/org/ethereum/datasource/Serializers.java @@ -96,12 +96,12 @@ public DataWord deserialize(byte[] stream) { public final static Serializer TrieNodeSerializer = new Serializer() { @Override public byte[] serialize(Value object) { - return object.encode(); + return object.asBytes(); } @Override public Value deserialize(byte[] stream) { - return Value.fromRlpEncoded(stream); + return new Value(stream); } }; diff --git a/ethereumj-core/src/main/java/org/ethereum/db/migrate/MigrateHeaderSourceTotalDiff.java b/ethereumj-core/src/main/java/org/ethereum/db/migrate/MigrateHeaderSourceTotalDiff.java index e0f7776de9..893184023d 100644 --- a/ethereumj-core/src/main/java/org/ethereum/db/migrate/MigrateHeaderSourceTotalDiff.java +++ b/ethereumj-core/src/main/java/org/ethereum/db/migrate/MigrateHeaderSourceTotalDiff.java @@ -43,13 +43,13 @@ /** * @deprecated * TODO: Remove after a few versions (current: 1.7.3) or with DB version update + * TODO: Make {@link FastSyncManager#removeHeadersDb(Logger)} private after removing * Also remove CommonConfig.headerSource with it as no more used * * - Repairs Headers DB after FastSync with skipHistory to be usable * a) Updates incorrect total difficulty * b) Migrates headers without index to usable scheme with index * - Removes headers DB otherwise as it's not needed - * TODO: move DB removal to main logic. Not done yet to prevent any conflicts */ @Deprecated public class MigrateHeaderSourceTotalDiff implements Runnable { @@ -85,19 +85,12 @@ public void run() { } logger.info("Fast Sync was used. Checking if migration required."); - if (blockStore.getBestBlock().getNumber() > 0 && - blockStore.getChainBlockByNumber(1) != null) { - // Everything is cool but maybe we could remove unused DB? - Path headersDbPath = Paths.get(systemProperties.databaseDir(), "headers"); - if (Files.exists(headersDbPath)) { - logger.info("Headers DB was used during FastSync but not required any more. Removing."); - FileUtil.recursiveDelete(headersDbPath.toString()); - logger.info("Headers DB removed. Migration is over"); - } else { - logger.info("No migration required."); - return; - } - } else if (blockStore.getBestBlock().getNumber() > 0) { + boolean dbRemoved = fastSyncManager.removeHeadersDb(logger); + if (dbRemoved) { + logger.info("Migration finished."); + return; + } + if (blockStore.getBestBlock().getNumber() > 0 && blockStore.getChainBlockByNumber(1) == null) { // Maybe migration of headerStore and totalDifficulty is required? HeaderStore headerStore = ctx.getBean(HeaderStore.class); if (headerStore.getHeaderByNumber(1) != null) { @@ -138,6 +131,8 @@ public void run() { flushManager.commit(); flushManager.flush(); logger.info("headerStore migration finished. No more migrations required"); + } else { + logger.info("No migration required."); } } } diff --git a/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth62.java b/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth62.java index a87959bc65..d0deb570cb 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth62.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth62.java @@ -878,11 +878,12 @@ public String getSyncStats() { int waitResp = lastReqSentTime > 0 ? (int) (System.currentTimeMillis() - lastReqSentTime) / 1000 : 0; long lifeTime = System.currentTimeMillis() - connectedTime; return String.format( - "Peer %s: [ %s, %18s, ping %6s ms, difficulty %s, best block %s%s]: (idle %s of %s) %s", + "Peer %s: [ %s, %18s, ping %6s ms, rep: %s, difficulty %s, best block %s%s]: (idle %s of %s) %s", getVersion(), channel.getPeerIdShort(), peerState, (int)channel.getPeerStats().getAvgLatency(), + channel.getNodeStatistics().getReputation(), getTotalDifficulty(), getBestKnownBlock().getNumber(), waitResp > 5 ? ", wait " + waitResp + "s" : " ", diff --git a/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth63.java b/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth63.java index 8d84cd0f0b..acde14d363 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth63.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/eth/handler/Eth63.java @@ -33,6 +33,7 @@ import org.ethereum.net.eth.message.NodeDataMessage; import org.ethereum.net.eth.message.ReceiptsMessage; +import org.ethereum.net.message.ReasonCode; import org.ethereum.sync.PeerState; import org.ethereum.util.ByteArraySet; import org.ethereum.util.Value; @@ -45,6 +46,7 @@ import java.util.List; import java.util.Set; +import static org.ethereum.crypto.HashUtil.sha3; import static org.ethereum.net.eth.EthVersion.V63; import static org.ethereum.util.ByteUtil.toHexString; @@ -186,19 +188,22 @@ protected synchronized void processNodeData(NodeDataMessage msg) { List> ret = new ArrayList<>(); if(msg.getDataList().isEmpty()) { - String err = "Received NodeDataMessage contains empty node data. Dropping peer " + channel; - dropUselessPeer(err); + String err = String.format("Received NodeDataMessage contains empty node data. Dropping peer %s", channel); + logger.debug(err); + requestNodesFuture.setException(new RuntimeException(err)); + // Not fatal but let us touch it later + channel.getChannelManager().disconnect(channel, ReasonCode.TOO_MANY_PEERS); return; } for (Value nodeVal : msg.getDataList()) { - byte[] hash = nodeVal.hash(); + byte[] hash = sha3(nodeVal.asBytes()); if (!requestedNodes.contains(hash)) { String err = "Received NodeDataMessage contains non-requested node with hash :" + toHexString(hash) + " . Dropping peer " + channel; dropUselessPeer(err); return; } - ret.add(Pair.of(hash, nodeVal.encode())); + ret.add(Pair.of(hash, nodeVal.asBytes())); } requestNodesFuture.set(ret); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/eth/message/NodeDataMessage.java b/ethereumj-core/src/main/java/org/ethereum/net/eth/message/NodeDataMessage.java index bfb1ba870b..3020e0c5c9 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/eth/message/NodeDataMessage.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/eth/message/NodeDataMessage.java @@ -49,7 +49,7 @@ private void parse() { dataList = new ArrayList<>(); for (int i = 0; i < paramsList.size(); ++i) { // Need it AS IS - dataList.add(Value.fromRlpEncoded(paramsList.get(i).getRLPData())); + dataList.add(new Value(paramsList.get(i).getRLPData())); } parsed = true; } @@ -58,7 +58,7 @@ private void encode() { List dataListRLP = new ArrayList<>(); for (Value value: dataList) { if (value == null) continue; // Bad sign - dataListRLP.add(value.getData()); + dataListRLP.add(RLP.encodeElement(value.asBytes())); } byte[][] encodedElementArray = dataListRLP.toArray(new byte[dataListRLP.size()][]); this.encoded = RLP.encodeList(encodedElementArray); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/rlpx/MessageCodec.java b/ethereumj-core/src/main/java/org/ethereum/net/rlpx/MessageCodec.java index f4b08feffb..536357b52a 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/rlpx/MessageCodec.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/rlpx/MessageCodec.java @@ -96,7 +96,7 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List out) Frame completeFrame = null; if (frame.isChunked()) { if (!supportChunkedFrames && frame.totalFrameSize > 0) { - throw new RuntimeException("Faming is not supported in this configuration."); + throw new RuntimeException("Framing is not supported in this configuration."); } Pair, AtomicInteger> frameParts = incompleteFrames.get(frame.contextId); @@ -157,15 +157,15 @@ private Message decodeMessage(ChannelHandlerContext ctx, List frames) thr Message msg; try { msg = createMessage((byte) frameType, payload); + + if (loggerNet.isDebugEnabled()) + loggerNet.debug("From: {} Recv: {}", channel, msg.toString()); } catch (Exception ex) { - loggerNet.debug("Incorrectly encoded message from: \t{}, dropping peer", channel); + loggerNet.debug(String.format("Incorrectly encoded message from: \t%s, dropping peer", channel), ex); channel.disconnect(ReasonCode.BAD_PROTOCOL); return null; } - if (loggerNet.isDebugEnabled()) - loggerNet.debug("From: {} Recv: {}", channel, msg.toString()); - ethereumListener.onRecvMessage(channel, msg); channel.getNodeStatistics().rlpxInMessages.add(); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/rlpx/SnappyCodec.java b/ethereumj-core/src/main/java/org/ethereum/net/rlpx/SnappyCodec.java index 9def7a83c2..5aa91236fd 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/rlpx/SnappyCodec.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/rlpx/SnappyCodec.java @@ -59,7 +59,6 @@ protected void encode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List MAX_SIZE) { logger.info("{}: outgoing frame size exceeds the limit ({} bytes), disconnect", channel, msg.size); - channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.USELESS_PEER); channel.disconnect(ReasonCode.USELESS_PEER); return; } @@ -81,7 +80,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List MAX_SIZE) { logger.info("{}: uncompressed frame size exceeds the limit ({} bytes), drop the peer", channel, uncompressedLength); - channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.BAD_PROTOCOL); channel.disconnect(ReasonCode.BAD_PROTOCOL); return; } @@ -94,7 +92,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List newPeers = new CopyOnWriteArrayList<>(); + // Limiting number of new peers to avoid delays in processing + private static final int MAX_NEW_PEERS = 128; private final Map activePeers = new ConcurrentHashMap<>(); private ScheduledExecutorService mainWorker = Executors.newSingleThreadScheduledExecutor(); @@ -192,7 +194,7 @@ private void processNewPeers() { newPeers.removeAll(processed); } - private void disconnect(Channel peer, ReasonCode reason) { + public void disconnect(Channel peer, ReasonCode reason) { logger.debug("Disconnecting peer with reason " + reason + ": " + peer); peer.disconnect(reason); recentlyDisconnected.put(peer.getInetSocketAddress().getAddress(), new Date()); @@ -201,7 +203,7 @@ private void disconnect(Channel peer, ReasonCode reason) { public boolean isRecentlyDisconnected(InetAddress peerAddr) { Date disconnectTime = recentlyDisconnected.get(peerAddr); if (disconnectTime != null && - System.currentTimeMillis() - disconnectTime.getTime() < inboundConnectionBanTimeout) { + System.currentTimeMillis() - disconnectTime.getTime() < INBOUND_CONNECTION_BAN_TIMEOUT) { return true; } else { recentlyDisconnected.remove(peerAddr); @@ -343,10 +345,23 @@ public Collection getActivePeers() { return new ArrayList<>(activePeers.values()); } + /** + * Checks whether newPeers is not full + * newPeers are used to fill up active peers + * @return True if there are free slots for new peers + */ + public boolean acceptingNewPeers() { + return newPeers.size() < Math.max(config.maxActivePeers(), MAX_NEW_PEERS); + } + public Channel getActivePeer(byte[] nodeId) { return activePeers.get(new ByteArrayWrapper(nodeId)); } + public SyncManager getSyncManager() { + return syncManager; + } + public void close() { try { logger.info("Shutting down block and tx distribute threads..."); diff --git a/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java b/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java index 6f7f527b41..de62a15ae2 100644 --- a/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java +++ b/ethereumj-core/src/main/java/org/ethereum/net/server/EthereumChannelInitializer.java @@ -19,6 +19,8 @@ import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; +import org.ethereum.net.rlpx.Node; +import org.ethereum.net.rlpx.discover.NodeManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,6 +44,9 @@ public class EthereumChannelInitializer extends ChannelInitializer> futureHeaders = headersRequest.getHash() == null ? any.getEthHandler().sendGetBlockHeaders(headersRequest.getStart(), headersRequest.getCount(), headersRequest.isReverse()) : any.getEthHandler().sendGetBlockHeaders(headersRequest.getHash(), headersRequest.getCount(), headersRequest.getStep(), headersRequest.isReverse()); diff --git a/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncDownloader.java b/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncDownloader.java index 3e19bf531a..82701be0fe 100644 --- a/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncDownloader.java +++ b/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncDownloader.java @@ -17,6 +17,7 @@ */ package org.ethereum.sync; +import org.ethereum.core.BlockHeader; import org.ethereum.core.BlockHeaderWrapper; import org.ethereum.core.BlockWrapper; import org.ethereum.db.IndexedBlockStore; @@ -44,6 +45,8 @@ public class FastSyncDownloader extends BlockDownloader { @Autowired IndexedBlockStore blockStore; + private SyncQueueReverseImpl syncQueueReverse; + int counter; int maxCount; long t; @@ -53,12 +56,12 @@ public FastSyncDownloader(BlockHeaderValidator headerValidator) { super(headerValidator); } - public void startImporting(byte[] fromHash, int count) { + public void startImporting(BlockHeader start, int count) { this.maxCount = count <= 0 ? Integer.MAX_VALUE : count; setHeaderQueueLimit(maxCount); setBlockQueueLimit(maxCount); - SyncQueueReverseImpl syncQueueReverse = new SyncQueueReverseImpl(fromHash); + syncQueueReverse = new SyncQueueReverseImpl(start.getHash(), start.getNumber() - count); init(syncQueueReverse, syncPool, "FastSync"); } @@ -70,7 +73,8 @@ protected void pushBlocks(List blockWrappers) { blockStore.saveBlock(blockWrapper.getBlock(), BigInteger.ZERO, true); counter++; if (counter >= maxCount) { - logger.info("FastSync: All requested " + counter + " blocks are downloaded. (last " + blockWrapper.getBlock().getShortDescr() + ")"); + logger.info("FastSync: All requested " + counter + " blocks are downloaded. (last " + + blockWrapper.getBlock().getShortDescr() + ")"); stop(); break; } @@ -79,7 +83,8 @@ protected void pushBlocks(List blockWrappers) { long c = System.currentTimeMillis(); if (c - t > 5000) { t = c; - logger.info("FastSync: downloaded " + counter + " blocks so far. Last: " + blockWrappers.get(0).getBlock().getShortDescr()); + logger.info("FastSync: downloaded " + counter + " blocks so far. Last: " + + blockWrappers.get(blockWrappers.size() - 1).getBlock().getShortDescr()); blockStore.flush(); } } @@ -90,12 +95,12 @@ protected void pushHeaders(List headers) {} @Override protected int getBlockQueueFreeSize() { - return getBlockQueueLimit(); + return Math.max(maxCount - counter, MAX_IN_REQUEST); } @Override protected int getMaxHeadersInQueue() { - return getHeaderQueueLimit(); + return Math.max(maxCount - syncQueueReverse.getValidatedHeadersCount(), 0); } // TODO: receipts loading here diff --git a/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncManager.java b/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncManager.java index 2e26f12d6b..e2224a6549 100644 --- a/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncManager.java +++ b/ethereumj-core/src/main/java/org/ethereum/sync/FastSyncManager.java @@ -49,6 +49,9 @@ import org.springframework.stereotype.Component; import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @@ -409,11 +412,14 @@ public void onSuccess(List> result) { try { synchronized (FastSyncManager.this) { logger.trace("Received " + result.size() + " nodes (of " + hashes.size() + ") from peer: " + idle); + idle.getNodeStatistics().eth63NodesRequested.add(hashes.size()); + idle.getNodeStatistics().eth63NodesRetrieveTime.add(System.currentTimeMillis() - reqTime); for (Pair pair : result) { TrieNodeRequest request = pendingNodes.get(pair.getKey()); if (request == null) { long t = System.currentTimeMillis(); logger.debug("Received node which was not requested: " + toHexString(pair.getKey()) + " from " + idle); + idle.disconnect(ReasonCode.TOO_MANY_PEERS); // We need better peers for this stage return; } Set intersection = request.requestIdsSnapshot(); @@ -428,10 +434,7 @@ public void onSuccess(List> result) { } FastSyncManager.this.notifyAll(); - - idle.getNodeStatistics().eth63NodesRequested.add(hashes.size()); idle.getNodeStatistics().eth63NodesReceived.add(result.size()); - idle.getNodeStatistics().eth63NodesRetrieveTime.add(System.currentTimeMillis() - reqTime); } } catch (Exception e) { logger.error("Unexpected error processing nodes", e); @@ -441,6 +444,8 @@ public void onSuccess(List> result) { @Override public void onFailure(Throwable t) { logger.warn("Error with Trie Node request: " + t); + idle.getNodeStatistics().eth63NodesRequested.add(hashes.size()); + idle.getNodeStatistics().eth63NodesRetrieveTime.add(System.currentTimeMillis() - reqTime); synchronized (FastSyncManager.this) { for (byte[] hash : hashes) { final TrieNodeRequest request = pendingNodes.get(hash); @@ -541,7 +546,7 @@ private void syncUnsecure(BlockHeader pivot) { logger.info("FastSync: downloading 256 blocks prior to pivot block (" + pivot.getShortDescr() + ")"); FastSyncDownloader downloader = applicationContext.getBean(FastSyncDownloader.class); - downloader.startImporting(pivot.getHash(), 260); + downloader.startImporting(pivot, 260); downloader.waitForStop(); logger.info("FastSync: complete downloading 256 blocks prior to pivot block (" + pivot.getShortDescr() + ")"); @@ -647,6 +652,7 @@ private void syncBlocksReceipts() { blockchainDB.delete(FASTSYNC_DB_KEY_PIVOT); dbFlushManager.commit(); dbFlushManager.flush(); + removeHeadersDb(logger); } /** @@ -668,6 +674,26 @@ private void fixTotalDiff() { blockchain.updateBlockTotDifficulties(firstFullBlockNum + 1); } + /** + * Physically removes headers DB if fast sync was performed without skipHistory + */ + public boolean removeHeadersDb(Logger logger) { + if (blockStore.getBestBlock().getNumber() > 0 && + blockStore.getChainBlockByNumber(1) != null) { + // Everything is cool but maybe we could remove unused DB? + Path headersDbPath = Paths.get(config.databaseDir(), "headers"); + if (Files.exists(headersDbPath)) { + logger.info("Headers DB was used during FastSync but not required any more. Removing."); + DbSource headerSource = (DbSource) applicationContext.getBean("headerSource"); + headerSource.close(); + FileUtil.recursiveDelete(headersDbPath.toString()); + logger.info("Headers DB removed."); + return true; + } + } + return false; + } + public void main() { if (blockchain.getBestBlock().getNumber() == 0 || getSyncStage() == SECURE || getSyncStage() == COMPLETE) { diff --git a/ethereumj-core/src/main/java/org/ethereum/sync/SyncPool.java b/ethereumj-core/src/main/java/org/ethereum/sync/SyncPool.java index fd8dc125b4..ebed0b490d 100644 --- a/ethereumj-core/src/main/java/org/ethereum/sync/SyncPool.java +++ b/ethereumj-core/src/main/java/org/ethereum/sync/SyncPool.java @@ -20,6 +20,7 @@ import org.ethereum.config.SystemProperties; import org.ethereum.core.Blockchain; import org.ethereum.listener.EthereumListener; +import org.ethereum.net.message.ReasonCode; import org.ethereum.net.rlpx.Node; import org.ethereum.net.rlpx.discover.NodeHandler; import org.ethereum.net.rlpx.discover.NodeManager; @@ -38,8 +39,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import static java.lang.Math.max; import static java.lang.Math.min; import static org.ethereum.util.BIUtil.isIn20PercentRange; import static org.ethereum.util.ByteUtil.toHexString; @@ -98,8 +101,8 @@ public void init(final ChannelManager channelManager, final Blockchain blockchai try { heartBeat(); updateLowerUsefulDifficulty(); - fillUp(); prepareActive(); + fillUp(); cleanupActive(); } catch (Throwable t) { logger.error("Unhandled exception", t); @@ -283,6 +286,8 @@ private void fillUp() { private synchronized void prepareActive() { List managerActive = new ArrayList<>(channelManager.getActivePeers()); + if (logger.isTraceEnabled()) + logger.trace("Preparing active peers from {} channelManager peers", managerActive.size()); // Filtering out with nodeSelector because server-connected nodes were not tested NodeSelector nodeSelector = new NodeSelector(BigInteger.ZERO); @@ -293,6 +298,8 @@ private synchronized void prepareActive() { } } + if (logger.isTraceEnabled()) + logger.trace("After filtering out with node selector, {} peers remaining", active.size()); if (active.isEmpty()) return; // filtering by 20% from top difficulty @@ -310,14 +317,33 @@ private synchronized void prepareActive() { List filtered = active.subList(0, thresholdIdx + 1); - // sorting by latency in asc order - filtered.sort(Comparator.comparingDouble(c -> c.getPeerStats().getAvgLatency())); + // Dropping other peers to free up slots for active + // Act more aggressive until sync is done + int cap = channelManager.getSyncManager().isSyncDone() ? + // 10 peers are enough for variance in data on short sync + Math.max(config.maxActivePeers() / 2, config.maxActivePeers() - 10) : config.maxActivePeers() / 6; + int otherCount = managerActive.size() - filtered.size(); + int killCount = max(0, otherCount - cap); + if (killCount > 0) { + AtomicInteger dropped = new AtomicInteger(0); + for (Channel channel : managerActive) { + if (!filtered.contains(channel)) { + if (channel.isIdle()) { + channelManager.disconnect(channel, ReasonCode.TOO_MANY_PEERS); + if (dropped.incrementAndGet() >= killCount) break; + } + } + } + logger.debug("Dropped {} other peers to free up sync slots", dropped.get()); + } for (Channel channel : filtered) { if (!activePeers.contains(channel)) { ethereumListener.onPeerAddedToSyncPool(channel); } } + if (logger.isTraceEnabled()) + logger.trace("{} peers set to be active in SyncPool", filtered.size()); activePeers.clear(); activePeers.addAll(filtered); diff --git a/ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueReverseImpl.java b/ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueReverseImpl.java index 28f6fb1eea..779bbd83bb 100644 --- a/ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueReverseImpl.java +++ b/ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueReverseImpl.java @@ -31,9 +31,10 @@ public class SyncQueueReverseImpl implements SyncQueueIfc { byte[] curHeaderHash; -// List headers = new ArrayList<>(); + MinMaxMap headers = new MinMaxMap<>(); long minValidated = -1; + long finishValidated = 0; ByteArrayMap blocks = new ByteArrayMap<>(); @@ -43,6 +44,11 @@ public SyncQueueReverseImpl(byte[] startHash) { this.curHeaderHash = startHash; } + public SyncQueueReverseImpl(byte[] startHash, long finishValidated) { + this.curHeaderHash = startHash; + this.finishValidated = finishValidated; + } + public SyncQueueReverseImpl(byte[] startHash, boolean headersOnly) { this.curHeaderHash = startHash; this.headersOnly = headersOnly; @@ -51,9 +57,14 @@ public SyncQueueReverseImpl(byte[] startHash, boolean headersOnly) { @Override public synchronized List requestHeaders(int maxSize, int maxRequests, int maxTotalHeaders) { List ret = new ArrayList<>(); + if (maxTotalHeaders == 0) return ret; + int totalHeaders = 0; + if (minValidated < 0) { ret.add(new SyncQueueImpl.HeadersRequestImpl(curHeaderHash, maxSize, true, maxSize - 1)); - } else if (minValidated == 0) { + totalHeaders += maxSize; + if (totalHeaders >= maxTotalHeaders) return ret; + } else if (minValidated == finishValidated) { // genesis reached return null; } else { @@ -61,21 +72,23 @@ public synchronized List requestHeaders(int maxSize, int maxRequ ret.add(new SyncQueueImpl.HeadersRequestImpl( headers.get(headers.getMin()).getHash(), maxSize, true, maxSize - 1)); maxRequests--; + totalHeaders += maxSize; } Set> entries = headers.descendingMap().subMap(minValidated, true, headers.getMin(), true).entrySet(); Iterator> it = entries.iterator(); BlockHeaderWrapper prevEntry = it.next().getValue(); - while(maxRequests > 0 && it.hasNext()) { + while(maxRequests > 0 && totalHeaders < maxTotalHeaders && it.hasNext()) { BlockHeaderWrapper entry = it.next().getValue(); if (prevEntry.getNumber() - entry.getNumber() > 1) { ret.add(new SyncQueueImpl.HeadersRequestImpl(prevEntry.getHash(), maxSize, true)); + totalHeaders += maxSize; maxRequests--; } prevEntry = entry; } - if (maxRequests > 0) { + if (maxRequests > 0 && totalHeaders < maxTotalHeaders) { ret.add(new SyncQueueImpl.HeadersRequestImpl(prevEntry.getHash(), maxSize, true)); } } @@ -96,7 +109,7 @@ public synchronized List addHeaders(Collection addHeaders(Collection= headers.getMin() ; minValidated--) { + for (; minValidated >= headers.getMin() && minValidated >= finishValidated; minValidated--) { BlockHeaderWrapper header = headers.get(minValidated); BlockHeaderWrapper parent = headers.get(minValidated - 1); if (parent == null) { // Some peers doesn't return 0 block header - if (minValidated == 1) minValidated = 0; + if (minValidated == 1 && finishValidated == 0) minValidated = 0; break; } if (!FastByteComparisons.equal(header.getHeader().getParentHash(), parent.getHash())) { @@ -164,4 +175,8 @@ public synchronized List addBlocks(Collection newBlocks) { public synchronized int getHeadersCount() { return headers.size(); } + + public synchronized int getValidatedHeadersCount() { + return headers.getMax() == null ? 0 : (int) (headers.getMax() - minValidated + 1); + } } diff --git a/ethereumj-core/src/test/java/org/ethereum/net/eth/MessagesTest.java b/ethereumj-core/src/test/java/org/ethereum/net/eth/MessagesTest.java new file mode 100644 index 0000000000..2cca7e20c1 --- /dev/null +++ b/ethereumj-core/src/test/java/org/ethereum/net/eth/MessagesTest.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) [2016] [ ] + * This file is part of the ethereumJ library. + * + * The ethereumJ library is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The ethereumJ library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with the ethereumJ library. If not, see . + */ +package org.ethereum.net.eth; + +import org.ethereum.net.eth.message.NodeDataMessage; +import org.junit.Test; +import org.spongycastle.util.encoders.Hex; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Testing different kind of net messages objects, + * for example, to clarify encode/parse reversal match + */ +public class MessagesTest { + + @Test + public void testNodeDataMessage() { + byte[] data = Hex.decode(""); + NodeDataMessage msg = new NodeDataMessage(data); + NodeDataMessage msg2 = new NodeDataMessage(msg.getDataList()); + assertArrayEquals(msg.getEncoded(), msg2.getEncoded()); + } + + @Test + public void testNodeDataMessageEmpty() { + byte[] data = Hex.decode("c0"); + NodeDataMessage msg = new NodeDataMessage(data); + NodeDataMessage msg2 = new NodeDataMessage(msg.getDataList()); + assertArrayEquals(msg.getEncoded(), msg2.getEncoded()); + } +} diff --git a/ethereumj-core/src/test/java/org/ethereum/net/rlpx/SnappyCodecTest.java b/ethereumj-core/src/test/java/org/ethereum/net/rlpx/SnappyCodecTest.java index 6111172a8d..b0aa543938 100644 --- a/ethereumj-core/src/test/java/org/ethereum/net/rlpx/SnappyCodecTest.java +++ b/ethereumj-core/src/test/java/org/ethereum/net/rlpx/SnappyCodecTest.java @@ -23,6 +23,8 @@ import org.ethereum.net.rlpx.discover.NodeStatistics; import org.ethereum.net.server.Channel; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.spongycastle.util.encoders.Hex; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; @@ -32,6 +34,9 @@ import static com.google.common.collect.Lists.newArrayList; import static org.ethereum.net.message.ReasonCode.BAD_PROTOCOL; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -60,6 +65,10 @@ public void testFramedDecodeDisconnect() throws Exception { Channel shouldBeDropped = mock(Channel.class); when(shouldBeDropped.getNodeStatistics()) .thenReturn(new NodeStatistics(new Node(new byte[0], "", 0))); + doAnswer(invocation -> { + shouldBeDropped.getNodeStatistics().nodeDisconnectedLocal(invocation.getArgument(0)); + return null; + }).when(shouldBeDropped).disconnect(any()); snappyDecode(frameBytes, shouldBeDropped); diff --git a/ethereumj-core/src/test/java/org/ethereum/util/ValueTest.java b/ethereumj-core/src/test/java/org/ethereum/util/ValueTest.java index 2888ae0191..e94680cd43 100644 --- a/ethereumj-core/src/test/java/org/ethereum/util/ValueTest.java +++ b/ethereumj-core/src/test/java/org/ethereum/util/ValueTest.java @@ -74,5 +74,14 @@ public void longListRLPBug_1() { assertEquals(testRlp, Hex.toHexString(val.encode())); } - + /** + * Shouldn't fail with correct TrieNode CODE data + * as opposed to Value.fromRlpEncoded + */ + @Test + public void testToString() { + Value val = new Value(Hex.decode("fe")); + assertEquals("fe", val.toString()); + assertEquals("fe", Hex.toHexString(val.asBytes())); + } }