BadVPN – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /**
2 * @file BConnection_win.c
3 * @author Ambroz Bizjak <ambrop7@gmail.com>
4 *
5 * @section LICENSE
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the author nor the
15 * names of its contributors may be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29  
30 #include <stdlib.h>
31 #include <limits.h>
32  
33 #include <base/BLog.h>
34  
35 #include "BConnection.h"
36  
37 #include <generated/blog_channel_BConnection.h>
38  
39 #define LISTEN_BACKLOG 128
40  
41 struct sys_addr {
42 int len;
43 union {
44 struct sockaddr generic;
45 struct sockaddr_in ipv4;
46 struct sockaddr_in6 ipv6;
47 } addr;
48 };
49  
50 static void addr_socket_to_sys (struct sys_addr *out, BAddr addr);
51 static void addr_any_to_sys (struct sys_addr *out, int family);
52 static void addr_sys_to_socket (BAddr *out, struct sys_addr addr);
53 static void listener_next_job_handler (BListener *o);
54 static void listener_olap_handler (BListener *o, int event, DWORD bytes);
55 static void connector_olap_handler (BConnector *o, int event, DWORD bytes);
56 static void connector_abort (BConnector *o);
57 static void connection_report_error (BConnection *o);
58 static void connection_abort (BConnection *o);
59 static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len);
60 static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len);
61 static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes);
62 static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes);
63  
64 static void addr_socket_to_sys (struct sys_addr *out, BAddr addr)
65 {
66 switch (addr.type) {
67 case BADDR_TYPE_IPV4: {
68 out->len = sizeof(out->addr.ipv4);
69 memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
70 out->addr.ipv4.sin_family = AF_INET;
71 out->addr.ipv4.sin_port = addr.ipv4.port;
72 out->addr.ipv4.sin_addr.s_addr = addr.ipv4.ip;
73 } break;
74  
75 case BADDR_TYPE_IPV6: {
76 out->len = sizeof(out->addr.ipv6);
77 memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
78 out->addr.ipv6.sin6_family = AF_INET6;
79 out->addr.ipv6.sin6_port = addr.ipv6.port;
80 out->addr.ipv6.sin6_flowinfo = 0;
81 memcpy(out->addr.ipv6.sin6_addr.s6_addr, addr.ipv6.ip, 16);
82 out->addr.ipv6.sin6_scope_id = 0;
83 } break;
84  
85 default: ASSERT(0);
86 }
87 }
88  
89 static void addr_any_to_sys (struct sys_addr *out, int family)
90 {
91 switch (family) {
92 case BADDR_TYPE_IPV4: {
93 out->len = sizeof(out->addr.ipv4);
94 memset(&out->addr.ipv4, 0, sizeof(out->addr.ipv4));
95 out->addr.ipv4.sin_family = AF_INET;
96 out->addr.ipv4.sin_port = 0;
97 out->addr.ipv4.sin_addr.s_addr = INADDR_ANY;
98 } break;
99  
100 case BADDR_TYPE_IPV6: {
101 out->len = sizeof(out->addr.ipv6);
102 memset(&out->addr.ipv6, 0, sizeof(out->addr.ipv6));
103 out->addr.ipv6.sin6_family = AF_INET6;
104 out->addr.ipv6.sin6_port = 0;
105 out->addr.ipv6.sin6_flowinfo = 0;
106 struct in6_addr any = IN6ADDR_ANY_INIT;
107 out->addr.ipv6.sin6_addr = any;
108 out->addr.ipv6.sin6_scope_id = 0;
109 } break;
110  
111 default: ASSERT(0);
112 }
113 }
114  
115 static void addr_sys_to_socket (BAddr *out, struct sys_addr addr)
116 {
117 switch (addr.addr.generic.sa_family) {
118 case AF_INET: {
119 ASSERT(addr.len == sizeof(struct sockaddr_in))
120 BAddr_InitIPv4(out, addr.addr.ipv4.sin_addr.s_addr, addr.addr.ipv4.sin_port);
121 } break;
122  
123 case AF_INET6: {
124 ASSERT(addr.len == sizeof(struct sockaddr_in6))
125 BAddr_InitIPv6(out, addr.addr.ipv6.sin6_addr.s6_addr, addr.addr.ipv6.sin6_port);
126 } break;
127  
128 default: {
129 BAddr_InitNone(out);
130 } break;
131 }
132 }
133  
134 static void listener_next_job_handler (BListener *o)
135 {
136 DebugObject_Access(&o->d_obj);
137 ASSERT(!o->busy)
138  
139 // free ready socket
140 if (o->ready) {
141 BLog(BLOG_ERROR, "discarding connection");
142  
143 // close new socket
144 if (closesocket(o->newsock) == SOCKET_ERROR) {
145 BLog(BLOG_ERROR, "closesocket failed");
146 }
147  
148 // set not ready
149 o->ready = 0;
150 }
151  
152 // create new socket
153 if ((o->newsock = WSASocket(o->sys_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
154 BLog(BLOG_ERROR, "WSASocket failed");
155 goto fail0;
156 }
157  
158 // start accept operation
159 while (1) {
160 memset(&o->olap.olap, 0, sizeof(o->olap.olap));
161 DWORD bytes;
162 BOOL res = o->fnAcceptEx(o->sock, o->newsock, o->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub), &bytes, &o->olap.olap);
163 if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
164 BLog(BLOG_ERROR, "AcceptEx failed");
165 continue;
166 }
167 break;
168 }
169  
170 // set busy
171 o->busy = 1;
172  
173 return;
174  
175 fail0:
176 return;
177 }
178  
179 static void listener_olap_handler (BListener *o, int event, DWORD bytes)
180 {
181 DebugObject_Access(&o->d_obj);
182 ASSERT(o->busy)
183 ASSERT(!o->ready)
184 ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
185  
186 // set not busy
187 o->busy = 0;
188  
189 // schedule next accept
190 BPending_Set(&o->next_job);
191  
192 if (event == BREACTOR_IOCP_EVENT_FAILED) {
193 BLog(BLOG_ERROR, "accepting failed");
194  
195 // close new socket
196 if (closesocket(o->newsock) == SOCKET_ERROR) {
197 BLog(BLOG_ERROR, "closesocket failed");
198 }
199  
200 return;
201 }
202  
203 BLog(BLOG_INFO, "connection accepted");
204  
205 // set ready
206 o->ready = 1;
207  
208 // call handler
209 o->handler(o->user);
210 return;
211 }
212  
213 static void connector_olap_handler (BConnector *o, int event, DWORD bytes)
214 {
215 DebugObject_Access(&o->d_obj);
216 ASSERT(o->sock != INVALID_SOCKET)
217 ASSERT(o->busy)
218 ASSERT(!o->ready)
219 ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
220  
221 // set not busy
222 o->busy = 0;
223  
224 if (event == BREACTOR_IOCP_EVENT_FAILED) {
225 BLog(BLOG_ERROR, "connection failed");
226 } else {
227 // set ready
228 o->ready = 1;
229 }
230  
231 // call handler
232 o->handler(o->user, !o->ready);
233 return;
234 }
235  
236 static void connector_abort (BConnector *o)
237 {
238 if (o->sock != INVALID_SOCKET) {
239 // cancel I/O
240 if (o->busy) {
241 if (!CancelIo((HANDLE)o->sock)) {
242 BLog(BLOG_ERROR, "CancelIo failed");
243 }
244 }
245  
246 // close socket
247 if (closesocket(o->sock) == SOCKET_ERROR) {
248 BLog(BLOG_ERROR, "closesocket failed");
249 }
250 }
251  
252 // wait for connect operation to finish
253 if (o->busy) {
254 BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
255 }
256  
257 // free olap
258 BReactorIOCPOverlapped_Free(&o->olap);
259 }
260  
261 static void connection_report_error (BConnection *o)
262 {
263 DebugError_AssertNoError(&o->d_err);
264 ASSERT(o->handler)
265  
266 // report error
267 DEBUGERROR(&o->d_err, o->handler(o->user, BCONNECTION_EVENT_ERROR));
268 return;
269 }
270  
271 static void connection_abort (BConnection *o)
272 {
273 ASSERT(!o->aborted)
274  
275 // cancel I/O
276 if ((o->recv.inited && o->recv.busy) || (o->send.inited && o->send.busy)) {
277 if (!CancelIo((HANDLE)o->sock)) {
278 BLog(BLOG_ERROR, "CancelIo failed");
279 }
280 }
281  
282 // close socket
283 if (closesocket(o->sock) == SOCKET_ERROR) {
284 BLog(BLOG_ERROR, "closesocket failed");
285 }
286  
287 // wait for receiving to complete
288 if (o->recv.inited && o->recv.busy) {
289 BReactorIOCPOverlapped_Wait(&o->recv.olap, NULL, NULL);
290 }
291  
292 // wait for sending to complete
293 if (o->send.inited && o->send.busy) {
294 BReactorIOCPOverlapped_Wait(&o->send.olap, NULL, NULL);
295 }
296  
297 // free recv olap
298 BReactorIOCPOverlapped_Free(&o->recv.olap);
299  
300 // free send olap
301 BReactorIOCPOverlapped_Free(&o->send.olap);
302  
303 // set aborted
304 o->aborted = 1;
305 }
306  
307 static void connection_send_iface_handler_send (BConnection *o, uint8_t *data, int data_len)
308 {
309 DebugObject_Access(&o->d_obj);
310 DebugError_AssertNoError(&o->d_err);
311 ASSERT(!o->aborted)
312 ASSERT(o->send.inited)
313 ASSERT(!o->send.busy)
314 ASSERT(data_len > 0)
315  
316 if (data_len > ULONG_MAX) {
317 data_len = ULONG_MAX;
318 }
319  
320 WSABUF buf;
321 buf.buf = (char *)data;
322 buf.len = data_len;
323  
324 memset(&o->send.olap.olap, 0, sizeof(o->send.olap.olap));
325  
326 // send
327 int res = WSASend(o->sock, &buf, 1, NULL, 0, &o->send.olap.olap, NULL);
328 if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
329 BLog(BLOG_ERROR, "WSASend failed (%d)", WSAGetLastError());
330 connection_report_error(o);
331 return;
332 }
333  
334 // set busy
335 o->send.busy = 1;
336 o->send.busy_data_len = data_len;
337 }
338  
339 static void connection_recv_iface_handler_recv (BConnection *o, uint8_t *data, int data_len)
340 {
341 DebugObject_Access(&o->d_obj);
342 DebugError_AssertNoError(&o->d_err);
343 ASSERT(!o->recv.closed)
344 ASSERT(!o->aborted)
345 ASSERT(o->recv.inited)
346 ASSERT(!o->recv.busy)
347 ASSERT(data_len > 0)
348  
349 if (data_len > ULONG_MAX) {
350 data_len = ULONG_MAX;
351 }
352  
353 WSABUF buf;
354 buf.buf = (char *)data;
355 buf.len = data_len;
356  
357 memset(&o->recv.olap.olap, 0, sizeof(o->recv.olap.olap));
358  
359 // recv
360 DWORD flags = 0;
361 int res = WSARecv(o->sock, &buf, 1, NULL, &flags, &o->recv.olap.olap, NULL);
362 if (res == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
363 BLog(BLOG_ERROR, "WSARecv failed (%d)", WSAGetLastError());
364 connection_report_error(o);
365 return;
366 }
367  
368 // set busy
369 o->recv.busy = 1;
370 o->recv.busy_data_len = data_len;
371 }
372  
373 static void connection_send_olap_handler (BConnection *o, int event, DWORD bytes)
374 {
375 DebugObject_Access(&o->d_obj);
376 DebugError_AssertNoError(&o->d_err);
377 ASSERT(!o->aborted)
378 ASSERT(o->send.inited)
379 ASSERT(o->send.busy)
380 ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
381  
382 // set not busy
383 o->send.busy = 0;
384  
385 if (event == BREACTOR_IOCP_EVENT_FAILED) {
386 BLog(BLOG_ERROR, "sending failed");
387 connection_report_error(o);
388 return;
389 }
390  
391 ASSERT(bytes > 0)
392 ASSERT(bytes <= o->send.busy_data_len)
393  
394 // done
395 StreamPassInterface_Done(&o->send.iface, bytes);
396 }
397  
398 static void connection_recv_olap_handler (BConnection *o, int event, DWORD bytes)
399 {
400 DebugObject_Access(&o->d_obj);
401 DebugError_AssertNoError(&o->d_err);
402 ASSERT(!o->recv.closed)
403 ASSERT(!o->aborted)
404 ASSERT(o->recv.inited)
405 ASSERT(o->recv.busy)
406 ASSERT(event == BREACTOR_IOCP_EVENT_SUCCEEDED || event == BREACTOR_IOCP_EVENT_FAILED)
407  
408 // set not busy
409 o->recv.busy = 0;
410  
411 if (event == BREACTOR_IOCP_EVENT_FAILED) {
412 BLog(BLOG_ERROR, "receiving failed");
413 connection_report_error(o);
414 return;
415 }
416  
417 if (bytes == 0) {
418 // set closed
419 o->recv.closed = 1;
420  
421 // report recv closed
422 o->handler(o->user, BCONNECTION_EVENT_RECVCLOSED);
423 return;
424 }
425  
426 ASSERT(bytes > 0)
427 ASSERT(bytes <= o->recv.busy_data_len)
428  
429 // done
430 StreamRecvInterface_Done(&o->recv.iface, bytes);
431 }
432  
433 int BConnection_AddressSupported (BAddr addr)
434 {
435 BAddr_Assert(&addr);
436  
437 return (addr.type == BADDR_TYPE_IPV4 || addr.type == BADDR_TYPE_IPV6);
438 }
439  
440 int BListener_InitFrom (BListener *o, struct BLisCon_from from,
441 BReactor *reactor, void *user,
442 BListener_handler handler)
443 {
444 ASSERT(from.type == BLISCON_FROM_ADDR)
445 ASSERT(handler)
446 BNetwork_Assert();
447  
448 // init arguments
449 o->reactor = reactor;
450 o->user = user;
451 o->handler = handler;
452  
453 // check address
454 if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
455 BLog(BLOG_ERROR, "address not supported");
456 goto fail0;
457 }
458  
459 // convert address
460 struct sys_addr sysaddr;
461 addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
462  
463 // remember family
464 o->sys_family = sysaddr.addr.generic.sa_family;
465  
466 // init socket
467 if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
468 BLog(BLOG_ERROR, "WSASocket failed");
469 goto fail0;
470 }
471  
472 // associate with IOCP
473 if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
474 BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
475 goto fail1;
476 }
477  
478 // bind
479 if (bind(o->sock, &sysaddr.addr.generic, sysaddr.len) < 0) {
480 BLog(BLOG_ERROR, "bind failed");
481 goto fail1;
482 }
483  
484 // listen
485 if (listen(o->sock, LISTEN_BACKLOG) < 0) {
486 BLog(BLOG_ERROR, "listen failed");
487 goto fail1;
488 }
489  
490 DWORD out_bytes;
491  
492 // obtain AcceptEx
493 GUID guid1 = WSAID_ACCEPTEX;
494 if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid1, sizeof(guid1), &o->fnAcceptEx, sizeof(o->fnAcceptEx), &out_bytes, NULL, NULL) != 0) {
495 BLog(BLOG_ERROR, "faild to obtain AcceptEx");
496 goto fail1;
497 }
498  
499 // obtain GetAcceptExSockaddrs
500 GUID guid2 = WSAID_GETACCEPTEXSOCKADDRS;
501 if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid2, sizeof(guid2), &o->fnGetAcceptExSockaddrs, sizeof(o->fnGetAcceptExSockaddrs), &out_bytes, NULL, NULL) != 0) {
502 BLog(BLOG_ERROR, "faild to obtain GetAcceptExSockaddrs");
503 goto fail1;
504 }
505  
506 // init olap
507 BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)listener_olap_handler);
508  
509 // init next job
510 BPending_Init(&o->next_job, BReactor_PendingGroup(o->reactor), (BPending_handler)listener_next_job_handler, o);
511  
512 // set not busy
513 o->busy = 0;
514  
515 // set not ready
516 o->ready = 0;
517  
518 // set next job
519 BPending_Set(&o->next_job);
520  
521 DebugObject_Init(&o->d_obj);
522 return 1;
523  
524 fail1:
525 if (closesocket(o->sock) == SOCKET_ERROR) {
526 BLog(BLOG_ERROR, "closesocket failed");
527 }
528 fail0:
529 return 0;
530 }
531  
532 void BListener_Free (BListener *o)
533 {
534 DebugObject_Free(&o->d_obj);
535  
536 // cancel I/O
537 if (o->busy) {
538 if (!CancelIo((HANDLE)o->sock)) {
539 BLog(BLOG_ERROR, "CancelIo failed");
540 }
541 }
542  
543 // close socket
544 if (closesocket(o->sock) == SOCKET_ERROR) {
545 BLog(BLOG_ERROR, "closesocket failed");
546 }
547  
548 // wait for accept operation to finish
549 if (o->busy) {
550 BReactorIOCPOverlapped_Wait(&o->olap, NULL, NULL);
551 }
552  
553 // close new socket
554 if (o->busy || o->ready) {
555 if (closesocket(o->newsock) == SOCKET_ERROR) {
556 BLog(BLOG_ERROR, "closesocket failed");
557 }
558 }
559  
560 // free next job
561 BPending_Free(&o->next_job);
562  
563 // free olap
564 BReactorIOCPOverlapped_Free(&o->olap);
565 }
566  
567 int BConnector_InitFrom (BConnector *o, struct BLisCon_from from, BReactor *reactor, void *user,
568 BConnector_handler handler)
569 {
570 ASSERT(from.type == BLISCON_FROM_ADDR)
571 ASSERT(handler)
572 BNetwork_Assert();
573  
574 // init arguments
575 o->reactor = reactor;
576 o->user = user;
577 o->handler = handler;
578  
579 // check address
580 if (!BConnection_AddressSupported(from.u.from_addr.addr)) {
581 BLog(BLOG_ERROR, "address not supported");
582 goto fail0;
583 }
584  
585 // convert address
586 struct sys_addr sysaddr;
587 addr_socket_to_sys(&sysaddr, from.u.from_addr.addr);
588  
589 // create local any address
590 struct sys_addr local_sysaddr;
591 addr_any_to_sys(&local_sysaddr, from.u.from_addr.addr.type);
592  
593 // init socket
594 if ((o->sock = WSASocket(sysaddr.addr.generic.sa_family, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) {
595 BLog(BLOG_ERROR, "WSASocket failed");
596 goto fail0;
597 }
598  
599 // associate with IOCP
600 if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
601 BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
602 goto fail1;
603 }
604  
605 // bind socket
606 if (bind(o->sock, &local_sysaddr.addr.generic, local_sysaddr.len) < 0) {
607 BLog(BLOG_ERROR, "bind failed");
608 goto fail1;
609 }
610  
611 // obtain ConnectEx
612 GUID guid = WSAID_CONNECTEX;
613 DWORD out_bytes;
614 if (WSAIoctl(o->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), &o->fnConnectEx, sizeof(o->fnConnectEx), &out_bytes, NULL, NULL) != 0) {
615 BLog(BLOG_ERROR, "faild to get ConnectEx");
616 goto fail1;
617 }
618  
619 // init olap
620 BReactorIOCPOverlapped_Init(&o->olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connector_olap_handler);
621  
622 // start connect operation
623 BOOL res = o->fnConnectEx(o->sock, &sysaddr.addr.generic, sysaddr.len, NULL, 0, NULL, &o->olap.olap);
624 if (res == FALSE && WSAGetLastError() != ERROR_IO_PENDING) {
625 BLog(BLOG_ERROR, "ConnectEx failed (%d)", WSAGetLastError());
626 goto fail2;
627 }
628  
629 // set busy
630 o->busy = 1;
631  
632 // set not ready
633 o->ready = 0;
634  
635 DebugObject_Init(&o->d_obj);
636 return 1;
637  
638 fail2:
639 BReactorIOCPOverlapped_Free(&o->olap);
640 fail1:
641 if (closesocket(o->sock) == SOCKET_ERROR) {
642 BLog(BLOG_ERROR, "closesocket failed");
643 }
644 fail0:
645 return 0;
646 }
647  
648 void BConnector_Free (BConnector *o)
649 {
650 DebugObject_Free(&o->d_obj);
651  
652 if (o->sock != INVALID_SOCKET) {
653 connector_abort(o);
654 }
655 }
656  
657 int BConnection_Init (BConnection *o, struct BConnection_source source, BReactor *reactor, void *user,
658 BConnection_handler handler)
659 {
660 switch (source.type) {
661 case BCONNECTION_SOURCE_TYPE_LISTENER: {
662 BListener *listener = source.u.listener.listener;
663 DebugObject_Access(&listener->d_obj);
664 ASSERT(BPending_IsSet(&listener->next_job))
665 ASSERT(!listener->busy)
666 ASSERT(listener->ready)
667 } break;
668 case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
669 BConnector *connector = source.u.connector.connector;
670 DebugObject_Access(&connector->d_obj);
671 ASSERT(connector->reactor == reactor)
672 ASSERT(connector->sock != INVALID_SOCKET)
673 ASSERT(!connector->busy)
674 ASSERT(connector->ready)
675 } break;
676 default: ASSERT(0);
677 }
678 ASSERT(handler)
679 BNetwork_Assert();
680  
681 // init arguments
682 o->reactor = reactor;
683 o->user = user;
684 o->handler = handler;
685  
686 switch (source.type) {
687 case BCONNECTION_SOURCE_TYPE_LISTENER: {
688 BListener *listener = source.u.listener.listener;
689  
690 // grab new socket from listener
691 o->sock = listener->newsock;
692 listener->ready = 0;
693  
694 // associate with IOCP
695 if (!CreateIoCompletionPort((HANDLE)o->sock, BReactor_GetIOCPHandle(o->reactor), 0, 0)) {
696 BLog(BLOG_ERROR, "CreateIoCompletionPort failed");
697 goto fail1;
698 }
699  
700 // return address
701 if (source.u.listener.out_addr) {
702 struct sockaddr *addr_local;
703 struct sockaddr *addr_remote;
704 int len_local;
705 int len_remote;
706 listener->fnGetAcceptExSockaddrs(listener->addrbuf, 0, sizeof(struct BListener_addrbuf_stub), sizeof(struct BListener_addrbuf_stub),
707 &addr_local, &len_local, &addr_remote, &len_remote);
708  
709 struct sys_addr sysaddr;
710  
711 ASSERT_FORCE(len_remote >= 0)
712 ASSERT_FORCE(len_remote <= sizeof(sysaddr.addr))
713  
714 memcpy((uint8_t *)&sysaddr.addr, (uint8_t *)addr_remote, len_remote);
715 sysaddr.len = len_remote;
716  
717 addr_sys_to_socket(source.u.listener.out_addr, sysaddr);
718 }
719 } break;
720  
721 case BCONNECTION_SOURCE_TYPE_CONNECTOR: {
722 BConnector *connector = source.u.connector.connector;
723  
724 // grab fd from connector
725 o->sock = connector->sock;
726 connector->sock = INVALID_SOCKET;
727  
728 // release connector resources
729 connector_abort(connector);
730 } break;
731 }
732  
733 // set not aborted
734 o->aborted = 0;
735  
736 // init send olap
737 BReactorIOCPOverlapped_Init(&o->send.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_send_olap_handler);
738  
739 // set send not inited
740 o->send.inited = 0;
741  
742 // init recv olap
743 BReactorIOCPOverlapped_Init(&o->recv.olap, o->reactor, o, (BReactorIOCPOverlapped_handler)connection_recv_olap_handler);
744  
745 // set recv not closed
746 o->recv.closed = 0;
747  
748 // set recv not inited
749 o->recv.inited = 0;
750  
751 DebugError_Init(&o->d_err, BReactor_PendingGroup(o->reactor));
752 DebugObject_Init(&o->d_obj);
753 return 1;
754  
755 fail1:
756 if (closesocket(o->sock) == SOCKET_ERROR) {
757 BLog(BLOG_ERROR, "closesocket failed");
758 }
759 return 0;
760 }
761  
762 void BConnection_Free (BConnection *o)
763 {
764 DebugObject_Free(&o->d_obj);
765 DebugError_Free(&o->d_err);
766 ASSERT(!o->recv.inited)
767 ASSERT(!o->send.inited)
768  
769 if (!o->aborted) {
770 connection_abort(o);
771 }
772 }
773  
774 void BConnection_SetHandlers (BConnection *o, void *user, BConnection_handler handler)
775 {
776 DebugObject_Access(&o->d_obj);
777  
778 // set handlers
779 o->user = user;
780 o->handler = handler;
781 }
782  
783 int BConnection_SetSendBuffer (BConnection *o, int buf_size)
784 {
785 DebugObject_Access(&o->d_obj);
786  
787 if (setsockopt(o->sock, SOL_SOCKET, SO_SNDBUF, (char *)&buf_size, sizeof(buf_size)) < 0) {
788 BLog(BLOG_ERROR, "setsockopt failed");
789 return 0;
790 }
791  
792 return 1;
793 }
794  
795 int BConnection_GetLocalAddress (BConnection *o, BAddr *local_addr)
796 {
797 DebugObject_Access(&o->d_obj);
798  
799 struct sys_addr sysaddr;
800 socklen_t addr_size = sizeof(sysaddr.addr.generic);
801 if (getsockname(o->sock, &sysaddr.addr.generic, &addr_size) != 0) {
802 BLog(BLOG_ERROR, "BConnection_GetLocalAddress: getsockname failed");
803 return 0;
804 }
805 sysaddr.len = addr_size;
806  
807 BAddr addr;
808 addr_sys_to_socket(&addr, sysaddr);
809  
810 if (addr.type == BADDR_TYPE_NONE) {
811 BLog(BLOG_ERROR, "BConnection_GetLocalAddress: Unsupported address family "
812 "from getsockname: %d", sysaddr.addr.generic.sa_family);
813 return 0;
814 }
815  
816 *local_addr = addr;
817 return 1;
818 }
819  
820 void BConnection_SendAsync_Init (BConnection *o)
821 {
822 DebugObject_Access(&o->d_obj);
823 DebugError_AssertNoError(&o->d_err);
824 ASSERT(!o->aborted)
825 ASSERT(!o->send.inited)
826  
827 // init interface
828 StreamPassInterface_Init(&o->send.iface, (StreamPassInterface_handler_send)connection_send_iface_handler_send, o, BReactor_PendingGroup(o->reactor));
829  
830 // set not busy
831 o->send.busy = 0;
832  
833 // set inited
834 o->send.inited = 1;
835 }
836  
837 void BConnection_SendAsync_Free (BConnection *o)
838 {
839 DebugObject_Access(&o->d_obj);
840 ASSERT(o->send.inited)
841  
842 // abort if busy
843 if (o->send.busy && !o->aborted) {
844 connection_abort(o);
845 }
846  
847 // free interface
848 StreamPassInterface_Free(&o->send.iface);
849  
850 // set not inited
851 o->send.inited = 0;
852 }
853  
854 StreamPassInterface * BConnection_SendAsync_GetIf (BConnection *o)
855 {
856 DebugObject_Access(&o->d_obj);
857 ASSERT(o->send.inited)
858  
859 return &o->send.iface;
860 }
861  
862 void BConnection_RecvAsync_Init (BConnection *o)
863 {
864 DebugObject_Access(&o->d_obj);
865 DebugError_AssertNoError(&o->d_err);
866 ASSERT(!o->recv.closed)
867 ASSERT(!o->aborted)
868 ASSERT(!o->recv.inited)
869  
870 // init interface
871 StreamRecvInterface_Init(&o->recv.iface, (StreamRecvInterface_handler_recv)connection_recv_iface_handler_recv, o, BReactor_PendingGroup(o->reactor));
872  
873 // set not busy
874 o->recv.busy = 0;
875  
876 // set inited
877 o->recv.inited = 1;
878 }
879  
880 void BConnection_RecvAsync_Free (BConnection *o)
881 {
882 DebugObject_Access(&o->d_obj);
883 ASSERT(o->recv.inited)
884  
885 // abort if busy
886 if (o->recv.busy && !o->aborted) {
887 connection_abort(o);
888 }
889  
890 // free interface
891 StreamRecvInterface_Free(&o->recv.iface);
892  
893 // set not inited
894 o->recv.inited = 0;
895 }
896  
897 StreamRecvInterface * BConnection_RecvAsync_GetIf (BConnection *o)
898 {
899 DebugObject_Access(&o->d_obj);
900 ASSERT(o->recv.inited)
901  
902 return &o->recv.iface;
903 }