1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
9 #include <cds/container/skip_list_map_hp.h>
10 #include <cds/gc/hp.h>
13 #include <rapidjson/document.h>
15 #include <websocketpp/client.hpp>
16 #include <websocketpp/concurrency/none.hpp>
17 #include <websocketpp/config/asio_client.hpp>
20 * A copy of the GDAX order book for the currency pair product given during
21 * construction, exposed as two maps, one for bids and one for offers, each
22 * mapping price levels to order quantities, continually updated in real time
23 * via the `level2` channel of the Websocket feed of the GDAX API.
25 * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
26 * process them into the maps.
28 * To ensure high performance, implemented using concurrent data structures
29 * from libcds. The price->quantity maps are instances of
30 * cds::container::SkipListMap, whose doc says it is lock-free.
34 // libcds requires paired Initialize/Terminate calls
35 struct CDSInitializer {
36 CDSInitializer() { cds::Initialize(); }
37 ~CDSInitializer() { cds::Terminate(); }
40 // a libcds garbage collector is required for our map structures
41 cds::gc::HP m_cdsGarbageCollector;
45 * libcds requires each and every thread to first "attach" itself to the
46 * lib before using any data structures. This method is used internally,
47 * and is called during construction on behalf of the constructing thread,
48 * and should be called by any additional client threads that access the
49 * price->quanitity maps.
51 static void ensureThreadAttached()
53 if (cds::threading::Manager::isThreadAttached() == false)
54 cds::threading::Manager::attachThread();
57 GDAXOrderBook(std::string const& product = "BTC-USD")
58 : m_cdsGarbageCollector(67*2),
59 // per SkipListMap doc, 67 hazard pointers per instance
63 &GDAXOrderBook::handleUpdates,
67 ensureThreadAttached();
68 m_bookInitialized.get_future().wait();
71 using Price = unsigned int; // cents
73 using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
75 cds::container::SkipListMap<
79 // reverse map ordering so best (highest) bid is at begin()
80 typename cds::container::skip_list::make_traits<
81 cds::opt::less<std::greater<Price>>>::type>;
82 // *map_t::get(Price) returns an std::pair<Price, Size>*
86 ~GDAXOrderBook() { m_client.stop(); }
89 struct websocketppConfig
90 : public websocketpp::config::asio_tls_client
92 typedef websocketpp::concurrency::none concurrency_type;
93 // we only have one thread using the WebSocket
95 using websocketclient_t = websocketpp::client<websocketppConfig>;
96 websocketclient_t m_client;
98 std::promise<void> m_bookInitialized; // to signal constructor to finish
100 std::future<void> m_threadTerminator; // for graceful thread destruction
103 * Initiates WebSocket connection, subscribes to order book updates for the
104 * given product, installs a message handler which will receive updates
105 * and process them into the maps, and starts the asio event loop.
107 void handleUpdates(std::string const& product)
109 ensureThreadAttached();
111 rapidjson::Document json;
114 m_client.clear_access_channels(websocketpp::log::alevel::all);
115 m_client.set_access_channels(
116 websocketpp::log::alevel::connect |
117 websocketpp::log::alevel::disconnect);
119 m_client.clear_error_channels(websocketpp::log::elevel::all);
120 m_client.set_error_channels(
121 websocketpp::log::elevel::info |
122 websocketpp::log::elevel::warn |
123 websocketpp::log::elevel::rerror |
124 websocketpp::log::elevel::fatal);
126 m_client.init_asio();
128 m_client.set_tls_init_handler(
129 [](websocketpp::connection_hdl)
131 websocketpp::lib::shared_ptr<boost::asio::ssl::context>
132 context = websocketpp::lib::make_shared<
133 boost::asio::ssl::context>(
134 boost::asio::ssl::context::tlsv1);
137 context->set_options(
138 boost::asio::ssl::context::default_workarounds |
139 boost::asio::ssl::context::no_sslv2 |
140 boost::asio::ssl::context::no_sslv3 |
141 boost::asio::ssl::context::single_dh_use);
142 } catch (std::exception& e) {
143 std::cerr << "set_tls_init_handler() failed to set"
144 " context options: " << e.what() << std::endl;
149 m_client.set_open_handler(
150 [this, &product](websocketpp::connection_hdl handle)
152 // subscribe to updates to product's order book
153 websocketpp::lib::error_code errorCode;
154 this->m_client.send(handle,
156 "\"type\": \"subscribe\","
157 "\"product_ids\": [" "\""+product+"\"" "],"
158 "\"channels\": [" "\"level2\"" "]"
159 "}", websocketpp::frame::opcode::text, errorCode);
161 std::cerr << "error sending subscription: " +
162 errorCode.message() << std::endl;
166 m_client.set_message_handler(
167 [this, &json] (websocketpp::connection_hdl,
168 websocketppConfig::message_type::ptr msg)
170 json.Parse(msg->get_payload().c_str());
171 const char *const type = json["type"].GetString();
172 if ( strcmp(type, "l2update") == 0 )
174 processUpdates(json, bids, offers);
176 else if ( strcmp(type, "snapshot") == 0 )
178 processSnapshot(json, bids, offers, m_bookInitialized);
182 websocketpp::lib::error_code errorCode;
184 m_client.get_connection("wss://ws-feed.gdax.com", errorCode);
186 std::cerr << "failed websocketclient_t::get_connection(): " <<
187 errorCode.message() << std::endl;
190 m_client.connect(connection);
193 } catch (websocketpp::exception const & e) {
194 std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
199 * Simply delegates snapshot processing to a helper function (different
200 * template instantiations of the same function, one for each type of map
201 * (bid, offer)), and signals when the snapshot has been processed.
203 static void processSnapshot(
204 rapidjson::Document & json,
206 offers_map_t & offers,
207 std::promise<void> & finished)
209 processSnapshotHalf(json, "bids", bids);
210 processSnapshotHalf(json, "asks", offers);
211 finished.set_value();
215 * Helper to permit code re-use on either type of map (bids or offers).
216 * Traverses already-parsed json document and inserts initial-price
217 * snapshots for entire half (bids or offers) of the order book.
219 template<typename map_t>
220 static void processSnapshotHalf(
221 rapidjson::Document const& json,
222 const char *const bidsOrOffers,
225 for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
227 Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
228 Size size = std::stod(json[bidsOrOffers][j][1].GetString());
230 map.insert(price, size);
235 * Traverses already-parsed json document, and, assuming it's a "l2update"
236 * document, updates price->quantity maps based on the order book changes
237 * that have occurred.
239 static void processUpdates(
240 rapidjson::Document & json,
242 offers_map_t & offers)
244 for (auto i = 0 ; i < json["changes"].Size() ; ++i)
246 const char* buyOrSell = json["changes"][i][0].GetString(),
247 * price = json["changes"][i][1].GetString(),
248 * size = json["changes"][i][2].GetString();
250 if ( strcmp(buyOrSell, "buy") == 0 )
252 updateMap(price, size, bids);
256 updateMap(price, size, offers);
262 * Helper to permit code re-use on either type of map (bids or offers).
263 * Simply updates a single map entry with the specified price/size.
265 template<typename map_t>
266 static void updateMap(
267 const char *const price,
268 const char *const size,
271 if (std::stod(size) == 0) { map.erase(std::stod(price)); }
275 std::stod(price)*100,
277 std::pair<const Price, Size> & pair)
279 pair.second = std::stod(size);
285 #endif // GDAX_ORDERBOOK_HPP