ソース 5.4.2 Romberg 積分法(FORTRAN77 の場合)

debug.h
/* デバッグ用のギミック */

#ifndef ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___
#define ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___

/* デバッグ用出力 */
#ifdef NDEBUG
#define TRACE    !
#define TNEXT    !
#else
#define TRACE    Write
#define TNEXT    *
#endif

#endif  /* #ifndef ___DEBUG_H__20021129_1712_FKWLEJFLEW__INCLUDED___ */
maindefs.h
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Implicit None
      Include 'mpif.h'
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     標準エラー出力
C     コンパイラによって値が違う可能性があります
C     また、標準エラー出力に対応していなければ
C     標準出力 6 にしておいて下さい
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Integer STDERR
        Parameter (STDERR = 0)
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     通信タグ
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Integer TAG_COMMAND, TAG_ROMBERG
        Parameter (
     *    TAG_COMMAND = 0,  ! 何を行うかの指令
     *    TAG_ROMBERG = 1   ! Romberg 積分法のデータを通信
     *  )
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     通信ランク
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Integer RANK_MASTER, RANK_CLIENT_HEAD
        Parameter (
     *    RANK_MASTER      = 0,  ! マスターのランク
     *    RANK_CLIENT_HEAD = 0   ! クライアントのランクの先頭 - 1
     *       ! (Fortran は基本的に 1 オリジンなので、こうしておきます)
     *  )
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     クライアントに送るコマンド
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Integer CMD_NULL, CMD_END, CMD_ROMBERG_FUNC
        Parameter (
     *    CMD_NULL         = 0,  ! 何もしません
     *    CMD_END          = 1,  ! クライアントプログラムを終了します
     *    CMD_ROMBERG_FUNC = 2   ! Func 関数で Romberg 積分法を行います
     *  )
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
main_in.h
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Include 'maindefs.h'
      Include 'romberg.h'
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     関数の型
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     今回使う被積分関数
      Double Precision Func
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
main.F
C デバッグ用のヘッダファイル
#include "debug.h"

C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C                                                                      |
C                            メインルーチン                            |
C                                                                      |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Program RombergTest
      Include 'main_in.h'

      Integer rank
      Integer err

        Call MPI_Init(err)
        Call MPI_Comm_rank(MPI_COMM_WORLD, rank, err)

        If(rank .eq. RANK_MASTER) then
          Call Master()
        Else
          Call Client()
        EndIf

        Call MPI_Finalize(err)

        Stop
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     マスターのメインルーチン
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Subroutine Master()
      Include 'main_in.h'
      External Func

        Integer          clients  ! クライアントの数
        Double Precision xN       ! 積分の終点
        Integer*8        divide   ! 初期分割数
        Double Precision result   ! 積分結果
        Double Precision before, after
        Integer          err

        Call MPI_Comm_size(MPI_COMM_WORLD, clients, err)
        clients = clients - 1

        Write(*,*) '積分範囲を入力して下さい'
        Read(*,*)  xN

        Write(*,*) '初期分割数を入力して下さい'
        Read(*,*)  divide

        before = MPI_Wtime()
        result = Romberg(0, xN, divide, clients, CMD_ROMBERG_FUNC, Func)
        after  = MPI_Wtime()
        Write(*,10) result, after - before
   10     Format('結果: ', D21.15, ' (', D21.15, ' sec)')

        Call WakeupClients(clients, CMD_END)

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     クライアントのメインルーチン
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Subroutine Client()
      Include 'main_in.h'
      External Func

        Integer command
        Integer err

C       コマンドを受け取ると、そのコマンドに応じた処理を始めます
   10   Continue
          Call MPI_Recv(command, 1, MPI_INTEGER, RANK_MASTER,
     *      TAG_COMMAND, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err)

C         CMD_NULL : 何もしません
          If(command .eq. CMD_NULL) Then
          Else
     *
C         CMD_END : 終了します
     *    If(command .eq. CMD_END) Then
            Return
          Else
     *
C         CMD_ROMBERG_FUNC : 被積分関数 Func の
C                            Romberg 積分で使う並列ルーチン
     *    If(command .eq. CMD_ROMBERG_FUNC) Then
            Call RombergClient(Func)
          EndIf
        Goto 10

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C                                                                      |
C                             サブルーチン                             |
C                                                                      |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     クライアントを起こします
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Subroutine WakeupClients(clients, command)
      Include 'main_in.h'

      Integer clients  ! クライアントの数
      Integer command  ! クライアントに送るコマンド

        Integer iClient
        Integer req
        Integer err

        Do iClient = 1, clients
          Call MPI_Isend(command, 1, MPI_INTEGER,
     *      RANK_CLIENT_HEAD + iClient, TAG_COMMAND,
     *      MPI_COMM_WORLD, req, err)
          Call MPI_Request_free(req, err)
        EndDo

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     今回使う被積分関数
C     f(x) = x^5 sin(x^4) * cos(x^3) * exp(-x^2) * log(x + 1)
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Function Func(x)
      Include 'main_in.h'
      Double Precision x

        Double Precision x2
        Double Precision x3
        Double Precision x4
        Double Precision x5

        x2 = x  * x
        x3 = x2 * x
        x4 = x2 * x2
        x5 = x2 * x3

        Func = x5 * sin(x4) * cos(x3) * exp(-x2) * log(x + 1.0D0)

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
romberg.h
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     関数の型
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     Romberg 積分法
      Double Precision Romberg
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
romberg_in.h
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Include 'maindefs.h'
      Include 'romberg.h'
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     Richardson 補外用のバッファのサイズ
      Integer RICHARDSON_BUFSIZE
        Parameter (RICHARDSON_BUFSIZE = 32)

C     許容誤差
      Double Precision EPS
        Parameter (EPS = 1.0D-15)
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     関数の型
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     関数の値の和をとります
      Double Precision Integrate
C     Richardson 補外
      Double Precision Richardson
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
romberg.F
C デバッグ用のヘッダファイル
#include "debug.h"

C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C                                                                      |
C                            Romberg 積分法                            |
C                                                                      |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C
C 台形公式は、次のように書き換えることができます。
C
C S(h) = h/2 sum[k=1,N] {f(x_{k-1}) + f(x_k)}
C      = h { (f(x_0) + f(x_N))/2 + sum[k=1,N-1] f(x_k) }
C
C 幅を半分にすると、
C
C S(h/2) = h/4 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
C        = h{ 1/2 S(h)/h + 1/2 sum[k=1,N] f(x_{2k-1}) }
C
C さらに半分にすると、
C
C S(h/4) = h/8 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
C        = h{ 1/2 S(h/2)/h + 1/4 sum[k=1,2N] f(x_{2k-1}) }
C
C これを続けると、
C
C S(h/(2^n)) = h{ 1/2 S(h/(2^n-1))/h + 1/(2^n) sum[k=1,nN] f(x_{2k-1}) }
C
C とすることができます。
C なので、今までの積分の結果を再利用することができることになります。
C 新しく計算しなくてはならないのは、
C 今までに計算に含めたことのなかった点の和のみです。
C
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Function Romberg(x0, xN, divide, clients, command, fpInt)
      Include 'romberg_in.h'

      Double Precision x0, xN   ! 始点と終点
      Integer*8        divide   ! 分割数
      Integer          clients  ! クライアントの数
      Integer          command  ! クライアントに投げるコマンド
      Double Precision fpInt    ! 被積分関数
        External       fpInt

        Double Precision mainSpan  ! 最大幅
        Double Precision subSpan   ! 現在の幅
        Double Precision richbuf(RICHARDSON_BUFSIZE)
                                   ! Richardson 補外用のバッファ
        Double Precision diff      ! 前回の計算とのずれ
        Double Precision prev      ! 前回の計算の値
        Double Precision now       ! 今回の計算の値
        Double Precision weight    ! 重み
        Integer*8        points    ! 和をとる点の数
        Integer          i
        Integer          err

        mainSpan = (xN - x0) / divide
        subSpan  = mainSpan

C       1回目の計算
        prev = (fpInt(x0) + fpInt(xN)) / 2.0D0
     *    + Integrate(x0 + mainSpan, mainSpan, divide - 1, clients,
     *                command, fpInt)
        richbuf(1) = prev

        points = divide
        weight = 2.0D0

        Do i = 2, RICHARDSON_BUFSIZE
C         台形公式で積分値を求めます
          richbuf(i) = richbuf(i - 1) / 2.0D0
     *      + Integrate(x0 + subSpan / 2, subSpan, points, clients,
     *                  command, fpInt) / weight

C         Richardson 補外を行います
          now = Richardson(richbuf, i)

C         前回との差が小さければ終了
          diff = Abs(now - prev)
          TRACE(STDERR, 10) i, points * 2, diff, EPS * Abs(now)
   10       Format(I3, I20, D19.10, D19.10)
          If(diff .lt. EPS * Abs(now)) Then
            Romberg = now * mainSpan
            Return
          EndIf

          prev    = now
          subSpan = subSpan / 2.0D0
          points  = points  * 2
          weight  = weight  * 2.0D0
        EndDo

C       補外失敗
        Write(STDERR, *) 'Error: ', diff
        Romberg = prev * mainSpan

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C                                                                      |
C                             サブルーチン                             |
C                                                                      |
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     関数の値の和をとります
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Function Integrate(xI, span, points, clients, command, fpInt)
      Include 'romberg_in.h'

      Double Precision xI       ! 始点
      Double Precision span     ! サンプリング幅
      Integer*8        points   ! サンプリング点の数
      Integer          clients  ! クライアントの数
      Integer          command  ! クライアントに送るコマンド
      Double Precision fpInt    ! 被積分関数

        Integer*8        eachPoints  ! クライアントあたりの点の数
        Double Precision eachxI      ! 各クライアントの始点
        Double Precision eachInt     ! 各クライアントで計算した和
        Integer          iClient
        Integer*8        iFirst
        Integer*8        i
        Integer          req
        Integer          err

        eachPoints = points / (clients + 1)

C       クライアントを起こします
        Call WakeupClients(clients, command)

C       クライアントにデータを投げます
        iFirst = 0
        Do iClient = 1, clients
          eachxI = xI + iFirst * span

C         構造体が使えないので、1つずつ送ります
C         通信の順番は決まっているので、別のタグを使う必要はありません
          Call MPI_Isend(eachxI, 1, MPI_DOUBLE_PRECISION,
     *                   RANK_CLIENT_HEAD + iClient, TAG_ROMBERG,
     *                   MPI_COMM_WORLD, req, err)
          Call MPI_Request_free(req, err)

          Call MPI_Isend(span, 1, MPI_DOUBLE_PRECISION,
     *                   RANK_CLIENT_HEAD + iClient, TAG_ROMBERG,
     *                   MPI_COMM_WORLD, req, err)
          Call MPI_Request_free(req, err)

C         Integer*8 用のデータ型は今回は作りません
C         このコードは異なるバイトオーダのマシン間の通信には使えません
C         データ型を作るにはバイトオーダを判定して
C         ごちゃごちゃやる必要があるのです
          Call MPI_Isend(eachPoints, 2, MPI_INTEGER,
     *                   RANK_CLIENT_HEAD + iClient, TAG_ROMBERG,
     *                   MPI_COMM_WORLD, req, err)
          Call MPI_Request_free(req, err)

          iFirst = iFirst + eachPoints
        EndDo

C       マスターでも計算します
        Integrate = 0
        Do i = iFirst, points - 1
C         数値誤差を少なくするために、xI = xI + span のようにはしません
C         同様の理由で、i は整数にします
          Integrate = Integrate + fpInt(xI + i * span)
        EndDo

C       クライアントの結果と合わせます
        Do iClient = 1, clients
          Call MPI_Recv(eachInt, 1, MPI_DOUBLE_PRECISION,
     *                  RANK_CLIENT_HEAD + iClient, TAG_ROMBERG,
     *                  MPI_COMM_WORLD, MPI_STATUS_IGNORE, err)
          Integrate = Integrate + eachInt
        EndDo

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C     Romberg 積分法でのクライアントの作業
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Subroutine RombergClient(fpInt)
      Include 'romberg_in.h'

      Double Precision fpInt  ! 被積分関数

        Double Precision eachInt  ! 総和
        Double Precision xI       ! 始点
        Double Precision span     ! サンプリング幅
        Integer*8        points   ! サンプリング点の数
        Integer*8        i
        Integer          err

        Call MPI_Recv(xI, 1, MPI_DOUBLE_PRECISION, RANK_MASTER,
     *    TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err)
        Call MPI_Recv(span, 1, MPI_DOUBLE_PRECISION, RANK_MASTER,
     *    TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err)
        Call MPI_Recv(points, 2, MPI_INTEGER, RANK_MASTER,
     *    TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE, err)

        eachInt = 0
        Do i = 0, points - 1
          eachInt = eachInt + fpInt(xI + i * span)
        EndDo

        Call MPI_Send(eachInt, 1, MPI_DOUBLE_PRECISION, RANK_MASTER,
     *    TAG_ROMBERG, MPI_COMM_WORLD, err)

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
C
C Richardson 補外
C
C 台形公式で幅を半分ずつにしていった場合の Richardson 補外の式は
C   T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1}
C となります。
C k 番目の台形公式の結果は T_{k1} で、
C k 番目の補外値は T_{kk} で得られます。
C
C 次のような処理によって、効率良い計算が行えます。
C
C ・バッファの推移
C
C +----+----+----+----+----+
C |    |  1 |  2 |  3 |  4 |
C +----+====+----+----+----+
C |  1 |T_11|    |    |    |
C +----+====+====+    |    |
C | (2)|T_11|T_21|    |    |
C +----+----+====+    |    |
C |    | | /   |    |    |
C +----+====+----+    |    |
C |  2 |T_22|T_21|    |    |
C +----+====+----+====+    |
C | (3)|T_22|T_21|T_31|    |
C +----+----+----+====+    |
C |    | | /| /   |    |
C +----+====+----+----+    |
C |  3 |T_33|T_32|T_31|    |
C +----+====+----+----+====+
C | (4)|T_33|T_32|T_31|T_41|
C +----+----+----+----+====+
C |    | | /| /| /   |
C +----+====+----+----+----+
C |  4 |T_44|T_43|T_42|T_41|
C +----+====+----+----+----+
C
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+
      Function Richardson(richbuf, depth)
      Include 'romberg_in.h'

      Double Precision richbuf(*)  ! 作業バッファ
      Integer          depth       ! 補完次数

        Double Precision pow4j /4.0D0/
        Integer          i

        Do i = depth, 2, -1
          richbuf(i - 1) =
     *      (pow4j * richbuf(i) - richbuf(i - 1)) / (pow4j - 1.0D0)
          pow4j = pow4j * 4.0D0
        EndDo

        Richardson = richbuf(1)

        Return
      End
C-----+--1----+----2----+----3----+----4----+----5----+----6----+----7-+

Last update was done on 2002.11.14