ソース 5.4.1 Romberg 積分法(C の場合)
maindefs.h |
---|
#ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___ #define ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___ #include <mpi.h> #define NUMOF(array) (sizeof (array) / sizeof *(array)) #define NUMOFT(type) NUMOF(*(type*)0) #define STRLEN(literal) (NUMOF(literal) - 1) /* デバッグ用出力 */ #ifdef NDEBUG #define TRACE(params) #else #define TRACE(params) fprintf params #endif /* 各環境に合わせて定義してください * 符号なし 64 ビット(8 バイト)変数です */ typedef unsigned long long UINT64; #define FORMAT_UINT64 "%Lu" enum TAG_KIND /* 通信タグ */ { TAG_COMMAND, /* 何を行うかの指令 */ TAG_ROMBERG, /* Romberg 積分法のデータを通信 */ }; enum RANK_KIND /* 通信ランク */ { RANK_MASTER, /* マスターのランク */ RANK_CLIENT_HEAD, /* クライアントのランクの先頭 */ }; enum COMMAND_TYPE /* クライアントに送るコマンド */ { CMD_NULL, /* 何もしません */ CMD_END, /* クライアントプログラムを終了します */ CMD_ROMBERG_FUNC, /* Func 関数で Romberg 積分法を行います */ }; /* クライアントを起こす */ void WakeupClients( int clients, /* クライアントの数 */ int command /* クライアントに送るコマンド */ ); #endif /* #ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___ */ |
main.c |
#include <stdio.h> #include <math.h> #include "maindefs.h" #include "romberg.h" static void Master(); static void Client(); /* 今回使う被積分関数 */ static double Func(double x); main(int argc, char** argv) { int rank; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if(rank == RANK_MASTER) { Master(); } else { Client(); } MPI_Finalize(); } /* マスターのメインルーチン */ static void Master() { int clients; /* クライアントの数 */ double xN; /* 積分の終点 */ UINT64 divide; /* 初期分割数 */ double result; /* 積分結果 */ double before, after; MPI_Comm_size(MPI_COMM_WORLD, &clients); --clients; printf("積分範囲を入力して下さい > "); fflush(stdout); scanf("%lf", &xN); printf("初期分割数を入力して下さい > "); fflush(stdout); scanf(FORMAT_UINT64, ÷); before = MPI_Wtime(); result = Romberg(0, xN, divide, clients, CMD_ROMBERG_FUNC, Func); after = MPI_Wtime(); printf("結果: %.15le (%.15le sec)\n", result, after - before); WakeupClients(clients, CMD_END); } /* クライアントのメインルーチン */ static void Client() { /* コマンドを受け取ると、そのコマンドに応じた処理を始めます */ for(; ;) { int command; MPI_Recv(&command, 1, MPI_INT, RANK_MASTER, TAG_COMMAND, MPI_COMM_WORLD, MPI_STATUS_IGNORE); switch(command) { /* CMD_NULL : 何もしません */ case CMD_NULL: break; /* CMD_END : 終了します */ case CMD_END: return; /* CMD_ROMBERG_FUNC : 被積分関数 Func の * Romberg 積分で使う並列ルーチン */ case CMD_ROMBERG_FUNC: RombergClient(Func); break; } } } /********************************************************************** * * * サブルーチン * * * **********************************************************************/ /* クライアントを起こす */ void WakeupClients( int clients, /* クライアントの数 */ int command /* クライアントに送るコマンド */ ){ int iClient; for(iClient = 0; iClient < clients; ++iClient) { MPI_Request req; MPI_Isend(&command, 1, MPI_INT, RANK_CLIENT_HEAD + iClient, TAG_COMMAND, MPI_COMM_WORLD, &req); MPI_Request_free(&req); } } /* 今回使う被積分関数 */ /* f(x) = x^5 sin(x^4) * cos(x^3) * exp(-x^2) * log(x + 1) */ static double Func(double x) { const double x2 = x * x; const double x3 = x2 * x; const double x4 = x2 * x2; const double x5 = x2 * x3; return x5 * sin(x4) * cos(x3) * exp(-x2) * log(x + 1); } |
romberg.h |
#ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ #define ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ #include "maindefs.h" /* Romberg 積分法 */ double Romberg( double x0, double xN, /* 始点と終点 */ UINT64 divide, /* 分割数 */ int clients, /* クライアントの数 */ int command, /* クライアントに投げるコマンド */ double (*fpInt)(double) /* 被積分関数 */ ); /* Romberg 積分法でのクライアントの作業 */ void RombergClient( double (*fpInt)(double) /* 被積分関数 */ ); #endif /* #ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ */ |
romberg.c |
#include <stdio.h> #include <math.h> #include "maindefs.h" #include "romberg.h" /* Richardson 補外用のバッファのサイズ */ #define RICHARDSON_BUFSIZE 32 /* 和をとる際に使う情報を送る際に使う構造体 */ typedef struct ___dummy_SUM_T { double xI; /* 始点 */ double span; /* サンプリング幅 */ UINT64 points; /* サンプリング点の数 */ } SUM_T; /* 関数の値の和をとります */ static double Integrate( double xI, /* 始点 */ double span, /* サンプリング幅 */ UINT64 points, /* サンプリング点の数 */ int clients, /* クライアントの数 */ int command, /* クライアントに送るコマンド */ double (*fpInt)(double) /* 被積分関数 */ ); /* Richardson 補外 * T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1} */ static double Richardson( double* richbuf, /* 作業バッファ */ int depth /* 補完次数 */ ); /********************************************************************** * * * Romberg 積分法 * * * **********************************************************************/ /* * 台形公式は、次のように書き換えることができます。 * * S(h) = h/2 sum[k=1,N] {f(x_{k-1}) + f(x_k)} * = h { (f(x_0) + f(x_N))/2 + sum[k=1,N-1] f(x_k) } * * 幅を半分にすると、 * * S(h/2) = h/4 sum[k=1,2N] {f(x_{k-1}) + f(x_k)} * = h{ 1/2 S(h)/h + 1/2 sum[k=1,N] f(x_{2k-1}) } * * さらに半分にすると、 * * S(h/4) = h/8 sum[k=1,2N] {f(x_{k-1}) + f(x_k)} * = h{ 1/2 S(h/2)/h + 1/4 sum[k=1,2N] f(x_{2k-1}) } * * これを続けると、 * * S(h/(2^n)) = h{ 1/2 S(h/(2^n-1))/h + 1/(2^n) sum[k=1,nN] f(x_{2k-1}) } * * とすることができます。 * なので、今までの積分の結果を再利用することができることになります。 * 新しく計算しなくてはならないのは、 * 今までに計算に含めたことのなかった点の和のみです。 */ double Romberg( double x0, double xN, /* 始点と終点 */ UINT64 divide, /* 分割数 */ int clients, /* クライアントの数 */ int command, /* クライアントに投げるコマンド */ double (*fpInt)(double) /* 被積分関数 */ ){ const double eps = 10e-15; /* 許容誤差 */ double mainSpan = (xN - x0) / divide; /* 最大幅 */ double subSpan = mainSpan; /* 現在の幅 */ double richbuf[RICHARDSON_BUFSIZE]; /* Richardson 補外用のバッファ */ double diff; /* 前回の計算とのずれ */ double prev; /* 前回の計算の値 */ double weight; /* 重み */ UINT64 points; /* 和をとる点の数 */ int i; /* 1回目の計算 */ prev = richbuf[0] = (fpInt(x0) + fpInt(xN)) / 2 + Integrate(x0 + mainSpan, mainSpan, divide - 1, clients, command, fpInt); points = divide; weight = 2; for(i = 1; i < RICHARDSON_BUFSIZE; ++i) { double now; /* 台形公式で積分値を求めます */ richbuf[i] = richbuf[i - 1] / 2 + Integrate(x0 + subSpan / 2, subSpan, points, clients, command, fpInt) / weight; /* Richardson 補外を行います */ now = Richardson(richbuf, i); /* 前回との差が小さければ終了 */ diff = fabs(now - prev); TRACE((stderr, "%d "FORMAT_UINT64" %le %le\n", i, points * 2, diff, eps * fabs(now))); if(diff < eps * fabs(now)) { return now * mainSpan; } prev = now; subSpan /= 2; points *= 2; weight *= 2; } /* 補外失敗 */ fprintf(stderr, "Error: %lf\n", diff); return prev * mainSpan; } /********************************************************************** * * * サブルーチン * * * **********************************************************************/ /* 関数の値の和をとります */ static double Integrate( double xI, /* 始点 */ double span, /* サンプリング幅 */ UINT64 points, /* サンプリング点の数 */ int clients, /* クライアントの数 */ int command, /* クライアントに送るコマンド */ double (*fpInt)(double) /* 被積分関数 */ ){ double integrate; int eachPoints = points / (clients + 1); SUM_T data; int iClient; UINT64 iFirst; UINT64 i; /* クライアントを起こします */ WakeupClients(clients, command); /* クライアントに投げます */ iFirst = 0; data.span = span; data.points = eachPoints; for(iClient = 0; iClient < clients; ++iClient) { MPI_Request req; data.xI = xI + iFirst * span; /* 全部同じ環境と仮定して、MPI_BYTE で送ります * データ型を作るのはここでは行いません */ MPI_Isend(&data, sizeof data, MPI_BYTE, RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, MPI_COMM_WORLD, &req); MPI_Request_free(&req); iFirst += eachPoints; } /* マスターでも計算します */ integrate = 0; for(i = iFirst; i < points; ++i) { /* 数値誤差を少なくするために、xI += span; のようにはしません */ /* 同様の理由で、i は整数にします */ const double x = xI + i * span; integrate += fpInt(x); } /* クライアントの結果と合わせます */ for(iClient = 0; iClient < clients; ++iClient) { double eachInt; MPI_Recv(&eachInt, 1, MPI_DOUBLE, RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); integrate += eachInt; } return integrate; } /* Romberg 積分法でのクライアントの作業 */ void RombergClient( double (*fpInt)(double) /* 被積分関数 */ ){ double integrate = 0; SUM_T data; /* 通信用構造体 */ UINT64 i; MPI_Recv(&data, sizeof data, MPI_BYTE, RANK_MASTER, TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); for(i = 0; i < data.points; ++i) { const double x = data.xI + i * data.span; integrate += fpInt(x); } MPI_Send(&integrate, 1, MPI_DOUBLE, RANK_MASTER, TAG_ROMBERG, MPI_COMM_WORLD); } /* * Richardson 補外 * * 台形公式で幅を半分ずつにしていった場合の Richardson 補外の式は * T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1} * となります。 * k 番目の台形公式の結果は T_{k1} で、 * k 番目の補外値は T_{kk} で得られます。 * * 一見再帰を使って求めればいいように見えますが、 * 無駄なバッファを使わなくても再帰をせず、しかも軽い処理ができます。 * * ・バッファの推移 * * +----+----+----+----+----+ * | | 0 | 1 | 2 | 3 | * +----+====+----+----+----+ * | 0 |T_00| | | | * +----+====+====+ | | * | (1)|T_00|T_10| | | * +----+----+====+ | | * | | | / | | | * +----+====+----+ | | * | 1 |T_11|T_10| | | * +----+====+----+====+ | * | (2)|T_11|T_10|T_20| | * +----+----+----+====+ | * | | | /| / | | * +----+====+----+----+ | * | 2 |T_22|T_21|T_20| | * +----+====+----+----+====+ * | (3)|T_11|T_10|T_20|T_30| * +----+----+----+----+====+ * | | | /| /| / | * +----+====+----+----+----+ * | 3 |T_33|T_32|T_31|T_30| * +----+====+----+----+----+ * * 何でも再帰でやればいいってもんでもないのです。 */ static double Richardson( double* richbuf, /* 作業バッファ */ int depth /* 補完次数 */ ){ double pow4j = 4; int i; for(i = depth; i > 0; --i) { richbuf[i - 1] = (pow4j * richbuf[i] - richbuf[i - 1]) / (pow4j - 1); pow4j *= 4; } return richbuf[0]; } |
Last update was done on 2002.11.14