Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 21 additions & 34 deletions Framework/AnalysisSupport/src/AODReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct Buildable {
std::vector<o2::soa::IndexRecord> records;
std::shared_ptr<arrow::Schema> outputSchema;

Buildable(InputSpec const& spec)
explicit Buildable(InputSpec const& spec)
: binding{spec.binding}
{
auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
Expand All @@ -58,9 +58,8 @@ struct Buildable {
}
outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto& r : recs) {
fields.push_back(r.field());
}
fields.reserve(recs.size());
std::ranges::transform(recs, std::back_inserter(fields), [](auto& r) { return r.field(); });
return fields;
}(records))
->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{binding}}));
Expand All @@ -87,19 +86,12 @@ AlgorithmSpec AODReaderHelpers::indexBuilderCallback(ConfigContext const& /*ctx*
{
return AlgorithmSpec::InitCallback{[](InitContext& ic) {
auto const& requested = ic.services().get<DanglingEdgesContext>().requestedIDXs;
std::vector<Buildable> buildables;
for (auto const& i : requested) {
buildables.emplace_back(i);
}
std::vector<Builder> builders;
for (auto& b : buildables) {
builders.push_back(b.createBuilder());
}
builders.reserve(requested.size());
std::ranges::transform(requested, std::back_inserter(builders), [](auto const& i) { return Buildable{i}.createBuilder(); });
return [builders](ProcessingContext& pc) mutable {
auto outputs = pc.outputs();
for (auto& builder : builders) {
outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc));
}
std::ranges::for_each(builders, [&pc, &outputs](auto& builder) { outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc)); });
};
}};
}
Expand All @@ -119,7 +111,7 @@ struct Spawnable {
header::DataDescription description;
header::DataHeader::SubSpecificationType version;

Spawnable(InputSpec const& spec)
explicit Spawnable(InputSpec const& spec)
: binding{spec.binding}
{
auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
Expand All @@ -144,16 +136,19 @@ struct Spawnable {
iws.str(json);
schemas.emplace_back(ArrowJSONHelpers::read(iws));
}
for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input:") | std::ranges::views::transform([](auto const& param) {
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
})) {
matchers.emplace_back(std::get<ConcreteDataMatcher>(i.matcher));
}
std::ranges::transform(spec.metadata |
views::filter_string_params_starts_with("input:") |
std::ranges::views::transform(
[](auto const& param) {
return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
}),
std::back_inserter(matchers), [](auto const& i) { return std::get<ConcreteDataMatcher>(i.matcher); });

std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto& s : schemas) {
std::copy(s->fields().begin(), s->fields().end(), std::back_inserter(fields));
}
std::ranges::for_each(schemas,
[&fields](auto const& s) {
std::ranges::copy(s->fields(), std::back_inserter(fields));
});

inputSchema = std::make_shared<arrow::Schema>(fields);
expressions = expressions::materializeProjectors(projectors, inputSchema, outputSchema->fields());
Expand Down Expand Up @@ -194,20 +189,12 @@ AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(ConfigContext const& /*ctx*/)
{
return AlgorithmSpec::InitCallback{[](InitContext& ic) {
auto const& requested = ic.services().get<DanglingEdgesContext>().spawnerInputs;
std::vector<Spawnable> spawnables;
for (auto const& i : requested) {
spawnables.emplace_back(i);
}
std::vector<Spawner> spawners;
for (auto& s : spawnables) {
spawners.push_back(s.createMaker());
}

spawners.reserve(requested.size());
std::ranges::transform(requested, std::back_inserter(spawners), [](auto const& i) { return Spawnable{i}.createMaker(); });
return [spawners](ProcessingContext& pc) mutable {
auto outputs = pc.outputs();
for (auto& spawner : spawners) {
outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc));
}
std::ranges::for_each(spawners, [&pc, &outputs](auto& spawner) { outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc)); });
};
}};
}
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,7 @@ struct TableIterator : IP, C... {
};

struct ArrowHelpers {
static std::shared_ptr<arrow::Table> joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables);
static std::shared_ptr<arrow::Table> joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels);
static std::shared_ptr<arrow::Table> joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const std::string> labels);
static std::shared_ptr<arrow::Table> concatTables(std::vector<std::shared_ptr<arrow::Table>>&& tables);
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ std::shared_ptr<gandiva::Projector> createProjectorHelper(size_t nColumns, expre
std::shared_ptr<arrow::Schema> schema,
std::vector<std::shared_ptr<arrow::Field>> const& fields);

std::vector<std::shared_ptr<gandiva::Expression>> materializeProjectors(std::vector<expressions::Projector> const& projectors, std::shared_ptr<arrow::Schema> const& inputSchema, std::vector<std::shared_ptr<arrow::Field>> outputFields);
std::vector<std::shared_ptr<gandiva::Expression>> materializeProjectors(std::vector<expressions::Projector> const& projectors, std::shared_ptr<arrow::Schema> const& inputSchema, std::vector<std::shared_ptr<arrow::Field>> const& outputFields);

template <typename... C>
std::shared_ptr<gandiva::Projector> createProjectors(framework::pack<C...>, std::vector<std::shared_ptr<arrow::Field>> const& fields, gandiva::SchemaPtr schema)
Expand Down
96 changes: 47 additions & 49 deletions Framework/Core/src/ASoA.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,71 +62,71 @@ SelectionVector sliceSelection(std::span<int64_t const> const& mSelectedRows, in
auto start_iterator = std::lower_bound(mSelectedRows.begin(), mSelectedRows.end(), start);
auto stop_iterator = std::lower_bound(start_iterator, mSelectedRows.end(), end);
SelectionVector slicedSelection{start_iterator, stop_iterator};
std::transform(slicedSelection.begin(), slicedSelection.end(), slicedSelection.begin(),
[&start](int64_t idx) {
return idx - static_cast<int64_t>(start);
});
std::ranges::transform(slicedSelection.begin(), slicedSelection.end(), slicedSelection.begin(),
[&start](int64_t idx) {
return idx - static_cast<int64_t>(start);
});
return slicedSelection;
}

std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels)
std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables)
{
if (tables.size() == 1) {
return tables[0];
}
for (auto i = 0U; i < tables.size() - 1; ++i) {
if (tables[i]->num_rows() != tables[i + 1]->num_rows()) {
throw o2::framework::runtime_error_f("Tables %s and %s have different sizes (%d vs %d) and cannot be joined!",
labels[i], labels[i + 1], tables[i]->num_rows(), tables[i + 1]->num_rows());
}
}
std::vector<std::shared_ptr<arrow::Field>> fields;
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;

for (auto& t : tables) {
auto tf = t->fields();
std::copy(tf.begin(), tf.end(), std::back_inserter(fields));
}

auto schema = std::make_shared<arrow::Schema>(fields);

if (tables[0]->num_rows() != 0) {
for (auto& t : tables) {
auto tc = t->columns();
std::copy(tc.begin(), tc.end(), std::back_inserter(columns));
bool notEmpty = (tables[0]->num_rows() != 0);
std::ranges::for_each(tables, [&fields, &columns, notEmpty](auto const& t) {
std::ranges::copy(t->fields(), std::back_inserter(fields));
if (notEmpty) {
std::ranges::copy(t->columns(), std::back_inserter(columns));
}
}
});
auto schema = std::make_shared<arrow::Schema>(fields);
return arrow::Table::Make(schema, columns);
}

std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const std::string> labels)
namespace
{
template <typename T>
requires(std::same_as<T, std::string>)
auto makeString(T const& str)
{
return str.c_str();
}
template <typename T>
requires(std::same_as<T, const char*>)
auto makeString(T const& str)
{
return str;
}

template <typename T>
void canNotJoin(std::vector<std::shared_ptr<arrow::Table>> const& tables, std::span<T> labels)
{
if (tables.size() == 1) {
return tables[0];
}
for (auto i = 0U; i < tables.size() - 1; ++i) {
if (tables[i]->num_rows() != tables[i + 1]->num_rows()) {
throw o2::framework::runtime_error_f("Tables %s and %s have different sizes (%d vs %d) and cannot be joined!",
labels[i].c_str(), labels[i + 1].c_str(), tables[i]->num_rows(), tables[i + 1]->num_rows());
makeString(labels[i]), makeString(labels[i + 1]), tables[i]->num_rows(), tables[i + 1]->num_rows());
}
}
std::vector<std::shared_ptr<arrow::Field>> fields;
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
}
} // namespace

for (auto& t : tables) {
auto tf = t->fields();
std::copy(tf.begin(), tf.end(), std::back_inserter(fields));
std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels)
{
if (tables.size() == 1) {
return tables[0];
}
canNotJoin(tables, labels);
return joinTables(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables));
}

auto schema = std::make_shared<arrow::Schema>(fields);

if (tables[0]->num_rows() != 0) {
for (auto& t : tables) {
auto tc = t->columns();
std::copy(tc.begin(), tc.end(), std::back_inserter(columns));
}
std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const std::string> labels)
{
if (tables.size() == 1) {
return tables[0];
}
return arrow::Table::Make(schema, columns);
canNotJoin(tables, labels);
return joinTables(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables));
}

std::shared_ptr<arrow::Table> ArrowHelpers::concatTables(std::vector<std::shared_ptr<arrow::Table>>&& tables)
Expand All @@ -135,7 +135,6 @@ std::shared_ptr<arrow::Table> ArrowHelpers::concatTables(std::vector<std::shared
return tables[0];
}
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
assert(tables.size() > 1);
std::vector<std::shared_ptr<arrow::Field>> resultFields = tables[0]->schema()->fields();
auto compareFields = [](std::shared_ptr<arrow::Field> const& f1, std::shared_ptr<arrow::Field> const& f2) {
// Let's do this with stable sorting.
Expand Down Expand Up @@ -165,13 +164,12 @@ std::shared_ptr<arrow::Table> ArrowHelpers::concatTables(std::vector<std::shared
columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks));
}

auto result = arrow::Table::Make(std::make_shared<arrow::Schema>(resultFields), columns);
return result;
return arrow::Table::Make(std::make_shared<arrow::Schema>(resultFields), columns);
}

arrow::ChunkedArray* getIndexFromLabel(arrow::Table* table, std::string_view label)
{
auto field = std::find_if(table->schema()->fields().begin(), table->schema()->fields().end(), [&](std::shared_ptr<arrow::Field> const& f) {
auto field = std::ranges::find_if(table->schema()->fields(), [&](std::shared_ptr<arrow::Field> const& f) {
auto caseInsensitiveCompare = [](const std::string_view& str1, const std::string& str2) {
return std::ranges::equal(
str1, str2,
Expand Down
37 changes: 11 additions & 26 deletions Framework/Core/src/AnalysisHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,32 @@ void IndexBuilder::resetBuilders(std::vector<framework::IndexColumnBuilder>& bui
std::shared_ptr<arrow::Table> IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive)
{
auto size = tables[0]->num_rows();
if (builders.empty()) {
if (O2_BUILTIN_UNLIKELY(builders.empty())) {
builders = makeBuilders(std::move(tables), records);
} else {
resetBuilders(builders, std::move(tables));
}

std::vector<bool> finds;
finds.resize(builders.size());
for (int64_t counter = 0; counter < size; ++counter) {
int64_t idx = -1;
if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex == nullptr) {
idx = counter;
} else {
idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(counter);
}
for (auto i = 0U; i < builders.size(); ++i) {
finds[i] = builders[i].find(idx);
}
if (exclusive) {
if (std::none_of(finds.begin(), finds.end(), [](bool const x) { return x == false; })) {
builders[0].fill(counter);
for (auto i = 1U; i < builders.size(); ++i) {
builders[i].fill(idx);
}
}
} else {

bool found = true;
std::ranges::for_each(builders, [&idx, &found](auto& builder) { found &= builder.find(idx); });

if (!exclusive || found) {
builders[0].fill(counter);
for (auto i = 1U; i < builders.size(); ++i) {
builders[i].fill(idx);
}
std::ranges::for_each(builders.begin() + 1, builders.end(), [&idx](auto& builder) { builder.fill(idx); });
}
}

std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
arrays.reserve(builders.size());
for (auto& builder : builders) {
arrays.push_back(builder.result());
}
std::ranges::transform(builders, std::back_inserter(arrays), [](auto& builder) { return builder.result(); });

return arrow::Table::Make(schema, arrays);
}
Expand Down Expand Up @@ -142,9 +130,7 @@ std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const&
}

arrays.reserve(nColumns);
for (auto i = 0U; i < nColumns; ++i) {
arrays.push_back(std::make_shared<arrow::ChunkedArray>(chunks[i]));
}
std::ranges::transform(chunks, std::back_inserter(arrays), [](auto&& chunk) { return std::make_shared<arrow::ChunkedArray>(chunk); });

return arrow::Table::Make(newSchema, arrays);
}
Expand Down Expand Up @@ -188,9 +174,8 @@ std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs)
std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<ConcreteDataMatcher> const& matchers)
{
std::vector<std::shared_ptr<arrow::Table>> tables;
for (auto const& matcher : matchers) {
tables.emplace_back(pc.inputs().get<TableConsumer>(matcher)->asArrowTable());
}
tables.reserve(matchers.size());
std::ranges::transform(matchers, std::back_inserter(tables), [&pc](auto const& matcher) { return pc.inputs().get<TableConsumer>(matcher)->asArrowTable(); });
return tables;
}

Expand Down
Loading