Message parsing and serialization, connection to other legislators
This commit is contained in:
parent
fd4de7091c
commit
6b80cc68ce
@ -57,6 +57,9 @@ MISC = \
|
||||
src/misc/logger.hh \
|
||||
src/misc/buffer.cc
|
||||
|
||||
MESSAGE = \
|
||||
src/message/message.cc
|
||||
|
||||
EVENTS = \
|
||||
src/events/event-loop.hh \
|
||||
src/events/event-loop.cc \
|
||||
@ -87,6 +90,7 @@ SOURCES = \
|
||||
$(CONFIG) \
|
||||
$(EVENTS) \
|
||||
$(LEGISLATOR) \
|
||||
$(MESSAGE) \
|
||||
$(MISC) \
|
||||
$(SOCKET) \
|
||||
$(SCHEDULER) \
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <string>
|
||||
#include <system_error>
|
||||
|
||||
namespace http
|
||||
namespace paxos
|
||||
{
|
||||
struct ConnectionRecvClosedError: public std::system_error
|
||||
{
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <string>
|
||||
#include <system_error>
|
||||
|
||||
namespace http
|
||||
namespace paxos
|
||||
{
|
||||
struct ConnectionFailed : public std::system_error
|
||||
{
|
||||
|
@ -3,7 +3,7 @@
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
|
||||
namespace http
|
||||
namespace paxos
|
||||
{
|
||||
struct ParsingError : public std::logic_error
|
||||
{
|
||||
|
@ -3,6 +3,9 @@
|
||||
#include "events/register.hh"
|
||||
#include "error/connection-closed.hh"
|
||||
#include "error/parsing-error.hh"
|
||||
#include "misc/logger.hh"
|
||||
#include "misc/addrinfo/addrinfo.hh"
|
||||
#include "error/connection-failed.hh"
|
||||
|
||||
namespace paxos
|
||||
{
|
||||
@ -20,4 +23,51 @@ namespace paxos
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
static shared_socket connect_to_ip_port(std::string ip, std::string port)
|
||||
{
|
||||
/* prepare hints structure */
|
||||
misc::AddrInfoHint hints;
|
||||
hints.family(AF_UNSPEC);
|
||||
hints.socktype(SOCK_STREAM);
|
||||
misc::AddrInfo result = misc::getaddrinfo(ip.c_str(), port.c_str(), hints);
|
||||
|
||||
for (auto it : result)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto vhost_socket = std::make_shared<DefaultSocket>
|
||||
(it.ai_family, it.ai_socktype, it.ai_protocol);
|
||||
vhost_socket->connect(it.ai_addr, it.ai_addrlen);
|
||||
vhost_socket->fcntl_set_O_NONBLOCK();
|
||||
return vhost_socket;
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
throw ConnectionFailed();
|
||||
}
|
||||
|
||||
|
||||
void SendEW::send_message(Message message, shared_legislator legislator)
|
||||
{
|
||||
std::string ip = legislator->config_.ip;
|
||||
std::string port = legislator->config_.port;
|
||||
shared_socket socket;
|
||||
try
|
||||
{
|
||||
socket = connect_to_ip_port(ip, port);
|
||||
}
|
||||
catch (const ConnectionFailed& e)
|
||||
{
|
||||
log("Could not connect to " + legislator->config_.name, red);
|
||||
}
|
||||
|
||||
shared_connection connection = std::make_shared<Connection>(socket);
|
||||
message.fill_buffer(connection->get_buffer_out());
|
||||
|
||||
event_register.register_event<SendEW>(connection);
|
||||
}
|
||||
}
|
||||
|
@ -4,6 +4,8 @@
|
||||
|
||||
#include "events/events.hh"
|
||||
#include "connection/connection.hh"
|
||||
#include "message/message.hh"
|
||||
#include "legislator/legislator.hh"
|
||||
|
||||
namespace paxos
|
||||
{
|
||||
@ -24,6 +26,8 @@ namespace paxos
|
||||
*/
|
||||
void operator()() final;
|
||||
|
||||
static void send_message(Message message, shared_legislator legislator);
|
||||
|
||||
private:
|
||||
shared_connection connection_;
|
||||
};
|
||||
|
@ -1,6 +1,10 @@
|
||||
#include <string>
|
||||
|
||||
#include "legislator.hh"
|
||||
#include "misc/logger.hh"
|
||||
#include <string>
|
||||
#include "message/message.hh"
|
||||
#include "events/register.hh"
|
||||
#include "events/send.hh"
|
||||
|
||||
namespace paxos
|
||||
{
|
||||
@ -20,8 +24,13 @@ namespace paxos
|
||||
|
||||
void Legislator::send_next_ballot(int ballot)
|
||||
{
|
||||
ballot = ballot;
|
||||
return;
|
||||
std::string ballot_string = std::to_string(ballot);
|
||||
Message message;
|
||||
message.set_method("NextBallot");
|
||||
message.add_header("ballot", ballot_string);
|
||||
|
||||
for (auto legislator : legislators)
|
||||
SendEW::send_message(message, legislator.second);
|
||||
}
|
||||
|
||||
void Legislator::receive_next_ballot(int ballot, std::string sender)
|
||||
|
@ -27,5 +27,5 @@ namespace paxos
|
||||
using shared_legislator = std::shared_ptr<Legislator>;
|
||||
|
||||
extern shared_legislator self;
|
||||
extern std::vector<shared_legislator> legislators;
|
||||
extern std::unordered_map<std::string, shared_legislator> legislators;
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "config/config.hh"
|
||||
#include "legislator/legislator-factory.hh"
|
||||
@ -7,7 +7,7 @@
|
||||
#include "misc/logger.hh"
|
||||
|
||||
paxos::EventWatcherRegistry paxos::event_register;
|
||||
std::vector<paxos::shared_legislator> paxos::legislators;
|
||||
std::unordered_map<std::string, paxos::shared_legislator> paxos::legislators;
|
||||
paxos::shared_legislator paxos::self;
|
||||
|
||||
ev_signal sigint_watcher;
|
||||
@ -38,7 +38,7 @@ int main(int, char **argv)
|
||||
|
||||
for (auto config : server_config.legislators_)
|
||||
{
|
||||
paxos::legislators.push_back(paxos::LegislatorFactory::Create(config));
|
||||
paxos::legislators.insert(std::pair(config.name, paxos::LegislatorFactory::Create(config)));
|
||||
}
|
||||
|
||||
paxos::event_register.get_event_loop()();
|
||||
|
185
src/message/message.cc
Normal file
185
src/message/message.cc
Normal file
@ -0,0 +1,185 @@
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
#include "error/parsing-error.hh"
|
||||
#include "message.hh"
|
||||
|
||||
|
||||
namespace paxos
|
||||
{
|
||||
static inline bool is_white_space(char c)
|
||||
{
|
||||
return c == ' ' || c == '\t' || c == '\n' || c == '\r';
|
||||
}
|
||||
|
||||
static inline bool is_digit(const char c)
|
||||
{
|
||||
return c >= '0' && c <= '9';
|
||||
}
|
||||
|
||||
static inline bool is_t_char(char c)
|
||||
{
|
||||
return c == '!' || c == '#' || c == '$' || c == '%' || c == '&'
|
||||
|| c == '\'' || c == '*' || c == '+' || c == '-' || c == '.'
|
||||
|| c == '^' || c == '_' || c == '`' || c == '|' || c == '~'
|
||||
|| is_digit(c) || (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z');
|
||||
}
|
||||
|
||||
void strip_white_spaces(std::string& str)
|
||||
{
|
||||
int nb_leading_white_spaces = 0;
|
||||
for (auto i = str.begin(); i != str.end()
|
||||
&& is_white_space(*i); ++i)
|
||||
++nb_leading_white_spaces;
|
||||
str.erase(0, nb_leading_white_spaces);
|
||||
|
||||
auto len = str.size();
|
||||
while (len && is_white_space(str[len - 1]))
|
||||
{
|
||||
str.pop_back();
|
||||
len = str.size();
|
||||
}
|
||||
}
|
||||
|
||||
static inline bool contains_only_tchars(const std::string& str)
|
||||
{
|
||||
for (auto c : str)
|
||||
if (!is_t_char(c))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Message::parse_first_line(const std::string& line)
|
||||
{
|
||||
std::istringstream stream(line);
|
||||
stream >> method_;
|
||||
if (stream.eof())
|
||||
return false;
|
||||
std::string crlf;
|
||||
stream >> crlf;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Message::parse_header(const std::string& header_line)
|
||||
{
|
||||
auto name_separator_index = header_line.find_first_of(':');
|
||||
if (name_separator_index == std::string::npos)
|
||||
return false;
|
||||
|
||||
auto header_name = header_line.substr(0, name_separator_index);
|
||||
if (!contains_only_tchars(header_name))
|
||||
throw ParsingError("Forbidden whitespace in header name");
|
||||
|
||||
auto field_value = header_line.substr(name_separator_index + 1);
|
||||
|
||||
strip_white_spaces(field_value);
|
||||
|
||||
return parse_header(header_name, field_value);
|
||||
}
|
||||
|
||||
bool Message::parse_header(std::string& header_name,
|
||||
const std::string& field_value)
|
||||
{
|
||||
if (headers_[header_name] == "")
|
||||
headers_[header_name] = field_value;
|
||||
else
|
||||
headers_[header_name] += "," + field_value;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void Message::pop_buffer(misc::Buffer& buffer)
|
||||
{
|
||||
for (auto line = buffer.popline();
|
||||
line != "\r\n";
|
||||
line = buffer.popline())
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string& Message::get_method()
|
||||
{
|
||||
return method_;
|
||||
}
|
||||
|
||||
void Message::set_method(std::string method)
|
||||
{
|
||||
method_ = method;
|
||||
}
|
||||
|
||||
Message::headers_map_t& Message::get_headers()
|
||||
{
|
||||
return headers_;
|
||||
}
|
||||
|
||||
std::optional<std::string> Message::get_header(std::string header_name) const
|
||||
{
|
||||
auto res = headers_.find(header_name);
|
||||
if (res == headers_.end())
|
||||
return std::nullopt;
|
||||
|
||||
return res->second;
|
||||
}
|
||||
|
||||
|
||||
void Message::add_header(std::string header,
|
||||
const std::string& header_value)
|
||||
{
|
||||
headers_[header] = header_value;
|
||||
}
|
||||
|
||||
void Message::remove_header(std::string header)
|
||||
{
|
||||
auto header_iter = headers_.find(header);
|
||||
if (header_iter != headers_.end())
|
||||
headers_.erase(header_iter);
|
||||
}
|
||||
|
||||
void Message::fill_buffer(misc::Buffer& buffer)
|
||||
{
|
||||
std::string first_line = get_method() + "\r\n";
|
||||
buffer.append(first_line);
|
||||
|
||||
std::string headers;
|
||||
for (auto header: get_headers())
|
||||
headers += header.first + ": " + header.second + "\r\n";
|
||||
|
||||
buffer.append(headers);
|
||||
buffer.append("\r\n");
|
||||
}
|
||||
|
||||
bool Message::parse(misc::Buffer& buffer)
|
||||
{
|
||||
if (buffer.empty())
|
||||
return false;
|
||||
size_t line_nb = 0;
|
||||
if (!parse_first_line(buffer.getline(line_nb)))
|
||||
{
|
||||
if (buffer.get_nb_lines() == 0)
|
||||
return false;
|
||||
else
|
||||
throw ParsingError("First line too short");
|
||||
}
|
||||
++line_nb;
|
||||
for (auto line = buffer.getline(line_nb);
|
||||
line_nb < buffer.get_nb_lines() && line != "\r\n";
|
||||
line = buffer.getline(line_nb))
|
||||
{
|
||||
if (!parse_header(line))
|
||||
{
|
||||
if (line_nb == buffer.get_nb_lines())
|
||||
return false;
|
||||
else
|
||||
throw ParsingError("Invalid header");
|
||||
}
|
||||
++line_nb;
|
||||
}
|
||||
if (line_nb >= buffer.get_nb_lines())
|
||||
return false;
|
||||
|
||||
pop_buffer(buffer);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
74
src/message/message.hh
Normal file
74
src/message/message.hh
Normal file
@ -0,0 +1,74 @@
|
||||
#pragma once
|
||||
#include <unordered_map>
|
||||
|
||||
#include "misc/buffer.hh"
|
||||
|
||||
namespace paxos
|
||||
{
|
||||
class Message
|
||||
{
|
||||
public:
|
||||
//ALIASES
|
||||
using headers_map_t = std::unordered_map<std::string, std::string>;
|
||||
|
||||
Message() = default;
|
||||
Message(const Message&) = default;
|
||||
Message& operator=(const Message&) = default;
|
||||
Message(Message&&) = default;
|
||||
Message& operator=(Message&&) = default;
|
||||
~Message() = default;
|
||||
|
||||
//PARSING
|
||||
bool parse(misc::Buffer& buffer);
|
||||
|
||||
/**
|
||||
** \brief Parse the given header line
|
||||
** \param header_line the header line to parse
|
||||
** \return false if the line is incomplete, true if it is valid
|
||||
** throw a ParsingError if there is a whitespace between the
|
||||
** header name and the first ':'
|
||||
** this is sufficient request invalidity condition.
|
||||
*/
|
||||
bool parse_header(const std::string& header_line);
|
||||
|
||||
/**
|
||||
** \brief Add the given header name and value in the headers map.
|
||||
** If the header name already exists, the new value is concatenated
|
||||
** to the old one with a separating ','.
|
||||
** \param header_name The name of the header to be added, it is case
|
||||
** insensitive
|
||||
** \param field_value the value of the field, white spaces need to
|
||||
** be stripped
|
||||
** \return true
|
||||
*/
|
||||
bool parse_header(std::string& header_name,
|
||||
const std::string& field_value);
|
||||
|
||||
bool parse_first_line(const std::string& line);
|
||||
|
||||
|
||||
//GETTERS
|
||||
|
||||
std::string& get_method();
|
||||
|
||||
headers_map_t& get_headers();
|
||||
std::optional<std::string> get_header(std::string header_name) const;
|
||||
|
||||
void pop_buffer(misc::Buffer& buffer);
|
||||
|
||||
void add_header(std::string header, const std::string& header_value);
|
||||
|
||||
void remove_header(std::string header);
|
||||
|
||||
void set_method(std::string method);
|
||||
|
||||
void fill_buffer(misc::Buffer& buffer);
|
||||
|
||||
protected:
|
||||
std::string method_;
|
||||
|
||||
headers_map_t headers_;
|
||||
};
|
||||
|
||||
void strip_white_spaces(std::string& str);
|
||||
}
|
@ -44,7 +44,7 @@ namespace misc
|
||||
char buf[BUFF_SIZE];
|
||||
ssize_t n = socket.recv(buf, BUFF_SIZE);
|
||||
if (n == 0)
|
||||
throw http::ConnectionRecvClosedError();
|
||||
throw paxos::ConnectionRecvClosedError();
|
||||
if (n == -1)
|
||||
return 0;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user