BadVPN – Blame information for rev 1

Subversion Repositories:
Rev:
Rev Author Line No. Line
1 office 1 /**
2 * @file DataProto.c
3 * @author Ambroz Bizjak <ambrop7@gmail.com>
4 *
5 * @section LICENSE
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the author nor the
15 * names of its contributors may be used to endorse or promote products
16 * derived from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
25 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 */
29  
30 #include <stdlib.h>
31 #include <string.h>
32 #include <limits.h>
33  
34 #include <protocol/dataproto.h>
35 #include <misc/byteorder.h>
36 #include <base/BLog.h>
37  
38 #include <client/DataProto.h>
39  
40 #include <generated/blog_channel_DataProto.h>
41  
42 static void monitor_handler (DataProtoSink *o);
43 static void refresh_up_job (DataProtoSink *o);
44 static void receive_timer_handler (DataProtoSink *o);
45 static void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len);
46 static void up_job_handler (DataProtoSink *o);
47 static void flow_buffer_free (struct DataProtoFlow_buffer *b);
48 static void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink);
49 static void flow_buffer_detach (struct DataProtoFlow_buffer *b);
50 static void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b);
51 static void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b);
52 static void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b);
53  
54 void monitor_handler (DataProtoSink *o)
55 {
56 DebugObject_Access(&o->d_obj);
57  
58 // send keep-alive
59 PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
60 }
61  
62 void refresh_up_job (DataProtoSink *o)
63 {
64 if (o->up != o->up_report) {
65 BPending_Set(&o->up_job);
66 } else {
67 BPending_Unset(&o->up_job);
68 }
69 }
70  
71 void receive_timer_handler (DataProtoSink *o)
72 {
73 DebugObject_Access(&o->d_obj);
74  
75 // consider down
76 o->up = 0;
77  
78 refresh_up_job(o);
79 }
80  
81 void notifier_handler (DataProtoSink *o, uint8_t *data, int data_len)
82 {
83 DebugObject_Access(&o->d_obj);
84 ASSERT(data_len >= sizeof(struct dataproto_header))
85  
86 int flags = 0;
87  
88 // if we are receiving keepalives, set the flag
89 if (BTimer_IsRunning(&o->receive_timer)) {
90 flags |= DATAPROTO_FLAGS_RECEIVING_KEEPALIVES;
91 }
92  
93 // modify existing packet here
94 struct dataproto_header header;
95 memcpy(&header, data, sizeof(header));
96 header.flags = hton8(flags);
97 memcpy(data, &header, sizeof(header));
98 }
99  
100 void up_job_handler (DataProtoSink *o)
101 {
102 DebugObject_Access(&o->d_obj);
103 ASSERT(o->up != o->up_report)
104  
105 o->up_report = o->up;
106  
107 o->handler(o->user, o->up);
108 return;
109 }
110  
111 void source_router_handler (DataProtoSource *o, uint8_t *buf, int recv_len)
112 {
113 DebugObject_Access(&o->d_obj);
114 ASSERT(buf)
115 ASSERT(recv_len >= 0)
116 ASSERT(recv_len <= o->frame_mtu)
117  
118 // remember packet
119 o->current_buf = buf;
120 o->current_recv_len = recv_len;
121  
122 // call handler
123 o->handler(o->user, buf + DATAPROTO_MAX_OVERHEAD, recv_len);
124 return;
125 }
126  
127 void flow_buffer_free (struct DataProtoFlow_buffer *b)
128 {
129 ASSERT(!b->sink)
130  
131 // free route buffer
132 RouteBuffer_Free(&b->rbuf);
133  
134 // free inactivity monitor
135 if (b->inactivity_time >= 0) {
136 PacketPassInactivityMonitor_Free(&b->monitor);
137 }
138  
139 // free connector
140 PacketPassConnector_Free(&b->connector);
141  
142 // free buffer structure
143 free(b);
144 }
145  
146 void flow_buffer_attach (struct DataProtoFlow_buffer *b, DataProtoSink *sink)
147 {
148 ASSERT(!b->sink)
149  
150 // init queue flow
151 PacketPassFairQueueFlow_Init(&b->sink_qflow, &sink->queue);
152  
153 // connect to queue flow
154 PacketPassConnector_ConnectOutput(&b->connector, PacketPassFairQueueFlow_GetInput(&b->sink_qflow));
155  
156 // set DataProto
157 b->sink = sink;
158 }
159  
160 void flow_buffer_detach (struct DataProtoFlow_buffer *b)
161 {
162 ASSERT(b->sink)
163 PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
164  
165 // disconnect from queue flow
166 PacketPassConnector_DisconnectOutput(&b->connector);
167  
168 // free queue flow
169 PacketPassFairQueueFlow_Free(&b->sink_qflow);
170  
171 // clear reference to this buffer in the sink
172 if (b->sink->detaching_buffer == b) {
173 b->sink->detaching_buffer = NULL;
174 }
175  
176 // set no DataProto
177 b->sink = NULL;
178 }
179  
180 void flow_buffer_schedule_detach (struct DataProtoFlow_buffer *b)
181 {
182 ASSERT(b->sink)
183 ASSERT(PacketPassFairQueueFlow_IsBusy(&b->sink_qflow))
184 ASSERT(!b->sink->detaching_buffer || b->sink->detaching_buffer == b)
185  
186 if (b->sink->detaching_buffer == b) {
187 return;
188 }
189  
190 // request cancel
191 PacketPassFairQueueFlow_RequestCancel(&b->sink_qflow);
192  
193 // set busy handler
194 PacketPassFairQueueFlow_SetBusyHandler(&b->sink_qflow, (PacketPassFairQueue_handler_busy)flow_buffer_qflow_handler_busy, b);
195  
196 // remember this buffer in the sink so it can handle us if it goes away
197 b->sink->detaching_buffer = b;
198 }
199  
200 void flow_buffer_finish_detach (struct DataProtoFlow_buffer *b)
201 {
202 ASSERT(b->sink)
203 ASSERT(b->sink->detaching_buffer == b)
204 PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
205  
206 // detach
207 flow_buffer_detach(b);
208  
209 if (!b->flow) {
210 // free
211 flow_buffer_free(b);
212 } else if (b->flow->sink_desired) {
213 // attach
214 flow_buffer_attach(b, b->flow->sink_desired);
215 }
216 }
217  
218 void flow_buffer_qflow_handler_busy (struct DataProtoFlow_buffer *b)
219 {
220 ASSERT(b->sink)
221 ASSERT(b->sink->detaching_buffer == b)
222 PacketPassFairQueueFlow_AssertFree(&b->sink_qflow);
223  
224 flow_buffer_finish_detach(b);
225 }
226  
227 int DataProtoSink_Init (DataProtoSink *o, BReactor *reactor, PacketPassInterface *output, btime_t keepalive_time, btime_t tolerance_time, DataProtoSink_handler handler, void *user)
228 {
229 ASSERT(PacketPassInterface_HasCancel(output))
230 ASSERT(PacketPassInterface_GetMTU(output) >= DATAPROTO_MAX_OVERHEAD)
231  
232 // init arguments
233 o->reactor = reactor;
234 o->handler = handler;
235 o->user = user;
236  
237 // set frame MTU
238 o->frame_mtu = PacketPassInterface_GetMTU(output) - DATAPROTO_MAX_OVERHEAD;
239  
240 // init notifier
241 PacketPassNotifier_Init(&o->notifier, output, BReactor_PendingGroup(o->reactor));
242 PacketPassNotifier_SetHandler(&o->notifier, (PacketPassNotifier_handler_notify)notifier_handler, o);
243  
244 // init monitor
245 PacketPassInactivityMonitor_Init(&o->monitor, PacketPassNotifier_GetInput(&o->notifier), o->reactor, keepalive_time, (PacketPassInactivityMonitor_handler)monitor_handler, o);
246 PacketPassInactivityMonitor_Force(&o->monitor);
247  
248 // init queue
249 if (!PacketPassFairQueue_Init(&o->queue, PacketPassInactivityMonitor_GetInput(&o->monitor), BReactor_PendingGroup(o->reactor), 1, 1)) {
250 BLog(BLOG_ERROR, "PacketPassFairQueue_Init failed");
251 goto fail1;
252 }
253  
254 // init keepalive queue flow
255 PacketPassFairQueueFlow_Init(&o->ka_qflow, &o->queue);
256  
257 // init keepalive source
258 DataProtoKeepaliveSource_Init(&o->ka_source, BReactor_PendingGroup(o->reactor));
259  
260 // init keepalive blocker
261 PacketRecvBlocker_Init(&o->ka_blocker, DataProtoKeepaliveSource_GetOutput(&o->ka_source), BReactor_PendingGroup(o->reactor));
262  
263 // init keepalive buffer
264 if (!SinglePacketBuffer_Init(&o->ka_buffer, PacketRecvBlocker_GetOutput(&o->ka_blocker), PacketPassFairQueueFlow_GetInput(&o->ka_qflow), BReactor_PendingGroup(o->reactor))) {
265 BLog(BLOG_ERROR, "SinglePacketBuffer_Init failed");
266 goto fail2;
267 }
268  
269 // init receive timer
270 BTimer_Init(&o->receive_timer, tolerance_time, (BTimer_handler)receive_timer_handler, o);
271  
272 // init handler job
273 BPending_Init(&o->up_job, BReactor_PendingGroup(o->reactor), (BPending_handler)up_job_handler, o);
274  
275 // set not up
276 o->up = 0;
277 o->up_report = 0;
278  
279 // set no detaching buffer
280 o->detaching_buffer = NULL;
281  
282 DebugCounter_Init(&o->d_ctr);
283 DebugObject_Init(&o->d_obj);
284 return 1;
285  
286 fail2:
287 PacketRecvBlocker_Free(&o->ka_blocker);
288 DataProtoKeepaliveSource_Free(&o->ka_source);
289 PacketPassFairQueueFlow_Free(&o->ka_qflow);
290 PacketPassFairQueue_Free(&o->queue);
291 fail1:
292 PacketPassInactivityMonitor_Free(&o->monitor);
293 PacketPassNotifier_Free(&o->notifier);
294 return 0;
295 }
296  
297 void DataProtoSink_Free (DataProtoSink *o)
298 {
299 DebugObject_Free(&o->d_obj);
300 DebugCounter_Free(&o->d_ctr);
301  
302 // allow freeing queue flows
303 PacketPassFairQueue_PrepareFree(&o->queue);
304  
305 // release detaching buffer
306 if (o->detaching_buffer) {
307 ASSERT(!o->detaching_buffer->flow || o->detaching_buffer->flow->sink_desired != o)
308 flow_buffer_finish_detach(o->detaching_buffer);
309 }
310  
311 // free handler job
312 BPending_Free(&o->up_job);
313  
314 // free receive timer
315 BReactor_RemoveTimer(o->reactor, &o->receive_timer);
316  
317 // free keepalive buffer
318 SinglePacketBuffer_Free(&o->ka_buffer);
319  
320 // free keepalive blocker
321 PacketRecvBlocker_Free(&o->ka_blocker);
322  
323 // free keepalive source
324 DataProtoKeepaliveSource_Free(&o->ka_source);
325  
326 // free keepalive queue flow
327 PacketPassFairQueueFlow_Free(&o->ka_qflow);
328  
329 // free queue
330 PacketPassFairQueue_Free(&o->queue);
331  
332 // free monitor
333 PacketPassInactivityMonitor_Free(&o->monitor);
334  
335 // free notifier
336 PacketPassNotifier_Free(&o->notifier);
337 }
338  
339 void DataProtoSink_Received (DataProtoSink *o, int peer_receiving)
340 {
341 ASSERT(peer_receiving == 0 || peer_receiving == 1)
342 DebugObject_Access(&o->d_obj);
343  
344 // reset receive timer
345 BReactor_SetTimer(o->reactor, &o->receive_timer);
346  
347 if (!peer_receiving) {
348 // peer reports not receiving, consider down
349 o->up = 0;
350 // send keep-alive to converge faster
351 PacketRecvBlocker_AllowBlockedPacket(&o->ka_blocker);
352 } else {
353 // consider up
354 o->up = 1;
355 }
356  
357 refresh_up_job(o);
358 }
359  
360 int DataProtoSource_Init (DataProtoSource *o, PacketRecvInterface *input, DataProtoSource_handler handler, void *user, BReactor *reactor)
361 {
362 ASSERT(PacketRecvInterface_GetMTU(input) <= INT_MAX - DATAPROTO_MAX_OVERHEAD)
363 ASSERT(handler)
364  
365 // init arguments
366 o->handler = handler;
367 o->user = user;
368 o->reactor = reactor;
369  
370 // remember frame MTU
371 o->frame_mtu = PacketRecvInterface_GetMTU(input);
372  
373 // init router
374 if (!PacketRouter_Init(&o->router, DATAPROTO_MAX_OVERHEAD + o->frame_mtu, DATAPROTO_MAX_OVERHEAD, input, (PacketRouter_handler)source_router_handler, o, BReactor_PendingGroup(reactor))) {
375 BLog(BLOG_ERROR, "PacketRouter_Init failed");
376 goto fail0;
377 }
378  
379 DebugCounter_Init(&o->d_ctr);
380 DebugObject_Init(&o->d_obj);
381 return 1;
382  
383 fail0:
384 return 0;
385 }
386  
387 void DataProtoSource_Free (DataProtoSource *o)
388 {
389 DebugObject_Free(&o->d_obj);
390 DebugCounter_Free(&o->d_ctr);
391  
392 // free router
393 PacketRouter_Free(&o->router);
394 }
395  
396 int DataProtoFlow_Init (DataProtoFlow *o, DataProtoSource *source, peerid_t source_id, peerid_t dest_id, int num_packets, int inactivity_time, void *user,
397 DataProtoFlow_handler_inactivity handler_inactivity)
398 {
399 DebugObject_Access(&source->d_obj);
400 ASSERT(num_packets > 0)
401 ASSERT(!(inactivity_time >= 0) || handler_inactivity)
402  
403 // init arguments
404 o->source = source;
405 o->source_id = source_id;
406 o->dest_id = dest_id;
407  
408 // set no desired sink
409 o->sink_desired = NULL;
410  
411 // allocate buffer structure
412 struct DataProtoFlow_buffer *b = (struct DataProtoFlow_buffer *)malloc(sizeof(*b));
413 if (!b) {
414 BLog(BLOG_ERROR, "malloc failed");
415 goto fail0;
416 }
417 o->b = b;
418  
419 // set parent
420 b->flow = o;
421  
422 // remember inactivity time
423 b->inactivity_time = inactivity_time;
424  
425 // init connector
426 PacketPassConnector_Init(&b->connector, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, BReactor_PendingGroup(source->reactor));
427  
428 // init inactivity monitor
429 PacketPassInterface *buf_out = PacketPassConnector_GetInput(&b->connector);
430 if (b->inactivity_time >= 0) {
431 PacketPassInactivityMonitor_Init(&b->monitor, buf_out, source->reactor, b->inactivity_time, handler_inactivity, user);
432 buf_out = PacketPassInactivityMonitor_GetInput(&b->monitor);
433 }
434  
435 // init route buffer
436 if (!RouteBuffer_Init(&b->rbuf, DATAPROTO_MAX_OVERHEAD + source->frame_mtu, buf_out, num_packets)) {
437 BLog(BLOG_ERROR, "RouteBuffer_Init failed");
438 goto fail1;
439 }
440  
441 // set no sink
442 b->sink = NULL;
443  
444 DebugCounter_Increment(&source->d_ctr);
445 DebugObject_Init(&o->d_obj);
446 return 1;
447  
448 fail1:
449 if (b->inactivity_time >= 0) {
450 PacketPassInactivityMonitor_Free(&b->monitor);
451 }
452 PacketPassConnector_Free(&b->connector);
453 free(b);
454 fail0:
455 return 0;
456 }
457  
458 void DataProtoFlow_Free (DataProtoFlow *o)
459 {
460 DebugObject_Free(&o->d_obj);
461 DebugCounter_Decrement(&o->source->d_ctr);
462 ASSERT(!o->sink_desired)
463 struct DataProtoFlow_buffer *b = o->b;
464  
465 if (b->sink) {
466 if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
467 // schedule detach, free buffer after detach
468 flow_buffer_schedule_detach(b);
469 b->flow = NULL;
470  
471 // remove inactivity handler
472 if (b->inactivity_time >= 0) {
473 PacketPassInactivityMonitor_SetHandler(&b->monitor, NULL, NULL);
474 }
475 } else {
476 // detach and free buffer now
477 flow_buffer_detach(b);
478 flow_buffer_free(b);
479 }
480 } else {
481 // free buffer
482 flow_buffer_free(b);
483 }
484 }
485  
486 void DataProtoFlow_Route (DataProtoFlow *o, int more)
487 {
488 DebugObject_Access(&o->d_obj);
489 PacketRouter_AssertRoute(&o->source->router);
490 ASSERT(o->source->current_buf)
491 ASSERT(more == 0 || more == 1)
492 struct DataProtoFlow_buffer *b = o->b;
493  
494 // write header. Don't set flags, it will be set in notifier_handler.
495 struct dataproto_header header;
496 struct dataproto_peer_id id;
497 header.from_id = htol16(o->source_id);
498 header.num_peer_ids = htol16(1);
499 id.id = htol16(o->dest_id);
500 memcpy(o->source->current_buf, &header, sizeof(header));
501 memcpy(o->source->current_buf + sizeof(header), &id, sizeof(id));
502  
503 // route
504 uint8_t *next_buf;
505 if (!PacketRouter_Route(&o->source->router, DATAPROTO_MAX_OVERHEAD + o->source->current_recv_len, &b->rbuf,
506 &next_buf, DATAPROTO_MAX_OVERHEAD, (more ? o->source->current_recv_len : 0)
507 )) {
508 BLog(BLOG_NOTICE, "buffer full: %d->%d", (int)o->source_id, (int)o->dest_id);
509 return;
510 }
511  
512 // remember next buffer, or don't allow further routing if more==0
513 o->source->current_buf = (more ? next_buf : NULL);
514 }
515  
516 void DataProtoFlow_Attach (DataProtoFlow *o, DataProtoSink *sink)
517 {
518 DebugObject_Access(&o->d_obj);
519 DebugObject_Access(&sink->d_obj);
520 ASSERT(!o->sink_desired)
521 ASSERT(sink)
522 ASSERT(o->source->frame_mtu <= sink->frame_mtu)
523 struct DataProtoFlow_buffer *b = o->b;
524  
525 if (b->sink) {
526 if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
527 // schedule detach and reattach
528 flow_buffer_schedule_detach(b);
529 } else {
530 // detach and reattach now
531 flow_buffer_detach(b);
532 flow_buffer_attach(b, sink);
533 }
534 } else {
535 // attach
536 flow_buffer_attach(b, sink);
537 }
538  
539 // set desired sink
540 o->sink_desired = sink;
541  
542 DebugCounter_Increment(&sink->d_ctr);
543 }
544  
545 void DataProtoFlow_Detach (DataProtoFlow *o)
546 {
547 DebugObject_Access(&o->d_obj);
548 ASSERT(o->sink_desired)
549 struct DataProtoFlow_buffer *b = o->b;
550 ASSERT(b->sink)
551  
552 DataProtoSink *sink = o->sink_desired;
553  
554 if (PacketPassFairQueueFlow_IsBusy(&b->sink_qflow)) {
555 // schedule detach
556 flow_buffer_schedule_detach(b);
557 } else {
558 // detach now
559 flow_buffer_detach(b);
560 }
561  
562 // set no desired sink
563 o->sink_desired = NULL;
564  
565 DebugCounter_Decrement(&sink->d_ctr);
566 }