ソース 5.4.1 Romberg 積分法(C の場合)
| maindefs.h |
|---|
#ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___
#define ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___
#include <mpi.h>
#define NUMOF(array) (sizeof (array) / sizeof *(array))
#define NUMOFT(type) NUMOF(*(type*)0)
#define STRLEN(literal) (NUMOF(literal) - 1)
/* デバッグ用出力 */
#ifdef NDEBUG
#define TRACE(params)
#else
#define TRACE(params) fprintf params
#endif
/* 各環境に合わせて定義してください
* 符号なし 64 ビット(8 バイト)変数です */
typedef unsigned long long UINT64;
#define FORMAT_UINT64 "%Lu"
enum TAG_KIND /* 通信タグ */
{
TAG_COMMAND, /* 何を行うかの指令 */
TAG_ROMBERG, /* Romberg 積分法のデータを通信 */
};
enum RANK_KIND /* 通信ランク */
{
RANK_MASTER, /* マスターのランク */
RANK_CLIENT_HEAD, /* クライアントのランクの先頭 */
};
enum COMMAND_TYPE /* クライアントに送るコマンド */
{
CMD_NULL, /* 何もしません */
CMD_END, /* クライアントプログラムを終了します */
CMD_ROMBERG_FUNC, /* Func 関数で Romberg 積分法を行います */
};
/* クライアントを起こす */
void WakeupClients(
int clients, /* クライアントの数 */
int command /* クライアントに送るコマンド */
);
#endif /* #ifndef ___MAINDEFS_H__20021125_2022_EOIJSDFOJ__INCLUDED___ */ |
| main.c |
#include <stdio.h>
#include <math.h>
#include "maindefs.h"
#include "romberg.h"
static void Master();
static void Client();
/* 今回使う被積分関数 */
static double Func(double x);
main(int argc, char** argv)
{
int rank;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == RANK_MASTER)
{ Master(); }
else
{ Client(); }
MPI_Finalize();
}
/* マスターのメインルーチン */
static void Master()
{
int clients; /* クライアントの数 */
double xN; /* 積分の終点 */
UINT64 divide; /* 初期分割数 */
double result; /* 積分結果 */
double before, after;
MPI_Comm_size(MPI_COMM_WORLD, &clients);
--clients;
printf("積分範囲を入力して下さい > ");
fflush(stdout);
scanf("%lf", &xN);
printf("初期分割数を入力して下さい > ");
fflush(stdout);
scanf(FORMAT_UINT64, ÷);
before = MPI_Wtime();
result = Romberg(0, xN, divide, clients, CMD_ROMBERG_FUNC, Func);
after = MPI_Wtime();
printf("結果: %.15le (%.15le sec)\n", result, after - before);
WakeupClients(clients, CMD_END);
}
/* クライアントのメインルーチン */
static void Client()
{
/* コマンドを受け取ると、そのコマンドに応じた処理を始めます */
for(; ;)
{
int command;
MPI_Recv(&command, 1, MPI_INT, RANK_MASTER, TAG_COMMAND,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
switch(command)
{
/* CMD_NULL : 何もしません */
case CMD_NULL:
break;
/* CMD_END : 終了します */
case CMD_END:
return;
/* CMD_ROMBERG_FUNC : 被積分関数 Func の
* Romberg 積分で使う並列ルーチン */
case CMD_ROMBERG_FUNC:
RombergClient(Func);
break;
}
}
}
/**********************************************************************
* *
* サブルーチン *
* *
**********************************************************************/
/* クライアントを起こす */
void WakeupClients(
int clients, /* クライアントの数 */
int command /* クライアントに送るコマンド */
){
int iClient;
for(iClient = 0; iClient < clients; ++iClient)
{
MPI_Request req;
MPI_Isend(&command, 1, MPI_INT, RANK_CLIENT_HEAD + iClient,
TAG_COMMAND, MPI_COMM_WORLD, &req);
MPI_Request_free(&req);
}
}
/* 今回使う被積分関数 */
/* f(x) = x^5 sin(x^4) * cos(x^3) * exp(-x^2) * log(x + 1) */
static double Func(double x)
{
const double x2 = x * x;
const double x3 = x2 * x;
const double x4 = x2 * x2;
const double x5 = x2 * x3;
return x5 * sin(x4) * cos(x3) * exp(-x2) * log(x + 1);
} |
| romberg.h |
#ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ #define ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ #include "maindefs.h" /* Romberg 積分法 */ double Romberg( double x0, double xN, /* 始点と終点 */ UINT64 divide, /* 分割数 */ int clients, /* クライアントの数 */ int command, /* クライアントに投げるコマンド */ double (*fpInt)(double) /* 被積分関数 */ ); /* Romberg 積分法でのクライアントの作業 */ void RombergClient( double (*fpInt)(double) /* 被積分関数 */ ); #endif /* #ifndef ___ROMBERG_H__20021125_2030_DJSOIAF_INCLUDED___ */ |
| romberg.c |
#include <stdio.h>
#include <math.h>
#include "maindefs.h"
#include "romberg.h"
/* Richardson 補外用のバッファのサイズ */
#define RICHARDSON_BUFSIZE 32
/* 和をとる際に使う情報を送る際に使う構造体 */
typedef struct ___dummy_SUM_T
{
double xI; /* 始点 */
double span; /* サンプリング幅 */
UINT64 points; /* サンプリング点の数 */
} SUM_T;
/* 関数の値の和をとります */
static double Integrate(
double xI, /* 始点 */
double span, /* サンプリング幅 */
UINT64 points, /* サンプリング点の数 */
int clients, /* クライアントの数 */
int command, /* クライアントに送るコマンド */
double (*fpInt)(double) /* 被積分関数 */
);
/* Richardson 補外
* T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1} */
static double Richardson(
double* richbuf, /* 作業バッファ */
int depth /* 補完次数 */
);
/**********************************************************************
* *
* Romberg 積分法 *
* *
**********************************************************************/
/*
* 台形公式は、次のように書き換えることができます。
*
* S(h) = h/2 sum[k=1,N] {f(x_{k-1}) + f(x_k)}
* = h { (f(x_0) + f(x_N))/2 + sum[k=1,N-1] f(x_k) }
*
* 幅を半分にすると、
*
* S(h/2) = h/4 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
* = h{ 1/2 S(h)/h + 1/2 sum[k=1,N] f(x_{2k-1}) }
*
* さらに半分にすると、
*
* S(h/4) = h/8 sum[k=1,2N] {f(x_{k-1}) + f(x_k)}
* = h{ 1/2 S(h/2)/h + 1/4 sum[k=1,2N] f(x_{2k-1}) }
*
* これを続けると、
*
* S(h/(2^n)) = h{ 1/2 S(h/(2^n-1))/h + 1/(2^n) sum[k=1,nN] f(x_{2k-1}) }
*
* とすることができます。
* なので、今までの積分の結果を再利用することができることになります。
* 新しく計算しなくてはならないのは、
* 今までに計算に含めたことのなかった点の和のみです。
*/
double Romberg(
double x0, double xN, /* 始点と終点 */
UINT64 divide, /* 分割数 */
int clients, /* クライアントの数 */
int command, /* クライアントに投げるコマンド */
double (*fpInt)(double) /* 被積分関数 */
){
const double eps = 10e-15; /* 許容誤差 */
double mainSpan = (xN - x0) / divide; /* 最大幅 */
double subSpan = mainSpan; /* 現在の幅 */
double richbuf[RICHARDSON_BUFSIZE]; /* Richardson 補外用のバッファ */
double diff; /* 前回の計算とのずれ */
double prev; /* 前回の計算の値 */
double weight; /* 重み */
UINT64 points; /* 和をとる点の数 */
int i;
/* 1回目の計算 */
prev = richbuf[0]
= (fpInt(x0) + fpInt(xN)) / 2
+ Integrate(x0 + mainSpan, mainSpan, divide - 1,
clients, command, fpInt);
points = divide;
weight = 2;
for(i = 1; i < RICHARDSON_BUFSIZE; ++i)
{
double now;
/* 台形公式で積分値を求めます */
richbuf[i] = richbuf[i - 1] / 2
+ Integrate(x0 + subSpan / 2, subSpan, points,
clients, command, fpInt) / weight;
/* Richardson 補外を行います */
now = Richardson(richbuf, i);
/* 前回との差が小さければ終了 */
diff = fabs(now - prev);
TRACE((stderr, "%d "FORMAT_UINT64" %le %le\n",
i, points * 2, diff, eps * fabs(now)));
if(diff < eps * fabs(now))
{ return now * mainSpan; }
prev = now;
subSpan /= 2;
points *= 2;
weight *= 2;
}
/* 補外失敗 */
fprintf(stderr, "Error: %lf\n", diff);
return prev * mainSpan;
}
/**********************************************************************
* *
* サブルーチン *
* *
**********************************************************************/
/* 関数の値の和をとります */
static double Integrate(
double xI, /* 始点 */
double span, /* サンプリング幅 */
UINT64 points, /* サンプリング点の数 */
int clients, /* クライアントの数 */
int command, /* クライアントに送るコマンド */
double (*fpInt)(double) /* 被積分関数 */
){
double integrate;
int eachPoints = points / (clients + 1);
SUM_T data;
int iClient;
UINT64 iFirst;
UINT64 i;
/* クライアントを起こします */
WakeupClients(clients, command);
/* クライアントに投げます */
iFirst = 0;
data.span = span;
data.points = eachPoints;
for(iClient = 0; iClient < clients; ++iClient)
{
MPI_Request req;
data.xI = xI + iFirst * span;
/* 全部同じ環境と仮定して、MPI_BYTE で送ります
* データ型を作るのはここでは行いません */
MPI_Isend(&data, sizeof data, MPI_BYTE, RANK_CLIENT_HEAD + iClient,
TAG_ROMBERG, MPI_COMM_WORLD, &req);
MPI_Request_free(&req);
iFirst += eachPoints;
}
/* マスターでも計算します */
integrate = 0;
for(i = iFirst; i < points; ++i)
{
/* 数値誤差を少なくするために、xI += span; のようにはしません */
/* 同様の理由で、i は整数にします */
const double x = xI + i * span;
integrate += fpInt(x);
}
/* クライアントの結果と合わせます */
for(iClient = 0; iClient < clients; ++iClient)
{
double eachInt;
MPI_Recv(&eachInt, 1, MPI_DOUBLE, RANK_CLIENT_HEAD + iClient,
TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
integrate += eachInt;
}
return integrate;
}
/* Romberg 積分法でのクライアントの作業 */
void RombergClient(
double (*fpInt)(double) /* 被積分関数 */
){
double integrate = 0;
SUM_T data; /* 通信用構造体 */
UINT64 i;
MPI_Recv(&data, sizeof data, MPI_BYTE, RANK_MASTER,
TAG_ROMBERG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for(i = 0; i < data.points; ++i)
{
const double x = data.xI + i * data.span;
integrate += fpInt(x);
}
MPI_Send(&integrate, 1, MPI_DOUBLE, RANK_MASTER,
TAG_ROMBERG, MPI_COMM_WORLD);
}
/*
* Richardson 補外
*
* 台形公式で幅を半分ずつにしていった場合の Richardson 補外の式は
* T_{ij} = \frac{4^j T_{i,j-1} - T_{i-1,j-1}}{4^j - 1}
* となります。
* k 番目の台形公式の結果は T_{k1} で、
* k 番目の補外値は T_{kk} で得られます。
*
* 一見再帰を使って求めればいいように見えますが、
* 無駄なバッファを使わなくても再帰をせず、しかも軽い処理ができます。
*
* ・バッファの推移
*
* +----+----+----+----+----+
* | | 0 | 1 | 2 | 3 |
* +----+====+----+----+----+
* | 0 |T_00| | | |
* +----+====+====+ | |
* | (1)|T_00|T_10| | |
* +----+----+====+ | |
* | | | / | | |
* +----+====+----+ | |
* | 1 |T_11|T_10| | |
* +----+====+----+====+ |
* | (2)|T_11|T_10|T_20| |
* +----+----+----+====+ |
* | | | /| / | |
* +----+====+----+----+ |
* | 2 |T_22|T_21|T_20| |
* +----+====+----+----+====+
* | (3)|T_11|T_10|T_20|T_30|
* +----+----+----+----+====+
* | | | /| /| / |
* +----+====+----+----+----+
* | 3 |T_33|T_32|T_31|T_30|
* +----+====+----+----+----+
*
* 何でも再帰でやればいいってもんでもないのです。
*/
static double Richardson(
double* richbuf, /* 作業バッファ */
int depth /* 補完次数 */
){
double pow4j = 4;
int i;
for(i = depth; i > 0; --i)
{
richbuf[i - 1] =
(pow4j * richbuf[i] - richbuf[i - 1]) / (pow4j - 1);
pow4j *= 4;
}
return richbuf[0];
} |
Last update was done on 2002.11.14