5…5. 开发 Boost.Asio 扩展
要向 Boost.Asio 中增加新的异步操作,需要实现以下三个类:
一个派生自 boost::asio::basic_io_object 的类,以表示新的 I/O 对象。使用这个新的 Boost.Asio 扩展的开发者将只会看到这个 I/O 对象。
一个派生自 boost::asio::io_service::service 的类,表示一个服务,它被注册为 I/O 服务,可以从 I/O 对象访问它。 服务与 I/O 对象之间的区别是很重要的,因为在任意给定的时间点,每个 I/O 服务只能有一个服务实例,而一个服务可以被多个 I/O 对象访问。
一个不派生自任何其它类的类,表示该服务的具体实现。 由于在任意给定的时间点每个 I/O 服务只能有一个服务实例,所以服务会为每个 I/O 对象创建一个其具体实现的实例。 该实例管理与相应 I/O 对象有关的内部数据。
本节中开发的 Boost.Asio 扩展并不仅仅提供一个框架,而是模拟一个可用的 boost::asio::deadline_timer 对象。 它与原来的 boost::asio::deadline_timer 的区别在于,计时器的时长是作为参数传递给 wait() 或 async_wait() 方法的,而不是传给构造函数。
#include <boost/asio.hpp> #include <cstddef> template <typename Service> class basic_timer : public boost::asio::basic_io_object<Service> { public: explicit basic_timer(boost::asio::io_service &io_service) : boost::asio::basic_io_object<Service>(io_service) { } void wait(std::size_t seconds) { return this->service.wait(this->implementation, seconds); } template <typename Handler> void async_wait(std::size_t seconds, Handler handler) { this->service.async_wait(this->implementation, seconds, handler); } };
每个 I/O 对象通常被实现为一个模板类,要求以一个服务来实例化 - 通常就是那个特定为此 I/O 对象开发的服务。 当一个 I/O 对象被实例化时,该服务会通过父类 boost::asio::basic_io_object 自动注册为 I/O 服务,除非它之前已经注册。 这样可确保任何 I/O 对象所使用的服务只会每个 I/O 服务只注册一次。
在 I/O 对象的内部,可以通过 service 引用来访问相应的服务,通常的访问就是将方法调用前转至该服务。 由于服务需要为每一个 I/O 对象保存数据,所以要为每一个使用该服务的 I/O 对象自动创建一个实例。 这还是在父类 boost::asio::basic_io_object 的帮助下实现的。 实际的服务实现被作为一个参数传递给任一方法调用,使得服务可以知道是哪个 I/O 对象启动了这次调用。 服务的具体实现是通过 implementation 属性来访问的。
服务的安装以及服务实现的创建都是由父类 boost::asio::basic_io_object 来完成的,方法调用则只是前转至相应的服务;以 I/O 对象的实际服务实现作为参数即可。
#include <boost/asio.hpp> #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/weak_ptr.hpp> #include <boost/system/error_code.hpp> template <typename TimerImplementation = timer_impl> class basic_timer_service : public boost::asio::io_service::service { public: static boost::asio::io_service::id id; explicit basic_timer_service(boost::asio::io_service &io_service) : boost::asio::io_service::service(io_service), async_work_(new boost::asio::io_service::work(async_io_service_)), async_thread_(boost::bind(&boost::asio::io_service::run, &async_io_service_)) { } ~basic_timer_service() { async_work_.reset(); async_io_service_.stop(); async_thread_.join(); } typedef boost::shared_ptr<TimerImplementation> implementation_type; void construct(implementation_type &impl) { impl.reset(new TimerImplementation()); } void destroy(implementation_type &impl) { impl->destroy(); impl.reset(); } void wait(implementation_type &impl, std::size_t seconds) { boost::system::error_code ec; impl->wait(seconds, ec); boost::asio::detail::throw_error(ec); } template <typename Handler> class wait_operation { public: wait_operation(implementation_type &impl, boost::asio::io_service &io_service, std::size_t seconds, Handler handler) : impl_(impl), io_service_(io_service), work_(io_service), seconds_(seconds), handler_(handler) { } void operator()() const { implementation_type impl = impl_.lock(); if (impl) { boost::system::error_code ec; impl->wait(seconds_, ec); this->io_service_.post(boost::asio::detail::bind_handler(handler_, ec)); } else { this->io_service_.post(boost::asio::detail::bind_handler(handler_, boost::asio::error::operation_aborted)); } } private: boost::weak_ptr<TimerImplementation> impl_; boost::asio::io_service &io_service_; boost::asio::io_service::work work_; std::size_t seconds_; Handler handler_; }; template <typename Handler> void async_wait(implementation_type &impl, std::size_t seconds, Handler handler) { this->async_io_service_.post(wait_operation<Handler>(impl, this->get_io_service(), seconds, handler)); } private: void shutdown_service() { } boost::asio::io_service async_io_service_; boost::scoped_ptr<boost::asio::io_service::work> async_work_; boost::thread async_thread_; }; template <typename TimerImplementation> boost::asio::io_service::id basic_timer_service<TimerImplementation>::id;
为了与 Boost.Asio 集成,一个服务必须符合几个要求:
它必须派生自 boost::asio::io_service::service。 构造函数必须接受一个指向 I/O 服务的引用,该 I/O 服务会被相应地传给 boost::asio::io_service::service 的构造函数。
任何服务都必须包含一个类型为 boost::asio::io_service::id 的静态公有属性 id。在 I/O 服务的内部是用该属性来识别服务的。
必须定义一个名为 shutdown_service() 的方法;不过它可以是私有的。 对于一般的 Boost.Asio 扩展来说,它通常是一个空方法。 只有与 Boost.Asio 集成得非常紧密的服务才会使用它。 但是这个方法必须要有,这样扩展才能编译成功。
为了将方法调用前转至相应的服务,必须为相应的 I/O 对象定义要前转的方法。 这些方法通常具有与 I/O 对象中的方法相似的名字,如上例中的 wait() 和 async_wait()。 同步方法,如 wait(),只是访问该服务的具体实现去调用一个阻塞式的方法,而异步方法,如 async_wait(),则是在一个线程中调用这个阻塞式方法。
在线程的协助下使用异步操作,通常是通过访问一个新的 I/O 服务来完成的。 上述例子中包含了一个名为 async_io_service_ 的属性,其类型为 boost::asio::io_service。 这个 I/O 服务的 run() 方法是在它自己的线程中启动的,而它的线程是在该服务的构造函数内部由类型为 boost::thread 的 async_thread_ 创建的。 第三个属性 async_work_ 的类型为 boost::scoped_ptrboost::asio::io_service::work,用于避免 run() 方法立即返回。 否则,这可能会发生,因为已没有其它的异步操作在创建。 创建一个类型为 boost::asio::io_service::work 的对象并将它绑定至该 I/O 服务,这个动作也是发生在该服务的构造函数中,可以防止 run() 方法立即返回。
一个服务也可以无需访问它自身的 I/O 服务来实现 - 单线程就足够的。 为新增的线程使用一个新的 I/O 服务的原因是,这样更简单: 线程间可以用 I/O 服务来非常容易地相互通信。 在这个例子中,async_wait() 创建了一个类型为 wait_operation 的函数对象,并通过 post() 方法将它传递给内部的 I/O 服务。 然后,在用于执行这个内部 I/O 服务的 run() 方法的线程内,调用该函数对象的重载 operator()()。 post() 提供了一个简单的方法,在另一个线程中执行一个函数对象。
wait_operation 的重载 operator()() 操作符基本上就是执行了和 wait() 方法相同的工作:调用服务实现中的阻塞式 wait() 方法。 但是,有可能这个 I/O 对象以及它的服务实现在这个线程执行 operator()() 操作符期间被销毁。 如果服务实现是在 destruct() 中销毁的,则 operator()() 操作符将不能再访问它。 这种情形是通过使用一个弱指针来防止的,从第一章中我们知道:如果在调用 lock() 时服务实现仍然存在,则弱指针 impl_ 返回它的一个共享指针,否则它将返回0。 在这种情况下,operator()() 不会访问这个服务实现,而是以一个 boost::asio::error::operation_aborted 错误来调用句柄。
6 进程间通信
6.1概述
本章展示了 Boost.Interprocess 库,它包括众多的类,这些类提供了操作系统相关的进程间通讯接口的抽象层。 虽然不同操作系统的进程间通讯概念非常相近,但接口的变化却很大。 Boost.Interprocess 库使通过C++使用这些功能成为可能。
虽然 Boost.Asio 也可以用来在同一台计算机的应用程序间交换数据,但是使用 Boost.Interprocess 库通常性能更好。 Boost.Interprocess 库实际上是使用操作系统的功能优化了同一台计算机的应用程序之间数据交换,所以它应该是任何不需要网络时应用程序间数据交换的首选。
6.1 共享内存
共享内存通常是进程间通讯最快的形式。 它提供一块在应用程序间共享的内存区域。 一个应用能够在另一个应用读取数据时写数据。
这样一块内存区用 Boost.Interprocess 的 boost::interprocess::shared_memory_object 类表示。 为使用这个类,需要包含 boost/interprocess/shared_memory_object.hpp 头文件。
在创建一个 boost::interprocess::shared_memory_object 类型的对象后,相应的共享内存就在操作系统中建立了。 可是此共享内存区域的大小被初始化为0.为了使用这块区域,需要调用 truncate() 函数,以字节为单位传递请求的共享内存的大小。 对于上面的例子,共享内存提供了1,024字节的空间。
请注意,truncate() 函数只能在共享内存以 boost::interprocess::read_write 方式打开时调用。 如果不是以此方式打开,将抛出 boost::interprocess::interprocess_exception 异常。
为了调整共享内存的大小,truncate() 函数可以被重复调用。
在创建共享内存后,get_name() 和 get_size() 函数可以分别用来查询共享内存的名称和大小。
由于共享内存被用于应用程序之间交换数据,所以每个应用程序需要映射共享内存到自己的地址空间上,这是通过 boost::interprocess::mapped_region 类实现的。
你也许有些奇怪,为了访问共享内存,要使用两个类。 是的,boost::interprocess::mapped_region 还能映射不同的对象到具体应用的地址空间。 如 Boost.Interprocess 提供 boost::interprocess::file_mapping 类实际上代表特定文件的共享内存。 所以 boost::interprocess::file_mapping 类型的对象对应一个文件。 向这个对象写入的数据将自动保存关联的物理文件上。 由于 boost::interprocess::file_mapping 不必加载整个文件,但却可以使用 boost::interprocess::mapped_region 将任意部分映射到地址空间,所以就能处理几个GB的文件,而这个文件在32位系统上是不能全部加载到内存上的。
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> int main() { boost::interprocess::shared_memory_object shdmem(boost::interprocess::open_or_create, "Highscore", boost::interprocess::read_write); shdmem.truncate(1024); boost::interprocess::mapped_region region(shdmem, boost::interprocess::read_write); std::cout << std::hex << "0x" << region.get_address() << std::endl; std::cout << std::dec << region.get_size() << std::endl; boost::interprocess::mapped_region region2(shdmem, boost::interprocess::read_only); std::cout << std::hex << "0x" << region2.get_address() << std::endl; std::cout << std::dec << region2.get_size() << std::endl; }
为了使用 boost::interprocess::mapped_region 类,需要包含 boost/interprocess/mapped_region.hpp 头文件。 boost::interprocess::mapped_region 的构造函数的第一个参数必须是 boost::interprocess::shared_memory_object 类型的对象。 第二个参数指示此内存区域对应用程序来说,是只读或是可写的。
上面的例子创建了两个 boost::interprocess::mapped_region 类型的对象。 名为"Highscore"的共享内存,被映射到进程的地址空间两次。 通过 get_address() 和 get_size() 这两个函数获得共享内存的地址和大小写到标准标准输出流中。 在这两种情况下,get_size() 的返回值都是1024,而 get_address() 的返回值是不同的。
下面的例子使用共享内存写入并读取一个数字。
#include <boost/interprocess/shared_memory_object.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> int main() { boost::interprocess::shared_memory_object shdmem(boost::interprocess::open_or_create, "Highscore", boost::interprocess::read_write); shdmem.truncate(1024); boost::interprocess::mapped_region region(shdmem, boost::interprocess::read_write); int *i1 = static_cast<int*>(region.get_address()); *i1 = 99; boost::interprocess::mapped_region region2(shdmem, boost::interprocess::read_only); int *i2 = static_cast<int*>(region2.get_address()); std::cout << *i2 << std::endl; }
通过变量 region, 数值 99 被写到共享内存的开始处。 然后变量 region2 访问共享内存的同一个位置,并将数值写入到标准输出流中。 正如前面例子的 get_address() 函数的返回值所见,虽然变量 region 和 region2 表示的是该进程内不同的内存区域,但由于两个内存区域底层实际访问的是同一块共享内存,所以程序打印出99。
通常,不会在同一个应用程序内使用多个 boost::interprocess::mapped_region 访问同一块共享内存。 实际上在同一个应用程序内将同一个共享内存映射到不同的内存区域上没有多大的意义,上面的例子只用于说明的目的。
为了删除指定的共享内存,boost::interprocess::shared_memory_object 对象提供了静态的 remove() 函数,此函数带有一个要被删除的共享内存名称的参数。
Boost.Interprocess 类的RAII概念支持,明显来自关于智能指针的章节,并使用了另外的一个类名称 boost::interprocess::remove_shared_memory_on_destroy。 它的构造函数需要一个已经存在的共享内存的名称。 如果这个类的对象被销毁了,那么在析构函数中会自动删除共享内存的容器。
#include <boost/interprocess/shared_memory_object.hpp> #include <iostream> int main() { bool removed = boost::interprocess::shared_memory_object::remove("Highscore"); std::cout << removed << std::endl; }
如果 remove() 没有被调用, 那么,即使进程终止,共享内存还会一直存在,而不论共享内存的删除是否依赖底层操作系统。 多数Unix操作系统,包括Linux,一旦系统重新启动,都会自动删除共享内存,在 Windows 或 Mac OS X上,remove() 必须调用,这两种系统实际上将共享内存存储在持久化的文件上,此文件在系统重启后还是存在的。
Windows 提供了一种特别的共享内存,它可以在最后一个使用它的应用程序终止后自动删除。 为了使用它,提供了 boost::interprocess::windows_shared_memory 类,定义在 boost/interprocess/windows_shared_memory.hpp 文件中。
#include <boost/interprocess/windows_shared_memory.hpp> #include <boost/interprocess/mapped_region.hpp> #include <iostream> int main() { boost::interprocess::windows_shared_memory shdmem(boost::interprocess::open_or_create, "Highscore", boost::interprocess::read_write, 1024); boost::interprocess::mapped_region region(shdmem, boost::interprocess::read_write); int *i1 = static_cast<int*>(region.get_address()); *i1 = 99; boost::interprocess::mapped_region region2(shdmem, boost::interprocess::read_only); int *i2 = static_cast<int*>(region2.get_address()); std::cout << *i2 << std::endl; }
请注意,boost::interprocess::windows_shared_memory 类没有提供 truncate() 函数,而是在构造函数的第四个参数传递共享内存的大小。