diff --git a/src/legislator/ledger.cc b/src/legislator/ledger.cc index d9a57e3..18a8c19 100644 --- a/src/legislator/ledger.cc +++ b/src/legislator/ledger.cc @@ -12,9 +12,16 @@ namespace paxos base_path_ += "/"; } - std::vector Ledger::get_decrees() + Decree Ledger::get_decree() { - return std::vector(); + Decree decree; + decree.decree = read_file(base_path_ + "decree.txt"); + return decree; + } + + void Ledger::set_decree(Decree decree) + { + write_file(base_path_ + "decree.txt", decree.decree); } int Ledger::last_tried() diff --git a/src/legislator/ledger.hh b/src/legislator/ledger.hh index 0f069eb..7a7a768 100644 --- a/src/legislator/ledger.hh +++ b/src/legislator/ledger.hh @@ -12,7 +12,8 @@ namespace paxos public: Ledger(std::string path); - std::vector get_decrees(); + Decree get_decree(); + void set_decree(Decree decree); int last_tried(); void set_last_tried(int b); diff --git a/src/legislator/legislator.cc b/src/legislator/legislator.cc index eea7767..ba9a9e4 100644 --- a/src/legislator/legislator.cc +++ b/src/legislator/legislator.cc @@ -182,18 +182,62 @@ namespace paxos vote.decree = decree; ledger.set_prev_vote(vote); - send_voted(ballot, sender); + send_voted(ballot, decree, sender); } - void Legislator::send_voted(int ballot, std::string receiver) + void Legislator::send_voted(int ballot, Decree decree, std::string receiver) { Message message; message.set_method("Voted"); message.add_header("ballot", std::to_string(ballot)); message.add_header("sender", self->config_.name); + message.add_header("decree", std::to_string(decree.decree)); SendEW::send_message(message, legislators[receiver]); } + void Legislator::receive_voted(Message message) + { + std::string ballot_str = *message.get_header("ballot"); + std::string sender = *message.get_header("sender"); + std::string decree_str = *message.get_header("decree"); + Decree decree; + decree.decree = std::stoi(decree_str); + + log(config_.name + " has received Voted(" + + ballot_str + + ") from " + sender, cyan); + + receive_voted(std::stoi(ballot_str), decree, sender); + } + + void Legislator::receive_voted(int ballot, Decree decree, std::string voter) + { + if (ballot != ledger.last_tried()) + return; + quorum_previous_votes.erase(voter); + if (quorum_previous_votes.size() == 0) + receive_enough_voted(ballot, decree); + } + + void Legislator::receive_enough_voted(int ballot, Decree decree) + { + log(config_.name + " has received enough Voted(" + + std::to_string(ballot) + + "), saving decree: " + std::to_string(decree.decree), green); + ledger.set_decree(decree); + send_success(decree); + } + + void Legislator::send_success(Decree decree) + { + Message message; + message.set_method("Success"); + message.add_header("decree", std::to_string(decree.decree)); + + for (auto legislator : legislators) + SendEW::send_message(message, legislator.second); + } + void Legislator::handle_message(Message message) { std::string method = message.get_method(); @@ -203,5 +247,7 @@ namespace paxos receive_last_vote(message); else if (method == "BeginBallot") receive_begin_ballot(message); + else if (method == "Voted") + receive_voted(message); } } diff --git a/src/legislator/legislator.hh b/src/legislator/legislator.hh index dc35729..7fa5ebe 100644 --- a/src/legislator/legislator.hh +++ b/src/legislator/legislator.hh @@ -29,7 +29,12 @@ namespace paxos void receive_begin_ballot(Message message); void receive_begin_ballot(int ballot, int decree, std::string sender); - void send_voted(int ballot, std::string receiver); + void send_voted(int ballot, Decree decree, std::string receiver); + void receive_voted(Message message); + void receive_voted(int ballot, Decree decree, std::string voter); + void receive_enough_voted(int ballot, Decree decree); + + void send_success(Decree decree); void handle_message(Message message);