ソース 5.4.2 Romberg 積分法(FORTRAN77 の場合)
debug.h |
---|
/* デバッグ用のギミック */ #ifndef ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___ #define ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___ /* デバッグ用出力 */ #ifdef NDEBUG #define TRACE ! #define TNEXT ! #else #define TRACE Write #define TNEXT * #endif #endif /* #ifndef ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___ */ |
maindefs.h |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Implicit None Include 'mpif.h' C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 標準エラー出力 C コンパイラによって値が違う可能性があります C また、標準エラー出力に対応していなければ C 標準出力 6 にしておいて下さい C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Integer STDERR Parameter (STDERR = 0) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 通信タグ C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Integer TAG_COMMAND, TAG_ROMBERG Parameter ( * TAG_COMMAND = 0, ! 何を行うかの指令 * TAG_ROMBERG = 1 ! Romberg 積分法のデータを通信 * ) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 通信ランク C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Integer RANK_MASTER, RANK_CLIENT_HEAD Parameter ( * RANK_MASTER = 0, ! マスターのランク * RANK_CLIENT_HEAD = 0 ! クライアントのランクの先頭 - 1 * ! (Fortran は基本的に 1 オリジンなので、こうしておきます) * ) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C クライアントに送るコマンド C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Integer CMD_NULL, CMD_END, CMD_ROMBERG_FUNC Parameter ( * CMD_NULL = 0, ! 何もしません * CMD_END = 1, ! クライアントプログラムを終了します * CMD_ROMBERG_FUNC = 2 ! Func 関数で Romberg 積分法を行います * ) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
main_in.h |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Include 'maindefs.h' Include 'romberg.h' C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 関数の型 C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 今回使う被積分関数 Double Precision Func C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
main.F |
C デバッグ用のヘッダファイル #include "debug.h" C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C | C メインルーチン | C | C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Program RombergTest Include 'main_in.h' Integer rank Integer err Call MPI_Init(err) Call MPI_Comm_rank(MPI_COMM_WORLD, rank, err) If(rank .eq. RANK_MASTER) then Call Master() Else Call Client() EndIf Call MPI_Finalize(err) Stop End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C マスターのメインルーチン C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Subroutine Master() Include 'main_in.h' External Func Integer clients ! クライアントの数 Double Precision xN ! 積分の終点 Integer*8 divide ! 初期分割数 Double Precision result ! 積分結果 Double Precision before, after Integer err Call MPI_Comm_size(MPI_COMM_WORLD, clients, err) clients = clients - 1 Write(*,*) '積分範囲を入力して下さい' Read(*,*) xN Write(*,*) '初期分割数を入力して下さい' Read(*,*) divide before = MPI_Wtime() result = Romberg(0, xN, divide, clients, CMD_ROMBERG_FUNC, Func) after = MPI_Wtime() Write(*,10) result, after - before 10 Format('結果: ', D21.15, ' (', D21.15, ' sec)') Call WakeupClients(clients, CMD_END) Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C クライアントのメインルーチン C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Subroutine Client() Include 'main_in.h' External Func Integer command Integer err C コマンドを受け取ると、そのコマンドに応じた処理を始めます 10 Continue Call MPI_Recv(command, 1, MPI_INTEGER, RANK_MASTER, * TAG_COMMAND, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err) C CMD_NULL : 何もしません If(command .eq. CMD_NULL) Then Else * C CMD_END : 終了します * If(command .eq. CMD_END) Then Return Else * C CMD_ROMBERG_FUNC : 被積分関数 Func の C Romberg 積分で使う並列ルーチン * If(command .eq. CMD_ROMBERG_FUNC) Then Call RombergClient(Func) EndIf Goto 10 Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C | C サブルーチン | C | C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C クライアントを起こします C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Subroutine WakeupClients(clients, command) Include 'main_in.h' Integer clients ! クライアントの数 Integer command ! クライアントに送るコマンド Integer iClient Integer req Integer err Do iClient = 1, clients Call MPI_Isend(command, 1, MPI_INTEGER, * RANK_CLIENT_HEAD + iClient, TAG_COMMAND, * MPI_COMM_WORLD, req, err) Call MPI_Request_free(req, err) EndDo Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 今回使う被積分関数 C f(x) = x^5 sin(x^4) * cos(x^3) * exp(-x^2) * log(x + 1) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Function Func(x) Include 'main_in.h' Double Precision x Double Precision x2 Double Precision x3 Double Precision x4 Double Precision x5 x2 = x * x x3 = x2 * x x4 = x2 * x2 x5 = x2 * x3 Func = x5 * sin(x4) * cos(x3) * exp(-x2) * log(x + 1.0D0) Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
romberg.h |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 関数の型 C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C Romberg 積分法 Double Precision Romberg C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
romberg_in.h |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Include 'maindefs.h' Include 'romberg.h' C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C Richardson 補外用のバッファのサイズ Integer RICHARDSON_BUFSIZE Parameter (RICHARDSON_BUFSIZE = 32) C 許容誤差 Double Precision EPS Parameter (EPS = 1.0D-15) C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 関数の型 C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 関数の値の和をとります Double Precision Integrate C Richardson 補外 Double Precision Richardson C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
romberg.F |
C デバッグ用のヘッダファイル #include "debug.h" C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C | C Romberg 積分法 | C | C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C C 台形公式は、次のように書き換えることができます。 C C S(h) = h/2 sum[k=1,N] {f(x_{k-1}) + f(x_k)} C = h { (f(x_0) + f(x_N))/2 + sum[k=1,N-1] f(x_k) } C C 幅を半分にすると、 C C S(h/2) = h/4 sum[k=1,2N] {f(x_{k-1}) + f(x_k)} C = h{ 1/2 S(h)/h + 1/2 sum[k=1,N] f(x_{2k-1}) } C C さらに半分にすると、 C C S(h/4) = h/8 sum[k=1,2N] {f(x_{k-1}) + f(x_k)} C = h{ 1/2 S(h/2)/h + 1/4 sum[k=1,2N] f(x_{2k-1}) } C C これを続けると、 C C S(h/(2^n)) = h{ 1/2 S(h/(2^n-1))/h + 1/(2^n) sum[k=1,nN] f(x_{2k-1}) } C C とすることができます。 C なので、今までの積分の結果を再利用することができることになります。 C 新しく計算しなくてはならないのは、 C 今までに計算に含めたことのなかった点の和のみです。 C C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Function Romberg(x0, xN, divide, clients, command, fpInt) Include 'romberg_in.h' Double Precision x0, xN ! 始点と終点 Integer*8 divide ! 分割数 Integer clients ! クライアントの数 Integer command ! クライアントに投げるコマンド Double Precision fpInt ! 被積分関数 External fpInt Double Precision mainSpan ! 最大幅 Double Precision subSpan ! 現在の幅 Double Precision richbuf(RICHARDSON_BUFSIZE) ! Richardson 補外用のバッファ Double Precision diff ! 前回の計算とのずれ Double Precision prev ! 前回の計算の値 Double Precision now ! 今回の計算の値 Double Precision weight ! 重み Integer*8 points ! 和をとる点の数 Integer i Integer err mainSpan = (xN - x0) / divide subSpan = mainSpan C 1回目の計算 prev = (fpInt(x0) + fpInt(xN)) / 2.0D0 * + Integrate(x0 + mainSpan, mainSpan, divide - 1, clients, * command, fpInt) richbuf(1) = prev points = divide weight = 2.0D0 Do i = 2, RICHARDSON_BUFSIZE C 台形公式で積分値を求めます richbuf(i) = richbuf(i - 1) / 2.0D0 * + Integrate(x0 + subSpan / 2, subSpan, points, clients, * command, fpInt) / weight C Richardson 補外を行います now = Richardson(richbuf, i) C 前回との差が小さければ終了 diff = Abs(now - prev) TRACE(STDERR, 10) i, points * 2, diff, EPS * Abs(now) 10 Format(I3, I20, D19.10, D19.10) If(diff .lt. EPS * Abs(now)) Then Romberg = now * mainSpan Return EndIf prev = now subSpan = subSpan / 2.0D0 points = points * 2 weight = weight * 2.0D0 EndDo C 補外失敗 Write(STDERR, *) 'Error: ', diff Romberg = prev * mainSpan Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C | C サブルーチン | C | C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C 関数の値の和をとります C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Function Integrate(xI, span, points, clients, command, fpInt) Include 'romberg_in.h' Double Precision xI ! 始点 Double Precision span ! サンプリング幅 Integer*8 points ! サンプリング点の数 Integer clients ! クライアントの数 Integer command ! クライアントに送るコマンド Double Precision fpInt ! 被積分関数 Integer*8 eachPoints ! クライアントあたりの点の数 Double Precision eachxI ! 各クライアントの始点 Double Precision eachInt ! 各クライアントで計算した和 Integer iClient Integer*8 iFirst Integer*8 i Integer req Integer err eachPoints = points / (clients + 1) C クライアントを起こします Call WakeupClients(clients, command) C クライアントにデータを投げます iFirst = 0 Do iClient = 1, clients eachxI = xI + iFirst * span C 構造体が使えないので、1つずつ送ります C 通信の順番は決まっているので、別のタグを使う必要はありません Call MPI_Isend(eachxI, 1, MPI_DOUBLE_PRECISION, * RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, * MPI_COMM_WORLD, req, err) Call MPI_Request_free(req, err) Call MPI_Isend(span, 1, MPI_DOUBLE_PRECISION, * RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, * MPI_COMM_WORLD, req, err) Call MPI_Request_free(req, err) C Integer*8 用のデータ型は今回は作りません C このコードは異なるバイトオーダのマシン間の通信には使えません C データ型を作るにはバイトオーダを判定して C ごちゃごちゃやる必要があるのです Call MPI_Isend(eachPoints, 2, MPI_INTEGER, * RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, * MPI_COMM_WORLD, req, err) Call MPI_Request_free(req, err) iFirst = iFirst + eachPoints EndDo C マスターでも計算します Integrate = 0 Do i = iFirst, points - 1 C 数値誤差を少なくするために、xI = xI + span のようにはしません C 同様の理由で、i は整数にします Integrate = Integrate + fpInt(xI + i * span) EndDo C クライアントの結果と合わせます Do iClient = 1, clients Call MPI_Recv(eachInt, 1, MPI_DOUBLE_PRECISION, * RANK_CLIENT_HEAD + iClient, TAG_ROMBERG, * MPI_COMM_WORLD, MPI_STATUS_IGNORE, err) Integrate = Integrate + eachInt EndDo Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C Romberg 積分法でのクライアントの作業 C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Subroutine RombergClient(fpInt) Include 'romberg_in.h' Double Precision fpInt ! 被積分関数 Double Precision eachInt ! 総和 Double Precision xI ! 始点 Double Precision span ! サンプリング幅 Integer*8 points ! サンプリング点の数 Integer*8 i Integer err Call MPI_Recv(xI, 1, MPI_DOUBLE_PRECISION, RANK_MASTER, * TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err) Call MPI_Recv(span, 1, MPI_DOUBLE_PRECISION, RANK_MASTER, * TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err) Call MPI_Recv(points, 2, MPI_INTEGER, RANK_MASTER, * TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err) eachInt = 0 Do i = 0, points - 1 eachInt = eachInt + fpInt(xI + i * span) EndDo Call MPI_Send(eachInt, 1, MPI_DOUBLE_PRECISION, RANK_MASTER, * TAG_ROMBERG, MPI_COMM_WORLD, err) Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ C C Richardson 補外 C C 台形公式で幅を半分ずつにしていった場合の Richardson 補外の式は C T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1} C となります。 C k 番目の台形公式の結果は T_{k1} で、 C k 番目の補外値は T_{kk} で得られます。 C C 次のような処理によって、効率良い計算が行えます。 C C ・バッファの推移 C C +----+----+----+----+----+ C | | 1 | 2 | 3 | 4 | C +----+====+----+----+----+ C | 1 |T_11| | | | C +----+====+====+ | | C | (2)|T_11|T_21| | | C +----+----+====+ | | C | | | / | | | C +----+====+----+ | | C | 2 |T_22|T_21| | | C +----+====+----+====+ | C | (3)|T_22|T_21|T_31| | C +----+----+----+====+ | C | | | /| / | | C +----+====+----+----+ | C | 3 |T_33|T_32|T_31| | C +----+====+----+----+====+ C | (4)|T_33|T_32|T_31|T_41| C +----+----+----+----+====+ C | | | /| /| / | C +----+====+----+----+----+ C | 4 |T_44|T_43|T_42|T_41| C +----+====+----+----+----+ C C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ Function Richardson(richbuf, depth) Include 'romberg_in.h' Double Precision richbuf(*) ! 作業バッファ Integer depth ! 補完次数 Double Precision pow4j /4.0D0/ Integer i Do i = depth, 2, -1 richbuf(i - 1) = * (pow4j * richbuf(i) - richbuf(i - 1)) / (pow4j - 1.0D0) pow4j = pow4j * 4.0D0 EndDo Richardson = richbuf(1) Return End C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+ |
Last update was done on 2002.11.14