From 6b80cc68ceb39372237b3d24c06b7f6113614609 Mon Sep 17 00:00:00 2001 From: Julien Clement Date: Tue, 5 May 2020 20:43:35 +0200 Subject: [PATCH] Message parsing and serialization, connection to other legislators --- Makefile.am | 4 + src/error/connection-closed.hh | 2 +- src/error/connection-failed.hh | 2 +- src/error/parsing-error.hh | 2 +- src/events/send.cc | 50 +++++++++ src/events/send.hh | 4 + src/legislator/legislator.cc | 15 ++- src/legislator/legislator.hh | 2 +- src/main.cc | 6 +- src/message/message.cc | 185 +++++++++++++++++++++++++++++++++ src/message/message.hh | 74 +++++++++++++ src/misc/buffer.cc | 2 +- 12 files changed, 337 insertions(+), 11 deletions(-) create mode 100644 src/message/message.cc create mode 100644 src/message/message.hh diff --git a/Makefile.am b/Makefile.am index fecc8d3..e42ed11 100644 --- a/Makefile.am +++ b/Makefile.am @@ -57,6 +57,9 @@ MISC = \ src/misc/logger.hh \ src/misc/buffer.cc +MESSAGE = \ + src/message/message.cc + EVENTS = \ src/events/event-loop.hh \ src/events/event-loop.cc \ @@ -87,6 +90,7 @@ SOURCES = \ $(CONFIG) \ $(EVENTS) \ $(LEGISLATOR) \ + $(MESSAGE) \ $(MISC) \ $(SOCKET) \ $(SCHEDULER) \ diff --git a/src/error/connection-closed.hh b/src/error/connection-closed.hh index a8e7759..62c19b3 100644 --- a/src/error/connection-closed.hh +++ b/src/error/connection-closed.hh @@ -4,7 +4,7 @@ #include #include -namespace http +namespace paxos { struct ConnectionRecvClosedError: public std::system_error { diff --git a/src/error/connection-failed.hh b/src/error/connection-failed.hh index fb96959..a3b2021 100644 --- a/src/error/connection-failed.hh +++ b/src/error/connection-failed.hh @@ -4,7 +4,7 @@ #include #include -namespace http +namespace paxos { struct ConnectionFailed : public std::system_error { diff --git a/src/error/parsing-error.hh b/src/error/parsing-error.hh index 6dc4e75..a1e45f2 100644 --- a/src/error/parsing-error.hh +++ b/src/error/parsing-error.hh @@ -3,7 +3,7 @@ #include #include -namespace http +namespace paxos { struct ParsingError : public std::logic_error { diff --git a/src/events/send.cc b/src/events/send.cc index e51bf9b..36d0c17 100644 --- a/src/events/send.cc +++ b/src/events/send.cc @@ -3,6 +3,9 @@ #include "events/register.hh" #include "error/connection-closed.hh" #include "error/parsing-error.hh" +#include "misc/logger.hh" +#include "misc/addrinfo/addrinfo.hh" +#include "error/connection-failed.hh" namespace paxos { @@ -20,4 +23,51 @@ namespace paxos { } + + static shared_socket connect_to_ip_port(std::string ip, std::string port) + { + /* prepare hints structure */ + misc::AddrInfoHint hints; + hints.family(AF_UNSPEC); + hints.socktype(SOCK_STREAM); + misc::AddrInfo result = misc::getaddrinfo(ip.c_str(), port.c_str(), hints); + + for (auto it : result) + { + try + { + auto vhost_socket = std::make_shared + (it.ai_family, it.ai_socktype, it.ai_protocol); + vhost_socket->connect(it.ai_addr, it.ai_addrlen); + vhost_socket->fcntl_set_O_NONBLOCK(); + return vhost_socket; + } + catch (const std::exception& e) + { + continue; + } + } + throw ConnectionFailed(); + } + + + void SendEW::send_message(Message message, shared_legislator legislator) + { + std::string ip = legislator->config_.ip; + std::string port = legislator->config_.port; + shared_socket socket; + try + { + socket = connect_to_ip_port(ip, port); + } + catch (const ConnectionFailed& e) + { + log("Could not connect to " + legislator->config_.name, red); + } + + shared_connection connection = std::make_shared(socket); + message.fill_buffer(connection->get_buffer_out()); + + event_register.register_event(connection); + } } diff --git a/src/events/send.hh b/src/events/send.hh index 514b759..0696e13 100644 --- a/src/events/send.hh +++ b/src/events/send.hh @@ -4,6 +4,8 @@ #include "events/events.hh" #include "connection/connection.hh" +#include "message/message.hh" +#include "legislator/legislator.hh" namespace paxos { @@ -24,6 +26,8 @@ namespace paxos */ void operator()() final; + static void send_message(Message message, shared_legislator legislator); + private: shared_connection connection_; }; diff --git a/src/legislator/legislator.cc b/src/legislator/legislator.cc index fbe517e..e1950e7 100644 --- a/src/legislator/legislator.cc +++ b/src/legislator/legislator.cc @@ -1,6 +1,10 @@ +#include + #include "legislator.hh" #include "misc/logger.hh" -#include +#include "message/message.hh" +#include "events/register.hh" +#include "events/send.hh" namespace paxos { @@ -20,8 +24,13 @@ namespace paxos void Legislator::send_next_ballot(int ballot) { - ballot = ballot; - return; + std::string ballot_string = std::to_string(ballot); + Message message; + message.set_method("NextBallot"); + message.add_header("ballot", ballot_string); + + for (auto legislator : legislators) + SendEW::send_message(message, legislator.second); } void Legislator::receive_next_ballot(int ballot, std::string sender) diff --git a/src/legislator/legislator.hh b/src/legislator/legislator.hh index cbefdf8..7f6a1d8 100644 --- a/src/legislator/legislator.hh +++ b/src/legislator/legislator.hh @@ -27,5 +27,5 @@ namespace paxos using shared_legislator = std::shared_ptr; extern shared_legislator self; - extern std::vector legislators; + extern std::unordered_map legislators; } diff --git a/src/main.cc b/src/main.cc index a23a65e..8c13871 100644 --- a/src/main.cc +++ b/src/main.cc @@ -1,5 +1,5 @@ #include -#include +#include #include "config/config.hh" #include "legislator/legislator-factory.hh" @@ -7,7 +7,7 @@ #include "misc/logger.hh" paxos::EventWatcherRegistry paxos::event_register; -std::vector paxos::legislators; +std::unordered_map paxos::legislators; paxos::shared_legislator paxos::self; ev_signal sigint_watcher; @@ -38,7 +38,7 @@ int main(int, char **argv) for (auto config : server_config.legislators_) { - paxos::legislators.push_back(paxos::LegislatorFactory::Create(config)); + paxos::legislators.insert(std::pair(config.name, paxos::LegislatorFactory::Create(config))); } paxos::event_register.get_event_loop()(); diff --git a/src/message/message.cc b/src/message/message.cc new file mode 100644 index 0000000..4b84997 --- /dev/null +++ b/src/message/message.cc @@ -0,0 +1,185 @@ +#include +#include +#include + +#include "error/parsing-error.hh" +#include "message.hh" + + +namespace paxos +{ + static inline bool is_white_space(char c) + { + return c == ' ' || c == '\t' || c == '\n' || c == '\r'; + } + + static inline bool is_digit(const char c) + { + return c >= '0' && c <= '9'; + } + + static inline bool is_t_char(char c) + { + return c == '!' || c == '#' || c == '$' || c == '%' || c == '&' + || c == '\'' || c == '*' || c == '+' || c == '-' || c == '.' + || c == '^' || c == '_' || c == '`' || c == '|' || c == '~' + || is_digit(c) || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z'); + } + + void strip_white_spaces(std::string& str) + { + int nb_leading_white_spaces = 0; + for (auto i = str.begin(); i != str.end() + && is_white_space(*i); ++i) + ++nb_leading_white_spaces; + str.erase(0, nb_leading_white_spaces); + + auto len = str.size(); + while (len && is_white_space(str[len - 1])) + { + str.pop_back(); + len = str.size(); + } + } + + static inline bool contains_only_tchars(const std::string& str) + { + for (auto c : str) + if (!is_t_char(c)) + return false; + return true; + } + + bool Message::parse_first_line(const std::string& line) + { + std::istringstream stream(line); + stream >> method_; + if (stream.eof()) + return false; + std::string crlf; + stream >> crlf; + return true; + } + + bool Message::parse_header(const std::string& header_line) + { + auto name_separator_index = header_line.find_first_of(':'); + if (name_separator_index == std::string::npos) + return false; + + auto header_name = header_line.substr(0, name_separator_index); + if (!contains_only_tchars(header_name)) + throw ParsingError("Forbidden whitespace in header name"); + + auto field_value = header_line.substr(name_separator_index + 1); + + strip_white_spaces(field_value); + + return parse_header(header_name, field_value); + } + + bool Message::parse_header(std::string& header_name, + const std::string& field_value) + { + if (headers_[header_name] == "") + headers_[header_name] = field_value; + else + headers_[header_name] += "," + field_value; + return true; + } + + + void Message::pop_buffer(misc::Buffer& buffer) + { + for (auto line = buffer.popline(); + line != "\r\n"; + line = buffer.popline()) + continue; + } + + std::string& Message::get_method() + { + return method_; + } + + void Message::set_method(std::string method) + { + method_ = method; + } + + Message::headers_map_t& Message::get_headers() + { + return headers_; + } + + std::optional Message::get_header(std::string header_name) const + { + auto res = headers_.find(header_name); + if (res == headers_.end()) + return std::nullopt; + + return res->second; + } + + + void Message::add_header(std::string header, + const std::string& header_value) + { + headers_[header] = header_value; + } + + void Message::remove_header(std::string header) + { + auto header_iter = headers_.find(header); + if (header_iter != headers_.end()) + headers_.erase(header_iter); + } + + void Message::fill_buffer(misc::Buffer& buffer) + { + std::string first_line = get_method() + "\r\n"; + buffer.append(first_line); + + std::string headers; + for (auto header: get_headers()) + headers += header.first + ": " + header.second + "\r\n"; + + buffer.append(headers); + buffer.append("\r\n"); + } + + bool Message::parse(misc::Buffer& buffer) + { + if (buffer.empty()) + return false; + size_t line_nb = 0; + if (!parse_first_line(buffer.getline(line_nb))) + { + if (buffer.get_nb_lines() == 0) + return false; + else + throw ParsingError("First line too short"); + } + ++line_nb; + for (auto line = buffer.getline(line_nb); + line_nb < buffer.get_nb_lines() && line != "\r\n"; + line = buffer.getline(line_nb)) + { + if (!parse_header(line)) + { + if (line_nb == buffer.get_nb_lines()) + return false; + else + throw ParsingError("Invalid header"); + } + ++line_nb; + } + if (line_nb >= buffer.get_nb_lines()) + return false; + + pop_buffer(buffer); + + return true; + } + +} diff --git a/src/message/message.hh b/src/message/message.hh new file mode 100644 index 0000000..f876e87 --- /dev/null +++ b/src/message/message.hh @@ -0,0 +1,74 @@ +#pragma once +#include + +#include "misc/buffer.hh" + +namespace paxos +{ + class Message + { + public: + //ALIASES + using headers_map_t = std::unordered_map; + + Message() = default; + Message(const Message&) = default; + Message& operator=(const Message&) = default; + Message(Message&&) = default; + Message& operator=(Message&&) = default; + ~Message() = default; + + //PARSING + bool parse(misc::Buffer& buffer); + + /** + ** \brief Parse the given header line + ** \param header_line the header line to parse + ** \return false if the line is incomplete, true if it is valid + ** throw a ParsingError if there is a whitespace between the + ** header name and the first ':' + ** this is sufficient request invalidity condition. + */ + bool parse_header(const std::string& header_line); + + /** + ** \brief Add the given header name and value in the headers map. + ** If the header name already exists, the new value is concatenated + ** to the old one with a separating ','. + ** \param header_name The name of the header to be added, it is case + ** insensitive + ** \param field_value the value of the field, white spaces need to + ** be stripped + ** \return true + */ + bool parse_header(std::string& header_name, + const std::string& field_value); + + bool parse_first_line(const std::string& line); + + + //GETTERS + + std::string& get_method(); + + headers_map_t& get_headers(); + std::optional get_header(std::string header_name) const; + + void pop_buffer(misc::Buffer& buffer); + + void add_header(std::string header, const std::string& header_value); + + void remove_header(std::string header); + + void set_method(std::string method); + + void fill_buffer(misc::Buffer& buffer); + + protected: + std::string method_; + + headers_map_t headers_; + }; + + void strip_white_spaces(std::string& str); +} diff --git a/src/misc/buffer.cc b/src/misc/buffer.cc index 2559a99..b18ee20 100644 --- a/src/misc/buffer.cc +++ b/src/misc/buffer.cc @@ -44,7 +44,7 @@ namespace misc char buf[BUFF_SIZE]; ssize_t n = socket.recv(buf, BUFF_SIZE); if (n == 0) - throw http::ConnectionRecvClosedError(); + throw paxos::ConnectionRecvClosedError(); if (n == -1) return 0;