diff --git a/Framework/Core/include/Framework/DataProcessingHelpers.h b/Framework/Core/include/Framework/DataProcessingHelpers.h index a9bd95b69f4c7..87aeeb8922da3 100644 --- a/Framework/Core/include/Framework/DataProcessingHelpers.h +++ b/Framework/Core/include/Framework/DataProcessingHelpers.h @@ -59,6 +59,8 @@ struct DataProcessingHelpers { /// Helper to route messages for forwarding static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span& currentSetOfInputs, std::vector& forwardedParts, bool copy, bool consume); + + static void cleanForwardedMessages(std::span& currentSetOfInputs, bool consume); }; } // namespace o2::framework #endif // O2_FRAMEWORK_DATAPROCESSINGHELPERS_H_ diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f65477c573772..38c57c66c8a01 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -626,9 +626,12 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl slot.index, oldestTimeslice.timeslice.value, copy ? "with copy" : "", copy && consume ? " and " : "", consume ? "with consume" : ""); // Always copy them, because we do not want to actually send them. // We merely need the side effect of the consume, if applicable. - auto forwardedParts = DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, true, consume); + for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { + auto span = std::span(currentSetOfInputs[ii].messages); + DataProcessingHelpers::cleanForwardedMessages(span, consume); + } - O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); + O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done"); }; extern volatile int region_read_global_dummy_variable; diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index 87e7c9bf8962f..334a0fc6045f6 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -338,6 +338,60 @@ void DataProcessingHelpers::routeForwardedMessages(FairMQDeviceProxy& proxy, std } } +void DataProcessingHelpers::cleanForwardedMessages(std::span& messages, bool consume) +{ + size_t pi = 0; + while (pi < messages.size()) { + auto& header = messages[pi]; + + // If is now possible that the record is not complete when + // we forward it, because of a custom completion policy. + // this means that we need to skip the empty entries in the + // record for being forwarded. + if (header->GetData() == nullptr || + o2::header::get(header->GetData()) || + o2::header::get(header->GetData())) { + pi += 2; + continue; + } + + auto dph = o2::header::get(header->GetData()); + auto dh = o2::header::get(header->GetData()); + + if (dph == nullptr || dh == nullptr) { + // Complain only if this is not an out-of-band message + LOGP(error, "Data is missing {}{}{}", + dph ? "DataProcessingHeader" : "", dph || dh ? "and" : "", dh ? "DataHeader" : ""); + pi += 2; + continue; + } + + // At least one payload. + auto& payload = messages[pi + 1]; + // Calculate the number of messages which should be handled together + // all in one go. + size_t numberOfMessages = 0; + if (dh->splitPayloadParts > 0 && dh->splitPayloadParts == dh->splitPayloadIndex) { + // Sequence of (header, payload[0], ... , payload[splitPayloadParts - 1]) pairs belonging together. + numberOfMessages = dh->splitPayloadParts + 1; // one is for the header + } else { + // Sequence of splitPayloadParts (header, payload) pairs belonging together. + // In case splitPayloadParts = 0, we consider this as a single message pair + numberOfMessages = (dh->splitPayloadParts > 0 ? dh->splitPayloadParts : 1) * 2; + } + + if (payload.get() == nullptr && consume == true) { + // If the payload is not there, it means we already + // processed it with ConsumeExisiting. Therefore we + // need to do something only if this is the last consume. + header.reset(nullptr); + } + + // Nothing to forward go to the next messageset + pi += numberOfMessages; + } +} + auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector& currentSetOfInputs, const bool copyByDefault, bool consume) -> std::vector