最近TokyoTyrant、TokyoCabinetに触る機会があって、その実装レベルまで込み入った話に接する機会があった。
本日は最近気になっているTokyoTyrant, TokyoCabinetに関するモジュールのソースを追って、C言語で通信をどのように実装しているのかを確認してみたいと思います。
さてちょっといきなり飛ぶが、tokyotyrantのインタフェースから各種のメソッドを呼び出すとき
当然初めにリモートホストとの接続リソースを生成する必要がある。そこから始めよう。
tokyotyrantでは接続リソースを生成する際に下記のtcrdbopenを利用する。
/* Open a remote database. */ bool tcrdbopen(TCRDB *rdb, const char *host, int port){ assert(rdb && host); if(!tcrdblockmethod(rdb)) return false; bool rv; pthread_cleanup_push((void (*)(void *))tcrdbunlockmethod, rdb); rv = tcrdbopenimpl(rdb, host, port); pthread_cleanup_pop(1); return rv; }
リモートリソースをオープンするときtcrdblockmethodを使用。
ロックの取得を行っている。
/* Lock a method of the remote database object. `rdb' specifies the remote database object. If successful, the return value is true, else, it is false. */ static bool tcrdblockmethod(TCRDB *rdb){ assert(rdb); if(pthread_mutex_lock(&rdb->mmtx) != 0){ tcrdbsetecode(rdb, TCEMISC); return false; } return true; }
実態は何をやっているかというと下記メソッドを実行している
pthread_mutex_lock(&rdb->mmtx)
&rdb構造体を確認すると
typedef struct { /* type of structure for a remote database */ pthread_mutex_t mmtx; /* mutex for method */ pthread_key_t eckey; /* key for thread specific error code */ char *host; /* host name */ int port; /* port number */ char *expr; /* simple server expression */ int fd; /* file descriptor */ TTSOCK *sock; /* socket object */ double timeout; /* timeout */ int opts; /* options */ } TCRDB;
各種メソッドのためのmutexロックとある
ところでさらにpthread_mutex_tという型が何なのか。
pthread_mutex_tとはtokyotyrantにおける拡張ではなくc言語で用意されている機構である。
manpageは下記になる。
http://linuxjm.sourceforge.jp/html/glibc-linuxthreads/man3/pthread_mutex_lock.3.html
pthread_mutex_lockはmutexロックを取得しようとする。mutexロックはスレッドの間で共通のロックを一つだけ持ち共有する。
ということはプロセス間ではそのロックは有効ではないので、その場合はまた別のロック機構が必要となる。
またpthread_mutex_lockはロックを取得できれば、直ちにロックを返却し、ロックが取得できなかった場合はロックが取得できるまでスレッドの実行を停止させるなどの挙動を示す。
ここですでにロックを取得していたスレッドがいた場合の挙動は、mutexを初期化した際の状態に依存して異なる。
tcrdbの初期化処理を見てみよう。
/* Create a remote database object. */ TCRDB *tcrdbnew(void){ TCRDB *rdb = tcmalloc(sizeof(*rdb)); if(pthread_mutex_init(&rdb->mmtx, NULL) != 0) tcmyfatal("pthread_mutex_init failed"); if(pthread_key_create(&rdb->eckey, NULL) != 0) tcmyfatal("pthread_key_create failed"); rdb->host = NULL; rdb->port = -1; rdb->expr = NULL; rdb->fd = -1; rdb->sock = NULL; rdb->timeout = UINT_MAX; rdb->opts = 0; tcrdbsetecode(rdb, TTESUCCESS); return rdb; }
するとpthread_mutex_init関数を用いて、第二引数がNULLで初期化されていることがわかる。
pthread_mutex_initのmanpageは下記にある
http://linuxjm.sourceforge.jp/html/glibc-linuxthreads/man3/pthread_mutexattr_init.3.html
マニュアルを拝見すると第二引数はpthread_mutexattr_tという属性を指定することになる。
これがNULLである場合代わりにデフォルトの属性が使用される。
デフォルトのmutex種別は「速い(fast)」である。
さて、先のコードに戻って、このfast種別の場合、ロックを取得しようと場合どうなるかというところに話を戻そう。
mutexがfastの場合、すでにロックがあるmutexを取得しようとした場合呼び出しスレッドを永遠に停止させる。
ちなみにpthread_mutexattr_initの返り値の定義に関しては常に0を返却する。とある。
これはつまりtcrdblockmethodメソッドは、純粋にロックを直列に管理するような機構になっている事がわかる。
0でない場合雑多なエラーとして返却しているが、これはプログラムとして異常な状態になっていることを管理する機構であると推測される。
さて無事にほかスレッドとの競合に打ち勝ってロックが取得できたところで、再びtcrdbopenに戻ってみよう。
/* Open a remote database. */ bool tcrdbopen(TCRDB *rdb, const char *host, int port){ assert(rdb && host); if(!tcrdblockmethod(rdb)) return false; bool rv; pthread_cleanup_push((void (*)(void *))tcrdbunlockmethod, rdb); rv = tcrdbopenimpl(rdb, host, port); pthread_cleanup_pop(1); return rv; }
次にpthread_cleanup_pushという関数を呼び出している。これを見てみよう。
マニュアルによると、スレッドがキャンセルされた場合の、ハンドラメソッドを追加するとある。
C言語では作成したスレッドは明示的に終了することができ、その際にpthread_cleanup_pushで登録したハンドラが実行される。
この場合tcrdbunlockmethodが実行され、その時の引数としてrdbを渡すということを宣言している。
ここでtcrdbunlockmethodを見てみよう
/* Unlock a method of the remote database object. `rdb' specifies the remote database object. */ static void tcrdbunlockmethod(TCRDB *rdb){ assert(rdb); if(pthread_mutex_unlock(&rdb->mmtx) != 0) tcrdbsetecode(rdb, TCEMISC); }
非常にシンプルでmutex_lockを開放していることがわかる。
さて再びtcrdbopen関数に戻って、tcrdbopenimplを参照してみよう。
/* Open a remote database. `rdb' specifies the remote database object. `host' specifies the name or the address of the server. `port' specifies the port number. If successful, the return value is true, else, it is false. */ static bool tcrdbopenimpl(TCRDB *rdb, const char *host, int port){ assert(rdb && host); if(rdb->fd >= 0){ tcrdbsetecode(rdb, TTEINVALID); return false; } int fd; if(port < 1){ fd = ttopensockunix(host); } else { char addr[TTADDRBUFSIZ]; if(!ttgethostaddr(host, addr)){ tcrdbsetecode(rdb, TTENOHOST); return false; } fd = ttopensock(addr, port); } if(fd == -1){ tcrdbsetecode(rdb, TTEREFUSED); return false; } if(rdb->host) tcfree(rdb->host); rdb->host = tcstrdup(host); rdb->port = port; rdb->expr = tcsprintf("%s:%d", host, port); rdb->fd = fd; rdb->sock = ttsocknew(fd); return true; }
細かなエラーチェックは割愛し、本体を見てみると
port番号が1未満、これを想定しているのはおそらく未指定であった場合であるが、そのときはttopensockunix関数を呼び出している。
またportが指定されていた場合はttopensockが呼び出されている。これらについて追っていこう。
初めにttopensockunixから参照してみる。
/* Open a client socket of UNIX domain stream to a server. */ int ttopensockunix(const char *path){ assert(path); struct sockaddr_un saun; memset(&saun, 0, sizeof(saun)); saun.sun_family = AF_UNIX; snprintf(saun.sun_path, SOCKPATHBUFSIZ, "%s", path); int fd = socket(PF_UNIX, SOCK_STREAM, 0); if(fd == -1) return -1; int optint = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *)&optint, sizeof(optint)); struct timeval opttv; opttv.tv_sec = (int)SOCKRCVTIMEO; opttv.tv_usec = (SOCKRCVTIMEO - (int)SOCKRCVTIMEO) * 1000000; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&opttv, sizeof(opttv)); opttv.tv_sec = (int)SOCKSNDTIMEO; opttv.tv_usec = (SOCKSNDTIMEO - (int)SOCKSNDTIMEO) * 1000000; setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&opttv, sizeof(opttv)); double dl = tctime() + SOCKCNCTTIMEO; do { int ocs = PTHREAD_CANCEL_DISABLE; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ocs); int rv = connect(fd, (struct sockaddr *)&saun, sizeof(saun)); int en = errno; pthread_setcancelstate(ocs, NULL); if(rv == 0) return fd; if(en != EINTR && en != EAGAIN && en != EINPROGRESS && en != EALREADY && en != ETIMEDOUT) break; } while(tctime() <= dl); close(fd); return -1; }
sockaddr_unという構造体が出現しているが、これはUNIX LOCALなソケットのアドレスの情報を表す構造体である。
通信のためのfd(ファイルディスクリプタ)を取得するソケット生成の実態はscoket関数で行われている
引数がPF_UNIX, SOCK_STREAM, 0となっているが順に説明する。
PF_UNIXはソケット通信にファイルを用いることを示す。
詳しくは割愛するが、UNIXのローカルのソケット通信にはファイルを用いたものとTCP通信を用いたものがある。
またSOCK_STREAMに関してはソケットのタイプを表しており、これも詳しくは割愛するが
UNIXのソケットタイプにはSOCK_STREAMとSOCK_DGRAMが存在する。
ちなみにドメインソケットのアドレスは下記の構造体でシンプルに示される
#define UNIX_PATH_MAX 108 struct sockaddr_un { sa_family_t sun_family; /* AF_UNIX */ char sun_path[UNIX_PATH_MAX]; /* pathname */ };
また作成したソケットに対してオプション設定している箇所がある。
SOL_SOCKETはオプションの層を表す識別子で、ソケットAPIそうでそうでオプションを有効にすることを表す。
SO_RCVTIMEO, SO_SNDTIMEOはそれぞれ受信、送信のタイムアウト設定をしている。これを超えた場合エラーが送信される。
最終的にconnectシステムコールを要求してファイルディスクリプタが参照しているソケットを引数で渡しているアドレス空間が示す場所とに連結してます。
この一連の処理によってサーバと接続されたファイルディスクリプタが取得でき、それによって通信することができるようになるわけですね。
これ以降の深いレイヤについてはUnixソースを参照していくと良いと思います。
さて対してリモートサーバと通信する場合はどうしているのかを追っていきましょう。
/* Open a client socket of TCP/IP stream to a server. */ int ttopensock(const char *addr, int port){ assert(addr && port >= 0); struct sockaddr_in sain; memset(&sain, 0, sizeof(sain)); sain.sin_family = AF_INET; if(inet_aton(addr, &sain.sin_addr) == 0) return -1; uint16_t snum = port; sain.sin_port = htons(snum); int fd = socket(PF_INET, SOCK_STREAM, 0); if(fd == -1) return -1; int optint = 1; setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *)&optint, sizeof(optint)); struct timeval opttv; opttv.tv_sec = (int)SOCKRCVTIMEO; opttv.tv_usec = (SOCKRCVTIMEO - (int)SOCKRCVTIMEO) * 1000000; setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, (char *)&opttv, sizeof(opttv)); opttv.tv_sec = (int)SOCKSNDTIMEO; opttv.tv_usec = (SOCKSNDTIMEO - (int)SOCKSNDTIMEO) * 1000000; setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *)&opttv, sizeof(opttv)); optint = 1; setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&optint, sizeof(optint)); double dl = tctime() + SOCKCNCTTIMEO; do { int ocs = PTHREAD_CANCEL_DISABLE; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &ocs); int rv = connect(fd, (struct sockaddr *)&sain, sizeof(sain)); int en = errno; pthread_setcancelstate(ocs, NULL); if(rv == 0) return fd; if(en != EINTR && en != EAGAIN && en != EINPROGRESS && en != EALREADY && en != ETIMEDOUT) break; } while(tctime() <= dl); close(fd); return -1; }
ところどころがTCP通信に対応するような設定になっているだけで、基本的な内容はほぼ同じになっています。
sockaddr_inについても下記のような定義になっており、ポート番号が増えているなどTCPレイヤに対応していることが予想できます。
struct sockaddr_in { uint8_t sin_len; sa_family_t sin_family; in_port_t sin_port; struct in_addr sin_addr; char sin_zero[8]; };
こんなかんじにCではsocketを生成しているようですね。
ちなみに生成したfdを元にデータをやりとりする場合sendというシステムコールを利用することになりますが、これはまた後々取り上げたいと思います。
コメントを残す