00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <sys/types.h>
00022 #include <netinet/in.h>
00023
00024 #include <oasys/debug/DebugUtils.h>
00025 #include <oasys/util/StringUtils.h>
00026
00027 #include "BlockInfo.h"
00028 #include "BlockProcessor.h"
00029 #include "Bundle.h"
00030 #include "BundleProtocol.h"
00031 #include "BundleTimestamp.h"
00032 #include "MetadataBlockProcessor.h"
00033 #include "PayloadBlockProcessor.h"
00034 #include "PreviousHopBlockProcessor.h"
00035 #include "PrimaryBlockProcessor.h"
00036 #include "SDNV.h"
00037 #include "SessionBlockProcessor.h"
00038 #include "SequenceIDBlockProcessor.h"
00039 #include "UnknownBlockProcessor.h"
00040
00041 #ifdef BSP_ENABLED
00042 # include "security/SPD.h"
00043 #endif
00044
00045 namespace dtn {
00046
00047 static const char* LOG = "/dtn/bundle/protocol";
00048
00049 BlockProcessor* BundleProtocol::processors_[256];
00050
00051
00052 void
00053 BundleProtocol::register_processor(BlockProcessor* bp)
00054 {
00055
00056 ASSERT(processors_[bp->block_type()] == 0);
00057 processors_[bp->block_type()] = bp;
00058 }
00059
00060
00061 BlockProcessor*
00062 BundleProtocol::find_processor(u_int8_t type)
00063 {
00064 BlockProcessor* ret = processors_[type];
00065 if (ret == 0) {
00066 ret = UnknownBlockProcessor::instance();
00067 }
00068 return ret;
00069 }
00070
00071
00072 void
00073 BundleProtocol::init_default_processors()
00074 {
00075
00076 BundleProtocol::register_processor(new PrimaryBlockProcessor());
00077 BundleProtocol::register_processor(new PayloadBlockProcessor());
00078 BundleProtocol::register_processor(new PreviousHopBlockProcessor());
00079 BundleProtocol::register_processor(new MetadataBlockProcessor());
00080 BundleProtocol::register_processor(new SessionBlockProcessor());
00081 BundleProtocol::register_processor(
00082 new SequenceIDBlockProcessor(BundleProtocol::SEQUENCE_ID_BLOCK));
00083 BundleProtocol::register_processor(
00084 new SequenceIDBlockProcessor(BundleProtocol::OBSOLETES_ID_BLOCK));
00085 }
00086
00087
00088 void
00089 BundleProtocol::reload_post_process(Bundle* bundle)
00090 {
00091 BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
00092
00093 for (BlockInfoVec::iterator iter = recv_blocks->begin();
00094 iter != recv_blocks->end();
00095 ++iter)
00096 {
00097
00098
00099 iter->owner()->reload_post_process(bundle, recv_blocks, &*iter);
00100 }
00101 }
00102
00103
00104 BlockInfoVec*
00105 BundleProtocol::prepare_blocks(Bundle* bundle, const LinkRef& link)
00106 {
00107
00108
00109
00110 BlockInfoVec* xmit_blocks = bundle->xmit_blocks()->create_blocks(link);
00111 const BlockInfoVec* recv_blocks = &bundle->recv_blocks();
00112 BlockInfoVec* api_blocks = bundle->api_blocks();
00113
00114 if (recv_blocks->size() > 0) {
00115
00116 ASSERT(recv_blocks->front().type() == PRIMARY_BLOCK);
00117
00118 for (BlockInfoVec::const_iterator iter = recv_blocks->begin();
00119 iter != recv_blocks->end();
00120 ++iter)
00121 {
00122
00123
00124
00125
00126
00127
00128
00129 if (bundle->fragmented_incoming()
00130 && xmit_blocks->find_block(BundleProtocol::PAYLOAD_BLOCK)) {
00131 continue;
00132 }
00133
00134 iter->owner()->prepare(bundle, xmit_blocks, &*iter, link,
00135 BlockInfo::LIST_RECEIVED);
00136 }
00137 }
00138 else {
00139 log_debug_p(LOG, "adding primary and payload block");
00140 BlockProcessor* bp = find_processor(PRIMARY_BLOCK);
00141 bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
00142 bp = find_processor(PAYLOAD_BLOCK);
00143 bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
00144 }
00145
00146
00147 for (BlockInfoVec::iterator iter = api_blocks->begin();
00148 iter != api_blocks->end();
00149 ++iter)
00150 {
00151 log_debug_p(LOG, "adding api_block");
00152 iter->owner()->prepare(bundle, xmit_blocks,
00153 &*iter, link, BlockInfo::LIST_API);
00154 }
00155
00156
00157
00158
00159
00160
00161
00162
00163 for (int i = 0; i < 256; ++i) {
00164 BlockProcessor* bp = find_processor(i);
00165 if (bp == UnknownBlockProcessor::instance()) {
00166 continue;
00167 }
00168
00169 if (! xmit_blocks->has_block(i)) {
00170 bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
00171 }
00172 }
00173
00174
00175 BlockProcessor* metadata_processor = find_processor(METADATA_BLOCK);
00176 ASSERT(metadata_processor != NULL);
00177 ASSERT(metadata_processor->block_type() == METADATA_BLOCK);
00178 ((MetadataBlockProcessor *)metadata_processor)->
00179 prepare_generated_metadata(bundle, xmit_blocks, link);
00180
00181 #ifdef BSP_ENABLED
00182
00183 SPD::prepare_out_blocks(bundle, link, xmit_blocks);
00184 #endif
00185
00186 return xmit_blocks;
00187 }
00188
00189
00190 size_t
00191 BundleProtocol::generate_blocks(Bundle* bundle,
00192 BlockInfoVec* blocks,
00193 const LinkRef& link)
00194 {
00195
00196
00197 ASSERT(blocks->size() >= 2);
00198 ASSERT(blocks->front().type() == PRIMARY_BLOCK);
00199
00200
00201
00202 BlockInfoVec::iterator last_block = blocks->end() - 1;
00203 for (BlockInfoVec::iterator iter = blocks->begin();
00204 iter != blocks->end();
00205 ++iter)
00206 {
00207 bool last = (iter == last_block);
00208 iter->owner()->generate(bundle, blocks, &*iter, link, last);
00209
00210 log_debug_p(LOG, "generated block (owner 0x%x type 0x%x) "
00211 "data_offset %u data_length %u contents length %zu",
00212 iter->owner()->block_type(), iter->type(),
00213 iter->data_offset(), iter->data_length(),
00214 iter->contents().len());
00215
00216 if (last) {
00217 ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) != 0);
00218 } else {
00219 ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) == 0);
00220 }
00221 }
00222
00223
00224
00225 PrimaryBlockProcessor* pbp =
00226 (PrimaryBlockProcessor*)find_processor(PRIMARY_BLOCK);
00227 ASSERT(blocks->front().owner() == pbp);
00228 pbp->generate_primary(bundle, blocks, &blocks->front());
00229
00230
00231
00232
00233
00234
00235 size_t total_len = 0;
00236 for (BlockInfoVec::reverse_iterator iter = blocks->rbegin();
00237 iter != blocks->rend();
00238 ++iter)
00239 {
00240 iter->owner()->finalize(bundle, blocks, &*iter, link);
00241 total_len += iter->full_length();
00242 }
00243
00244 return total_len;
00245 }
00246
00247
00248 void
00249 BundleProtocol::delete_blocks(Bundle* bundle, const LinkRef& link)
00250 {
00251 ASSERT(bundle != NULL);
00252
00253 bundle->xmit_blocks()->delete_blocks(link);
00254
00255 BlockProcessor* metadata_processor = find_processor(METADATA_BLOCK);
00256 ASSERT(metadata_processor != NULL);
00257 ASSERT(metadata_processor->block_type() == METADATA_BLOCK);
00258 ((MetadataBlockProcessor *)metadata_processor)->
00259 delete_generated_metadata(bundle, link);
00260 }
00261
00262
00263 size_t
00264 BundleProtocol::total_length(const BlockInfoVec* blocks)
00265 {
00266 size_t ret = 0;
00267 for (BlockInfoVec::const_iterator iter = blocks->begin();
00268 iter != blocks->end();
00269 ++iter)
00270 {
00271 ret += iter->full_length();
00272 }
00273
00274 return ret;
00275 }
00276
00277
00278 size_t
00279 BundleProtocol::payload_offset(const BlockInfoVec* blocks)
00280 {
00281 size_t ret = 0;
00282 for (BlockInfoVec::const_iterator iter = blocks->begin();
00283 iter != blocks->end();
00284 ++iter)
00285 {
00286 if (iter->type() == PAYLOAD_BLOCK) {
00287 ret += iter->data_offset();
00288 return ret;
00289 }
00290
00291 ret += iter->full_length();
00292 }
00293
00294 return ret;
00295 }
00296
00297
00298 size_t
00299 BundleProtocol::produce(const Bundle* bundle, const BlockInfoVec* blocks,
00300 u_char* data, size_t offset, size_t len, bool* last)
00301 {
00302 size_t origlen = len;
00303 *last = false;
00304
00305 if (len == 0)
00306 return 0;
00307
00308
00309 ASSERT(!blocks->empty());
00310 BlockInfoVec::const_iterator iter = blocks->begin();
00311 while (offset >= iter->full_length()) {
00312 log_debug_p(LOG, "BundleProtocol::produce skipping block type 0x%x "
00313 "since offset %zu >= block length %u",
00314 iter->type(), offset, iter->full_length());
00315
00316 offset -= iter->full_length();
00317 iter++;
00318 ASSERT(iter != blocks->end());
00319 }
00320
00321
00322 ASSERT(iter != blocks->end());
00323
00324
00325 while (1) {
00326 size_t remainder = iter->full_length() - offset;
00327 size_t tocopy = std::min(len, remainder);
00328 log_debug_p(LOG, "BundleProtocol::produce copying %zu/%zu bytes from "
00329 "block type 0x%x at offset %zu",
00330 tocopy, remainder, iter->type(), offset);
00331 iter->owner()->produce(bundle, &*iter, data, offset, tocopy);
00332
00333 len -= tocopy;
00334 data += tocopy;
00335 offset = 0;
00336
00337
00338
00339
00340
00341 if (len == 0) {
00342 if ((tocopy == remainder) &&
00343 (iter->flags() & BLOCK_FLAG_LAST_BLOCK))
00344 {
00345 ASSERT(iter + 1 == blocks->end());
00346 *last = true;
00347 }
00348
00349 break;
00350 }
00351
00352
00353
00354 ASSERT(tocopy == remainder);
00355 if (iter->flags() & BLOCK_FLAG_LAST_BLOCK) {
00356 ASSERT(iter + 1 == blocks->end());
00357 *last = true;
00358 break;
00359 }
00360
00361 ++iter;
00362 ASSERT(iter != blocks->end());
00363 }
00364
00365 log_debug_p(LOG, "BundleProtocol::produce complete: "
00366 "produced %zu bytes, bundle %s",
00367 origlen - len, *last ? "complete" : "not complete");
00368
00369 return origlen - len;
00370 }
00371
00372
00373 int
00374 BundleProtocol::consume(Bundle* bundle,
00375 u_char* data,
00376 size_t len,
00377 bool* last)
00378 {
00379 size_t origlen = len;
00380 *last = false;
00381
00382 BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
00383
00384
00385
00386
00387 if (recv_blocks->empty()) {
00388 log_debug_p(LOG, "consume: got first block... "
00389 "creating primary block info");
00390 recv_blocks->append_block(find_processor(PRIMARY_BLOCK));
00391 }
00392
00393
00394 while (len != 0) {
00395 log_debug_p(LOG, "consume: %zu bytes left to process", len);
00396 BlockInfo* info = &recv_blocks->back();
00397
00398
00399
00400
00401
00402 if (info->complete()) {
00403 info = recv_blocks->append_block(find_processor(*data));
00404 log_debug_p(LOG, "consume: previous block complete, "
00405 "created new BlockInfo type 0x%x",
00406 info->owner()->block_type());
00407 }
00408
00409
00410
00411 log_debug_p(LOG, "consume: block processor 0x%x type 0x%x incomplete, "
00412 "calling consume (%zu bytes already buffered)",
00413 info->owner()->block_type(), info->type(),
00414 info->contents().len());
00415
00416 int cc = info->owner()->consume(bundle, info, data, len);
00417 if (cc < 0) {
00418 log_err_p(LOG, "consume: protocol error handling block 0x%x",
00419 info->type());
00420 return -1;
00421 }
00422
00423
00424
00425
00426 len -= cc;
00427 data += cc;
00428
00429 log_debug_p(LOG, "consume: consumed %u bytes of block type 0x%x (%s)",
00430 cc, info->type(),
00431 info->complete() ? "complete" : "not complete");
00432
00433 if (info->complete()) {
00434
00435 if (info->flags() & BLOCK_FLAG_LAST_BLOCK) {
00436 *last = true;
00437 break;
00438 }
00439
00440 } else {
00441 ASSERT(len == 0);
00442 }
00443 }
00444
00445 log_debug_p(LOG, "consume completed, %zu/%zu bytes consumed %s",
00446 origlen - len, origlen, *last ? "(completed bundle)" : "");
00447 return origlen - len;
00448 }
00449
00450
00451 bool
00452 BundleProtocol::validate(Bundle* bundle,
00453 status_report_reason_t* reception_reason,
00454 status_report_reason_t* deletion_reason)
00455 {
00456 int primary_blocks = 0, payload_blocks = 0;
00457 BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
00458
00459
00460 if (recv_blocks->size() < 2) {
00461 log_err_p(LOG, "bundle fails to contain at least two blocks");
00462 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00463 return false;
00464 }
00465
00466
00467 if (recv_blocks->front().type() != BundleProtocol::PRIMARY_BLOCK) {
00468 log_err_p(LOG, "bundle fails to contain a primary block");
00469 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00470 return false;
00471 }
00472
00473
00474 BlockInfoVec::iterator last_block = recv_blocks->end() - 1;
00475 for (BlockInfoVec::iterator iter = recv_blocks->begin();
00476 iter != recv_blocks->end();
00477 ++iter)
00478 {
00479
00480 if (iter->data_offset() == 0) {
00481
00482
00483 if (iter != last_block) {
00484 log_err_p(LOG, "bundle block too short for the preamble");
00485 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00486 return false;
00487 }
00488
00489 log_debug_p(LOG, "forgetting preamble-starved last block");
00490 recv_blocks->erase(iter);
00491 if (recv_blocks->size() < 2) {
00492 log_err_p(LOG, "bundle fails to contain at least two blocks");
00493 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00494 return false;
00495 }
00496
00497
00498 iter = last_block = recv_blocks->end() - 1;
00499 }
00500 else {
00501 if (iter->type() == BundleProtocol::PRIMARY_BLOCK) {
00502 primary_blocks++;
00503 }
00504
00505 if (iter->type() == BundleProtocol::PAYLOAD_BLOCK) {
00506 payload_blocks++;
00507 }
00508 }
00509
00510 if (!iter->owner()->validate(bundle, recv_blocks, &*iter,
00511 reception_reason, deletion_reason)) {
00512 return false;
00513 }
00514
00515
00516
00517 if (iter == last_block) {
00518 if (!iter->last_block()) {
00519
00520 if (!bundle->fragmented_incoming()) {
00521 log_err_p(LOG, "bundle's last block not flagged");
00522 *deletion_reason
00523 = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00524 return false;
00525 }
00526 else {
00527 log_debug_p(LOG, "bundle's last block not flagged, but "
00528 "it is a reactive fragment");
00529 }
00530 }
00531 } else {
00532 if (iter->last_block()) {
00533 log_err_p(LOG, "bundle block incorrectly flagged as last");
00534 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00535 return false;
00536 }
00537 }
00538 }
00539
00540
00541 if (primary_blocks != 1) {
00542 log_err_p(LOG, "bundle contains %d primary blocks", primary_blocks);
00543 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00544 return false;
00545 }
00546
00547
00548 if (payload_blocks > 1) {
00549 log_err_p(LOG, "bundle contains %d payload blocks", payload_blocks);
00550 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00551 return false;
00552 }
00553
00554 #ifdef BSP_ENABLED
00555
00556
00557 if (! SPD::verify_in_policy(bundle)) {
00558 log_err_p(LOG, "bundle failed security policy verification");
00559 *deletion_reason = BundleProtocol::REASON_SECURITY_FAILED;
00560 return false;
00561 }
00562 #endif
00563
00564 return true;
00565 }
00566
00567
00568 int
00569 BundleProtocol::set_timestamp(u_char* ts, size_t len, const BundleTimestamp& tv)
00570 {
00571 int sec_size = SDNV::encode(tv.seconds_, ts, len);
00572 if (sec_size < 0)
00573 return -1;
00574
00575 int seqno_size = SDNV::encode(tv.seqno_, ts + sec_size, len - sec_size);
00576 if (seqno_size < 0)
00577 return -1;
00578
00579 return sec_size + seqno_size;
00580 }
00581
00582
00583 int
00584 BundleProtocol::get_timestamp(BundleTimestamp* tv, const u_char* ts, size_t len)
00585 {
00586 u_int64_t tmp;
00587
00588 int sec_size = SDNV::decode(ts, len, &tmp);
00589 if (sec_size < 0)
00590 return -1;
00591 ASSERT(tmp < 0xffffffff);
00592 tv->seconds_ = (u_int32_t)tmp;
00593
00594 int seqno_size = SDNV::decode(ts + sec_size, len - sec_size, &tmp);
00595 if (seqno_size < 0)
00596 return -1;
00597 ASSERT(tmp < 0xffffffff);
00598 tv->seqno_ = (u_int32_t)tmp;
00599
00600 return sec_size + seqno_size;
00601 }
00602
00603
00604 size_t
00605 BundleProtocol::ts_encoding_len(const BundleTimestamp& tv)
00606 {
00607 return SDNV::encoding_len(tv.seconds_) + SDNV::encoding_len(tv.seqno_);
00608 }
00609
00610
00611 bool
00612 BundleProtocol::get_admin_type(const Bundle* bundle, admin_record_type_t* type)
00613 {
00614 if (! bundle->is_admin()) {
00615 return false;
00616 }
00617
00618 u_char buf[16];
00619 const u_char* bp = bundle->payload().read_data(0, sizeof(buf), buf);
00620
00621 switch (bp[0] >> 4)
00622 {
00623 #define CASE(_what) case _what: *type = _what; return true;
00624
00625 CASE(ADMIN_STATUS_REPORT);
00626 CASE(ADMIN_CUSTODY_SIGNAL);
00627 CASE(ADMIN_ANNOUNCE);
00628
00629 #undef CASE
00630 default:
00631 return false;
00632 }
00633
00634 NOTREACHED;
00635 }
00636
00637 }