From daa3c820878f8e189120cf9359caf8b2359d61ca Mon Sep 17 00:00:00 2001 From: ipkn Date: Sun, 17 Aug 2014 05:35:21 -0400 Subject: improve performance by 2x change to io_service per CPU model --- include/dumb_timer_queue.h | 12 ++++------ include/http_connection.h | 9 +++++-- include/http_server.h | 59 ++++++++++++++++++++++++++++++++++------------ 3 files changed, 55 insertions(+), 25 deletions(-) (limited to 'include') diff --git a/include/dumb_timer_queue.h b/include/dumb_timer_queue.h index ebb8bb1..86ec7c1 100644 --- a/include/dumb_timer_queue.h +++ b/include/dumb_timer_queue.h @@ -29,19 +29,17 @@ namespace crow k.first = nullptr; if (!self) return; - self->mutex_.lock(); + unsigned int index = (unsigned int)(k.second - self->step_); if (index < self->dq_.size()) self->dq_[index].second = nullptr; - self->mutex_.unlock(); } key add(std::function f) { - mutex_.lock(); dq_.emplace_back(std::chrono::steady_clock::now(), std::move(f)); int ret = step_+dq_.size()-1; - mutex_.unlock(); + CROW_LOG_DEBUG << "timer add inside: " << this << ' ' << ret ; return {this, ret}; } @@ -50,7 +48,7 @@ namespace crow { if (!io_service_) return; - mutex_.lock(); + auto now = std::chrono::steady_clock::now(); while(!dq_.empty()) { @@ -60,13 +58,12 @@ namespace crow if (x.second) { CROW_LOG_DEBUG << "timer call: " << this << ' ' << step_; - //io_service_->post(std::move(x.second)); + // we know that timer handlers are very simple currenty; call here x.second(); } dq_.pop_front(); step_++; } - mutex_.unlock(); } void set_io_service(boost::asio::io_service& io_service) @@ -82,7 +79,6 @@ namespace crow int tick{5}; boost::asio::io_service* io_service_{}; std::deque>> dq_; - std::mutex mutex_; int step_{}; }; } diff --git a/include/http_connection.h b/include/http_connection.h index 3232476..907485f 100644 --- a/include/http_connection.h +++ b/include/http_connection.h @@ -26,8 +26,8 @@ namespace crow class Connection { public: - Connection(tcp::socket&& socket, Handler* handler, const std::string& server_name) - : socket_(std::move(socket)), + Connection(boost::asio::io_service& io_service, Handler* handler, const std::string& server_name) + : socket_(io_service), handler_(handler), parser_(this), server_name_(server_name) @@ -48,6 +48,11 @@ namespace crow #endif } + tcp::socket& socket() + { + return socket_; + } + void start() { //auto self = this->shared_from_this(); diff --git a/include/http_server.h b/include/http_server.h index 73f62f0..6dc845a 100644 --- a/include/http_server.h +++ b/include/http_server.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -23,53 +24,80 @@ namespace crow public: Server(Handler* handler, uint16_t port, uint16_t concurrency = 1) : acceptor_(io_service_, tcp::endpoint(asio::ip::address(), port)), - socket_(io_service_), signals_(io_service_, SIGINT, SIGTERM), handler_(handler), concurrency_(concurrency), port_(port) { - do_accept(); } void run() { + if (concurrency_ < 0) + concurrency_ = 1; + + for(int i = 0; i < concurrency_; i++) + io_service_pool_.emplace_back(new boost::asio::io_service()); + std::vector> v; for(uint16_t i = 0; i < concurrency_; i ++) v.push_back( - std::async(std::launch::async, [this]{ + std::async(std::launch::async, [this, i]{ auto& timer_queue = detail::dumb_timer_queue::get_current_dumb_timer_queue(); - timer_queue.set_io_service(io_service_); - while(!io_service_.stopped()) - { + timer_queue.set_io_service(*io_service_pool_[i]); + boost::asio::deadline_timer timer(*io_service_pool_[i]); + timer.expires_from_now(boost::posix_time::seconds(1)); + std::function handler; + handler = [&](const boost::system::error_code& ec){ + if (ec) + return; timer_queue.process(); - io_service_.poll_one(); - } + timer.expires_from_now(boost::posix_time::seconds(1)); + timer.async_wait(handler); + }; + timer.async_wait(handler); + io_service_pool_[i]->run(); })); - CROW_LOG_INFO << server_name_ << " server is running, local port " << port_; signals_.async_wait( [&](const boost::system::error_code& error, int signal_number){ - io_service_.stop(); + stop(); }); - + + do_accept(); + + v.push_back(std::async(std::launch::async, [this]{ + io_service_.run(); + CROW_LOG_INFO << "Exiting."; + })); } void stop() { io_service_.stop(); + for(auto& io_service:io_service_pool_) + io_service->stop(); } private: + asio::io_service& pick_io_service() + { + // TODO load balancing + roundrobin_index_++; + if (roundrobin_index_ >= io_service_pool_.size()) + roundrobin_index_ = 0; + return *io_service_pool_[roundrobin_index_]; + } + void do_accept() { - acceptor_.async_accept(socket_, - [this](boost::system::error_code ec) + auto p = new Connection(pick_io_service(), handler_, server_name_); + acceptor_.async_accept(p->socket(), + [this, p](boost::system::error_code ec) { if (!ec) { - auto p = new Connection(std::move(socket_), handler_, server_name_); p->start(); } do_accept(); @@ -78,13 +106,14 @@ namespace crow private: asio::io_service io_service_; + std::vector> io_service_pool_; tcp::acceptor acceptor_; - tcp::socket socket_; boost::asio::signal_set signals_; Handler* handler_; uint16_t concurrency_{1}; std::string server_name_ = "Crow/0.1"; uint16_t port_; + unsigned int roundrobin_index_{}; }; } -- cgit v1.2.3-54-g00ecf