fixed adding file problem
[c11concurrency-benchmarks.git] / gdax-orderbook-hpp / demo / dependencies / websocketpp-0.7.0 / websocketpp / transport / asio / connection.hpp
1 /*
2  * Copyright (c) 2015, Peter Thorson. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *     * Redistributions of source code must retain the above copyright
7  *       notice, this list of conditions and the following disclaimer.
8  *     * Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *     * Neither the name of the WebSocket++ Project nor the
12  *       names of its contributors may be used to endorse or promote products
13  *       derived from this software without specific prior written permission.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
16  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25  *
26  */
27
28 #ifndef WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP
29 #define WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP
30
31 #include <websocketpp/transport/asio/base.hpp>
32
33 #include <websocketpp/transport/base/connection.hpp>
34
35 #include <websocketpp/logger/levels.hpp>
36 #include <websocketpp/http/constants.hpp>
37
38 #include <websocketpp/base64/base64.hpp>
39 #include <websocketpp/error.hpp>
40 #include <websocketpp/uri.hpp>
41
42 #include <websocketpp/common/asio.hpp>
43 #include <websocketpp/common/chrono.hpp>
44 #include <websocketpp/common/cpp11.hpp>
45 #include <websocketpp/common/memory.hpp>
46 #include <websocketpp/common/functional.hpp>
47 #include <websocketpp/common/connection_hdl.hpp>
48
49 #include <istream>
50 #include <sstream>
51 #include <string>
52 #include <vector>
53
54 namespace websocketpp {
55 namespace transport {
56 namespace asio {
57
58 typedef lib::function<void(connection_hdl)> tcp_init_handler;
59
60 /// Asio based connection transport component
61 /**
62  * transport::asio::connection implements a connection transport component using
63  * Asio that works with the transport::asio::endpoint endpoint transport
64  * component.
65  */
66 template <typename config>
67 class connection : public config::socket_type::socket_con_type {
68 public:
69     /// Type of this connection transport component
70     typedef connection<config> type;
71     /// Type of a shared pointer to this connection transport component
72     typedef lib::shared_ptr<type> ptr;
73
74     /// Type of the socket connection component
75     typedef typename config::socket_type::socket_con_type socket_con_type;
76     /// Type of a shared pointer to the socket connection component
77     typedef typename socket_con_type::ptr socket_con_ptr;
78     /// Type of this transport's access logging policy
79     typedef typename config::alog_type alog_type;
80     /// Type of this transport's error logging policy
81     typedef typename config::elog_type elog_type;
82
83     typedef typename config::request_type request_type;
84     typedef typename request_type::ptr request_ptr;
85     typedef typename config::response_type response_type;
86     typedef typename response_type::ptr response_ptr;
87
88     /// Type of a pointer to the Asio io_service being used
89     typedef lib::asio::io_service * io_service_ptr;
90     /// Type of a pointer to the Asio io_service::strand being used
91     typedef lib::shared_ptr<lib::asio::io_service::strand> strand_ptr;
92     /// Type of a pointer to the Asio timer class
93     typedef lib::shared_ptr<lib::asio::steady_timer> timer_ptr;
94
95     // connection is friends with its associated endpoint to allow the endpoint
96     // to call private/protected utility methods that we don't want to expose
97     // to the public api.
98     friend class endpoint<config>;
99
100     // generate and manage our own io_service
101     explicit connection(bool is_server, alog_type & alog, elog_type & elog)
102       : m_is_server(is_server)
103       , m_alog(alog)
104       , m_elog(elog)
105     {
106         m_alog.write(log::alevel::devel,"asio con transport constructor");
107     }
108
109     /// Get a shared pointer to this component
110     ptr get_shared() {
111         return lib::static_pointer_cast<type>(socket_con_type::get_shared());
112     }
113
114     bool is_secure() const {
115         return socket_con_type::is_secure();
116     }
117
118     /// Set uri hook
119     /**
120      * Called by the endpoint as a connection is being established to provide
121      * the uri being connected to to the transport layer.
122      *
123      * This transport policy doesn't use the uri except to forward it to the 
124      * socket layer.
125      *
126      * @since 0.6.0
127      *
128      * @param u The uri to set
129      */
130     void set_uri(uri_ptr u) {
131         socket_con_type::set_uri(u);
132     }
133
134     /// Sets the tcp pre init handler
135     /**
136      * The tcp pre init handler is called after the raw tcp connection has been
137      * established but before any additional wrappers (proxy connects, TLS
138      * handshakes, etc) have been performed.
139      *
140      * @since 0.3.0
141      *
142      * @param h The handler to call on tcp pre init.
143      */
144     void set_tcp_pre_init_handler(tcp_init_handler h) {
145         m_tcp_pre_init_handler = h;
146     }
147
148     /// Sets the tcp pre init handler (deprecated)
149     /**
150      * The tcp pre init handler is called after the raw tcp connection has been
151      * established but before any additional wrappers (proxy connects, TLS
152      * handshakes, etc) have been performed.
153      *
154      * @deprecated Use set_tcp_pre_init_handler instead
155      *
156      * @param h The handler to call on tcp pre init.
157      */
158     void set_tcp_init_handler(tcp_init_handler h) {
159         set_tcp_pre_init_handler(h);
160     }
161
162     /// Sets the tcp post init handler
163     /**
164      * The tcp post init handler is called after the tcp connection has been
165      * established and all additional wrappers (proxy connects, TLS handshakes,
166      * etc have been performed. This is fired before any bytes are read or any
167      * WebSocket specific handshake logic has been performed.
168      *
169      * @since 0.3.0
170      *
171      * @param h The handler to call on tcp post init.
172      */
173     void set_tcp_post_init_handler(tcp_init_handler h) {
174         m_tcp_post_init_handler = h;
175     }
176
177     /// Set the proxy to connect through (exception free)
178     /**
179      * The URI passed should be a complete URI including scheme. For example:
180      * http://proxy.example.com:8080/
181      *
182      * The proxy must be set up as an explicit (CONNECT) proxy allowed to
183      * connect to the port you specify. Traffic to the proxy is not encrypted.
184      *
185      * @param uri The full URI of the proxy to connect to.
186      *
187      * @param ec A status value
188      */
189     void set_proxy(std::string const & uri, lib::error_code & ec) {
190         // TODO: return errors for illegal URIs here?
191         // TODO: should https urls be illegal for the moment?
192         m_proxy = uri;
193         m_proxy_data = lib::make_shared<proxy_data>();
194         ec = lib::error_code();
195     }
196
197     /// Set the proxy to connect through (exception)
198     void set_proxy(std::string const & uri) {
199         lib::error_code ec;
200         set_proxy(uri,ec);
201         if (ec) { throw exception(ec); }
202     }
203
204     /// Set the basic auth credentials to use (exception free)
205     /**
206      * The URI passed should be a complete URI including scheme. For example:
207      * http://proxy.example.com:8080/
208      *
209      * The proxy must be set up as an explicit proxy
210      *
211      * @param username The username to send
212      *
213      * @param password The password to send
214      *
215      * @param ec A status value
216      */
217     void set_proxy_basic_auth(std::string const & username, std::string const &
218         password, lib::error_code & ec)
219     {
220         if (!m_proxy_data) {
221             ec = make_error_code(websocketpp::error::invalid_state);
222             return;
223         }
224
225         // TODO: username can't contain ':'
226         std::string val = "Basic "+base64_encode(username + ":" + password);
227         m_proxy_data->req.replace_header("Proxy-Authorization",val);
228         ec = lib::error_code();
229     }
230
231     /// Set the basic auth credentials to use (exception)
232     void set_proxy_basic_auth(std::string const & username, std::string const &
233         password)
234     {
235         lib::error_code ec;
236         set_proxy_basic_auth(username,password,ec);
237         if (ec) { throw exception(ec); }
238     }
239
240     /// Set the proxy timeout duration (exception free)
241     /**
242      * Duration is in milliseconds. Default value is based on the transport
243      * config
244      *
245      * @param duration The number of milliseconds to wait before aborting the
246      * proxy connection.
247      *
248      * @param ec A status value
249      */
250     void set_proxy_timeout(long duration, lib::error_code & ec) {
251         if (!m_proxy_data) {
252             ec = make_error_code(websocketpp::error::invalid_state);
253             return;
254         }
255
256         m_proxy_data->timeout_proxy = duration;
257         ec = lib::error_code();
258     }
259
260     /// Set the proxy timeout duration (exception)
261     void set_proxy_timeout(long duration) {
262         lib::error_code ec;
263         set_proxy_timeout(duration,ec);
264         if (ec) { throw exception(ec); }
265     }
266
267     std::string const & get_proxy() const {
268         return m_proxy;
269     }
270
271     /// Get the remote endpoint address
272     /**
273      * The iostream transport has no information about the ultimate remote
274      * endpoint. It will return the string "iostream transport". To indicate
275      * this.
276      *
277      * TODO: allow user settable remote endpoint addresses if this seems useful
278      *
279      * @return A string identifying the address of the remote endpoint
280      */
281     std::string get_remote_endpoint() const {
282         lib::error_code ec;
283
284         std::string ret = socket_con_type::get_remote_endpoint(ec);
285
286         if (ec) {
287             m_elog.write(log::elevel::info,ret);
288             return "Unknown";
289         } else {
290             return ret;
291         }
292     }
293
294     /// Get the connection handle
295     connection_hdl get_handle() const {
296         return m_connection_hdl;
297     }
298
299     /// Call back a function after a period of time.
300     /**
301      * Sets a timer that calls back a function after the specified period of
302      * milliseconds. Returns a handle that can be used to cancel the timer.
303      * A cancelled timer will return the error code error::operation_aborted
304      * A timer that expired will return no error.
305      *
306      * @param duration Length of time to wait in milliseconds
307      *
308      * @param callback The function to call back when the timer has expired
309      *
310      * @return A handle that can be used to cancel the timer if it is no longer
311      * needed.
312      */
313     timer_ptr set_timer(long duration, timer_handler callback) {
314         timer_ptr new_timer = lib::make_shared<lib::asio::steady_timer>(
315             lib::ref(*m_io_service),
316             lib::asio::milliseconds(duration)
317         );
318
319         if (config::enable_multithreading) {
320             new_timer->async_wait(m_strand->wrap(lib::bind(
321                 &type::handle_timer, get_shared(),
322                 new_timer,
323                 callback,
324                 lib::placeholders::_1
325             )));
326         } else {
327             new_timer->async_wait(lib::bind(
328                 &type::handle_timer, get_shared(),
329                 new_timer,
330                 callback,
331                 lib::placeholders::_1
332             ));
333         }
334
335         return new_timer;
336     }
337
338     /// Timer callback
339     /**
340      * The timer pointer is included to ensure the timer isn't destroyed until
341      * after it has expired.
342      *
343      * TODO: candidate for protected status
344      *
345      * @param post_timer Pointer to the timer in question
346      * @param callback The function to call back
347      * @param ec The status code
348      */
349     void handle_timer(timer_ptr, timer_handler callback,
350         lib::asio::error_code const & ec)
351     {
352         if (ec) {
353             if (ec == lib::asio::error::operation_aborted) {
354                 callback(make_error_code(transport::error::operation_aborted));
355             } else {
356                 log_err(log::elevel::info,"asio handle_timer",ec);
357                 callback(make_error_code(error::pass_through));
358             }
359         } else {
360             callback(lib::error_code());
361         }
362     }
363
364     /// Get a pointer to this connection's strand
365     strand_ptr get_strand() {
366         return m_strand;
367     }
368
369     /// Get the internal transport error code for a closed/failed connection
370     /**
371      * Retrieves a machine readable detailed error code indicating the reason
372      * that the connection was closed or failed. Valid only after the close or
373      * fail handler is called.
374      *
375      * Primarily used if you are using mismatched asio / system_error
376      * implementations such as `boost::asio` with `std::system_error`. In these
377      * cases the transport error type is different than the library error type
378      * and some WebSocket++ functions that return transport errors via the 
379      * library error code type will be coerced into a catch all `pass_through`
380      * or `tls_error` error. This method will return the original machine 
381      * readable transport error in the native type.
382      *
383      * @since 0.7.0
384      *
385      * @return Error code indicating the reason the connection was closed or 
386      * failed
387      */
388     lib::asio::error_code get_transport_ec() const {
389         return m_tec;
390     }
391
392     /// Initialize transport for reading
393     /**
394      * init_asio is called once immediately after construction to initialize
395      * Asio components to the io_service
396      *
397      * The transport initialization sequence consists of the following steps:
398      * - Pre-init: the underlying socket is initialized to the point where
399      * bytes may be written. No bytes are actually written in this stage
400      * - Proxy negotiation: if a proxy is set, a request is made to it to start
401      * a tunnel to the final destination. This stage ends when the proxy is
402      * ready to forward the
403      * next byte to the remote endpoint.
404      * - Post-init: Perform any i/o with the remote endpoint, such as setting up
405      * tunnels for encryption. This stage ends when the connection is ready to
406      * read or write the WebSocket handshakes. At this point the original
407      * callback function is called.
408      */
409 protected:
410     void init(init_handler callback) {
411         if (m_alog.static_test(log::alevel::devel)) {
412             m_alog.write(log::alevel::devel,"asio connection init");
413         }
414
415         // TODO: pre-init timeout. Right now no implemented socket policies
416         // actually have an asyncronous pre-init
417
418         socket_con_type::pre_init(
419             lib::bind(
420                 &type::handle_pre_init,
421                 get_shared(),
422                 callback,
423                 lib::placeholders::_1
424             )
425         );
426     }
427
428     /// initialize the proxy buffers and http parsers
429     /**
430      *
431      * @param authority The address of the server we want the proxy to tunnel to
432      * in the format of a URI authority (host:port)
433      *
434      * @return Status code indicating what errors occurred, if any
435      */
436     lib::error_code proxy_init(std::string const & authority) {
437         if (!m_proxy_data) {
438             return websocketpp::error::make_error_code(
439                 websocketpp::error::invalid_state);
440         }
441         m_proxy_data->req.set_version("HTTP/1.1");
442         m_proxy_data->req.set_method("CONNECT");
443
444         m_proxy_data->req.set_uri(authority);
445         m_proxy_data->req.replace_header("Host",authority);
446
447         return lib::error_code();
448     }
449
450     /// Finish constructing the transport
451     /**
452      * init_asio is called once immediately after construction to initialize
453      * Asio components to the io_service.
454      *
455      * @param io_service A pointer to the io_service to register with this
456      * connection
457      *
458      * @return Status code for the success or failure of the initialization
459      */
460     lib::error_code init_asio (io_service_ptr io_service) {
461         m_io_service = io_service;
462
463         if (config::enable_multithreading) {
464             m_strand = lib::make_shared<lib::asio::io_service::strand>(
465                 lib::ref(*io_service));
466         }
467
468         lib::error_code ec = socket_con_type::init_asio(io_service, m_strand,
469             m_is_server);
470
471         return ec;
472     }
473
474     void handle_pre_init(init_handler callback, lib::error_code const & ec) {
475         if (m_alog.static_test(log::alevel::devel)) {
476             m_alog.write(log::alevel::devel,"asio connection handle pre_init");
477         }
478
479         if (m_tcp_pre_init_handler) {
480             m_tcp_pre_init_handler(m_connection_hdl);
481         }
482
483         if (ec) {
484             callback(ec);
485         }
486
487         // If we have a proxy set issue a proxy connect, otherwise skip to
488         // post_init
489         if (!m_proxy.empty()) {
490             proxy_write(callback);
491         } else {
492             post_init(callback);
493         }
494     }
495
496     void post_init(init_handler callback) {
497         if (m_alog.static_test(log::alevel::devel)) {
498             m_alog.write(log::alevel::devel,"asio connection post_init");
499         }
500
501         timer_ptr post_timer;
502         
503         if (config::timeout_socket_post_init > 0) {
504             post_timer = set_timer(
505                 config::timeout_socket_post_init,
506                 lib::bind(
507                     &type::handle_post_init_timeout,
508                     get_shared(),
509                     post_timer,
510                     callback,
511                     lib::placeholders::_1
512                 )
513             );
514         }
515
516         socket_con_type::post_init(
517             lib::bind(
518                 &type::handle_post_init,
519                 get_shared(),
520                 post_timer,
521                 callback,
522                 lib::placeholders::_1
523             )
524         );
525     }
526
527     /// Post init timeout callback
528     /**
529      * The timer pointer is included to ensure the timer isn't destroyed until
530      * after it has expired.
531      *
532      * @param post_timer Pointer to the timer in question
533      * @param callback The function to call back
534      * @param ec The status code
535      */
536     void handle_post_init_timeout(timer_ptr, init_handler callback,
537         lib::error_code const & ec)
538     {
539         lib::error_code ret_ec;
540
541         if (ec) {
542             if (ec == transport::error::operation_aborted) {
543                 m_alog.write(log::alevel::devel,
544                     "asio post init timer cancelled");
545                 return;
546             }
547
548             log_err(log::elevel::devel,"asio handle_post_init_timeout",ec);
549             ret_ec = ec;
550         } else {
551             if (socket_con_type::get_ec()) {
552                 ret_ec = socket_con_type::get_ec();
553             } else {
554                 ret_ec = make_error_code(transport::error::timeout);
555             }
556         }
557
558         m_alog.write(log::alevel::devel, "Asio transport post-init timed out");
559         cancel_socket_checked();
560         callback(ret_ec);
561     }
562
563     /// Post init timeout callback
564     /**
565      * The timer pointer is included to ensure the timer isn't destroyed until
566      * after it has expired.
567      *
568      * @param post_timer Pointer to the timer in question
569      * @param callback The function to call back
570      * @param ec The status code
571      */
572     void handle_post_init(timer_ptr post_timer, init_handler callback,
573         lib::error_code const & ec)
574     {
575         if (ec == transport::error::operation_aborted ||
576             (post_timer && lib::asio::is_neg(post_timer->expires_from_now())))
577         {
578             m_alog.write(log::alevel::devel,"post_init cancelled");
579             return;
580         }
581
582         if (post_timer) {
583             post_timer->cancel();
584         }
585
586         if (m_alog.static_test(log::alevel::devel)) {
587             m_alog.write(log::alevel::devel,"asio connection handle_post_init");
588         }
589
590         if (m_tcp_post_init_handler) {
591             m_tcp_post_init_handler(m_connection_hdl);
592         }
593
594         callback(ec);
595     }
596
597     void proxy_write(init_handler callback) {
598         if (m_alog.static_test(log::alevel::devel)) {
599             m_alog.write(log::alevel::devel,"asio connection proxy_write");
600         }
601
602         if (!m_proxy_data) {
603             m_elog.write(log::elevel::library,
604                 "assertion failed: !m_proxy_data in asio::connection::proxy_write");
605             callback(make_error_code(error::general));
606             return;
607         }
608
609         m_proxy_data->write_buf = m_proxy_data->req.raw();
610
611         m_bufs.push_back(lib::asio::buffer(m_proxy_data->write_buf.data(),
612                                            m_proxy_data->write_buf.size()));
613
614         m_alog.write(log::alevel::devel,m_proxy_data->write_buf);
615
616         // Set a timer so we don't wait forever for the proxy to respond
617         m_proxy_data->timer = this->set_timer(
618             m_proxy_data->timeout_proxy,
619             lib::bind(
620                 &type::handle_proxy_timeout,
621                 get_shared(),
622                 callback,
623                 lib::placeholders::_1
624             )
625         );
626
627         // Send proxy request
628         if (config::enable_multithreading) {
629             lib::asio::async_write(
630                 socket_con_type::get_next_layer(),
631                 m_bufs,
632                 m_strand->wrap(lib::bind(
633                     &type::handle_proxy_write, get_shared(),
634                     callback,
635                     lib::placeholders::_1
636                 ))
637             );
638         } else {
639             lib::asio::async_write(
640                 socket_con_type::get_next_layer(),
641                 m_bufs,
642                 lib::bind(
643                     &type::handle_proxy_write, get_shared(),
644                     callback,
645                     lib::placeholders::_1
646                 )
647             );
648         }
649     }
650
651     void handle_proxy_timeout(init_handler callback, lib::error_code const & ec)
652     {
653         if (ec == transport::error::operation_aborted) {
654             m_alog.write(log::alevel::devel,
655                 "asio handle_proxy_write timer cancelled");
656             return;
657         } else if (ec) {
658             log_err(log::elevel::devel,"asio handle_proxy_write",ec);
659             callback(ec);
660         } else {
661             m_alog.write(log::alevel::devel,
662                 "asio handle_proxy_write timer expired");
663             cancel_socket_checked();
664             callback(make_error_code(transport::error::timeout));
665         }
666     }
667
668     void handle_proxy_write(init_handler callback,
669         lib::asio::error_code const & ec)
670     {
671         if (m_alog.static_test(log::alevel::devel)) {
672             m_alog.write(log::alevel::devel,
673                 "asio connection handle_proxy_write");
674         }
675
676         m_bufs.clear();
677
678         // Timer expired or the operation was aborted for some reason.
679         // Whatever aborted it will be issuing the callback so we are safe to
680         // return
681         if (ec == lib::asio::error::operation_aborted ||
682             lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
683         {
684             m_elog.write(log::elevel::devel,"write operation aborted");
685             return;
686         }
687
688         if (ec) {
689             log_err(log::elevel::info,"asio handle_proxy_write",ec);
690             m_proxy_data->timer->cancel();
691             callback(make_error_code(error::pass_through));
692             return;
693         }
694
695         proxy_read(callback);
696     }
697
698     void proxy_read(init_handler callback) {
699         if (m_alog.static_test(log::alevel::devel)) {
700             m_alog.write(log::alevel::devel,"asio connection proxy_read");
701         }
702
703         if (!m_proxy_data) {
704             m_elog.write(log::elevel::library,
705                 "assertion failed: !m_proxy_data in asio::connection::proxy_read");
706             m_proxy_data->timer->cancel();
707             callback(make_error_code(error::general));
708             return;
709         }
710
711         if (config::enable_multithreading) {
712             lib::asio::async_read_until(
713                 socket_con_type::get_next_layer(),
714                 m_proxy_data->read_buf,
715                 "\r\n\r\n",
716                 m_strand->wrap(lib::bind(
717                     &type::handle_proxy_read, get_shared(),
718                     callback,
719                     lib::placeholders::_1, lib::placeholders::_2
720                 ))
721             );
722         } else {
723             lib::asio::async_read_until(
724                 socket_con_type::get_next_layer(),
725                 m_proxy_data->read_buf,
726                 "\r\n\r\n",
727                 lib::bind(
728                     &type::handle_proxy_read, get_shared(),
729                     callback,
730                     lib::placeholders::_1, lib::placeholders::_2
731                 )
732             );
733         }
734     }
735
736     /// Proxy read callback
737     /**
738      * @param init_handler The function to call back
739      * @param ec The status code
740      * @param bytes_transferred The number of bytes read
741      */
742     void handle_proxy_read(init_handler callback,
743         lib::asio::error_code const & ec, size_t)
744     {
745         if (m_alog.static_test(log::alevel::devel)) {
746             m_alog.write(log::alevel::devel,
747                 "asio connection handle_proxy_read");
748         }
749
750         // Timer expired or the operation was aborted for some reason.
751         // Whatever aborted it will be issuing the callback so we are safe to
752         // return
753         if (ec == lib::asio::error::operation_aborted ||
754             lib::asio::is_neg(m_proxy_data->timer->expires_from_now()))
755         {
756             m_elog.write(log::elevel::devel,"read operation aborted");
757             return;
758         }
759
760         // At this point there is no need to wait for the timer anymore
761         m_proxy_data->timer->cancel();
762
763         if (ec) {
764             m_elog.write(log::elevel::info,
765                 "asio handle_proxy_read error: "+ec.message());
766             callback(make_error_code(error::pass_through));
767         } else {
768             if (!m_proxy_data) {
769                 m_elog.write(log::elevel::library,
770                     "assertion failed: !m_proxy_data in asio::connection::handle_proxy_read");
771                 callback(make_error_code(error::general));
772                 return;
773             }
774
775             std::istream input(&m_proxy_data->read_buf);
776
777             m_proxy_data->res.consume(input);
778
779             if (!m_proxy_data->res.headers_ready()) {
780                 // we read until the headers were done in theory but apparently
781                 // they aren't. Internal endpoint error.
782                 callback(make_error_code(error::general));
783                 return;
784             }
785
786             m_alog.write(log::alevel::devel,m_proxy_data->res.raw());
787
788             if (m_proxy_data->res.get_status_code() != http::status_code::ok) {
789                 // got an error response back
790                 // TODO: expose this error in a programmatically accessible way?
791                 // if so, see below for an option on how to do this.
792                 std::stringstream s;
793                 s << "Proxy connection error: "
794                   << m_proxy_data->res.get_status_code()
795                   << " ("
796                   << m_proxy_data->res.get_status_msg()
797                   << ")";
798                 m_elog.write(log::elevel::info,s.str());
799                 callback(make_error_code(error::proxy_failed));
800                 return;
801             }
802
803             // we have successfully established a connection to the proxy, now
804             // we can continue and the proxy will transparently forward the
805             // WebSocket connection.
806
807             // TODO: decide if we want an on_proxy callback that would allow
808             // access to the proxy response.
809
810             // free the proxy buffers and req/res objects as they aren't needed
811             // anymore
812             m_proxy_data.reset();
813
814             // Continue with post proxy initialization
815             post_init(callback);
816         }
817     }
818
819     /// read at least num_bytes bytes into buf and then call handler.
820     void async_read_at_least(size_t num_bytes, char *buf, size_t len,
821         read_handler handler)
822     {
823         if (m_alog.static_test(log::alevel::devel)) {
824             std::stringstream s;
825             s << "asio async_read_at_least: " << num_bytes;
826             m_alog.write(log::alevel::devel,s.str());
827         }
828
829         // TODO: safety vs speed ?
830         // maybe move into an if devel block
831         /*if (num_bytes > len) {
832             m_elog.write(log::elevel::devel,
833                 "asio async_read_at_least error::invalid_num_bytes");
834             handler(make_error_code(transport::error::invalid_num_bytes),
835                 size_t(0));
836             return;
837         }*/
838
839         if (config::enable_multithreading) {
840             lib::asio::async_read(
841                 socket_con_type::get_socket(),
842                 lib::asio::buffer(buf,len),
843                 lib::asio::transfer_at_least(num_bytes),
844                 m_strand->wrap(make_custom_alloc_handler(
845                     m_read_handler_allocator,
846                     lib::bind(
847                         &type::handle_async_read, get_shared(),
848                         handler,
849                         lib::placeholders::_1, lib::placeholders::_2
850                     )
851                 ))
852             );
853         } else {
854             lib::asio::async_read(
855                 socket_con_type::get_socket(),
856                 lib::asio::buffer(buf,len),
857                 lib::asio::transfer_at_least(num_bytes),
858                 make_custom_alloc_handler(
859                     m_read_handler_allocator,
860                     lib::bind(
861                         &type::handle_async_read, get_shared(),
862                         handler,
863                         lib::placeholders::_1, lib::placeholders::_2
864                     )
865                 )
866             );    
867         }
868         
869     }
870
871     void handle_async_read(read_handler handler, lib::asio::error_code const & ec,
872         size_t bytes_transferred)
873     {
874         m_alog.write(log::alevel::devel, "asio con handle_async_read");
875
876         // translate asio error codes into more lib::error_codes
877         lib::error_code tec;
878         if (ec == lib::asio::error::eof) {
879             tec = make_error_code(transport::error::eof);
880         } else if (ec) {
881             // We don't know much more about the error at this point. As our
882             // socket/security policy if it knows more:
883             tec = socket_con_type::translate_ec(ec);
884             m_tec = ec;
885
886             if (tec == transport::error::tls_error ||
887                 tec == transport::error::pass_through)
888             {
889                 // These are aggregate/catch all errors. Log some human readable
890                 // information to the info channel to give library users some
891                 // more details about why the upstream method may have failed.
892                 log_err(log::elevel::info,"asio async_read_at_least",ec);
893             }
894         }
895         if (handler) {
896             handler(tec,bytes_transferred);
897         } else {
898             // This can happen in cases where the connection is terminated while
899             // the transport is waiting on a read.
900             m_alog.write(log::alevel::devel,
901                 "handle_async_read called with null read handler");
902         }
903     }
904
905     /// Initiate a potentially asyncronous write of the given buffer
906     void async_write(const char* buf, size_t len, write_handler handler) {
907         m_bufs.push_back(lib::asio::buffer(buf,len));
908
909         if (config::enable_multithreading) {
910             lib::asio::async_write(
911                 socket_con_type::get_socket(),
912                 m_bufs,
913                 m_strand->wrap(make_custom_alloc_handler(
914                     m_write_handler_allocator,
915                     lib::bind(
916                         &type::handle_async_write, get_shared(),
917                         handler,
918                         lib::placeholders::_1, lib::placeholders::_2
919                     )
920                 ))
921             );
922         } else {
923             lib::asio::async_write(
924                 socket_con_type::get_socket(),
925                 m_bufs,
926                 make_custom_alloc_handler(
927                     m_write_handler_allocator,
928                     lib::bind(
929                         &type::handle_async_write, get_shared(),
930                         handler,
931                         lib::placeholders::_1, lib::placeholders::_2
932                     )
933                 )
934             );
935         }
936     }
937
938     /// Initiate a potentially asyncronous write of the given buffers
939     void async_write(std::vector<buffer> const & bufs, write_handler handler) {
940         std::vector<buffer>::const_iterator it;
941
942         for (it = bufs.begin(); it != bufs.end(); ++it) {
943             m_bufs.push_back(lib::asio::buffer((*it).buf,(*it).len));
944         }
945
946         if (config::enable_multithreading) {
947             lib::asio::async_write(
948                 socket_con_type::get_socket(),
949                 m_bufs,
950                 m_strand->wrap(make_custom_alloc_handler(
951                     m_write_handler_allocator,
952                     lib::bind(
953                         &type::handle_async_write, get_shared(),
954                         handler,
955                         lib::placeholders::_1, lib::placeholders::_2
956                     )
957                 ))
958             );
959         } else {
960             lib::asio::async_write(
961                 socket_con_type::get_socket(),
962                 m_bufs,
963                 make_custom_alloc_handler(
964                     m_write_handler_allocator,
965                     lib::bind(
966                         &type::handle_async_write, get_shared(),
967                         handler,
968                         lib::placeholders::_1, lib::placeholders::_2
969                     )
970                 )
971             );
972         }
973     }
974
975     /// Async write callback
976     /**
977      * @param ec The status code
978      * @param bytes_transferred The number of bytes read
979      */
980     void handle_async_write(write_handler handler, lib::asio::error_code const & ec, size_t) {
981         m_bufs.clear();
982         lib::error_code tec;
983         if (ec) {
984             log_err(log::elevel::info,"asio async_write",ec);
985             tec = make_error_code(transport::error::pass_through);
986         }
987         if (handler) {
988             handler(tec);
989         } else {
990             // This can happen in cases where the connection is terminated while
991             // the transport is waiting on a read.
992             m_alog.write(log::alevel::devel,
993                 "handle_async_write called with null write handler");
994         }
995     }
996
997     /// Set Connection Handle
998     /**
999      * See common/connection_hdl.hpp for information
1000      *
1001      * @param hdl A connection_hdl that the transport will use to refer
1002      * to itself
1003      */
1004     void set_handle(connection_hdl hdl) {
1005         m_connection_hdl = hdl;
1006         socket_con_type::set_handle(hdl);
1007     }
1008
1009     /// Trigger the on_interrupt handler
1010     /**
1011      * This needs to be thread safe
1012      */
1013     lib::error_code interrupt(interrupt_handler handler) {
1014         if (config::enable_multithreading) {
1015             m_io_service->post(m_strand->wrap(handler));
1016         } else {
1017             m_io_service->post(handler);
1018         }
1019         return lib::error_code();
1020     }
1021
1022     lib::error_code dispatch(dispatch_handler handler) {
1023         if (config::enable_multithreading) {
1024             m_io_service->post(m_strand->wrap(handler));
1025         } else {
1026             m_io_service->post(handler);
1027         }
1028         return lib::error_code();
1029     }
1030
1031     /*void handle_interrupt(interrupt_handler handler) {
1032         handler();
1033     }*/
1034
1035     /// close and clean up the underlying socket
1036     void async_shutdown(shutdown_handler callback) {
1037         if (m_alog.static_test(log::alevel::devel)) {
1038             m_alog.write(log::alevel::devel,"asio connection async_shutdown");
1039         }
1040
1041         timer_ptr shutdown_timer;
1042         shutdown_timer = set_timer(
1043             config::timeout_socket_shutdown,
1044             lib::bind(
1045                 &type::handle_async_shutdown_timeout,
1046                 get_shared(),
1047                 shutdown_timer,
1048                 callback,
1049                 lib::placeholders::_1
1050             )
1051         );
1052
1053         socket_con_type::async_shutdown(
1054             lib::bind(
1055                 &type::handle_async_shutdown,
1056                 get_shared(),
1057                 shutdown_timer,
1058                 callback,
1059                 lib::placeholders::_1
1060             )
1061         );
1062     }
1063
1064     /// Async shutdown timeout handler
1065     /**
1066      * @param shutdown_timer A pointer to the timer to keep it in scope
1067      * @param callback The function to call back
1068      * @param ec The status code
1069      */
1070     void handle_async_shutdown_timeout(timer_ptr, init_handler callback, 
1071         lib::error_code const & ec)
1072     {
1073         lib::error_code ret_ec;
1074
1075         if (ec) {
1076             if (ec == transport::error::operation_aborted) {
1077                 m_alog.write(log::alevel::devel,
1078                     "asio socket shutdown timer cancelled");
1079                 return;
1080             }
1081
1082             log_err(log::elevel::devel,"asio handle_async_shutdown_timeout",ec);
1083             ret_ec = ec;
1084         } else {
1085             ret_ec = make_error_code(transport::error::timeout);
1086         }
1087
1088         m_alog.write(log::alevel::devel,
1089             "Asio transport socket shutdown timed out");
1090         cancel_socket_checked();
1091         callback(ret_ec);
1092     }
1093
1094     void handle_async_shutdown(timer_ptr shutdown_timer, shutdown_handler
1095         callback, lib::asio::error_code const & ec)
1096     {
1097         if (ec == lib::asio::error::operation_aborted ||
1098             lib::asio::is_neg(shutdown_timer->expires_from_now()))
1099         {
1100             m_alog.write(log::alevel::devel,"async_shutdown cancelled");
1101             return;
1102         }
1103
1104         shutdown_timer->cancel();
1105
1106         lib::error_code tec;
1107         if (ec) {
1108             if (ec == lib::asio::error::not_connected) {
1109                 // The socket was already closed when we tried to close it. This
1110                 // happens periodically (usually if a read or write fails
1111                 // earlier and if it is a real error will be caught at another
1112                 // level of the stack.
1113             } else {
1114                 // We don't know anything more about this error, give our
1115                 // socket/security policy a crack at it.
1116                 tec = socket_con_type::translate_ec(ec);
1117                 m_tec = ec;
1118
1119                 if (tec == transport::error::tls_short_read) {
1120                     // TLS short read at this point is somewhat expected if both
1121                     // sides try and end the connection at the same time or if
1122                     // SSLv2 is being used. In general there is nothing that can
1123                     // be done here other than a low level development log.
1124                 } else {
1125                     // all other errors are effectively pass through errors of
1126                     // some sort so print some detail on the info channel for
1127                     // library users to look up if needed.
1128                     log_err(log::elevel::info,"asio async_shutdown",ec);
1129                 }
1130             }
1131         } else {
1132             if (m_alog.static_test(log::alevel::devel)) {
1133                 m_alog.write(log::alevel::devel,
1134                     "asio con handle_async_shutdown");
1135             }
1136         }
1137         callback(tec);
1138     }
1139
1140     /// Cancel the underlying socket and log any errors
1141     void cancel_socket_checked() {
1142         lib::asio::error_code cec = socket_con_type::cancel_socket();
1143         if (cec) {
1144             if (cec == lib::asio::error::operation_not_supported) {
1145                 // cancel not supported on this OS, ignore and log at dev level
1146                 m_alog.write(log::alevel::devel, "socket cancel not supported");
1147             } else {
1148                 log_err(log::elevel::warn, "socket cancel failed", cec);
1149             }
1150         }
1151     }
1152
1153 private:
1154     /// Convenience method for logging the code and message for an error_code
1155     template <typename error_type>
1156     void log_err(log::level l, const char * msg, const error_type & ec) {
1157         std::stringstream s;
1158         s << msg << " error: " << ec << " (" << ec.message() << ")";
1159         m_elog.write(l,s.str());
1160     }
1161
1162     // static settings
1163     const bool m_is_server;
1164     alog_type& m_alog;
1165     elog_type& m_elog;
1166
1167     struct proxy_data {
1168         proxy_data() : timeout_proxy(config::timeout_proxy) {}
1169
1170         request_type req;
1171         response_type res;
1172         std::string write_buf;
1173         lib::asio::streambuf read_buf;
1174         long timeout_proxy;
1175         timer_ptr timer;
1176     };
1177
1178     std::string m_proxy;
1179     lib::shared_ptr<proxy_data> m_proxy_data;
1180
1181     // transport resources
1182     io_service_ptr  m_io_service;
1183     strand_ptr      m_strand;
1184     connection_hdl  m_connection_hdl;
1185
1186     std::vector<lib::asio::const_buffer> m_bufs;
1187
1188     /// Detailed internal error code
1189     lib::asio::error_code m_tec;
1190
1191     // Handlers
1192     tcp_init_handler    m_tcp_pre_init_handler;
1193     tcp_init_handler    m_tcp_post_init_handler;
1194
1195     handler_allocator   m_read_handler_allocator;
1196     handler_allocator   m_write_handler_allocator;
1197 };
1198
1199
1200 } // namespace asio
1201 } // namespace transport
1202 } // namespace websocketpp
1203
1204 #endif // WEBSOCKETPP_TRANSPORT_ASIO_CON_HPP