How to design proper release of a boost::asio socket or wrapper thereof

Like I said, I fail to see how using smart pointers is “cheating, and cheating big”. I also do not think your assessment that “they do this for brevity” holds water.


Here’s a slightly redacted excerpt¹ from our code base that exemplifies how using shared_ptrs doesn’t preclude tracking connections.

It shows just the server side of things, with

  • a very simple connection object in connection.hpp; this uses the enable_shared_from_this

  • just the fixed size connection_pool (we have dynamically resizing pools too, hence the locking primitives). Note how we can do actions on all active connections.

    So you’d trivially write something like this to write to all clients, like on a timer:

    _pool.for_each_active([] (auto const& conn) {
        send_message(conn, hello_world_packet);
    });
    
  • a sample listener that shows how it ties in with the connection_pool (which has a sample method to close all connections)

Code Listings

  • connection.hpp

    #pragma once
    
    #include "xxx/net/rpc/protocol.hpp"
    #include "log.hpp"
    #include "stats_filer.hpp"
    #include <memory>
    
    namespace xxx { namespace net { namespace rpc {
    
        struct connection : std::enable_shared_from_this<connection>, protected LogSource {
            typedef std::shared_ptr<connection> ptr;
    
          private:
            friend struct io;
            friend struct listener;
    
            boost::asio::io_service& _svc;
            protocol::socket _socket;
            protocol::endpoint _ep;
            protocol::endpoint _peer;
          public:
    
            connection(boost::asio::io_service& svc, protocol::endpoint ep)
                : LogSource("rpc::connection"),
                  _svc(svc),
                  _socket(svc),
                  _ep(ep)
            {}
    
            void init() {
                _socket.set_option(protocol::no_delay(true));
                _peer = _socket.remote_endpoint();
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_accepted");
                debug() << "New connection from " << _peer;
            }
    
            protocol::endpoint endpoint() const { return _ep;     } 
            protocol::endpoint peer() const     { return _peer;   } 
            protocol::socket&  socket()         { return _socket; } 
    
            // TODO encapsulation
            int handle() {
                return _socket.native_handle();
            }
    
            bool valid() const { return _socket.is_open(); }
    
            void cancel() {
                _svc.post([this] { _socket.cancel(); }); 
            }
    
            using shutdown_type = boost::asio::ip::tcp::socket::shutdown_type;
            void shutdown(shutdown_type what = shutdown_type::shutdown_both) {
                _svc.post([=] { _socket.shutdown(what); }); 
            }
    
            ~connection() {
                g_stats_filer_p->inc_value("asio." + _ep.address().to_string() + ".sockets_disconnected");
            }
        };
    
    } } }
    
  • connection_pool.hpp

    #pragma once
    
    #include <mutex>
    #include "xxx/threads/null_mutex.hpp"
    #include "xxx/net/rpc/connection.hpp"
    #include "stats_filer.hpp"
    #include "log.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        // not thread-safe by default, but pass e.g. std::mutex for `Mutex` if you need it
        template <typename Ptr = xxx::net::rpc::connection::ptr, typename Mutex = xxx::threads::null_mutex>
        struct basic_connection_pool : LogSource {
            using WeakPtr = std::weak_ptr<typename Ptr::element_type>;
    
            basic_connection_pool(std::string name = "connection_pool", size_t size)
                : LogSource(std::move(name)), _pool(size) 
            { }
    
            bool try_insert(Ptr const& conn) {
                std::lock_guard<Mutex> lk(_mx);
    
                auto slot = std::find_if(_pool.begin(), _pool.end(), std::mem_fn(&WeakPtr::expired));
    
                if (slot == _pool.end()) {
                    g_stats_filer_p->inc_value("asio." + conn->endpoint().address().to_string() + ".connections_dropped");
                    error() << "dropping connection from " << conn->peer() << ": connection pool (" << _pool.size() << ") saturated";
                    return false;
                }
    
                *slot = conn;
                return true;
            }
    
            template <typename F>
            void for_each_active(F action) {
                auto locked = [=] {
                    using namespace std;
                    lock_guard<Mutex> lk(_mx);
                    vector<Ptr> locked(_pool.size());
                    transform(_pool.begin(), _pool.end(), locked.begin(), mem_fn(&WeakPtr::lock));
                    return locked;
                }();
    
                for (auto const& p : locked)
                    if (p) action(p);
            }
    
            constexpr static bool synchronizing() {
                return not std::is_same<xxx::threads::null_mutex, Mutex>();
            }
    
          private:
            void dump_stats(LogSource::LogTx tx) const {
                // lock is assumed!
                size_t empty = 0, busy = 0, idle = 0;
    
                for (auto& p : _pool) {
                    switch (p.use_count()) {
                        case 0:  empty++; break;
                        case 1:  idle++;  break;
                        default: busy++;  break;
                    }
                }
    
                tx << "usage empty:" << empty << " busy:" << busy << " idle:" << idle;
            }
    
            Mutex _mx;
            std::vector<WeakPtr> _pool;
        };
    
        // TODO FIXME use null_mutex once growing is no longer required AND if
        // en-pooling still only happens from the single IO thread (XXX-2535)
        using server_connection_pool = basic_connection_pool<xxx::net::rpc::connection::ptr, std::mutex>;
    
    } } }
    
  • listener.hpp

    #pragma once
    
    #include "xxx/threads/null_mutex.hpp"
    #include <mutex>
    #include "xxx/net/rpc/connection_pool.hpp"
    #include "xxx/net/rpc/io_operations.hpp"
    
    namespace xxx { namespace net { namespace rpc {
    
        struct listener : std::enable_shared_from_this<listener>, LogSource {
            typedef std::shared_ptr<listener> ptr;
    
            protocol::acceptor _acceptor;
            protocol::endpoint _ep;
    
            listener(boost::asio::io_service& svc, protocol::endpoint ep, server_connection_pool& pool) 
                : LogSource("rpc::listener"), _acceptor(svc), _ep(ep), _pool(pool)
            {
                _acceptor.open(ep.protocol());
    
                _acceptor.set_option(protocol::acceptor::reuse_address(true));
                _acceptor.set_option(protocol::no_delay(true));
                ::fcntl(_acceptor.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
                _acceptor.bind(ep);
    
                _acceptor.listen(32);
            }
    
            void accept_loop(std::function<void(connection::ptr conn)> on_accept) {
    
                auto self = shared_from_this();
                auto conn = std::make_shared<xxx::net::rpc::connection>(_acceptor.get_io_service(), _ep);
    
                _acceptor.async_accept(conn->_socket, [this,self,conn,on_accept](boost::system::error_code ec) {
                    if (ec) {
                        auto tx = ec == boost::asio::error::operation_aborted? debug() : warn();
                        tx << "failed accept " << ec.message();
                    } else {
                        ::fcntl(conn->_socket.native(), F_SETFD, FD_CLOEXEC); // FIXME use non-racy socket factory?
    
                        if (_pool.try_insert(conn)) {
                            on_accept(conn);
                        }
    
                        self->accept_loop(on_accept);
                    }
                });
            }
    
            void close() {
                _acceptor.cancel();
                _acceptor.close();
    
                _acceptor.get_io_service().post([=] {
                    _pool.for_each_active([] (auto const& sp) {
                        sp->shutdown(connection::shutdown_type::shutdown_both);
                        sp->cancel();
                    });
                });
    
                debug() << "shutdown";
            }
    
            ~listener() {
            }
    
          private:
            server_connection_pool& _pool;
        };
    
    } } }
    

¹ download as gist https://gist.github.com/sehe/979af25b8ac4fd77e73cdf1da37ab4c2

Leave a Comment