标签:
转自:http://netsmell.com/post/how-sort-10-billion-data.html?ref=myread
海量数据处理/外部归并排序 - 分治.cppp
| // 对 2 亿个数字进行排序, 约 10 G 的文件, 每个数字 int 能表示 | ||
| 3 | // 算法流程 | |
| 4 | // 将 10 G 的文件散列到 300 个文件中, 每个文件大约 35 MB | |
| 5 | // 对 35 MB 的小文件内部排序, 或者分发到多台计算机中, 并行处理 MapReduce | |
| 6 | // 最后使用最小堆, 进行 300 路归并排序, 合成大文件 | |
| 7 | // 再写一个算法判断 2 亿个数字是否有序 | |
| 8 | ||
| 9 | #include <stdio.h> | |
| 10 | #include <stdlib.h> | |
| 11 | #include <time.h> | |
| 12 | #include <io.h> | |
| 13 | #include <queue> | |
| 14 | ||
| 15 | #define FILE_NUM 300 // 哈希文件数 | |
| 16 | #define HASH(a) (a % FILE_NUM) | |
| 17 | ||
| 18 | int num = 6000000; // 2 亿个数字, 手动改 | |
| 19 | char path[20] = "c:\\data.dat"; // 待排文件 | |
| 20 | char result[20] = "c:\\result.dat"; // 排序后文件 | |
| 21 | char tmpdir[100] = "c:\\hashfile"; // 临时目录 | |
| 22 | ||
| 23 | // 随机生成 2 亿个数字 | |
| 24 | int write_file(void) | |
| 25 | { | |
| 26 | FILE *out = NULL; | |
| 27 | int i; | |
| 28 | ||
| 29 | printf("\n正在生成 %d 个数字...\n\n", num); | |
| 30 | out = fopen(path, "wt"); | |
| 31 | if (out == NULL) return 0; | |
| 32 | ||
| 33 | unsigned int s, e; | |
| 34 | e = s = clock(); | |
| 35 | for (i=0; i<num; i++) | |
| 36 | { | |
| 37 | e = clock(); | |
| 38 | if (e - s > 1000) // 计算进度 | |
| 39 | { | |
| 40 | printf("\r处理进度 %0.2f %%\t", (i * 100.0) / num); | |
| 41 | s = e; | |
| 42 | } | |
| 43 | fprintf(out, "%d\n", | |
| 44 | (rand() % 31623) * (rand() % 31623)); | |
| 45 | } | |
| 46 | fclose(out); | |
| 47 | return 1; | |
| 48 | } | |
| 49 | ||
| 50 | // 对 2 亿个数字进行哈希, 分散到子文件中 | |
| 51 | // 入口参数: path, tmpdir | |
| 52 | int map(void) | |
| 53 | { | |
| 54 | FILE *in = NULL; | |
| 55 | FILE *tmp[FILE_NUM + 5]; | |
| 56 | char hashfile[512]; // 哈希文件地址 | |
| 57 | int data, add; | |
| 58 | int i; | |
| 59 | ||
| 60 | printf("\r正在哈希 %s\n\n", path); | |
| 61 | in = fopen(path, "rt"); | |
| 62 | if (in == NULL) return 0; | |
| 63 | for (i=0; i<FILE_NUM; i++) tmp[i] = NULL; | |
| 64 | ||
| 65 | // 开始哈希, 核心代码要尽可能的加速 | |
| 66 | unsigned int s, e; | |
| 67 | e = s = clock(); | |
| 68 | i = 0; | |
| 69 | while (fscanf(in, "%d", &data) != EOF) | |
| 70 | { | |
| 71 | add = HASH(data); | |
| 72 | if (tmp[add] == NULL) | |
| 73 | { | |
| 74 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, add); | |
| 75 | tmp[add] = fopen(hashfile, "a"); | |
| 76 | } | |
| 77 | fprintf(tmp[add], "%d\n", data); | |
| 78 | ||
| 79 | i++; | |
| 80 | e = clock(); // 计算进度 | |
| 81 | if (e - s > 1000) | |
| 82 | { | |
| 83 | printf("\r处理进度 %0.2f %%\t", (i * 100.0) / num); | |
| 84 | s = e; | |
| 85 | } | |
| 86 | } | |
| 87 | for (i=0; i<FILE_NUM; i++) | |
| 88 | if (tmp[i]) fclose(tmp[i]); | |
| 89 | fclose(in); | |
| 90 | ||
| 91 | return 1; | |
| 92 | } | |
| 93 | ||
| 94 | // 对 300 个文件逐个排序, 采用堆排序 STL 的优先队列 | |
| 95 | void calc(void) | |
| 96 | { | |
| 97 | int fileexist(char *path); // 判断文件存在 | |
| 98 | std::priority_queue<int> q; // 堆排序 | |
| 99 | char hashfile[512]; | |
| 100 | FILE *fp = NULL; | |
| 101 | int i, data; | |
| 102 | ||
| 103 | // 逐个处理 300 个文件, 或者将这些文件发送到其它计算机中并行处理 | |
| 104 | for (i=0; i<FILE_NUM; i++) | |
| 105 | { | |
| 106 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
| 107 | if (fileexist(hashfile)) | |
| 108 | { | |
| 109 | printf("\r正在排序 hash_%d.~tmp\t", i); | |
| 110 | ||
| 111 | // 小文件从磁盘加入内存中 | |
| 112 | fp = fopen(hashfile, "rt"); | |
| 113 | while (fscanf(fp, "%d", &data) != EOF) | |
| 114 | { | |
| 115 | q.push(data); | |
| 116 | // 优先队列默认是大顶堆, 即降序排序 | |
| 117 | // 要升序需要重载 () 运算符 | |
| 118 | } | |
| 119 | fclose(fp); | |
| 120 | ||
| 121 | // 排序后再从内存写回磁盘 | |
| 122 | fp = fopen(hashfile, "wt"); // 覆盖模式写 | |
| 123 | while (!q.empty()) | |
| 124 | { | |
| 125 | fprintf(fp, "%d\n", q.top()); | |
| 126 | q.pop(); | |
| 127 | } | |
| 128 | fclose(fp); | |
| 129 | } | |
| 130 | } | |
| 131 | } | |
| 132 | ||
| 133 | typedef struct node // 队列结点 | |
| 134 | { | |
| 135 | int data; | |
| 136 | int id; // 哈希文件的编号 | |
| 137 | bool operator < (const node &a) const | |
| 138 | { return data < a.data; } | |
| 139 | }node; | |
| 140 | ||
| 141 | // 将 300 个有序文件合并成一个文件, K 路归并排序 | |
| 142 | int reduce(void) | |
| 143 | { | |
| 144 | int fileexist(char *path); | |
| 145 | std::priority_queue<node> q; // 堆排序 | |
| 146 | FILE *file[FILE_NUM + 5]; | |
| 147 | FILE *out = NULL; | |
| 148 | char hashfile[512]; | |
| 149 | node tmp, p; | |
| 150 | int i, count = 0; | |
| 151 | ||
| 152 | printf("\r正在合并 %s\n\n", result); | |
| 153 | out = fopen(result, "wt"); | |
| 154 | if (out == NULL) return 0; | |
| 155 | for (i=0; i<FILE_NUM; i++) file[i] = NULL; | |
| 156 | for (i=0; i<FILE_NUM; i++) // 打开全部哈希文件 | |
| 157 | { | |
| 158 | sprintf(hashfile, "%s\\hash_%d.~tmp", tmpdir, i); | |
| 159 | if (fileexist(hashfile)) | |
| 160 | { | |
| 161 | file[i] = fopen(hashfile, "rt"); | |
| 162 | fscanf(file[i], "%d", &tmp.data); | |
| 163 | tmp.id = i; | |
| 164 | q.push(tmp); // 初始化队列 | |
| 165 | count++; // 计数器 | |
| 166 | printf("\r入队进度 %0.2f %%\t", (count * 100.0) / FILE_NUM); | |
| 167 | } | |
| 168 | } | |
| 169 | unsigned int s, e; | |
| 170 | e = s = clock(); | |
| 171 | while (!q.empty()) // 开始 K 路归并 | |
| 172 | { | |
| 173 | tmp = q.top(); | |
| 174 | q.pop(); | |
| 175 | // 将堆顶的元素写回磁盘, 再从磁盘中拿一个到内存 | |
| 176 | fprintf(out, "%d\n", tmp.data); | |
| 177 | if (fscanf(file[tmp.id], "%d", &p.data) != EOF) | |
| 178 | { | |
| 179 | p.id = tmp.id; | |
| 180 | q.push(p); | |
| 181 | count++; | |
| 182 | } | |
| 183 | ||
| 184 | e = clock(); // 计算进度 | |
| 185 | if (e - s > 1000) | |
| 186 | { | |
| 187 | printf("\r处理进度 %0.2f %%\t", (count * 100.0) / num); | |
| 188 | s = e; | |
| 189 | } | |
| 190 | } | |
| 191 | for (i=0; i<FILE_NUM; i++) | |
| 192 | if (file[i]) fclose(file[i]); | |
| 193 | fclose(out); | |
| 194 | ||
| 195 | return 1; | |
| 196 | } | |
| 197 | ||
| 198 | int check(void) // 检查是否降序排序 | |
| 199 | { | |
| 200 | FILE *in = NULL; | |
| 201 | int max = 0x7FFFFFFF; | |
| 202 | int data; | |
| 203 | int count = 0; | |
| 204 | ||
| 205 | printf("\r正在检查文件正确性...\n\n"); | |
| 206 | in = fopen(result, "rt"); | |
| 207 | if (in == NULL) return 0; | |
| 208 | ||
| 209 | unsigned int s, e; | |
| 210 | e = s = clock(); | |
| 211 | while (fscanf(in, "%d", &data) != EOF) | |
| 212 | { | |
| 213 | if (data <= max) max = data; | |
| 214 | else | |
| 215 | { | |
| 216 | fclose(in); | |
| 217 | return 0; | |
| 218 | } | |
| 219 | count++; | |
| 220 | e = clock(); // 计算进度 | |
| 221 | if (e - s > 1000) | |
| 222 | { | |
| 223 | printf("\r处理进度 %0.2f %%\t", (count * 100.0) / num); | |
| 224 | s = e; | |
| 225 | } | |
| 226 | } | |
| 227 | fclose(in); | |
| 228 | return 1; | |
| 229 | } | |
| 230 | ||
| 231 | // 判断文件存在 | |
| 232 | int fileexist(char *path) | |
| 233 | { | |
| 234 | FILE *fp = NULL; | |
| 235 | ||
| 236 | fp = fopen(path, "rt"); | |
| 237 | if (fp) | |
| 238 | { | |
| 239 | fclose(fp); | |
| 240 | return 1; | |
| 241 | } | |
| 242 | else return 0; | |
| 243 | } | |
| 244 | ||
| 245 | int main(void) | |
| 246 | { | |
| 247 | char cmd_del[200]; // 删除目录 | |
| 248 | char cmd_att[200]; // 设置隐藏 | |
| 249 | char cmd_mkdir[200]; // 建立目录 | |
| 250 | ||
| 251 | // 初始化 cmd 命令, 建立工作目录 | |
| 252 | sprintf(cmd_del, "rmdir /s /q %s", tmpdir); | |
| 253 | sprintf(cmd_att, "attrib +h %s", tmpdir); | |
| 254 | sprintf(cmd_mkdir, "mkdir %s", tmpdir); | |
| 255 | if (access(path, 0) == 0) system(cmd_del); | |
| 256 | system(cmd_mkdir); // 建立工作目录 | |
| 257 | system(cmd_att); // 隐藏目录 | |
| 258 | ||
| 259 | // 随机生成 2 亿个数字 | |
| 260 | if (!write_file()) return 0; | |
| 261 | ||
| 262 | map(); // 对 2 亿个数字进行哈希, 即 Map | |
| 263 | calc(); // 对 300 个文件逐个排序 | |
| 264 | reduce(); // 最后将 300 个有序文件合并成一个文件, 即 reduce | |
| 265 | if (check()) printf("\r排序正确!\t\t\t\n\n"); | |
| 266 | else printf("\r排序错误!\t\t\t\n\n"); | |
| 267 | ||
| 268 | system(cmd_del); // 删除哈希文件 | |
| 269 | remove(path); // 删除 2 亿数字文件 | |
| 270 | remove(result); // 删除排序后的文件 | |
| 271 | ||
| 272 | return 0; | |
| 273 | } |
标签:
原文地址:http://www.cnblogs.com/zhangxuan/p/5948291.html