Skip to content
This repository was archived by the owner on Dec 5, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
dd4bb88
Remove headers db when fastsync is finished w/o skipHistory
zilm13 May 2, 2018
c78e673
Fixed fast sync stall on backwards to pivot block download
zilm13 May 3, 2018
39e2816
Simplify FastSyncDownloader queue limit calculation
zilm13 May 4, 2018
857aa16
Added finish block number in SyncQueueReverseImpl
zilm13 May 4, 2018
24b7d3f
Logging improved in SyncPool.prepareActive()
zilm13 May 4, 2018
165c495
Added channel reputation to logging
zilm13 May 4, 2018
15c5f82
Merge branch 'develop' into fix/remove-headers-db
zilm13 May 4, 2018
539121c
Fixing case when during sync almost all slots are occupied by other p…
zilm13 May 4, 2018
082f9eb
Drop more peers when slots are needed during fast sync
zilm13 May 7, 2018
6394131
Added more debug temp logging + more strict disconnect
zilm13 May 9, 2018
15de110
Finished polishing of "Other peers" dropping during sync
zilm13 May 10, 2018
0baf916
Comment fixed
zilm13 May 10, 2018
ea63eaf
Logging message changed to be correct
zilm13 May 11, 2018
9e9e1a0
Merge branch 'develop' into fix/remove-headers-db
zilm13 May 11, 2018
4fa0441
Improved criteria for dropping other peers
zilm13 May 14, 2018
c499928
Polished drop other peers criteria
zilm13 May 14, 2018
518cd4d
Allow up to maxPeers-10 other peers on short sync
zilm13 May 14, 2018
a8a7e95
Some improvements to discard bad peers earlier
zilm13 May 15, 2018
0dd37ab
Updated drop rules
zilm13 May 15, 2018
db9c6cf
Fixed test
zilm13 May 15, 2018
6c16acb
Merge branch 'develop' into fix/remove-headers-db
zilm13 May 15, 2018
4926e8a
Logging improved for failed message decoding
zilm13 May 16, 2018
57d1dd3
Fxied incorrect Value usage for NODEDATA
zilm13 May 16, 2018
9bb6d67
Merge branch 'develop' into fix/remove-headers-db
zilm13 May 16, 2018
58f0322
Polishing peer drop strategy + NodeDataMessage fixed
zilm13 May 17, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public DataWord deserialize(byte[] stream) {
public final static Serializer<Value, byte[]> TrieNodeSerializer = new Serializer<Value, byte[]>() {
@Override
public byte[] serialize(Value object) {
return object.encode();
return object.asBytes();
}

@Override
public Value deserialize(byte[] stream) {
return Value.fromRlpEncoded(stream);
return new Value(stream);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@
/**
* @deprecated
* TODO: Remove after a few versions (current: 1.7.3) or with DB version update
* TODO: Make {@link FastSyncManager#removeHeadersDb(Logger)} private after removing
* Also remove CommonConfig.headerSource with it as no more used
*
* - Repairs Headers DB after FastSync with skipHistory to be usable
* a) Updates incorrect total difficulty
* b) Migrates headers without index to usable scheme with index
* - Removes headers DB otherwise as it's not needed
* TODO: move DB removal to main logic. Not done yet to prevent any conflicts
*/
@Deprecated
public class MigrateHeaderSourceTotalDiff implements Runnable {
Expand Down Expand Up @@ -85,19 +85,12 @@ public void run() {
}

logger.info("Fast Sync was used. Checking if migration required.");
if (blockStore.getBestBlock().getNumber() > 0 &&
blockStore.getChainBlockByNumber(1) != null) {
// Everything is cool but maybe we could remove unused DB?
Path headersDbPath = Paths.get(systemProperties.databaseDir(), "headers");
if (Files.exists(headersDbPath)) {
logger.info("Headers DB was used during FastSync but not required any more. Removing.");
FileUtil.recursiveDelete(headersDbPath.toString());
logger.info("Headers DB removed. Migration is over");
} else {
logger.info("No migration required.");
return;
}
} else if (blockStore.getBestBlock().getNumber() > 0) {
boolean dbRemoved = fastSyncManager.removeHeadersDb(logger);
if (dbRemoved) {
logger.info("Migration finished.");
return;
}
if (blockStore.getBestBlock().getNumber() > 0 && blockStore.getChainBlockByNumber(1) == null) {
// Maybe migration of headerStore and totalDifficulty is required?
HeaderStore headerStore = ctx.getBean(HeaderStore.class);
if (headerStore.getHeaderByNumber(1) != null) {
Expand Down Expand Up @@ -138,6 +131,8 @@ public void run() {
flushManager.commit();
flushManager.flush();
logger.info("headerStore migration finished. No more migrations required");
} else {
logger.info("No migration required.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -878,11 +878,12 @@ public String getSyncStats() {
int waitResp = lastReqSentTime > 0 ? (int) (System.currentTimeMillis() - lastReqSentTime) / 1000 : 0;
long lifeTime = System.currentTimeMillis() - connectedTime;
return String.format(
"Peer %s: [ %s, %18s, ping %6s ms, difficulty %s, best block %s%s]: (idle %s of %s) %s",
"Peer %s: [ %s, %18s, ping %6s ms, rep: %s, difficulty %s, best block %s%s]: (idle %s of %s) %s",
getVersion(),
channel.getPeerIdShort(),
peerState,
(int)channel.getPeerStats().getAvgLatency(),
channel.getNodeStatistics().getReputation(),
getTotalDifficulty(),
getBestKnownBlock().getNumber(),
waitResp > 5 ? ", wait " + waitResp + "s" : " ",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.ethereum.net.eth.message.NodeDataMessage;
import org.ethereum.net.eth.message.ReceiptsMessage;

import org.ethereum.net.message.ReasonCode;
import org.ethereum.sync.PeerState;
import org.ethereum.util.ByteArraySet;
import org.ethereum.util.Value;
Expand All @@ -45,6 +46,7 @@
import java.util.List;
import java.util.Set;

import static org.ethereum.crypto.HashUtil.sha3;
import static org.ethereum.net.eth.EthVersion.V63;
import static org.ethereum.util.ByteUtil.toHexString;

Expand Down Expand Up @@ -186,19 +188,22 @@ protected synchronized void processNodeData(NodeDataMessage msg) {

List<Pair<byte[], byte[]>> ret = new ArrayList<>();
if(msg.getDataList().isEmpty()) {
String err = "Received NodeDataMessage contains empty node data. Dropping peer " + channel;
dropUselessPeer(err);
String err = String.format("Received NodeDataMessage contains empty node data. Dropping peer %s", channel);
logger.debug(err);
requestNodesFuture.setException(new RuntimeException(err));
// Not fatal but let us touch it later
channel.getChannelManager().disconnect(channel, ReasonCode.TOO_MANY_PEERS);
return;
}

for (Value nodeVal : msg.getDataList()) {
byte[] hash = nodeVal.hash();
byte[] hash = sha3(nodeVal.asBytes());
if (!requestedNodes.contains(hash)) {
String err = "Received NodeDataMessage contains non-requested node with hash :" + toHexString(hash) + " . Dropping peer " + channel;
dropUselessPeer(err);
return;
}
ret.add(Pair.of(hash, nodeVal.encode()));
ret.add(Pair.of(hash, nodeVal.asBytes()));
}
requestNodesFuture.set(ret);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private void parse() {
dataList = new ArrayList<>();
for (int i = 0; i < paramsList.size(); ++i) {
// Need it AS IS
dataList.add(Value.fromRlpEncoded(paramsList.get(i).getRLPData()));
dataList.add(new Value(paramsList.get(i).getRLPData()));
}
parsed = true;
}
Expand All @@ -58,7 +58,7 @@ private void encode() {
List<byte[]> dataListRLP = new ArrayList<>();
for (Value value: dataList) {
if (value == null) continue; // Bad sign
dataListRLP.add(value.getData());
dataListRLP.add(value.asBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It results in a list with not encoded elements

}
byte[][] encodedElementArray = dataListRLP.toArray(new byte[dataListRLP.size()][]);
this.encoded = RLP.encodeList(encodedElementArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected void decode(ChannelHandlerContext ctx, Frame frame, List<Object> out)
Frame completeFrame = null;
if (frame.isChunked()) {
if (!supportChunkedFrames && frame.totalFrameSize > 0) {
throw new RuntimeException("Faming is not supported in this configuration.");
throw new RuntimeException("Framing is not supported in this configuration.");
}

Pair<? extends List<Frame>, AtomicInteger> frameParts = incompleteFrames.get(frame.contextId);
Expand Down Expand Up @@ -157,15 +157,15 @@ private Message decodeMessage(ChannelHandlerContext ctx, List<Frame> frames) thr
Message msg;
try {
msg = createMessage((byte) frameType, payload);

if (loggerNet.isDebugEnabled())
loggerNet.debug("From: {} Recv: {}", channel, msg.toString());
} catch (Exception ex) {
loggerNet.debug("Incorrectly encoded message from: \t{}, dropping peer", channel);
loggerNet.debug(String.format("Incorrectly encoded message from: \t%s, dropping peer", channel), ex);
channel.disconnect(ReasonCode.BAD_PROTOCOL);
return null;
}

if (loggerNet.isDebugEnabled())
loggerNet.debug("From: {} Recv: {}", channel, msg.toString());

ethereumListener.onRecvMessage(channel, msg);

channel.getNodeStatistics().rlpxInMessages.add();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ protected void encode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
// stay consistent with decoding party
if (msg.size > MAX_SIZE) {
logger.info("{}: outgoing frame size exceeds the limit ({} bytes), disconnect", channel, msg.size);
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.USELESS_PEER);
channel.disconnect(ReasonCode.USELESS_PEER);
return;
}
Expand All @@ -81,7 +80,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
long uncompressedLength = Snappy.uncompressedLength(in) & 0xFFFFFFFFL;
if (uncompressedLength > MAX_SIZE) {
logger.info("{}: uncompressed frame size exceeds the limit ({} bytes), drop the peer", channel, uncompressedLength);
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.BAD_PROTOCOL);
channel.disconnect(ReasonCode.BAD_PROTOCOL);
return;
}
Expand All @@ -94,7 +92,6 @@ protected void decode(ChannelHandlerContext ctx, FrameCodec.Frame msg, List<Obje
// 5 - error code for framed snappy
if (detailMessage.startsWith("FAILED_TO_UNCOMPRESS") && detailMessage.contains("5")) {
logger.info("{}: Snappy frames are not allowed in DEVp2p protocol, drop the peer", channel);
channel.getNodeStatistics().nodeDisconnectedLocal(ReasonCode.BAD_PROTOCOL);
channel.disconnect(ReasonCode.BAD_PROTOCOL);
return;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Math.min;
import static org.ethereum.net.server.ChannelManager.INBOUND_CONNECTION_BAN_TIMEOUT;

/**
* Handles all possible statistics related to a Node
Expand All @@ -42,7 +43,6 @@ public class NodeStatistics {
public final static int REPUTATION_HANDSHAKE = 3000;
public final static int REPUTATION_AUTH = 1000;
public final static int REPUTATION_DISCOVER_PING = 1;
public final static long TOO_MANY_PEERS_PENALIZE_TIMEOUT = 10 * 1000;

public class StatHandler {
AtomicLong count = new AtomicLong(0);
Expand Down Expand Up @@ -145,14 +145,14 @@ public int getReputation() {
return isReputationPenalized() ? 0 : persistedReputation / 2 + getSessionReputation();
}

private boolean isReputationPenalized() {
public boolean isReputationPenalized() {
if (wrongFork) return true;
if (wasDisconnected() && rlpxLastRemoteDisconnectReason == ReasonCode.TOO_MANY_PEERS &&
System.currentTimeMillis() - lastDisconnectedTime < TOO_MANY_PEERS_PENALIZE_TIMEOUT) {
System.currentTimeMillis() - lastDisconnectedTime < INBOUND_CONNECTION_BAN_TIMEOUT) {
return true;
}
if (wasDisconnected() && rlpxLastRemoteDisconnectReason == ReasonCode.DUPLICATE_PEER &&
System.currentTimeMillis() - lastDisconnectedTime < TOO_MANY_PEERS_PENALIZE_TIMEOUT) {
System.currentTimeMillis() - lastDisconnectedTime < INBOUND_CONNECTION_BAN_TIMEOUT) {
return true;
}
return rlpxLastLocalDisconnectReason == ReasonCode.NULL_IDENTITY ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public ByteArrayWrapper getNodeIdWrapper() {
}

public void disconnect(ReasonCode reason) {
getNodeStatistics().nodeDisconnectedLocal(reason);
msgQueue.disconnect(reason);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ public class ChannelManager {
// If the inbound peer connection was dropped by us with a reason message
// then we ban that peer IP on any connections for some time to protect from
// too active peers
private static final int inboundConnectionBanTimeout = 10 * 1000;
public static final int INBOUND_CONNECTION_BAN_TIMEOUT = 120 * 1000;

private List<Channel> newPeers = new CopyOnWriteArrayList<>();
// Limiting number of new peers to avoid delays in processing
private static final int MAX_NEW_PEERS = 128;
private final Map<ByteArrayWrapper, Channel> activePeers = new ConcurrentHashMap<>();

private ScheduledExecutorService mainWorker = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -192,7 +194,7 @@ private void processNewPeers() {
newPeers.removeAll(processed);
}

private void disconnect(Channel peer, ReasonCode reason) {
public void disconnect(Channel peer, ReasonCode reason) {
logger.debug("Disconnecting peer with reason " + reason + ": " + peer);
peer.disconnect(reason);
recentlyDisconnected.put(peer.getInetSocketAddress().getAddress(), new Date());
Expand All @@ -201,7 +203,7 @@ private void disconnect(Channel peer, ReasonCode reason) {
public boolean isRecentlyDisconnected(InetAddress peerAddr) {
Date disconnectTime = recentlyDisconnected.get(peerAddr);
if (disconnectTime != null &&
System.currentTimeMillis() - disconnectTime.getTime() < inboundConnectionBanTimeout) {
System.currentTimeMillis() - disconnectTime.getTime() < INBOUND_CONNECTION_BAN_TIMEOUT) {
return true;
} else {
recentlyDisconnected.remove(peerAddr);
Expand Down Expand Up @@ -343,10 +345,23 @@ public Collection<Channel> getActivePeers() {
return new ArrayList<>(activePeers.values());
}

/**
* Checks whether newPeers is not full
* newPeers are used to fill up active peers
* @return True if there are free slots for new peers
*/
public boolean acceptingNewPeers() {
return newPeers.size() < Math.max(config.maxActivePeers(), MAX_NEW_PEERS);
}

public Channel getActivePeer(byte[] nodeId) {
return activePeers.get(new ByteArrayWrapper(nodeId));
}

public SyncManager getSyncManager() {
return syncManager;
}

public void close() {
try {
logger.info("Shutting down block and tx distribute threads...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.ethereum.net.rlpx.Node;
import org.ethereum.net.rlpx.discover.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -42,6 +44,9 @@ public class EthereumChannelInitializer extends ChannelInitializer<NioSocketChan
@Autowired
ChannelManager channelManager;

@Autowired
NodeManager nodeManager;

private String remoteId;

private boolean peerDiscoveryMode = false;
Expand All @@ -57,11 +62,35 @@ public void initChannel(NioSocketChannel ch) throws Exception {
logger.debug("Open {} connection, channel: {}", isInbound() ? "inbound" : "outbound", ch.toString());
}

if (isInbound() && channelManager.isRecentlyDisconnected(ch.remoteAddress().getAddress())) {
// avoid too frequent connection attempts
logger.debug("Drop connection - the same IP was disconnected recently, channel: {}", ch.toString());
ch.disconnect();
return;
// For incoming connection drop if..
if (isInbound()) {
boolean needToDrop = false;
// Bad remote address
if (ch.remoteAddress() == null) {
logger.debug("Drop connection - bad remote address, channel: {}", ch.toString());
needToDrop = true;
}
// Avoid too frequent connection attempts
if (!needToDrop && channelManager.isRecentlyDisconnected(ch.remoteAddress().getAddress())) {
logger.debug("Drop connection - the same IP was disconnected recently, channel: {}", ch.toString());
needToDrop = true;
}
// Drop bad peers before creating channel
if (!needToDrop && nodeManager.getNodeStatistics(new Node(new byte[0], ch.remoteAddress().getHostString(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requesting for nodeManager.isReputationPenalized(InetSocketAddress addr) which does all the job

ch.remoteAddress().getPort())).isReputationPenalized()) {
logger.debug("Drop connection - bad peer, channel: {}", ch.toString());
needToDrop = true;
}
// Drop if we have long waiting queue already
if (!needToDrop && !channelManager.acceptingNewPeers()) {
logger.debug("Drop connection - many new peers are not processed, channel: {}", ch.toString());
needToDrop = true;
}

if (needToDrop) {
ch.disconnect();
return;
}
}

final Channel channel = ctx.getBean(Channel.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void headerRetrieveLoop() {
logger.debug("{} headerRetrieveLoop: No IDLE peers found", name);
break;
} else {
logger.debug("{} headerRetrieveLoop: request headers (" + headersRequest.getStart() + ") from " + any.getNode(), name);
logger.debug("{} headerRetrieveLoop: request headers (" + headersRequest.toString() + ") from " + any.getNode(), name);
ListenableFuture<List<BlockHeader>> futureHeaders = headersRequest.getHash() == null ?
any.getEthHandler().sendGetBlockHeaders(headersRequest.getStart(), headersRequest.getCount(), headersRequest.isReverse()) :
any.getEthHandler().sendGetBlockHeaders(headersRequest.getHash(), headersRequest.getCount(), headersRequest.getStep(), headersRequest.isReverse());
Expand Down
Loading