ソース 5.4.1 Romberg 積分法(C の場合)

maindefs.h
#ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___
#define ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___

#include <mpi.h>

#define NUMOF(array)      (sizeof (array) / sizeof *(array))
#define NUMOFT(type)      NUMOF(*(type*)0)
#define STRLEN(literal)   (NUMOF(literal) - 1)

/* デバッグ用出力 */
#ifdef NDEBUG
#define TRACE(params)
#else
#define TRACE(params)    fprintf params
#endif

/* 各環境に合わせて定義してください
 * 符号なし 64 ビット(8 バイト)変数です */
typedef unsigned long long UINT64;
#define FORMAT_UINT64 "%Lu"

enum TAG_KIND   /* 通信タグ */
{
  TAG_COMMAND,  /* 何を行うかの指令 */
  TAG_ROMBERG,  /* Romberg 積分法のデータを通信 */
};

enum RANK_KIND  /* 通信ランク */
{
  RANK_MASTER,       /* マスターのランク */
  RANK_CLIENT_HEAD,  /* クライアントのランクの先頭 */
};

enum COMMAND_TYPE  /* クライアントに送るコマンド */
{
  CMD_NULL,          /* 何もしません */
  CMD_END,           /* クライアントプログラムを終了します */
  CMD_ROMBERG_FUNC,  /* Func 関数で Romberg 積分法を行います */
};

/* クライアントを起こす */
void WakeupClients(
  int clients,  /* クライアントの数 */
  int command   /* クライアントに送るコマンド */
);

#endif  /* #ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___ */
main.c
#include <stdio.h>
#include <math.h>
#include "maindefs.h"
#include "romberg.h"

static void Master();
static void Client();

/* 今回使う被積分関数 */
static double Func(double x);

main(int argc, char** argv)
{
  int rank;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);

  if(rank == RANK_MASTER)
    { Master(); }
  else
    { Client(); }

  MPI_Finalize();
}

/* マスターのメインルーチン */
static void Master()
{
  int clients;    /* クライアントの数 */
  double xN;      /* 積分の終点 */
  UINT64 divide;  /* 初期分割数 */
  double result;  /* 積分結果 */
  double before, after;

  MPI_Comm_size(MPI_COMM_WORLD, &clients);
  --clients;

  printf("積分範囲を入力して下さい > ");
  fflush(stdout);
  scanf("%lf", &xN);

  printf("初期分割数を入力して下さい > ");
  fflush(stdout);
  scanf(FORMAT_UINT64, &divide);

  before = MPI_Wtime();
  result = Romberg(0, xN, divide, clients, CMD_ROMBERG_FUNC, Func);
  after  = MPI_Wtime();
  printf("結果: %.15le (%.15le sec)\n", result, after - before);

  WakeupClients(clients, CMD_END);
}

/* クライアントのメインルーチン */
static void Client()
{
  /* コマンドを受け取ると、そのコマンドに応じた処理を始めます */
  for(; ;)
  {
    int command;
    MPI_Recv(&command, 1, MPI_INT, RANK_MASTER, TAG_COMMAND,
             MPI_COMM_WORLD, MPI_STATUS_IGNORE);

    switch(command)
    {
    /* CMD_NULL : 何もしません */
    case CMD_NULL:
      break;

    /* CMD_END : 終了します */
    case CMD_END:
      return;

    /* CMD_ROMBERG_FUNC : 被積分関数 Func の
     *                    Romberg 積分で使う並列ルーチン */
    case CMD_ROMBERG_FUNC:
      RombergClient(Func);
      break;
    }
  }
}

/**********************************************************************
 *                                                                    *
 *                            サブルーチン                            *
 *                                                                    *
 **********************************************************************/

/* クライアントを起こす */
void WakeupClients(
  int clients,  /* クライアントの数 */
  int command   /* クライアントに送るコマンド */
){
  int iClient;
  for(iClient = 0; iClient < clients; ++iClient)
  {
    MPI_Request req;
    MPI_Isend(&command, 1, MPI_INT, RANK_CLIENT_HEAD + iClient,
              TAG_COMMAND, MPI_COMM_WORLD, &req);
    MPI_Request_free(&req);
  }
}

/* 今回使う被積分関数 */
/* f(x) = x^5 sin(x^4) * cos(x^3) * exp(-x^2) * log(x + 1) */
static double Func(double x)
{
  const double x2 = x  * x;
  const double x3 = x2 * x;
  const double x4 = x2 * x2;
  const double x5 = x2 * x3;

  return x5 * sin(x4) * cos(x3) * exp(-x2) * log(x + 1);
}
romberg.h
#ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___
#define ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___

#include "maindefs.h"

/* Romberg 積分法 */
double Romberg(
  double x0, double xN,    /* 始点と終点 */
  UINT64 divide,           /* 分割数 */
  int clients,             /* クライアントの数 */
  int command,             /* クライアントに投げるコマンド */
  double (*fpInt)(double)  /* 被積分関数 */
);

/* Romberg 積分法でのクライアントの作業 */
void RombergClient(
  double (*fpInt)(double)  /* 被積分関数 */
);

#endif  /* #ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ */
romberg.c
#include <stdio.h>
#include <math.h>
#include "maindefs.h"
#include "romberg.h"

/* Richardson 補外用のバッファのサイズ */
#define RICHARDSON_BUFSIZE   32

/* 和をとる際に使う情報を送る際に使う構造体 */
typedef struct ___dummy_SUM_T
{
  double xI;      /* 始点 */
  double span;    /* サンプリング幅 */
  UINT64 points;  /* サンプリング点の数 */
} SUM_T;

/* 関数の値の和をとります */
static double Integrate(
  double xI,               /* 始点 */
  double span,             /* サンプリング幅 */
  UINT64 points,           /* サンプリング点の数 */
  int clients,             /* クライアントの数 */
  int command,             /* クライアントに送るコマンド */
  double (*fpInt)(double)  /* 被積分関数 */
);

/* Richardson 補外
 * T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1} */
static double Richardson(
  double* richbuf,  /* 作業バッファ */
  int depth         /* 補完次数 */
);

/**********************************************************************
 *                                                                    *
 *                           Romberg 積分法                           *
 *                                                                    *
 **********************************************************************/

/*
 * 台形公式は、次のように書き換えることができます。
 *
 * S(h) = h/2 sum[k=1,N] {f(x_{k-1}) + f(x_k)}
 *      = h { (f(x_0) + f(x_N))/2 + sum[k=1,N-1] f(x_k) }
 *
 * 幅を半分にすると、
 *
 * S(h/2) = h/4 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
 *        = h{ 1/2 S(h)/h + 1/2 sum[k=1,N] f(x_{2k-1}) }
 *
 * さらに半分にすると、
 *
 * S(h/4) = h/8 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
 *        = h{ 1/2 S(h/2)/h + 1/4 sum[k=1,2N] f(x_{2k-1}) }
 *
 * これを続けると、
 *
 * S(h/(2^n)) = h{ 1/2 S(h/(2^n-1))/h + 1/(2^n) sum[k=1,nN] f(x_{2k-1}) }
 *
 * とすることができます。
 * なので、今までの積分の結果を再利用することができることになります。
 * 新しく計算しなくてはならないのは、
 * 今までに計算に含めたことのなかった点の和のみです。
 */
double Romberg(
  double x0, double xN,    /* 始点と終点 */
  UINT64 divide,           /* 分割数 */
  int clients,             /* クライアントの数 */
  int command,             /* クライアントに投げるコマンド */
  double (*fpInt)(double)  /* 被積分関数 */
){
  const double eps = 10e-15;              /* 許容誤差 */
  double mainSpan  = (xN - x0) / divide;  /* 最大幅 */
  double subSpan   = mainSpan;            /* 現在の幅 */

  double richbuf[RICHARDSON_BUFSIZE];  /* Richardson 補外用のバッファ */
  double diff;    /* 前回の計算とのずれ */
  double prev;    /* 前回の計算の値 */
  double weight;  /* 重み */
  UINT64 points;  /* 和をとる点の数 */
  int i;

  /* 1回目の計算 */
  prev = richbuf[0]
       = (fpInt(x0) + fpInt(xN)) / 2
         + Integrate(x0 + mainSpan, mainSpan, divide - 1,
                     clients, command, fpInt);

  points = divide;
  weight = 2;
  for(i = 1; i < RICHARDSON_BUFSIZE; ++i)
  {
    double now;

    /* 台形公式で積分値を求めます */
    richbuf[i] = richbuf[i - 1] / 2
      + Integrate(x0 + subSpan / 2, subSpan, points,
                  clients, command, fpInt) / weight;

    /* Richardson 補外を行います */
    now = Richardson(richbuf, i);

    /* 前回との差が小さければ終了 */
    diff = fabs(now - prev);
    TRACE((stderr, "%d "FORMAT_UINT64" %le %le\n",
      i, points * 2, diff, eps * fabs(now)));
    if(diff < eps * fabs(now))
      { return now * mainSpan; }

    prev     = now;
    subSpan /= 2;
    points  *= 2;
    weight  *= 2;
  }

  /* 補外失敗 */
  fprintf(stderr, "Error: %lf\n", diff);
  return prev * mainSpan;
}

/**********************************************************************
 *                                                                    *
 *                            サブルーチン                            *
 *                                                                    *
 **********************************************************************/

/* 関数の値の和をとります */
static double Integrate(
  double xI,               /* 始点 */
  double span,             /* サンプリング幅 */
  UINT64 points,           /* サンプリング点の数 */
  int clients,             /* クライアントの数 */
  int command,             /* クライアントに送るコマンド */
  double (*fpInt)(double)  /* 被積分関数 */
){
  double integrate;
  int eachPoints = points / (clients + 1);
  SUM_T data;
  int iClient;
  UINT64 iFirst;
  UINT64 i;

  /* クライアントを起こします */
  WakeupClients(clients, command);

  /* クライアントに投げます */
  iFirst = 0;
  data.span   = span;
  data.points = eachPoints;
  for(iClient = 0; iClient < clients; ++iClient)
  {
    MPI_Request req;

    data.xI = xI + iFirst * span;

    /* 全部同じ環境と仮定して、MPI_BYTE で送ります
     * データ型を作るのはここでは行いません */
    MPI_Isend(&data, sizeof data, MPI_BYTE, RANK_CLIENT_HEAD + iClient,
              TAG_ROMBERG, MPI_COMM_WORLD, &req);
    MPI_Request_free(&req);

    iFirst += eachPoints;
  }

  /* マスターでも計算します */
  integrate = 0;
  for(i = iFirst; i < points; ++i)
  {
    /* 数値誤差を少なくするために、xI += span; のようにはしません */
    /* 同様の理由で、i は整数にします */
    const double x = xI + i * span;
    integrate += fpInt(x);
  }

  /* クライアントの結果と合わせます */
  for(iClient = 0; iClient < clients; ++iClient)
  {
    double eachInt;
    MPI_Recv(&eachInt, 1, MPI_DOUBLE, RANK_CLIENT_HEAD + iClient,
             TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    integrate += eachInt;
  }
  return integrate;
}

/* Romberg 積分法でのクライアントの作業 */
void RombergClient(
  double (*fpInt)(double)  /* 被積分関数 */
){
  double integrate = 0;
  SUM_T data;  /* 通信用構造体 */
  UINT64 i;

  MPI_Recv(&data, sizeof data, MPI_BYTE, RANK_MASTER,
           TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

  for(i = 0; i < data.points; ++i)
  {
    const double x = data.xI + i * data.span;
    integrate += fpInt(x);
  }

  MPI_Send(&integrate, 1, MPI_DOUBLE, RANK_MASTER,
           TAG_ROMBERG, MPI_COMM_WORLD);
}

/*
 * Richardson 補外
 *
 * 台形公式で幅を半分ずつにしていった場合の Richardson 補外の式は
 *   T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1}
 * となります。
 * k 番目の台形公式の結果は T_{k1} で、
 * k 番目の補外値は T_{kk} で得られます。
 *
 * 一見再帰を使って求めればいいように見えますが、
 * 無駄なバッファを使わなくても再帰をせず、しかも軽い処理ができます。
 *
 * ・バッファの推移
 *
 * +----+----+----+----+----+
 * |    |  0 |  1 |  2 |  3 |
 * +----+====+----+----+----+
 * |  0 |T_00|    |    |    |
 * +----+====+====+    |    |
 * | (1)|T_00|T_10|    |    |
 * +----+----+====+    |    |
 * |    | | /   |    |    |
 * +----+====+----+    |    |
 * |  1 |T_11|T_10|    |    |
 * +----+====+----+====+    |
 * | (2)|T_11|T_10|T_20|    |
 * +----+----+----+====+    |
 * |    | | /| /   |    |
 * +----+====+----+----+    |
 * |  2 |T_22|T_21|T_20|    |
 * +----+====+----+----+====+
 * | (3)|T_11|T_10|T_20|T_30|
 * +----+----+----+----+====+
 * |    | | /| /| /   |
 * +----+====+----+----+----+
 * |  3 |T_33|T_32|T_31|T_30|
 * +----+====+----+----+----+
 *
 * 何でも再帰でやればいいってもんでもないのです。
 */
static double Richardson(
  double* richbuf,  /* 作業バッファ */
  int depth         /* 補完次数 */
){
  double pow4j = 4;
  int i;

  for(i = depth; i > 0; --i)
  {
    richbuf[i - 1] =
      (pow4j * richbuf[i] - richbuf[i - 1]) / (pow4j - 1);
    pow4j *= 4;
  }
  return richbuf[0];
}

Last update was done on 2002.11.14