在這一章節(jié),我們會深入學(xué)習(xí)怎樣使用Boost.Asio建立非凡的客戶端和服務(wù)端應(yīng)用。你可以運(yùn)行并測試它們,而且在理解之后,你可以把它們做為框架來構(gòu)造自己的應(yīng)用。
在接下來的例子中:
客戶端可以發(fā)送如下請求:
為了更有趣一點(diǎn),我們增加了一些難度:
首先,我們會實(shí)現(xiàn)同步應(yīng)用。你會發(fā)現(xiàn)它的代碼很直接而且易讀的。而且因?yàn)樗械木W(wǎng)絡(luò)調(diào)用都是阻塞的,所以它不需要獨(dú)立的線程。
同步客戶端會以你所期望的串行方式運(yùn)行;連接到服務(wù)端,登錄服務(wù)器,然后執(zhí)行連接循環(huán),比如休眠一下,發(fā)起一個請求,讀取服務(wù)端返回,然后再休眠一會,然后一直循環(huán)下去……
因?yàn)槲覀兪峭降?,所以我們讓事情變得簡單一點(diǎn)。首先,連接到服務(wù)器,然后再循環(huán),如下:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
void run_client(const std::string & client_name) {
talk_to_svr client(client_name);
try {
client.connect(ep);
client.loop();
} catch(boost::system::system_error & err) {
std::cout << "client terminated " << std::endl;
}
}
下面的代碼片段展示了talk_to_svr類:
struct talk_to_svr {
talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username) {}
void connect(ip::tcp::endpoint ep) {
sock_.connect(ep);
}
void loop() {
write("login " + username_ + "\n");
read_answer();
while ( started_) {
write_request();
read_answer();
boost::this_thread::sleep(millisec(rand() % 7000));
}
}
std::string username() const { return username_; }
...
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
int already_read_;
char buff_[max_msg];
bool started_;
std::string username_;
};
在這個循環(huán)中,我們僅僅填充1個比特,做一個ping操作之后就進(jìn)入睡眠狀態(tài),之后再讀取服務(wù)端的返回。我們的睡眠是隨機(jī)的(有時候超過5秒),這樣服務(wù)端就有可能在某個時間點(diǎn)斷開我們的連接:
void write_request() {
write("ping\n");
}
void read_answer() {
already_read_ = 0;
read(sock_, buffer(buff_), boost::bind(&talk_to_svr::read_complete, this, _1, _2));
process_msg();
}
void process_msg() {
std::string msg(buff_, already_read_);
if ( msg.find("login ") == 0) on_login();
else if ( msg.find("ping") == 0) on_ping(msg);
else if ( msg.find("clients ") == 0) on_clients(msg);
else std::cerr << "invalid msg " << msg << std::endl;
}
對于讀取結(jié)果,我們使用在之前章節(jié)就有說到的read_complete來保證我們能讀到換行符(’\n’)。這段邏輯在process_msg()中,在這里我們讀取服務(wù)端的返回,然后分發(fā)到正確的方法去處理:
void on_login() { do_ask_clients(); }
void on_ping(const std::string & msg) {
std::istringstream in(msg);
std::string answer;
in >> answer >> answer;
if ( answer == "client_list_changed")
do_ask_clients();
}
void on_clients(const std::string & msg) {
std::string clients = msg.substr(8);
std::cout << username_ << ", new client list:" << clients;
}
void do_ask_clients() {
write("ask_clients\n");
read_answer();
}
void write(const std::string & msg) { sock_.write_some(buffer(msg)); }
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// ... 和之前一樣
}
在讀取服務(wù)端對我們ping操作的返回時,如果得到的消息是client_list_changed,我們就需要重新請求客戶端列表。
同步服務(wù)端也是相當(dāng)簡單的。它只需要兩個線程,一個負(fù)責(zé)接收新的客戶端連接,另外一個負(fù)責(zé)處理已經(jīng)存在的客戶端請求。它不能使用單線程,因?yàn)榈却碌目蛻舳诉B接是一個阻塞操作,所以我們需要另外一個線程來處理已經(jīng)存在的客戶端請求。
正常來說服務(wù)端都比客戶端要難實(shí)現(xiàn)。一方面,它要管理所有已經(jīng)連接的客戶端。因?yàn)槲覀兪峭降?,所以我們需要至少兩個線程,一個負(fù)責(zé)接受新的客戶端連接(因?yàn)閍ccept()是阻塞的)而另一個負(fù)責(zé)回復(fù)已經(jīng)存在的客戶端。
void accept_thread() {
ip::tcp::acceptor acceptor(service,ip::tcp::endpoint(ip::tcp::v4(), 8001));
while ( true) {
client_ptr new_( new talk_to_client);
acceptor.accept(new_->sock());
boost::recursive_mutex::scoped_lock lk(cs);
clients.push_back(new_);
}
}
void handle_clients_thread() {
while ( true) {
boost::this_thread::sleep( millisec(1));
boost::recursive_mutex::scoped_lock lk(cs);
for(array::iterator b = clients.begin(), e = clients.end(); b!= e; ++b)
(*b)->answer_to_client();
// 刪除已經(jīng)超時的客戶端
clients.erase(std::remove_if(clients.begin(), clients.end(), boost::bind(&talk_to_client::timed_out,_1)), clients.end());
}
}
int main(int argc, char* argv[]) {
boost::thread_group threads;
threads.create_thread(accept_thread);
threads.create_thread(handle_clients_thread);
threads.join_all();
}
為了分辨客戶端發(fā)送過來的請求我們需要保存一個客戶端的列表。
每個talk_to_client實(shí)例都擁有一個socket,socket類是不支持拷貝構(gòu)造的,所以如果你想要把它們保存在一個std::vector對象中,你需要一個指向它的智能指針。這里有兩種實(shí)現(xiàn)的方式:在talk_to_client內(nèi)部保存一個指向socket的智能指針然后創(chuàng)建一個talk_to_client實(shí)例的數(shù)組,或者讓talk_to_client實(shí)例用變量的方式保存socket,然后創(chuàng)建一個指向talk_to_client智能指針的數(shù)組。我選擇后者,但是你也可以選前面的方式:
typedef boost::shared_ptr<talk_to_client> client_ptr;
typedef std::vector<client_ptr> array;
array clients;
boost::recursive_mutex cs; // 用線程安全的方式訪問客戶端數(shù)組
talk_to_client的主要代碼如下:
struct talk_to_client : boost::enable_shared_from_this<talk_to_client>
{
talk_to_client() { ... }
std::string username() const { return username_; }
void answer_to_client() {
try {
read_request();
process_request();
} catch ( boost::system::system_error&) { stop(); }
if ( timed_out())
stop();
}
void set_clients_changed() { clients_changed_ = true; }
ip::tcp::socket & sock() { return sock_; }
bool timed_out() const {
ptime now = microsec_clock::local_time();
long long ms = (now - last_ping).total_milliseconds();
return ms > 5000 ;
}
void stop() {
boost::system::error_code err; sock_.close(err);
}
void read_request() {
if ( sock_.available())
already_read_ += sock_.read_some(buffer(buff_ + already_read_, max_msg - already_read_));
}
...
private:
// ... 和同步客戶端中的一樣
bool clients_changed_;
ptime last_ping;
};
上述代碼擁有非常好的自釋能力。其中最重要的方法是read_request()。它只在存在有效數(shù)據(jù)的情況才讀取,這樣的話,服務(wù)端永遠(yuǎn)都不會阻塞:
void process_request() {
bool found_enter = std::find(buff_, buff_ + already_read_, '\n') < buff_ + already_read_;
if ( !found_enter)
return; // 消息不完整
// 處理消息
last_ping = microsec_clock::local_time();
size_t pos = std::find(buff_, buff_ + already_read_, '\n') - buff_;
std::string msg(buff_, pos);
std::copy(buff_ + already_read_, buff_ + max_msg, buff_);
already_read_ -= pos + 1;
if ( msg.find("login ") == 0) on_login(msg);
else if ( msg.find("ping") == 0) on_ping();
else if ( msg.find("ask_clients") == 0) on_clients();
else std::cerr << "invalid msg " << msg << std::endl;
}
void on_login(const std::string & msg) {
std::istringstream in(msg);
in >> username_ >> username_;
write("login ok\n");
update_clients_changed();
}
void on_ping() {
write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false;
}
void on_clients() {
std::string msg;
{ boost::recursive_mutex::scoped_lock lk(cs);
for( array::const_iterator b = clients.begin(), e = clients.end() ; b != e; ++b)
msg += (*b)->username() + " ";
}
write("clients " + msg + "\n");
}
void write(const std::string & msg){sock_.write_some(buffer(msg)); }
觀察process_request()。當(dāng)我們讀取到足夠多有效的數(shù)據(jù)時,我們需要知道我們是否已經(jīng)讀取到整個消息(如果found_enter為真)。這樣做的話,我們可以使我們避免一次讀多個消息的可能(’\n’之后的消息也被保存到緩沖區(qū)中),然后我們解析讀取到的整個消息。剩下的代碼都是很容易讀懂的。
現(xiàn)在,是比較有趣(也比較難)的異步實(shí)現(xiàn)! 當(dāng)查看示意圖時,你需要知道Boost.Asio代表由Boost.Asio執(zhí)行的一個異步調(diào)用。例如do_read(),Boost.Asio和on_read()代表了從do_read()到on_read()的邏輯流程,但是你永遠(yuǎn)不知道什么時候輪到on_read()被調(diào)用,你只是知道你最終會調(diào)用它。
到這里事情會變得有點(diǎn)復(fù)雜,但是仍然是可控的。當(dāng)然你也會擁有一個不會阻塞的應(yīng)用。
下面的代碼你應(yīng)該已經(jīng)很熟悉:
#define MEM_FN(x) boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y) boost::bind(&self_type::x, shared_from_
this(),y)
#define MEM_FN2(x,y,z) boost::bind(&self_type::x, shared_from_
this(),y,z)
class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>, boost::noncopyable {
typedef talk_to_svr self_type;
talk_to_svr(const std::string & username) : sock_(service), started_(true), username_(username), timer_
(service) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, MEM_FN1(on_connect,_1));
}
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_svr> ptr;
static ptr start(ip::tcp::endpoint ep, const std::string & username) {
ptr new_(new talk_to_svr(username));
new_->start(ep);
return new_;
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
...
private:
size_t read_complete(const boost::system::error_code &err, size_t bytes) {
if ( err) return 0;
bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes;
return found ? 0 : 1;
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string username_;
deadline_timer timer_;
};
你會看到額外還有一個叫deadlinetimer timer的方法用來ping服務(wù)端;而且ping操作同樣是隨機(jī)的。
下面是類的邏輯:
void on_connect(const error_code & err) {
if ( !err) do_write("login " + username_ + "\n");
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( err) stop();
if ( !started() ) return;
// 處理消息
std::string msg(read_buffer_, bytes);
if ( msg.find("login ") == 0) on_login();
else if ( msg.find("ping") == 0) on_ping(msg);
else if ( msg.find("clients ") == 0) on_clients(msg);
}
void on_login() {
do_ask_clients();
}
void on_ping(const std::string & msg) {
std::istringstream in(msg);
std::string answer;
in >> answer >> answer;
if ( answer == "client_list_changed") do_ask_clients();
else postpone_ping();
}
void on_clients(const std::string & msg) {
std::string clients = msg.substr(8);
std::cout << username_ << ", new client list:" << clients ;
postpone_ping();
}
在on_read()中,首先的兩行代碼是亮點(diǎn)。在第一行,如果出現(xiàn)錯誤,我們就停止。而第二行,如果我們已經(jīng)停止了(之前就停止了或者剛好停止),我們就返回。反之如果所有都是OK,我們就對收到的消息進(jìn)行處理。
最后是*do_**方法,實(shí)現(xiàn)如下:
void do_ping() { do_write("ping\n"); }
void postpone_ping() {
timer_.expires_from_now(boost::posix_time::millisec(rand() % 7000));
timer_.async_wait( MEM_FN(do_ping));
}
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
注意每一個read操作都會觸發(fā)一個ping操作
這個示意圖是相當(dāng)復(fù)雜的;從Boost.Asio出來你可以看到4個箭頭指向on_accept,on_read,on_write和on_check_ping。這也就意味著你永遠(yuǎn)不知道哪個異步調(diào)用是下一個完成的調(diào)用,但是你可以確定的是它是這4個操作中的一個。
現(xiàn)在,我們是異步的了;我們可以繼續(xù)保持單線程。接受客戶端連接是最簡單的部分,如下所示:
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
void handle_accept(talk_to_client::ptr client, const error_code & err)
{
client->start();
talk_to_client::ptr new_client = talk_to_client::new_();
acceptor.async_accept(new_client->sock(), boost::bind(handle_accept,new_client,_1));
}
int main(int argc, char* argv[]) {
talk_to_client::ptr client = talk_to_client::new_();
acceptor.async_accept(client->sock(),boost::bind(handle_accept,client,_1));
service.run();
}
上述代碼會一直異步地等待一個新的客戶端連接(每個新的客戶端連接會觸發(fā)另外一個異步等待操作)。
我們需要監(jiān)控client list changed事件(一個新客戶端連接或者一個客戶端斷開連接),然后當(dāng)事件發(fā)生時通知所有的客戶端。因此,我們需要保存一個客戶端連接的數(shù)組,否則除非你不需要在某一時刻知道所有連接的客戶端,你才不需要這樣一個數(shù)組。
class talk_to_client;
typedef boost::shared_ptr<talk_to_client>client_ptr;
typedef std::vector<client_ptr> array;
array clients;
connection類的框架如下:
class talk_to_client : public boost::enable_shared_from_this<talk_to_client> , boost::noncopyable {
talk_to_client() { ... }
public:
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<talk_to_client> ptr;
void start() {
started_ = true;
clients.push_back( shared_from_this());
last_ping = boost::posix_time::microsec_clock::local_time();
do_read(); //首先,我們等待客戶端連接
}
static ptr new_() { ptr new_(new talk_to_client); return new_; }
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
ptr self = shared_from_this();
array::iterator it = std::find(clients.begin(), clients.end(), self);
clients.erase(it);
update_clients_changed();
}
bool started() const { return started_; }
ip::tcp::socket & sock() { return sock_;}
std::string username() const { return username_; }
void set_clients_changed() { clients_changed_ = true; }
…
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
std::string username_;
deadline_timer timer_;
boost::posix_time::ptime last_ping;
bool clients_changed_;
};
我會用talk_to_client或者talk_to_server來調(diào)用connection類,從而讓你更明白我所說的內(nèi)容。
現(xiàn)在你需要用到之前的代碼了;它和我們在客戶端應(yīng)用中所用到的是一樣的。我們還有另外一個stop()方法,這個方法用來從客戶端數(shù)組中移除一個客戶端連接。
服務(wù)端持續(xù)不斷地等待異步的read操作:
void on_read(const error_code & err, size_t bytes) {
if ( err) stop();
if ( !started() ) return;
std::string msg(read_buffer_, bytes);
if ( msg.find("login ") == 0) on_login(msg);
else if ( msg.find("ping") == 0) on_ping();
else if ( msg.find("ask_clients") == 0) on_clients();
}
void on_login(const std::string & msg) {
std::istringstream in(msg);
in >> username_ >> username_;
do_write("login ok\n");
update_clients_changed();
}
void on_ping() {
do_write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false;
}
void on_clients() {
std::string msg;
for(array::const_iterator b =clients.begin(),e =clients.end(); b != e; ++b)
msg += (*b)->username() + " ";
do_write("clients " + msg + "\n");
}
這段代碼是簡單易懂的;需要注意的一點(diǎn)是:當(dāng)一個新客戶端登錄,我們調(diào)用update_clients_changed(),這個方法為所有客戶端將clientschanged標(biāo)志為true。
服務(wù)端每收到一個請求就用相應(yīng)的方式進(jìn)行回復(fù),如下所示:
void do_ping() { do_write("ping\n"); }
void do_ask_clients() { do_write("ask_clients\n"); }
void on_write(const error_code & err, size_t bytes) { do_read(); }
void do_read() {
async_read(sock_, buffer(read_buffer_), MEM_FN2(read_complete,_1,_2), MEM_FN2(on_read,_1,_2));
post_check_ping();
}
void do_write(const std::string & msg) {
if ( !started() ) return;
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some( buffer(write_buffer_, msg.size()), MEM_FN2(on_write,_1,_2));
}
size_t read_complete(const boost::system::error_code & err, size_t bytes) {
// ... 就像之前
}
在每個write操作的末尾,on_write()方法被調(diào)用,這個方法會觸發(fā)另外一個異步讀操作,這樣的話“等待請求-回復(fù)請求”這個循環(huán)就會一直執(zhí)行,直到客戶端斷開連接或者超時。
在每次讀操作開始之前,我們異步等待5秒鐘來觀察客戶端是否超時。如果超時,我們關(guān)閉它的連接:
void on_check_ping() {
ptime now = microsec_clock::local_time();
if ( (now - last_ping).total_milliseconds() > 5000)
stop();
last_ping = boost::posix_time::microsec_clock::local_time();
}
void post_check_ping() {
timer_.expires_from_now(boost::posix_time::millisec(5000));
timer_.async_wait( MEM_FN(on_check_ping));
}
這就是整個服務(wù)端的實(shí)現(xiàn)。你可以運(yùn)行并讓它工作起來!
在代碼中,我向你們展示了這一章我們學(xué)到的東西,為了更容易理解,我把代碼稍微精簡了下;比如,大部分的控制臺輸出我都沒有展示,盡管在這本書附贈的代碼中它們是存在的。我建議你自己運(yùn)行這些例子,因?yàn)閺念^到尾讀一次代碼能加強(qiáng)你對本章展示應(yīng)用的理解。
我們已經(jīng)學(xué)到了怎么寫一些基礎(chǔ)的客戶端/服務(wù)端應(yīng)用。我們已經(jīng)避免了一些諸如內(nèi)存泄漏和死鎖的低級錯誤。所有的編碼都是框架式的,這樣你就可以根據(jù)你自己的需求對它們進(jìn)行擴(kuò)展。
在接下來的章節(jié)中,我們會更加深入地了解使用Boost.Asio進(jìn)行同步編程和異步編程的不同點(diǎn),同時你也會學(xué)會如何嵌入你自己的異步操作。
更多建議: