|
@@ -11,22 +11,21 @@
|
|
|
#include <vector>
|
|
|
#include <mpi.h>
|
|
|
#include <map>
|
|
|
-#include <iostream>
|
|
|
-#include <fstream>
|
|
|
#include <string>
|
|
|
+#include <fcntl.h>
|
|
|
|
|
|
#include <ipc_debug.h>
|
|
|
|
|
|
bool debug_ipc_disabled = true;
|
|
|
bool debug_ipc_is_root = false;
|
|
|
-FILE *debug_ipc_file = nullptr;
|
|
|
+int debug_ipc_file = -1;
|
|
|
|
|
|
int rank, cluster_size;
|
|
|
std::map<uint64_t, int> start_indices_map; // maps indices to MPI ranks
|
|
|
std::vector<int> start_indices_list; // maps ranks to start indices
|
|
|
std::vector<int> recv_counts;
|
|
|
|
|
|
-std::vector<char> buffer;
|
|
|
+std::vector<unsigned char> buffer;
|
|
|
std::vector<double> float_buffer;
|
|
|
|
|
|
constexpr size_t INITIAL_BUFFER_SIZE = 5 * 1024 * 1024; // 5 MiB
|
|
@@ -59,12 +58,12 @@ void debug_ipc_init() {
|
|
|
}
|
|
|
|
|
|
if (debug_ipc_is_root) {
|
|
|
- debug_ipc_file = fopen(file_env, "rb");
|
|
|
+ debug_ipc_file = open(file_env, O_RDONLY | O_CLOEXEC);
|
|
|
} else {
|
|
|
- debug_ipc_file = fopen(file_env, "wb");
|
|
|
+ debug_ipc_file = open(file_env, O_WRONLY | O_CLOEXEC);
|
|
|
}
|
|
|
|
|
|
- if (debug_ipc_file == nullptr) {
|
|
|
+ if (debug_ipc_file == -1) {
|
|
|
printf("[IPCDBG] Error, could not open named pipe: %s", strerror(errno));
|
|
|
exit(-1);
|
|
|
}
|
|
@@ -85,16 +84,16 @@ void endless_loop() {
|
|
|
|
|
|
template<typename T>
|
|
|
void debug_ipc_assert_equal(T value) {
|
|
|
- if (debug_ipc_file == nullptr || debug_ipc_disabled) {
|
|
|
+ if (debug_ipc_file == -1 || debug_ipc_disabled) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- const size_t expected_size = sizeof(T);
|
|
|
+ ssize_t expected_size = sizeof(T);
|
|
|
if (debug_ipc_is_root) {
|
|
|
T other_value;
|
|
|
- size_t read = fread(&other_value, expected_size, 1, debug_ipc_file);
|
|
|
+ ssize_t num_read = read(debug_ipc_file, &other_value, expected_size);
|
|
|
|
|
|
- if (read != 1) {
|
|
|
+ if (num_read != expected_size) {
|
|
|
printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno));
|
|
|
exit(-1);
|
|
|
}
|
|
@@ -114,18 +113,51 @@ void debug_ipc_assert_equal(T value) {
|
|
|
#endif
|
|
|
}
|
|
|
} else {
|
|
|
- size_t written = fwrite(&value, expected_size, 1, debug_ipc_file);
|
|
|
+ ssize_t written = write(debug_ipc_file, &value, expected_size);
|
|
|
|
|
|
- if (written != 1) {
|
|
|
+ if (written != expected_size) {
|
|
|
printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno));
|
|
|
endless_loop();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+void read_loop(int fd, unsigned char *data, size_t count) {
|
|
|
+ size_t num_read = 0;
|
|
|
+
|
|
|
+ do {
|
|
|
+ ssize_t ret = read(fd, &data[num_read], count - num_read);
|
|
|
+
|
|
|
+ if (ret == -1) {
|
|
|
+ printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno));
|
|
|
+ endless_loop();
|
|
|
+ } else if (ret == 0) {
|
|
|
+ printf("[IPCDBG] Could not read enough bytes, unexpected EOF.");
|
|
|
+ endless_loop();
|
|
|
+ }
|
|
|
+
|
|
|
+ num_read += ret;
|
|
|
+ } while (num_read < count);
|
|
|
+}
|
|
|
+
|
|
|
+void write_loop(int fd, unsigned char *data, size_t count) {
|
|
|
+ size_t num_written = 0;
|
|
|
+
|
|
|
+ do {
|
|
|
+ ssize_t ret = write(fd, &data[num_written], count - num_written);
|
|
|
+
|
|
|
+ if (ret == -1) {
|
|
|
+ printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno));
|
|
|
+ endless_loop();
|
|
|
+ }
|
|
|
+ num_written += ret;
|
|
|
+ } while (num_written < count);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
template<typename T>
|
|
|
void debug_ipc_assert_equal_vector(std::vector<T> value) {
|
|
|
- if (debug_ipc_file == nullptr || debug_ipc_disabled) {
|
|
|
+ if (debug_ipc_file == -1 || debug_ipc_disabled) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -134,12 +166,8 @@ void debug_ipc_assert_equal_vector(std::vector<T> value) {
|
|
|
|
|
|
if (debug_ipc_is_root) {
|
|
|
buffer.resize(array_byte_length);
|
|
|
- size_t read = fread(buffer.data(), 1, array_byte_length, debug_ipc_file);
|
|
|
|
|
|
- if (read != array_byte_length) {
|
|
|
- printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno));
|
|
|
- endless_loop();
|
|
|
- }
|
|
|
+ read_loop(debug_ipc_file, buffer.data(), array_byte_length);
|
|
|
|
|
|
assert(reinterpret_cast<uint64_t>(buffer.data()) % 8 == 0); // Make sure the array is properly aligned
|
|
|
T *local_array = value.data();
|
|
@@ -155,12 +183,7 @@ void debug_ipc_assert_equal_vector(std::vector<T> value) {
|
|
|
}
|
|
|
|
|
|
} else {
|
|
|
- size_t written = fwrite(value.data(), 1, array_byte_length, debug_ipc_file);
|
|
|
-
|
|
|
- if (written != array_byte_length) {
|
|
|
- printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno));
|
|
|
- endless_loop();
|
|
|
- }
|
|
|
+ write_loop(debug_ipc_file, reinterpret_cast<unsigned char *>(value.data()), array_byte_length);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -188,24 +211,19 @@ void debug_ipc_assert_equal_int64(int64_t value) {
|
|
|
}
|
|
|
|
|
|
void debug_ipc_assert_equal_array(void *value, size_t size) {
|
|
|
- static_assert(sizeof(char) == 1);
|
|
|
- if (debug_ipc_file == nullptr || debug_ipc_disabled) {
|
|
|
+ static_assert(sizeof(unsigned char) == 1);
|
|
|
+ if (debug_ipc_file == -1 || debug_ipc_disabled) {
|
|
|
return;
|
|
|
}
|
|
|
debug_ipc_assert_equal(size); // Make sure arrays are the same size
|
|
|
|
|
|
|
|
|
- char *array = reinterpret_cast<char *>(value);
|
|
|
+ unsigned char *array = reinterpret_cast<unsigned char *>(value);
|
|
|
if (debug_ipc_is_root) {
|
|
|
buffer.resize(size);
|
|
|
- char *other_array = buffer.data();
|
|
|
+ unsigned char *other_array = buffer.data();
|
|
|
|
|
|
- size_t read = fread(other_array, 1, size, debug_ipc_file);
|
|
|
-
|
|
|
- if (read != size) {
|
|
|
- printf("[IPCDBG] Could not read enough bytes. Error: %s\n", strerror(errno));
|
|
|
- endless_loop();
|
|
|
- }
|
|
|
+ read_loop(debug_ipc_file, other_array, size);
|
|
|
|
|
|
for (size_t i = 0; i < size; i++) {
|
|
|
if (array[i] != other_array[i]) {
|
|
@@ -215,12 +233,7 @@ void debug_ipc_assert_equal_array(void *value, size_t size) {
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- size_t written = fwrite(value, 1, size, debug_ipc_file);
|
|
|
-
|
|
|
- if (written != size) {
|
|
|
- printf("[IPCDBG] Could not write enough bytes. Error: %s\n", strerror(errno));
|
|
|
- endless_loop();
|
|
|
- }
|
|
|
+ write_loop(debug_ipc_file, reinterpret_cast<unsigned char *>(value), size);
|
|
|
}
|
|
|
}
|
|
|
|