#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include bool debug_ipc_is_root = false; FILE *debug_ipc_file = nullptr; int rank, cluster_size; std::map start_indices_map; // maps indices to MPI ranks std::vector start_indices_list; // maps ranks to start indices std::vector recv_counts; std::vector buffer; std::vector float_buffer; constexpr size_t INITIAL_BUFFER_SIZE = 5 * 1024 * 1024; // 5 MiB void print_backtrace(); void debug_ipc_init() { int mpi_initialized; MPI_Initialized(&mpi_initialized); if (mpi_initialized) { MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &cluster_size); } if (rank == 0) { const char *root_env = std::getenv("IPC_DEBUG_ROOT"); const char *file_env = std::getenv("IPC_DEBUG_FILE"); if (file_env == nullptr) { return; } if (root_env != nullptr && 0 == strcmp(root_env, "1")) { debug_ipc_is_root = true; printf("[IPCDBG] I am host\n"); } else { printf("[IPCDBG] I am client\n"); } if (debug_ipc_is_root) { debug_ipc_file = fopen(file_env, "r"); } else { debug_ipc_file = fopen(file_env, "w"); } if (debug_ipc_file == nullptr) { printf("[IPCDBG] Error, could not open named pipe: %s", strerror(errno)); exit(-1); } buffer.resize(INITIAL_BUFFER_SIZE); } } template void debug_ipc_assert_equal(T value) { if (debug_ipc_file == nullptr) { return; } const size_t expected_size = sizeof(T); if (debug_ipc_is_root) { T other_value; size_t read = fread(&other_value, expected_size, 1, debug_ipc_file); if (read != 1) { printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno)); exit(-1); } if (other_value != value) { std::cout << "[IPCDBG] Assertion failed!" << " Master has " << value << " but client has " << other_value; print_backtrace(); std::cout << "Entering endless loop, attach debugger to PID " << getpid(); fflush(stdout); while (1) { sleep(1); } } else { #ifdef TRACE std::cout << "[IPCDBG] Assertion passed, value = " << value << std::endl; #endif } } else { size_t written = fwrite(&value, expected_size, 1, debug_ipc_file); if (written != 1) { printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno)); exit(-1); } } } template void debug_ipc_assert_equal_vector(std::vector value) { if (debug_ipc_file == nullptr) { return; } debug_ipc_assert_equal(value.size()); const size_t array_byte_length = sizeof(T) * value.size(); if (debug_ipc_is_root) { buffer.resize(array_byte_length); size_t read = fread(buffer.data(), 1, array_byte_length, debug_ipc_file); if (read != array_byte_length) { printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno)); exit(-1); } assert(reinterpret_cast(buffer.data()) % 8 == 0); // Make sure the array is properly aligned T *local_array = value.data(); T *other_array = reinterpret_cast(buffer.data()); for (size_t i = 0; i < value.size(); ++i) { if (local_array[i] != other_array[i]) { std::cout << "[IPCDBG] Assertion failed in vector at index " << i << ". Root has " << local_array[i] << " but client has " << other_array[i] << std::endl; print_backtrace(); printf("Entering endless loop, attach debugger to PID %i \n", getpid()); fflush(stdout); while (1) { sleep(1); } } } } else { fwrite(value.data(), 1, array_byte_length, debug_ipc_file); } } // Explicit template instantiation template void debug_ipc_assert_equal(double); template void debug_ipc_assert_equal(float); template void debug_ipc_assert_equal(uint32_t); template void debug_ipc_assert_equal(int32_t); template void debug_ipc_assert_equal(uint64_t); template void debug_ipc_assert_equal(int64_t); template void debug_ipc_assert_equal(bool); template void debug_ipc_assert_equal(std::string); void debug_ipc_assert_equal_double(double value) { debug_ipc_assert_equal(value); } void debug_ipc_assert_equal_uint(uint32_t value) { debug_ipc_assert_equal(value); } void debug_ipc_assert_equal_int(int32_t value) { debug_ipc_assert_equal(value); } void debug_ipc_assert_equal_int64(int64_t value) { debug_ipc_assert_equal(value); } void debug_ipc_assert_equal_array(void *value, size_t size) { static_assert(sizeof(char) == 1); if (debug_ipc_file == nullptr) { return; } debug_ipc_assert_equal(size); // Make sure arrays are the same size char *array = reinterpret_cast(value); if (debug_ipc_is_root) { buffer.resize(size); char *other_array = buffer.data(); size_t read = fread(other_array, 1, size, debug_ipc_file); if (read != size) { printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno)); exit(-1); } for (size_t i = 0; i < size; i++) { if (array[i] != other_array[i]) { printf("[IPCDBG] Assertion failed in byte %lu!\n", i); print_backtrace(); printf("Entering endless loop, attach debugger to PID %i \n", getpid()); while (1) { sleep(1); } } } } else { size_t written = fwrite(value, 1, size, debug_ipc_file); if (written != size) { printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno)); exit(-1); } } } void debug_ipc_assert_source_location(const char *source_file, const long int line_number) { std::string fname = source_file; debug_ipc_assert_equal_array(fname.data(), fname.size()); debug_ipc_assert_equal(line_number); } #include void print_backtrace() { backward::StackTrace st; st.load_here(32); backward::Printer p; p.print(st); } void debug_ipc_mpi_set_data_distribution(int start_index, uint64_t local_length) { if (rank == 0) { start_indices_map.clear(); start_indices_list.resize(cluster_size + 1); recv_counts.resize(cluster_size); } unsigned long total_length = 0UL; unsigned long length = local_length; MPI_Reduce(&length, &total_length, 1, MPI_UNSIGNED_LONG, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Gather(&start_index, 1, MPI_INT, start_indices_list.data(), 1, MPI_INT, 0, MPI_COMM_WORLD); if (rank != 0) { // Only root needs to now the sendcounts and displacements return; } start_indices_map[total_length] = -1; // sentinel element start_indices_list[cluster_size] = total_length; for (int i = 0; i < cluster_size; i++) { start_indices_map[start_indices_list[i]] = i; } // Calculate recv_counts for (const auto& [start_index, m_rank] : start_indices_map) { // Exit loop on sentinel element if (m_rank == -1) { break; } const uint64_t next_index = start_indices_map.upper_bound(start_index)->first; const uint64_t segment_length = next_index - start_index; recv_counts[m_rank] = segment_length; printf("Recv counts [%i] = %lu\n", m_rank, segment_length); } uint64_t total_recvs = std::accumulate(recv_counts.begin(), recv_counts.end(), 0); printf("total recv = %lu, total length = %lu\n", total_recvs, total_length); assert(total_length == total_recvs); debug_ipc_assert_equal(total_length); } void debug_ipc_assert_equal_mpi_double_array(double *array, size_t array_length, int span) { if (rank == 0) { std::vector actual_recv_counts(recv_counts); std::vector displacements(recv_counts); size_t total_length = span * start_indices_list[cluster_size]; float_buffer.resize(total_length); // Make sure we have enough space for all values assert(actual_recv_counts.size() == static_cast(cluster_size)); assert(start_indices_map.size() == static_cast(cluster_size + 1)); for (unsigned int i = 0; i < recv_counts.size(); i++) { actual_recv_counts[i] *= span; } for (unsigned int i = 0; i < recv_counts.size(); i++) { displacements[i] *= span; } MPI_Gatherv(array, array_length * span, MPI_DOUBLE, float_buffer.data(), // recvbuf actual_recv_counts.data(), // recv counts start_indices_list.data(), // displacements MPI_DOUBLE, 0, MPI_COMM_WORLD); debug_ipc_assert_equal_vector(float_buffer); } else { // Gather all data at the root. MPI_Gatherv(array, array_length * span, MPI_DOUBLE, nullptr, // recvbuf nullptr, // recv counts nullptr, // displacements MPI_DOUBLE, 0, MPI_COMM_WORLD); } }