1 #ifndef GDAX_ORDERBOOK_HPP
2 #define GDAX_ORDERBOOK_HPP
10 #include <cds/container/skip_list_map_hp.h>
11 #include <cds/gc/hp.h>
14 #include <rapidjson/document.h>
16 #include <websocketpp/client.hpp>
17 #include <websocketpp/concurrency/none.hpp>
18 #include <websocketpp/config/asio_client.hpp>
19 #define FILENAME "example90.txt"
22 * A copy of the GDAX order book for the currency pair product given during
23 * construction, exposed as two maps, one for bids and one for offers, each
24 * mapping price levels to order quantities, continually updated in real time
25 * via the `level2` channel of the Websocket feed of the GDAX API.
27 * Spawns a separate thread to receive updates from the GDAX WebSocket Feed and
28 * process them into the maps.
30 * To ensure high performance, implemented using concurrent data structures
31 * from libcds. The price->quantity maps are instances of
32 * cds::container::SkipListMap, whose doc says it is lock-free.
36 // libcds requires paired Initialize/Terminate calls
37 struct CDSInitializer {
38 CDSInitializer() { cds::Initialize(); }
39 ~CDSInitializer() { cds::Terminate(); }
42 // a libcds garbage collector is required for our map structures
43 cds::gc::HP m_cdsGarbageCollector;
47 * libcds requires each and every thread to first "attach" itself to the
48 * lib before using any data structures. This method is used internally,
49 * and is called during construction on behalf of the constructing thread,
50 * and should be called by any additional client threads that access the
51 * price->quanitity maps.
53 static void ensureThreadAttached()
55 if (cds::threading::Manager::isThreadAttached() == false)
56 cds::threading::Manager::attachThread();
58 //std::ofstream myfile;
60 GDAXOrderBook(std::string const& product = "BTC-USD")
61 : m_cdsGarbageCollector(67*2),
62 // per SkipListMap doc, 67 hazard pointers per instance
66 &GDAXOrderBook::handleUpdates,
70 ensureThreadAttached();
71 m_bookInitialized.get_future().wait();
76 using Price = unsigned int; // cents
78 using offers_map_t = cds::container::SkipListMap<cds::gc::HP, Price, Size>;
80 cds::container::SkipListMap<
84 // reverse map ordering so best (highest) bid is at begin()
85 typename cds::container::skip_list::make_traits<
86 cds::opt::less<std::greater<Price>>>::type>;
87 // *map_t::get(Price) returns an std::pair<Price, Size>*
91 ~GDAXOrderBook() { m_client.stop(); myfile.close(); }
94 struct websocketppConfig
95 : public websocketpp::config::asio_tls_client
97 typedef websocketpp::concurrency::none concurrency_type;
98 // we only have one thread using the WebSocket
100 using websocketclient_t = websocketpp::client<websocketppConfig>;
101 websocketclient_t m_client;
103 std::promise<void> m_bookInitialized; // to signal constructor to finish
105 std::future<void> m_threadTerminator; // for graceful thread destruction
108 * Initiates WebSocket connection, subscribes to order book updates for the
109 * given product, installs a message handler which will receive updates
110 * and process them into the maps, and starts the asio event loop.
112 void handleUpdates(std::string const& product)
114 ensureThreadAttached();
116 rapidjson::Document json;
119 m_client.clear_access_channels(websocketpp::log::alevel::all);
120 m_client.set_access_channels(
121 websocketpp::log::alevel::connect |
122 websocketpp::log::alevel::disconnect);
124 m_client.clear_error_channels(websocketpp::log::elevel::all);
125 m_client.set_error_channels(
126 websocketpp::log::elevel::info |
127 websocketpp::log::elevel::warn |
128 websocketpp::log::elevel::rerror |
129 websocketpp::log::elevel::fatal);
131 m_client.init_asio();
135 m_client.set_tls_init_handler(
136 [](websocketpp::connection_hdl)
138 websocketpp::lib::shared_ptr<boost::asio::ssl::context>
139 context = websocketpp::lib::make_shared<
140 boost::asio::ssl::context>(
141 boost::asio::ssl::context::tlsv1);
144 context->set_options(
145 boost::asio::ssl::context::default_workarounds |
146 boost::asio::ssl::context::no_sslv2 |
147 boost::asio::ssl::context::no_sslv3 |
148 boost::asio::ssl::context::single_dh_use);
149 } catch (std::exception& e) {
150 std::cerr << "set_tls_init_handler() failed to set"
151 " context options: " << e.what() << std::endl;
156 m_client.set_open_handler(
157 [this, &product](websocketpp::connection_hdl handle)
159 // subscribe to updates to product's order book
160 websocketpp::lib::error_code errorCode;
161 this->m_client.send(handle,
163 "\"type\": \"subscribe\","
164 "\"product_ids\": [" "\""+product+"\"" "],"
165 "\"channels\": [" "\"level2\"" "]"
166 "}", websocketpp::frame::opcode::text, errorCode);
168 std::cerr << "error sending subscription: " +
169 errorCode.message() << std::endl;
173 m_client.set_message_handler(
174 [this, &json] (websocketpp::connection_hdl,
175 websocketppConfig::message_type::ptr msg)
178 /*if(!myfile.is_open())
179 myfile.open(FILENAME);
182 myfile <<msg->get_payload().c_str()<<std::endl;
184 else std::cout << "Unable to open file";*/
186 json.Parse(msg->get_payload().c_str());
187 const char *const type = json["type"].GetString();
188 if ( strcmp(type, "l2update") == 0 )
190 processUpdates(json, bids, offers);
192 else if ( strcmp(type, "snapshot") == 0 )
194 processSnapshot(json, bids, offers, m_bookInitialized);
198 websocketpp::lib::error_code errorCode;
201 if(!myfile.is_open())
202 myfile.open(FILENAME);
204 while ( std::getline(myfile,line) )
206 json.Parse(line.c_str());
207 const char *const type = json["type"].GetString();
208 if ( strcmp(type, "l2update") == 0 )
210 processUpdates(json, bids, offers);
212 else if ( strcmp(type, "snapshot") == 0 )
214 processSnapshot(json, bids, offers, m_bookInitialized);
219 m_client.get_connection("wss://ws-feed.gdax.com", errorCode);
221 std::cerr << "failed websocketclient_t::get_connection(): " <<
222 errorCode.message() << std::endl;
225 m_client.connect(connection);
229 } catch (websocketpp::exception const & e) {
230 std::cerr << "handleUpdates() failed: " << e.what() << std::endl;
238 * Simply delegates snapshot processing to a helper function (different
239 * template instantiations of the same function, one for each type of map
240 * (bid, offer)), and signals when the snapshot has been processed.
242 static void processSnapshot(
243 rapidjson::Document & json,
245 offers_map_t & offers,
246 std::promise<void> & finished)
248 processSnapshotHalf(json, "bids", bids);
249 processSnapshotHalf(json, "asks", offers);
250 finished.set_value();
254 * Helper to permit code re-use on either type of map (bids or offers).
255 * Traverses already-parsed json document and inserts initial-price
256 * snapshots for entire half (bids or offers) of the order book.
258 template<typename map_t>
259 static void processSnapshotHalf(
260 rapidjson::Document const& json,
261 const char *const bidsOrOffers,
264 for (auto j = 0 ; j < json[bidsOrOffers].Size() ; ++j)
266 Price price = std::stod(json[bidsOrOffers][j][0].GetString())*100;
267 Size size = std::stod(json[bidsOrOffers][j][1].GetString());
269 map.insert(price, size);
274 * Traverses already-parsed json document, and, assuming it's a "l2update"
275 * document, updates price->quantity maps based on the order book changes
276 * that have occurred.
278 static void processUpdates(
279 rapidjson::Document & json,
281 offers_map_t & offers)
283 for (auto i = 0 ; i < json["changes"].Size() ; ++i)
285 const char* buyOrSell = json["changes"][i][0].GetString(),
286 * price = json["changes"][i][1].GetString(),
287 * size = json["changes"][i][2].GetString();
289 if ( strcmp(buyOrSell, "buy") == 0 )
291 updateMap(price, size, bids);
295 updateMap(price, size, offers);
301 * Helper to permit code re-use on either type of map (bids or offers).
302 * Simply updates a single map entry with the specified price/size.
304 template<typename map_t>
305 static void updateMap(
306 const char *const price,
307 const char *const size,
310 if (std::stod(size) == 0) { map.erase(std::stod(price)); }
314 std::stod(price)*100,
316 std::pair<const Price, Size> & pair)
318 pair.second = std::stod(size);
324 #endif // GDAX_ORDERBOOK_HPP