/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package bftsmart.tom.core;
import bftsmart.communication.ServerCommunicationSystem;
import bftsmart.consensus.Decision;
import bftsmart.consensus.Epoch;
import bftsmart.consensus.Consensus;
import bftsmart.consensus.TimestampValuePair;
import bftsmart.consensus.messages.ConsensusMessage;
import bftsmart.consensus.messages.MessageFactory;
import bftsmart.consensus.roles.Acceptor;
import bftsmart.reconfiguration.ServerViewController;
import bftsmart.statemanagement.StateManager;
import bftsmart.tom.core.messages.TOMMessage;
import bftsmart.tom.leaderchange.RequestsTimer;
import bftsmart.tom.leaderchange.CollectData;
import bftsmart.tom.leaderchange.LCManager;
import bftsmart.tom.leaderchange.LCMessage;
import bftsmart.tom.leaderchange.CertifiedDecision;
import bftsmart.tom.util.BatchBuilder;
import bftsmart.tom.util.BatchReader;
import bftsmart.tom.util.TOMUtil;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.security.MessageDigest;
import java.security.SignedObject;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* This class implements the synchronization phase described in
* Joao Sousa's 'From Byzantine Consensus to BFT state machine replication: a latency-optimal transformation' (May 2012)
*
* This class implements all optimizations described at the end of the paper
*
* @author joao
*/
public class Synchronizer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
// out of context messages related to the leader change are stored here
private final HashSet<LCMessage> outOfContextLC;
// Manager of the leader change
private final LCManager lcManager;
//Total order layer
private final TOMLayer tom;
// Stuff from TOMLayer that this object needs
private final RequestsTimer requestsTimer;
private final ExecutionManager execManager;
private final ServerViewController controller;
private final BatchBuilder bb;
private final ServerCommunicationSystem communication;
private final StateManager stateManager;
private final Acceptor acceptor;
private final MessageDigest md;
// Attributes to temporarely store synchronization info
// if state transfer is required for synchronization
private int tempRegency = -1;
private CertifiedDecision tempLastHighestCID = null;
private HashSet<SignedObject> tempSignedCollects = null;
private byte[] tempPropose = null;
private int tempBatchSize = -1;
private boolean tempIAmLeader = false;
public Synchronizer(TOMLayer tom) {
this.tom = tom;
this.requestsTimer = this.tom.requestsTimer;
this.execManager = this.tom.execManager;
this.controller = this.tom.controller;
this.bb = this.tom.bb;
this.communication = this.tom.getCommunication();
this.stateManager = this.tom.stateManager;
this.acceptor = this.tom.acceptor;
this.md = this.tom.md;
this.outOfContextLC = new HashSet<>();
this.lcManager = new LCManager(this.tom,this.controller, this.md);
}
public LCManager getLCManager() {
return lcManager;
}
/**
* This method is called when there is a timeout and the request has already
* been forwarded to the leader
*
* @param requestList List of requests that the replica wanted to order but
* didn't manage to
*/
public void triggerTimeout(List<TOMMessage> requestList) {
ObjectOutputStream out = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int regency = lcManager.getNextReg();
requestsTimer.stopTimer();
requestsTimer.Enabled(false);
// still not in the leader change phase?
if (lcManager.getNextReg() == lcManager.getLastReg()) {
lcManager.setNextReg(lcManager.getLastReg() + 1); // define next timestamp
regency = lcManager.getNextReg(); // update variable
// store messages to be ordered
lcManager.setCurrentRequestTimedOut(requestList);
// store information about messages that I'm going to send
lcManager.addStop(regency, this.controller.getStaticConf().getProcessId());
//execManager.stop(); // stop consensus execution
//Get requests that timed out and the requests received in STOP messages
//and add those STOPed requests to the client manager
addSTOPedRequestsToClientManager();
List<TOMMessage> messages = getRequestsToRelay();
try { // serialize content to send in STOP message
out = new ObjectOutputStream(bos);
if (messages != null && messages.size() > 0) {
//TODO: If this is null, then there was no timeout nor STOP messages.
//What to do?
byte[] serialized = bb.makeBatch(messages, 0, 0, controller.getStaticConf().getUseSignatures() == 1);
out.writeBoolean(true);
out.writeObject(serialized);
} else {
out.writeBoolean(false);
logger.warn("Strange... did not include any request in my STOP message for regency " + regency);
}
out.flush();
bos.flush();
byte[] payload = bos.toByteArray();
out.close();
bos.close();
// send STOP-message
logger.info("Sending STOP message to install regency " + regency + " with " + (messages != null ? messages.size() : 0) + " request(s) to relay");
LCMessage stop = new LCMessage(this.controller.getStaticConf().getProcessId(), TOMUtil.STOP, regency, payload);
requestsTimer.setSTOP(regency, stop); // make replica re-transmit the stop message until a new regency is installed
communication.send(this.controller.getCurrentViewOtherAcceptors(), stop);
} catch (IOException ex) {
logger.error("Could not serialize STOP message", ex);
} finally {
try {
out.close();
bos.close();
} catch (IOException ex) {
logger.error("Could not serialize STOP message", ex);
}
}
}
processOutOfContextSTOPs(regency); // the replica might have received STOPs
// that were out of context at the time they
// were received, but now can be processed
startSynchronization(regency); // evaluate STOP messages
}
// Processes STOP messages that were not process upon reception, because they were
// ahead of the replica's expected regency
private void processOutOfContextSTOPs(int regency) {
logger.debug("Checking if there are out of context STOPs for regency " + regency);
Set<LCMessage> stops = getOutOfContextLC(TOMUtil.STOP, regency);
if (stops.size() > 0) {
logger.info("Processing " + stops.size() + " out of context STOPs for regency " + regency);
} else {
lo