关于“RPC语言”
RPC语言也是一种专门的编程语言,当然这里我们不需要知道太多,只需要能看懂下面这种基本结构就行了:
program TESTPROG {
version VERSION {
string TEST(string) = 1;
} = 1;
} = 87654321;
这里TESTPROG和VERSION是两个变量,用于标识一个单独的RPC接口。这被RPC服务程序,比如portmap用到,我们可以不用关心,变量名字也是随便取的。但取值要在你的系统中是唯一的。
“string TEST(string) = 1;”这一行说明有两个函数test_VERSION和test_VERSION_svc,这里由于VERSION变量为1,所以函数名为test_1和 test_1_svc,这两个函数用于在服务器端和客户端实现调用,即:
在客户端调用test_1函数,服务器端调用test_1_svc函数处理并返回。
函数的类型是string,RPC语言中string即C里面的一个字符串。所以上述函数有一个字符串作为参数传递,同时要返回字符串。即:
char ** test_1(char **argp, CLIENT *clnt) 和 char **test_1_svc(char **argp, struct svc_req *rqstp)
同理,如果声明是这样的:
program RDICTPROG /* name of remote program ( not used ) */
{
version RDICTVERS /* declaration of version ( see below ) */
{
int INITW ( void ) = 1; /* first procedure in this program */
int INSERTW ( string ) = 2; /* second procedure in this program */
int DELETEW ( string ) = 3; /* third procedure in this program */
int LOOKUPW ( string ) = 4; /* fourth procedure in this program */
} = 1; /* definition of the program version */
} = 0x30090949; /* remote program number ( must be unique ) */
则说明这个RPC中有四个函数可用,即客户端可以调用initw_1、insertw_1、deletew_1、lookupw_1四个函数来向服务端发送消息,服务端可以用initw_1_svc、insertw_1_svc、deletew_1_svc、lookupw_1_svc四个函数来处理请求并返回结果。
原任务
假设现在有这样一个程序,源代码如下:
/* dict.c -- main, initw, nextin, insertw, deletew, lookupw */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#define MAXWORD 50 /* maximum length of a command or word */
#define DICTSIZ 100 /* maximum number of entries in dictionary. */
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
/* 函数原型 */
int nextin(char *cmd, char *word);
int initw(void);
int insertw(const char *word);
int deletew(const char *word);
int lookupw(const char *word);
/* ------------------------------------------------------------------
* main -- insert, delete, or lookup words in a dictionary as specified
* ------------------------------------------------------------------ */
int main(int argc, char *argv[])
{
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
printf("Please input:\n");
while (1) {
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
switch (cmd) {
case 'I': /* 初始化 */
initw();
printf("Dictionary initialized to empty.\n");
break;
case 'i': /* 插入 */
insertw(word);
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
if (deletew(word)) {
printf("%s deleted.\n", word);
} else {
printf("%s not found.\n", word);
}
break;
case 'l': /* 查询 */
if (lookupw(word)) {
printf("%s was found.\n", word);
} else {
printf("%s was not found.\n", word);
}
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("command %c invalid.\n", cmd);
break;
} /* end of switch */
} /* end of while */
return 0;
} /* end of main */
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0'; /* 原来的代码这里有问题 */
return i;
} /* end of nextin */
/* ------------------------------------------------------------------
* initw -- initialize the dictionary to contain no words at all
* ------------------------------------------------------------------ */
int initw(void)
{
nwords = 0;
return 1;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return (1);
}
} /* end of for */
return (0);
} /* end of lookupw */
这是一个简单的字典程序,即程序运行起来以后维护着一个字典库,用户可以向里面添加词语,也可以查询或删除词语。
当然,这个程序只能在同一台主机上运行。程序整个运行过程中,只需要完成如下几个步骤:
A、接受用户输入;
B、分析用户输入决定是否进行下面的步骤:
1、初始化数据库;
2、向数据库添加词语;
3、查询或删除词语
任务分解
大家可以想到,对于一个大型系统,比如需要有很多人维护这个系统的数据。象上面这样独立的程序就不适用了,需要做成分布式系统:
即一个服务器维护着数据库,任何客户端都可以接受用户请求,客户端分析用户命令后提交给服务器去处理。
所以我们可能会把程序分成两部分:
客户端:接受用户输入,并判断用户输入内容的正确性,向服务器提交数据,等服务器返回消息
服务器端:维护数据,接受客户端命令并执行后返回结果。
所以我们把上面这个程序分解成下面两部分:
/* dict1.c -- main, nextin */
#include <stdio.h>
#include <stdlib.h>
#define MAXWORD 50 /* maximum length of a command or word */
/* ------------------------------------------------------------------
* main -- insert, delete, or lookup words in a dictionary as specified
* ------------------------------------------------------------------ */
int main(int argc, char *argv[])
{
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
printf("Please input:\n");
while (1) {
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
switch (cmd) {
case 'I': /* 初始化 */
initw();
printf("Dictionary initialized to empty.\n");
break;
case 'i': /* 插入 */
insertw(word);
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
if (deletew(word)) {
printf("%s deleted.\n", word);
} else {
printf("%s not found.\n", word);
}
break;
case 'l': /* 查询 */
if (lookupw(word)) {
printf("%s was found.\n", word);
} else {
printf("%s was not found.\n", word);
}
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("command %c invalid.\n", cmd);
break;
} /* end of switch */
} /* end of while */
return 0;
} /* end of main */
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0';
return i;
} /* end of nextin */
和
/* dict2.c -- initw, insertw, deletew, lookupw */
#include <string.h>
#define MAXWORD 50 /* maximum length of a command or word */
#define DICTSIZ 100 /* maximum number of entries in dictionary. */
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
/* ------------------------------------------------------------------
* initw -- initialize the dictionary to contain no words at all
* ------------------------------------------------------------------ */
int initw(void)
{
nwords = 0;
return 1;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return (1);
}
} /* end of for */
return (0);
} /* end of lookupw */
这两部分代码只是在功能上实现了分离,显然实现通讯的部分还没有,下面我们利用RPC来快速实现通讯。
利用RPC实现分布式系统
首先,建立一个RPC源文件,源代码rdict.x如下:
/* rdict.x */
/* RPC declarations for dictionary program */
const MAXWORD = 10; /* maximum length of a command or word */
const DICTSIZ = 3; /* number of entries in dictionary */
struct example /* unused structure declared here to */
{
int exfield1; /* illustrate how rpcgen builds XDR */
char exfield2; /* routines to convert structures */
};
/* ------------------------------------------------------------------
* RDICTPROG -- remote program that provides insert, delete, and lookup
* ------------------------------------------------------------------ */
program RDICTPROG /* name of remote program ( not used ) */
{
version RDICTVERS /* declaration of version ( see below ) */
{
int INITW ( void ) = 1; /* first procedure in this program */
int INSERTW ( string ) = 2; /* second procedure in this program */
int DELETEW ( string ) = 3; /* third procedure in this program */
int LOOKUPW ( string ) = 4; /* fourth procedure in this program */
} = 1; /* definition of the program version */
} = 0x30090949; /* remote program number ( must be unique ) */
然后用下列命令产生服务器端函数rdict_srv_func.c:
rpcgen -Ss -o rdict_srv_func.c rdict.x
然后用下列命令产生客户端程序rdict_client.c:
rpcgen -Sc -o rdict_client.c rdict.x
/************关于本文档********************************************
*filename: 我是这样学习Linux下C语言编程的-利用RPC快速实现分布式系统
*purpose: 说明如何利用RPC快速进行客户端-服务器端C-S结构编程
*wrote by: zhoulifa(zhoulifa@163.com) 周立发(http://zhoulifa.bokee.com)
Linux爱好者 Linux知识传播者 SOHO族 开发者 最擅长C语言
*date time:2007-02-27 19:20
*Note: 任何人可以任意复制代码并运用这些文档,当然包括你的商业用途
* 但请遵循GPL
*Thanks to:
* Ubuntu 本程序在Ubuntu 6.10系统上测试完全正常
* Google.com 我通过google搜索并参考了RPC编程相关的许多文章
* 网络安全焦点(www.xfocus.net) 我主要借鉴了此文 http://www.xfocus.net/articles/200009/10.html
*Hope:希望越来越多的人贡献自己的力量,为科学技术发展出力
* 科技站在巨人的肩膀上进步更快!感谢有开源前辈的贡献!
*********************************************************************/
然后用下列命令产生Makefile:
rpcgen -Sm rdict.x > Makefile
Makefile文件原内容如下:
# This is a template Makefile generated by rpcgen
# Parameters
CLIENT = rdict_client
SERVER = rdict_server
SOURCES_CLNT.c =
SOURCES_CLNT.h =
SOURCES_SVC.c =
SOURCES_SVC.h =
SOURCES.x = rdict.x
TARGETS_SVC.c = rdict_svc.c rdict_xdr.c
TARGETS_CLNT.c = rdict_clnt.c rdict_xdr.c
TARGETS = rdict.h rdict_xdr.c rdict_clnt.c rdict_svc.c
OBJECTS_CLNT = $(SOURCES_CLNT.c:%.c=%.o) $(TARGETS_CLNT.c:%.c=%.o)
OBJECTS_SVC = $(SOURCES_SVC.c:%.c=%.o) $(TARGETS_SVC.c:%.c=%.o)
# Compiler flags
CFLAGS += -g
LDLIBS += -lnsl
RPCGENFLAGS =
# Targets
all : $(CLIENT) $(SERVER)
$(TARGETS) : $(SOURCES.x)
rpcgen $(RPCGENFLAGS) $(SOURCES.x)
$(OBJECTS_CLNT) : $(SOURCES_CLNT.c) $(SOURCES_CLNT.h) $(TARGETS_CLNT.c)
$(OBJECTS_SVC) : $(SOURCES_SVC.c) $(SOURCES_SVC.h) $(TARGETS_SVC.c)
$(CLIENT) : $(OBJECTS_CLNT)
$(LINK.c) -o $(CLIENT) $(OBJECTS_CLNT) $(LDLIBS)
$(SERVER) : $(OBJECTS_SVC)
$(LINK.c) -o $(SERVER) $(OBJECTS_SVC) $(LDLIBS)
clean:
$(RM) core $(TARGETS) $(OBJECTS_CLNT) $(OBJECTS_SVC) $(CLIENT) $(SERVER)
动手修改Makefile,修改后内容如下:
# This is a template Makefile generated by rpcgen
# Parameters
CLIENT = rdict_client
SERVER = rdict_server
SOURCES_CLNT.c =
SOURCES_CLNT.h =
SOURCES_SVC.c =
SOURCES_SVC.h =
SOURCES.x = rdict.x
TARGETS_SVC.c = rdict_svc.c rdict_xdr.c rdict_srv_func.c
TARGETS_CLNT.c = rdict_clnt.c rdict_xdr.c rdict_client.c
TARGETS = rdict.h rdict_xdr.c rdict_clnt.c rdict_svc.c
OBJECTS_CLNT = $(SOURCES_CLNT.c:%.c=%.o) $(TARGETS_CLNT.c:%.c=%.o)
OBJECTS_SVC = $(SOURCES_SVC.c:%.c=%.o) $(TARGETS_SVC.c:%.c=%.o)
# Compiler flags
CFLAGS += -g
LDLIBS += -lnsl
RPCGENFLAGS =
# Targets
all : $(CLIENT) $(SERVER)
$(TARGETS) : $(SOURCES.x)
rpcgen $(RPCGENFLAGS) $(SOURCES.x)
$(OBJECTS_CLNT) : $(SOURCES_CLNT.c) $(SOURCES_CLNT.h) $(TARGETS_CLNT.c)
$(OBJECTS_SVC) : $(SOURCES_SVC.c) $(SOURCES_SVC.h) $(TARGETS_SVC.c)
$(CLIENT) : $(OBJECTS_CLNT)
$(LINK.c) -o $(CLIENT) $(OBJECTS_CLNT) $(LDLIBS)
$(SERVER) : $(OBJECTS_SVC)
$(LINK.c) -o $(SERVER) $(OBJECTS_SVC) $(LDLIBS)
clean:
$(RM) core $(TARGETS) $(OBJECTS_CLNT) $(OBJECTS_SVC) $(CLIENT) $(SERVER) *~
修改客户端源代码rdict_client.c,把接受用户输入并分析用户输入内容的部分加到程序中来。修改后的代码为:
/*
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "rdict.h"
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0';
return i;
} /* end of nextin */
void rdictprog_1(char *host)
{
CLIENT *clnt;
int *result_1;
char *initw_1_arg;
int *result_2;
char *insertw_1_arg;
int *result_3;
char *deletew_1_arg;
int *result_4;
char *lookupw_1_arg;
#ifndef DEBUG
clnt = clnt_create(host, RDICTPROG, RDICTVERS, "udp");
if (clnt == NULL) {
clnt_pcreateerror(host);
exit(1);
}
#endif /* DEBUG */
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
while (1) {
printf("\nPlease input:");
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
/* printf("\nYour cmd is:%c, your word is:%s\n", cmd, word); */
switch (cmd) {
case 'I': /* 初始化 */
result_1 = initw_1((void *) &initw_1_arg, clnt);
/* printf("\nYour result is:%d\n", *result_1); */
if (result_1 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
if(*result_1 ==0) printf("Dictionary initialized to empty.\n");
else printf("Dictionary have already initialized.\n");
break;
case 'i': /* 插入 */
insertw_1_arg = word;
result_2 = insertw_1(&insertw_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_2, insertw_1_arg, strlen(insertw_1_arg)); */
if (result_2 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
deletew_1_arg = word;
result_3 = deletew_1(&deletew_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_3, deletew_1_arg, strlen(deletew_1_arg)); */
if (result_3 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
printf("%s deleted.\n", word);
break;
case 'l': /* 查询 */
lookupw_1_arg = word;
result_4 = lookupw_1(&lookupw_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_4, lookupw_1_arg, strlen(lookupw_1_arg)); */
if (result_4 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
if(*result_4 ==0) printf("%s found.\n", word);
else printf("%s not found.\n", word);
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("Command %c(%s) invalid.\n", cmd, word);
break;
} /* end of switch */
} /* end of while */
#ifndef DEBUG
clnt_destroy(clnt);
#endif /* DEBUG */
}
int main(int argc, char *argv[])
{
char *host;
if (argc < 2) {
printf("usage: %s server_host\n", argv[0]);
exit(1);
}
host = argv[1];
rdictprog_1(host);
exit(0);
}
同时修改服务器端代码rdict_srv_func.c,修改后内容为:
/*
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "rdict.h"
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
char init_bool = 0;
int initw(void)
{
if(init_bool) return 1;
nwords = 0;
init_bool = 1;
return 0;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords%DICTSIZ], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return 0;
}
} /* end of for */
return 1;
} /* end of lookupw */
int *initw_1_svc(void *argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = initw();
return &result;
}
int *insertw_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = insertw(*argp);
return &result;
}
int *deletew_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = deletew(*argp);
return &result;
}
int *lookupw_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = lookupw(*argp);
return &result;
}
至此,程序做好了。输入一个make命令就可以生成test_server和test_client这两个可执行程序了。
在一台机器上运行./test_server程序,在另外的客户机上运行./test_client server_ip就可以了。这里server_ip是运行着test_server程序的主机的IP地址。
http://blog.chinaunix.net/u/21675/showart.php?id=308008
http://blog.chinaunix.net/u2/66973/showart_1071175.html
RPC example:
http://blog.chinaunix.net/u3/102500/showart_2128311.html
http://blog.chinaunix.net/u1/37472/showart_726114.html
http://blog.chinaunix.net/u1/37472/showart_726114.html
http://blog.chinaunix.net/u3/119073/showart_2342954.html
http://blog.chinaunix.net/u3/94300/showart_1900094.html
linux下rpc的多线程实现
solaris下rpc可以通过rpcgen -A让服务器根据处理新用户请求的需要自动创建线程。但是linux没有实现这个功能,原因据说是会产生非线程安全的代码。
本文给出了一种实现上述功能的方法。参考了http://ftp.traduc.org/doc-vf/gazette-linux/gazette-us/2004/103/103D.html
和http://linux.ittoolbox.com/groups/technical-functional/redhat-l/multithreaded-rpc-servers-for-linux-482547
下面以unp2e2上的square3为例子,通过修改部分代码来实现
文件square.x
struct square_in{
long arg1;
};
struct square_out{
long res1;
};
program SQUARE_PROG{
version SQARE_VERS{
square_out SQUAREPROC(square_in) = 1;
} = 2;
} = 0x31230000
接着执行:
rpcgen -aM square.x
该指令产生了文件:
square_clnt.c square_server.c Makefile.square square_client.c square.h square_svc.c square_xdr.c
着重要修改的是square_svc.c文件:
/* 被修改过的
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "square.h"
#include <stdio.h>
#include <stdlib.h>
#include <rpc/pmap_clnt.h>
#include <string.h>
#include <memory.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
pthread_t p_thread;
pthread_attr_t attr;
//static void
//square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
void *
serv_request(void *data)
{
struct thr_data {
struct svc_req *rqstp;
SVCXPRT *transp;
} *ptr_data;
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *, struct svc_req *);
ptr_data = (struct thr_data *)data;
struct svc_req *rqstp = ptr_data->rqstp;
register SVCXPRT *transp = ptr_data->transp;
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp, (xdrproc_t) xdr_void,(char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0, sizeof (argument));
if (!svc_getargs (transp, (xdrproc_t) _xdr_argument,(caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
if (retval > 0 && !svc_sendreply(transp, (xdrproc_t), (char *)&result)) {
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument,(caddr_t) &argument)) {
fprintf (stderr, "%s", "unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult (transp, _xdr_result,(caddr_t) &result))
fprintf (stderr, "%s", "unable to free results");
return;
}
//new square_prog_2
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
struct data_str{
struct svc_req *rqstp;
SVCXPRT *transp;
} *data_ptr=(struct data_str*)malloc(sizeof(struct));
data_ptr->rqstp = rqstp;
data_ptr->transp = transp;
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
pthread_create(&p_thread,&attr,serv_request,(void*)data_ptr);
}
int
main (int argc, char **argv)
{
register SVCXPRT *transp;
pmap_unset (SQUARE_PROG, SQARE_VERS);
transp = svcudp_create(RPC_ANYSOCK);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create udp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_UDP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, udp).");
exit(1);
}
transp = svctcp_create(RPC_ANYSOCK, 0, 0);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create tcp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_TCP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, tcp).");
exit(1);
}
svc_run ();
fprintf (stderr, "%s", "svc_run returned");
exit (1);
/* NOTREACHED */
}
蓝色部分是修改的地方,对比下面修改前的代码:
/* 修改前的文件
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "square.h"
#include <stdio.h>
#include <stdlib.h>
#include <rpc/pmap_clnt.h>
#include <string.h>
#include <memory.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *, struct svc_req *);
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0, sizeof (argument));
if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) {
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
fprintf (stderr, "%s", "unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
fprintf (stderr, "%s", "unable to free results");
return;
}
int
main (int argc, char **argv)
{
register SVCXPRT *transp;
pmap_unset (SQUARE_PROG, SQARE_VERS);
transp = svcudp_create(RPC_ANYSOCK);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create udp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_UDP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, udp).");
exit(1);
}
transp = svctcp_create(RPC_ANYSOCK, 0, 0);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create tcp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_TCP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, tcp).");
exit(1);
}
svc_run ();
fprintf (stderr, "%s", "svc_run returned");
exit (1);
/* NOTREACHED */
}
可以看出,修改的思路就是把原square_prog_2函数功能改由serv_request来实现,让新的square_prog_2通过创建新线程来调用serv_request,serv_request则判断并调用相应的函数(在这个例子中只有SQUAREPROC一个函数)。实现的关键是要把struct svc_req *rqstp和 register SVCXPRT *transp这两个参数交给新线程函数serv_request处理,这样当serv_request返回时,便等价于原来的square_prog_2函数返回。
其它文件跟unp2e2上的例子一样
/* 文件square_server.c
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "square.h"
bool_t
squareproc_2_svc(square_in *inp, square_out *outp, struct svc_req *rqstp)
{
bool_t retval;
printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1);
sleep(5);
outp->res1 = inp->arg1 * inp->arg1;
printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1);
retval = TRUE;
return retval;
}
int
square_prog_2_freeresult (SVCXPRT *transp, xdrproc_t xdr_result, caddr_t result)
{
xdr_free (xdr_result, result);
/*
* Insert additional freeing code here, if needed
*/
return 1;
}
/*
* 该文件重写由rpcgen产生的原文件
* square_client.c
*/
#include "square.h"
int
main (int argc,char **argv)
{
CLIENT *cl;
square_in in;
square_out out;
if (argc != 3 ) {
printf ("Usage : client <hostname> <integer_valus>\n");
exit(1);
}
cl = clnt_create(argv[1], SQUARE_PROG, SQUARE_VERS, "tcp");
if (cl == NULL) {
clnt_sperror(cl, "call failed");
exit (1);
}
in.arg1 = atol(argv[2]);
if (squareproc_2(&in, &out, cl) != RPC_SUCCESS){
printf("%s\n", clnt_sperror(cl, argv[1]));
exit(1);
}
printf("result: %ld\n",out.res1);
exit(0);
}
接着编译链接server端和client端程序
$ gcc -o client square_clnt.c square_xdr.c square_client.c
$ gcc -o server square_svc.c square_xdr.c square_server.c -lpthread
在一个终端下运行./server记得打开portmap
另一个终端下运行:
$ ./client localhost 1 & ./client localhost 2 & ./client localhost 3
便能在server端看到:
Thread id = '1082132816' started, arg = 3
Thread id = '1090525520' started, arg = 2
Thread id = '1098918224' started, arg = 1
Thread id = '1082132816' is done 9
Thread id = '1090525520' is done 4
Thread id = '1098918224' is done 1
Hadoop里的RPC机制过程
已经有文章描述RPC的具体结构,http://caibinbupt.javaeye.com/blog/280790,这篇文章很清晰的描述了Client和Server的结构,但是较为高屋建瓴,我在看的时候依然觉得无法清晰理解其调用过程,所以将过程整理出来,知道how,才容易理解why,即知道是怎么干的,才容易理解为什么会那么去设计。
客户端C要发起向服务端S的关于方法M调用
1. C首先创建一个通向S的连接getConnection,然后将此次调用放入CallList里,这样客户端就可以同时发生很多调用,每个调用用ID来识别。
2. 发送调用参数。调用参数是Client的调用方(比如NameNode,DataNode等)指定的,一般就是一个Invocation对象,里面包含要调用的方法和参数。了解JAVA动态代理类java.lang.reflect.Proxy会对这里的理解有很大帮助。
3. 等待调用结果.Client.Connection是个线程类,启动了之后唯一做的时候就是等待调用结果
对于服务器端,其有一个方法start指定了启动服务器开始监听,这个start被四个类调用,分别是
TaskTracker.initialize,Namenode.initialize,Jobtracker.offerService,
Datanode.startDatanode
显然,任何两者之间的通信都是考这个client-server模型实现的。
server start后,干了三件事
1. 启动listen,监听客户端Call
2. 启动response,随时准备将处理结果发回client
3. 启动10个handler,处理具体的请求。
这里必须对java NIO机制了解,才能看的明白。
当客户端调用来到的时候
1. listen首先将调用doaccept将Connection附加给selectionkey,然后调用doread添加,doread会调用Connecton的方法将调用添加到调用列表,该列表是BlockingQueue,其保持列表先进先出的特性而且支持同步
2. listen将call添加到calllist后,handler因为一直在检测calllist,于是其立刻开始处理,处理完毕后,其将结果保存在call对象中,然后调用response开始向客户端写。这里hadler调用的call只是一个未实现的方法,具体实现在RPC.Server中,这点需要注意。
3. Response也监视responselist,如果responselist中某个call需要将结果写入客户端,就写出,当某个call的结果被发送完毕,从responselist中删除该call对象。
这里有个细节:handler完成call之后就开始向客户端写call结果,但是结果可能太多,无法通过一次性发送完毕,而发送之后还要等待client接受完毕才能再发,如果现在handler在那里等待客户端接受完毕,然后再发,效率不高。解决办法是handler处理完毕之后,只向client发送一次处理结果。如果这一次将处理结果发送完毕,接下来就没有response的事情了,如果没有发送完毕,接下来response负责将剩下的处理结果发送给客户端。这样handler的并发量会大一些。
服务器实现中大量利用监视队列,比如handler就直观坚持calllist,一旦发现数据就开始处理,而response就监视responselist,发现数据需要发送就开始发送。
写完了之后,觉得写的也不清楚,可能要清晰明白只能自己看代码吧。
还发现在没用过java的情况下看hadoop可以更快的学习java
RPC语言也是一种专门的编程语言,当然这里我们不需要知道太多,只需要能看懂下面这种基本结构就行了:
program TESTPROG {
version VERSION {
string TEST(string) = 1;
} = 1;
} = 87654321;
这里TESTPROG和VERSION是两个变量,用于标识一个单独的RPC接口。这被RPC服务程序,比如portmap用到,我们可以不用关心,变量名字也是随便取的。但取值要在你的系统中是唯一的。
“string TEST(string) = 1;”这一行说明有两个函数test_VERSION和test_VERSION_svc,这里由于VERSION变量为1,所以函数名为test_1和 test_1_svc,这两个函数用于在服务器端和客户端实现调用,即:
在客户端调用test_1函数,服务器端调用test_1_svc函数处理并返回。
函数的类型是string,RPC语言中string即C里面的一个字符串。所以上述函数有一个字符串作为参数传递,同时要返回字符串。即:
char ** test_1(char **argp, CLIENT *clnt) 和 char **test_1_svc(char **argp, struct svc_req *rqstp)
同理,如果声明是这样的:
program RDICTPROG /* name of remote program ( not used ) */
{
version RDICTVERS /* declaration of version ( see below ) */
{
int INITW ( void ) = 1; /* first procedure in this program */
int INSERTW ( string ) = 2; /* second procedure in this program */
int DELETEW ( string ) = 3; /* third procedure in this program */
int LOOKUPW ( string ) = 4; /* fourth procedure in this program */
} = 1; /* definition of the program version */
} = 0x30090949; /* remote program number ( must be unique ) */
则说明这个RPC中有四个函数可用,即客户端可以调用initw_1、insertw_1、deletew_1、lookupw_1四个函数来向服务端发送消息,服务端可以用initw_1_svc、insertw_1_svc、deletew_1_svc、lookupw_1_svc四个函数来处理请求并返回结果。
原任务
假设现在有这样一个程序,源代码如下:
/* dict.c -- main, initw, nextin, insertw, deletew, lookupw */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#define MAXWORD 50 /* maximum length of a command or word */
#define DICTSIZ 100 /* maximum number of entries in dictionary. */
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
/* 函数原型 */
int nextin(char *cmd, char *word);
int initw(void);
int insertw(const char *word);
int deletew(const char *word);
int lookupw(const char *word);
/* ------------------------------------------------------------------
* main -- insert, delete, or lookup words in a dictionary as specified
* ------------------------------------------------------------------ */
int main(int argc, char *argv[])
{
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
printf("Please input:\n");
while (1) {
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
switch (cmd) {
case 'I': /* 初始化 */
initw();
printf("Dictionary initialized to empty.\n");
break;
case 'i': /* 插入 */
insertw(word);
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
if (deletew(word)) {
printf("%s deleted.\n", word);
} else {
printf("%s not found.\n", word);
}
break;
case 'l': /* 查询 */
if (lookupw(word)) {
printf("%s was found.\n", word);
} else {
printf("%s was not found.\n", word);
}
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("command %c invalid.\n", cmd);
break;
} /* end of switch */
} /* end of while */
return 0;
} /* end of main */
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0'; /* 原来的代码这里有问题 */
return i;
} /* end of nextin */
/* ------------------------------------------------------------------
* initw -- initialize the dictionary to contain no words at all
* ------------------------------------------------------------------ */
int initw(void)
{
nwords = 0;
return 1;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return (1);
}
} /* end of for */
return (0);
} /* end of lookupw */
这是一个简单的字典程序,即程序运行起来以后维护着一个字典库,用户可以向里面添加词语,也可以查询或删除词语。
当然,这个程序只能在同一台主机上运行。程序整个运行过程中,只需要完成如下几个步骤:
A、接受用户输入;
B、分析用户输入决定是否进行下面的步骤:
1、初始化数据库;
2、向数据库添加词语;
3、查询或删除词语
任务分解
大家可以想到,对于一个大型系统,比如需要有很多人维护这个系统的数据。象上面这样独立的程序就不适用了,需要做成分布式系统:
即一个服务器维护着数据库,任何客户端都可以接受用户请求,客户端分析用户命令后提交给服务器去处理。
所以我们可能会把程序分成两部分:
客户端:接受用户输入,并判断用户输入内容的正确性,向服务器提交数据,等服务器返回消息
服务器端:维护数据,接受客户端命令并执行后返回结果。
所以我们把上面这个程序分解成下面两部分:
/* dict1.c -- main, nextin */
#include <stdio.h>
#include <stdlib.h>
#define MAXWORD 50 /* maximum length of a command or word */
/* ------------------------------------------------------------------
* main -- insert, delete, or lookup words in a dictionary as specified
* ------------------------------------------------------------------ */
int main(int argc, char *argv[])
{
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
printf("Please input:\n");
while (1) {
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
switch (cmd) {
case 'I': /* 初始化 */
initw();
printf("Dictionary initialized to empty.\n");
break;
case 'i': /* 插入 */
insertw(word);
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
if (deletew(word)) {
printf("%s deleted.\n", word);
} else {
printf("%s not found.\n", word);
}
break;
case 'l': /* 查询 */
if (lookupw(word)) {
printf("%s was found.\n", word);
} else {
printf("%s was not found.\n", word);
}
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("command %c invalid.\n", cmd);
break;
} /* end of switch */
} /* end of while */
return 0;
} /* end of main */
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0';
return i;
} /* end of nextin */
和
/* dict2.c -- initw, insertw, deletew, lookupw */
#include <string.h>
#define MAXWORD 50 /* maximum length of a command or word */
#define DICTSIZ 100 /* maximum number of entries in dictionary. */
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
/* ------------------------------------------------------------------
* initw -- initialize the dictionary to contain no words at all
* ------------------------------------------------------------------ */
int initw(void)
{
nwords = 0;
return 1;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return (1);
}
} /* end of for */
return (0);
} /* end of lookupw */
这两部分代码只是在功能上实现了分离,显然实现通讯的部分还没有,下面我们利用RPC来快速实现通讯。
利用RPC实现分布式系统
首先,建立一个RPC源文件,源代码rdict.x如下:
/* rdict.x */
/* RPC declarations for dictionary program */
const MAXWORD = 10; /* maximum length of a command or word */
const DICTSIZ = 3; /* number of entries in dictionary */
struct example /* unused structure declared here to */
{
int exfield1; /* illustrate how rpcgen builds XDR */
char exfield2; /* routines to convert structures */
};
/* ------------------------------------------------------------------
* RDICTPROG -- remote program that provides insert, delete, and lookup
* ------------------------------------------------------------------ */
program RDICTPROG /* name of remote program ( not used ) */
{
version RDICTVERS /* declaration of version ( see below ) */
{
int INITW ( void ) = 1; /* first procedure in this program */
int INSERTW ( string ) = 2; /* second procedure in this program */
int DELETEW ( string ) = 3; /* third procedure in this program */
int LOOKUPW ( string ) = 4; /* fourth procedure in this program */
} = 1; /* definition of the program version */
} = 0x30090949; /* remote program number ( must be unique ) */
然后用下列命令产生服务器端函数rdict_srv_func.c:
rpcgen -Ss -o rdict_srv_func.c rdict.x
然后用下列命令产生客户端程序rdict_client.c:
rpcgen -Sc -o rdict_client.c rdict.x
/************关于本文档********************************************
*filename: 我是这样学习Linux下C语言编程的-利用RPC快速实现分布式系统
*purpose: 说明如何利用RPC快速进行客户端-服务器端C-S结构编程
*wrote by: zhoulifa(zhoulifa@163.com) 周立发(http://zhoulifa.bokee.com)
Linux爱好者 Linux知识传播者 SOHO族 开发者 最擅长C语言
*date time:2007-02-27 19:20
*Note: 任何人可以任意复制代码并运用这些文档,当然包括你的商业用途
* 但请遵循GPL
*Thanks to:
* Ubuntu 本程序在Ubuntu 6.10系统上测试完全正常
* Google.com 我通过google搜索并参考了RPC编程相关的许多文章
* 网络安全焦点(www.xfocus.net) 我主要借鉴了此文 http://www.xfocus.net/articles/200009/10.html
*Hope:希望越来越多的人贡献自己的力量,为科学技术发展出力
* 科技站在巨人的肩膀上进步更快!感谢有开源前辈的贡献!
*********************************************************************/
然后用下列命令产生Makefile:
rpcgen -Sm rdict.x > Makefile
Makefile文件原内容如下:
# This is a template Makefile generated by rpcgen
# Parameters
CLIENT = rdict_client
SERVER = rdict_server
SOURCES_CLNT.c =
SOURCES_CLNT.h =
SOURCES_SVC.c =
SOURCES_SVC.h =
SOURCES.x = rdict.x
TARGETS_SVC.c = rdict_svc.c rdict_xdr.c
TARGETS_CLNT.c = rdict_clnt.c rdict_xdr.c
TARGETS = rdict.h rdict_xdr.c rdict_clnt.c rdict_svc.c
OBJECTS_CLNT = $(SOURCES_CLNT.c:%.c=%.o) $(TARGETS_CLNT.c:%.c=%.o)
OBJECTS_SVC = $(SOURCES_SVC.c:%.c=%.o) $(TARGETS_SVC.c:%.c=%.o)
# Compiler flags
CFLAGS += -g
LDLIBS += -lnsl
RPCGENFLAGS =
# Targets
all : $(CLIENT) $(SERVER)
$(TARGETS) : $(SOURCES.x)
rpcgen $(RPCGENFLAGS) $(SOURCES.x)
$(OBJECTS_CLNT) : $(SOURCES_CLNT.c) $(SOURCES_CLNT.h) $(TARGETS_CLNT.c)
$(OBJECTS_SVC) : $(SOURCES_SVC.c) $(SOURCES_SVC.h) $(TARGETS_SVC.c)
$(CLIENT) : $(OBJECTS_CLNT)
$(LINK.c) -o $(CLIENT) $(OBJECTS_CLNT) $(LDLIBS)
$(SERVER) : $(OBJECTS_SVC)
$(LINK.c) -o $(SERVER) $(OBJECTS_SVC) $(LDLIBS)
clean:
$(RM) core $(TARGETS) $(OBJECTS_CLNT) $(OBJECTS_SVC) $(CLIENT) $(SERVER)
动手修改Makefile,修改后内容如下:
# This is a template Makefile generated by rpcgen
# Parameters
CLIENT = rdict_client
SERVER = rdict_server
SOURCES_CLNT.c =
SOURCES_CLNT.h =
SOURCES_SVC.c =
SOURCES_SVC.h =
SOURCES.x = rdict.x
TARGETS_SVC.c = rdict_svc.c rdict_xdr.c rdict_srv_func.c
TARGETS_CLNT.c = rdict_clnt.c rdict_xdr.c rdict_client.c
TARGETS = rdict.h rdict_xdr.c rdict_clnt.c rdict_svc.c
OBJECTS_CLNT = $(SOURCES_CLNT.c:%.c=%.o) $(TARGETS_CLNT.c:%.c=%.o)
OBJECTS_SVC = $(SOURCES_SVC.c:%.c=%.o) $(TARGETS_SVC.c:%.c=%.o)
# Compiler flags
CFLAGS += -g
LDLIBS += -lnsl
RPCGENFLAGS =
# Targets
all : $(CLIENT) $(SERVER)
$(TARGETS) : $(SOURCES.x)
rpcgen $(RPCGENFLAGS) $(SOURCES.x)
$(OBJECTS_CLNT) : $(SOURCES_CLNT.c) $(SOURCES_CLNT.h) $(TARGETS_CLNT.c)
$(OBJECTS_SVC) : $(SOURCES_SVC.c) $(SOURCES_SVC.h) $(TARGETS_SVC.c)
$(CLIENT) : $(OBJECTS_CLNT)
$(LINK.c) -o $(CLIENT) $(OBJECTS_CLNT) $(LDLIBS)
$(SERVER) : $(OBJECTS_SVC)
$(LINK.c) -o $(SERVER) $(OBJECTS_SVC) $(LDLIBS)
clean:
$(RM) core $(TARGETS) $(OBJECTS_CLNT) $(OBJECTS_SVC) $(CLIENT) $(SERVER) *~
修改客户端源代码rdict_client.c,把接受用户输入并分析用户输入内容的部分加到程序中来。修改后的代码为:
/*
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "rdict.h"
/* ------------------------------------------------------------------
* nextin -- read a command and(possibly) a word from the next input line
* ------------------------------------------------------------------ */
int nextin(char *cmd, char *word)
{
int i, ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
*cmd = (char) ch;
ch = getc(stdin);
while (isspace(ch)) {
ch = getc(stdin);
} /* end of while */
if (ch == EOF) {
return (-1);
}
if (ch == '\n') {
return (0);
}
i = 0;
while (!isspace(ch)) {
if (++i > MAXWORD) {
printf("error: word too long.\n");
exit(1);
}
*word++ = ch;
ch = getc(stdin);
} /* end of while */
*word = '\0';
return i;
} /* end of nextin */
void rdictprog_1(char *host)
{
CLIENT *clnt;
int *result_1;
char *initw_1_arg;
int *result_2;
char *insertw_1_arg;
int *result_3;
char *deletew_1_arg;
int *result_4;
char *lookupw_1_arg;
#ifndef DEBUG
clnt = clnt_create(host, RDICTPROG, RDICTVERS, "udp");
if (clnt == NULL) {
clnt_pcreateerror(host);
exit(1);
}
#endif /* DEBUG */
char word[MAXWORD + 1]; /* space to hold word from input line */
char cmd;
int wordlen; /* length of input word */
while (1) {
printf("\nPlease input:");
wordlen = nextin(&cmd, word);
if (wordlen < 0) {
exit(0);
}
/* printf("\nYour cmd is:%c, your word is:%s\n", cmd, word); */
switch (cmd) {
case 'I': /* 初始化 */
result_1 = initw_1((void *) &initw_1_arg, clnt);
/* printf("\nYour result is:%d\n", *result_1); */
if (result_1 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
if(*result_1 ==0) printf("Dictionary initialized to empty.\n");
else printf("Dictionary have already initialized.\n");
break;
case 'i': /* 插入 */
insertw_1_arg = word;
result_2 = insertw_1(&insertw_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_2, insertw_1_arg, strlen(insertw_1_arg)); */
if (result_2 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
printf("%s inserted.\n", word);
break;
case 'd': /* 删除 */
deletew_1_arg = word;
result_3 = deletew_1(&deletew_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_3, deletew_1_arg, strlen(deletew_1_arg)); */
if (result_3 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
printf("%s deleted.\n", word);
break;
case 'l': /* 查询 */
lookupw_1_arg = word;
result_4 = lookupw_1(&lookupw_1_arg, clnt);
/* printf("\nYour result is:%d, your string is:%s(%d)\n", *result_4, lookupw_1_arg, strlen(lookupw_1_arg)); */
if (result_4 == (int *) NULL)
clnt_perror(clnt, "call failed");
else
if(*result_4 ==0) printf("%s found.\n", word);
else printf("%s not found.\n", word);
break;
case 'q': /* 退出 */
printf("Program quits.\n");
exit(0);
break;
default: /* 非法输入 */
printf("Command %c(%s) invalid.\n", cmd, word);
break;
} /* end of switch */
} /* end of while */
#ifndef DEBUG
clnt_destroy(clnt);
#endif /* DEBUG */
}
int main(int argc, char *argv[])
{
char *host;
if (argc < 2) {
printf("usage: %s server_host\n", argv[0]);
exit(1);
}
host = argv[1];
rdictprog_1(host);
exit(0);
}
同时修改服务器端代码rdict_srv_func.c,修改后内容为:
/*
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "rdict.h"
char dict[DICTSIZ][MAXWORD + 1]; /* storage for a dictionary of words */
int nwords = 0; /* number of words in the dictionary */
char init_bool = 0;
int initw(void)
{
if(init_bool) return 1;
nwords = 0;
init_bool = 1;
return 0;
} /* end of initw */
/* ------------------------------------------------------------------
* insertw -- insert a word in the dictionary
* ------------------------------------------------------------------ */
int insertw(const char *word)
{
strcpy(dict[nwords%DICTSIZ], word);
nwords++;
return (nwords);
} /* end of insertw */
/* ------------------------------------------------------------------
* deletew -- delete a word from the dictionary
* ------------------------------------------------------------------ */
int deletew(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
nwords--;
strcpy(dict[i], dict[nwords]);
return (1);
}
} /* end of for */
return (0);
} /* end of deletew */
/* ------------------------------------------------------------------
* lookupw -- look up a word in the dictionary
* ------------------------------------------------------------------ */
int lookupw(const char *word)
{
int i;
for (i = 0; i < nwords; i++) {
if (strcmp(word, dict[i]) == 0) {
return 0;
}
} /* end of for */
return 1;
} /* end of lookupw */
int *initw_1_svc(void *argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = initw();
return &result;
}
int *insertw_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = insertw(*argp);
return &result;
}
int *deletew_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = deletew(*argp);
return &result;
}
int *lookupw_1_svc(char **argp, struct svc_req *rqstp)
{
static int result;
/*
* insert server code here
*/
result = lookupw(*argp);
return &result;
}
至此,程序做好了。输入一个make命令就可以生成test_server和test_client这两个可执行程序了。
在一台机器上运行./test_server程序,在另外的客户机上运行./test_client server_ip就可以了。这里server_ip是运行着test_server程序的主机的IP地址。
http://blog.chinaunix.net/u/21675/showart.php?id=308008
http://blog.chinaunix.net/u2/66973/showart_1071175.html
RPC example:
http://blog.chinaunix.net/u3/102500/showart_2128311.html
http://blog.chinaunix.net/u1/37472/showart_726114.html
http://blog.chinaunix.net/u1/37472/showart_726114.html
http://blog.chinaunix.net/u3/119073/showart_2342954.html
http://blog.chinaunix.net/u3/94300/showart_1900094.html
linux下rpc的多线程实现
solaris下rpc可以通过rpcgen -A让服务器根据处理新用户请求的需要自动创建线程。但是linux没有实现这个功能,原因据说是会产生非线程安全的代码。
本文给出了一种实现上述功能的方法。参考了http://ftp.traduc.org/doc-vf/gazette-linux/gazette-us/2004/103/103D.html
和http://linux.ittoolbox.com/groups/technical-functional/redhat-l/multithreaded-rpc-servers-for-linux-482547
下面以unp2e2上的square3为例子,通过修改部分代码来实现
文件square.x
struct square_in{
long arg1;
};
struct square_out{
long res1;
};
program SQUARE_PROG{
version SQARE_VERS{
square_out SQUAREPROC(square_in) = 1;
} = 2;
} = 0x31230000
接着执行:
rpcgen -aM square.x
该指令产生了文件:
square_clnt.c square_server.c Makefile.square square_client.c square.h square_svc.c square_xdr.c
着重要修改的是square_svc.c文件:
/* 被修改过的
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "square.h"
#include <stdio.h>
#include <stdlib.h>
#include <rpc/pmap_clnt.h>
#include <string.h>
#include <memory.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
pthread_t p_thread;
pthread_attr_t attr;
//static void
//square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
void *
serv_request(void *data)
{
struct thr_data {
struct svc_req *rqstp;
SVCXPRT *transp;
} *ptr_data;
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *, struct svc_req *);
ptr_data = (struct thr_data *)data;
struct svc_req *rqstp = ptr_data->rqstp;
register SVCXPRT *transp = ptr_data->transp;
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp, (xdrproc_t) xdr_void,(char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0, sizeof (argument));
if (!svc_getargs (transp, (xdrproc_t) _xdr_argument,(caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
if (retval > 0 && !svc_sendreply(transp, (xdrproc_t), (char *)&result)) {
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument,(caddr_t) &argument)) {
fprintf (stderr, "%s", "unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult (transp, _xdr_result,(caddr_t) &result))
fprintf (stderr, "%s", "unable to free results");
return;
}
//new square_prog_2
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
struct data_str{
struct svc_req *rqstp;
SVCXPRT *transp;
} *data_ptr=(struct data_str*)malloc(sizeof(struct));
data_ptr->rqstp = rqstp;
data_ptr->transp = transp;
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
pthread_create(&p_thread,&attr,serv_request,(void*)data_ptr);
}
int
main (int argc, char **argv)
{
register SVCXPRT *transp;
pmap_unset (SQUARE_PROG, SQARE_VERS);
transp = svcudp_create(RPC_ANYSOCK);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create udp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_UDP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, udp).");
exit(1);
}
transp = svctcp_create(RPC_ANYSOCK, 0, 0);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create tcp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_TCP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, tcp).");
exit(1);
}
svc_run ();
fprintf (stderr, "%s", "svc_run returned");
exit (1);
/* NOTREACHED */
}
蓝色部分是修改的地方,对比下面修改前的代码:
/* 修改前的文件
* Please do not edit this file.
* It was generated using rpcgen.
*/
#include "square.h"
#include <stdio.h>
#include <stdlib.h>
#include <rpc/pmap_clnt.h>
#include <string.h>
#include <memory.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifndef SIG_PF
#define SIG_PF void(*)(int)
#endif
static void
square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp)
{
union {
square_in squareproc_2_arg;
} argument;
union {
square_out squareproc_2_res;
} result;
bool_t retval;
xdrproc_t _xdr_argument, _xdr_result;
bool_t (*local)(char *, void *, struct svc_req *);
switch (rqstp->rq_proc) {
case NULLPROC:
(void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL);
return;
case SQUAREPROC:
_xdr_argument = (xdrproc_t) xdr_square_in;
_xdr_result = (xdrproc_t) xdr_square_out;
local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc;
break;
default:
svcerr_noproc (transp);
return;
}
memset ((char *)&argument, 0, sizeof (argument));
if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
svcerr_decode (transp);
return;
}
retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp);
if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) {
svcerr_systemerr (transp);
}
if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
fprintf (stderr, "%s", "unable to free arguments");
exit (1);
}
if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result))
fprintf (stderr, "%s", "unable to free results");
return;
}
int
main (int argc, char **argv)
{
register SVCXPRT *transp;
pmap_unset (SQUARE_PROG, SQARE_VERS);
transp = svcudp_create(RPC_ANYSOCK);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create udp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_UDP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, udp).");
exit(1);
}
transp = svctcp_create(RPC_ANYSOCK, 0, 0);
if (transp == NULL) {
fprintf (stderr, "%s", "cannot create tcp service.");
exit(1);
}
if (!svc_register(transp, SQUARE_PROG, SQARE_VERS, square_prog_2, IPPROTO_TCP)) {
fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQARE_VERS, tcp).");
exit(1);
}
svc_run ();
fprintf (stderr, "%s", "svc_run returned");
exit (1);
/* NOTREACHED */
}
可以看出,修改的思路就是把原square_prog_2函数功能改由serv_request来实现,让新的square_prog_2通过创建新线程来调用serv_request,serv_request则判断并调用相应的函数(在这个例子中只有SQUAREPROC一个函数)。实现的关键是要把struct svc_req *rqstp和 register SVCXPRT *transp这两个参数交给新线程函数serv_request处理,这样当serv_request返回时,便等价于原来的square_prog_2函数返回。
其它文件跟unp2e2上的例子一样
/* 文件square_server.c
* This is sample code generated by rpcgen.
* These are only templates and you can use them
* as a guideline for developing your own functions.
*/
#include "square.h"
bool_t
squareproc_2_svc(square_in *inp, square_out *outp, struct svc_req *rqstp)
{
bool_t retval;
printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1);
sleep(5);
outp->res1 = inp->arg1 * inp->arg1;
printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1);
retval = TRUE;
return retval;
}
int
square_prog_2_freeresult (SVCXPRT *transp, xdrproc_t xdr_result, caddr_t result)
{
xdr_free (xdr_result, result);
/*
* Insert additional freeing code here, if needed
*/
return 1;
}
/*
* 该文件重写由rpcgen产生的原文件
* square_client.c
*/
#include "square.h"
int
main (int argc,char **argv)
{
CLIENT *cl;
square_in in;
square_out out;
if (argc != 3 ) {
printf ("Usage : client <hostname> <integer_valus>\n");
exit(1);
}
cl = clnt_create(argv[1], SQUARE_PROG, SQUARE_VERS, "tcp");
if (cl == NULL) {
clnt_sperror(cl, "call failed");
exit (1);
}
in.arg1 = atol(argv[2]);
if (squareproc_2(&in, &out, cl) != RPC_SUCCESS){
printf("%s\n", clnt_sperror(cl, argv[1]));
exit(1);
}
printf("result: %ld\n",out.res1);
exit(0);
}
接着编译链接server端和client端程序
$ gcc -o client square_clnt.c square_xdr.c square_client.c
$ gcc -o server square_svc.c square_xdr.c square_server.c -lpthread
在一个终端下运行./server记得打开portmap
另一个终端下运行:
$ ./client localhost 1 & ./client localhost 2 & ./client localhost 3
便能在server端看到:
Thread id = '1082132816' started, arg = 3
Thread id = '1090525520' started, arg = 2
Thread id = '1098918224' started, arg = 1
Thread id = '1082132816' is done 9
Thread id = '1090525520' is done 4
Thread id = '1098918224' is done 1
Hadoop里的RPC机制过程
已经有文章描述RPC的具体结构,http://caibinbupt.javaeye.com/blog/280790,这篇文章很清晰的描述了Client和Server的结构,但是较为高屋建瓴,我在看的时候依然觉得无法清晰理解其调用过程,所以将过程整理出来,知道how,才容易理解why,即知道是怎么干的,才容易理解为什么会那么去设计。
客户端C要发起向服务端S的关于方法M调用
1. C首先创建一个通向S的连接getConnection,然后将此次调用放入CallList里,这样客户端就可以同时发生很多调用,每个调用用ID来识别。
2. 发送调用参数。调用参数是Client的调用方(比如NameNode,DataNode等)指定的,一般就是一个Invocation对象,里面包含要调用的方法和参数。了解JAVA动态代理类java.lang.reflect.Proxy会对这里的理解有很大帮助。
3. 等待调用结果.Client.Connection是个线程类,启动了之后唯一做的时候就是等待调用结果
对于服务器端,其有一个方法start指定了启动服务器开始监听,这个start被四个类调用,分别是
TaskTracker.initialize,Namenode.initialize,Jobtracker.offerService,
Datanode.startDatanode
显然,任何两者之间的通信都是考这个client-server模型实现的。
server start后,干了三件事
1. 启动listen,监听客户端Call
2. 启动response,随时准备将处理结果发回client
3. 启动10个handler,处理具体的请求。
这里必须对java NIO机制了解,才能看的明白。
当客户端调用来到的时候
1. listen首先将调用doaccept将Connection附加给selectionkey,然后调用doread添加,doread会调用Connecton的方法将调用添加到调用列表,该列表是BlockingQueue,其保持列表先进先出的特性而且支持同步
2. listen将call添加到calllist后,handler因为一直在检测calllist,于是其立刻开始处理,处理完毕后,其将结果保存在call对象中,然后调用response开始向客户端写。这里hadler调用的call只是一个未实现的方法,具体实现在RPC.Server中,这点需要注意。
3. Response也监视responselist,如果responselist中某个call需要将结果写入客户端,就写出,当某个call的结果被发送完毕,从responselist中删除该call对象。
这里有个细节:handler完成call之后就开始向客户端写call结果,但是结果可能太多,无法通过一次性发送完毕,而发送之后还要等待client接受完毕才能再发,如果现在handler在那里等待客户端接受完毕,然后再发,效率不高。解决办法是handler处理完毕之后,只向client发送一次处理结果。如果这一次将处理结果发送完毕,接下来就没有response的事情了,如果没有发送完毕,接下来response负责将剩下的处理结果发送给客户端。这样handler的并发量会大一些。
服务器实现中大量利用监视队列,比如handler就直观坚持calllist,一旦发现数据就开始处理,而response就监视responselist,发现数据需要发送就开始发送。
写完了之后,觉得写的也不清楚,可能要清晰明白只能自己看代码吧。
还发现在没用过java的情况下看hadoop可以更快的学习java
作者:jackxiang@向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除
地址:https://jackxiang.com/post/3881/
版权所有。转载时必须以链接形式注明作者和原始出处及本声明!
最后编辑: jackxiang 编辑于2010-12-19 21:31
评论列表