ACE框架学习2

目录

ACE Service Configurator框架

ACE_Server_Object类

ACE_Server_Repository类

 ACE_Server_Config类

ACE Task框架

ACE_Message_Queue类

 ACE_TASK类

在开始之前,首先介绍一下模板类的实例化和使用。给出以下代码

//ACCEPTOR代表模板的方法
template <class ACCEPTOR>
class Reactor_Logging_Server_Adapter : public ACE_Service_Object {
public:
  // Hook methods inherited from <ACE_Service_Object>.
  virtual int init (int argc, ACE_TCHAR *argv[]);
  virtual int fini ();
  virtual int info (ACE_TCHAR **, size_t) const;
  virtual int suspend ();
  virtual int resume ();

private:
  Reactor_Logging_Server<ACCEPTOR> *server_;
};
//cpp中同样需要声明ACCEPTOR
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::init (int argc,
                                                ACE_TCHAR *argv[]) {
  int i;
  char **array = 0;
  ACE_NEW_RETURN (array, char*[argc], -1);
  std::unique_ptr<char *[]> char_argv (array);

  for (i = 0; i < argc; ++i)
    char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));
  ACE_NEW_NORETURN (server_, Reactor_Logging_Server<ACCEPTOR>
                               (i, char_argv.get (),
                                ACE_Reactor::instance ()));
  for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
  return server_ == 0 ? -1 : 0;
}

 下面代码中使用Logging_Acceptor来实例化Reactor_Logging_Server_Adapter模板,也就是替代了上面的ACCEPTOR对象。

Server_Logging_Daemon_Ex 是通过 typedef 定义的别名,它指向 Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex>。因此,Server_Logging_Daemon_Ex 的方法本质上是 Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex> 的方法。

//通过Logging_Acceptor类来实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor>
        Server_Logging_Daemon;

ACE Service Configurator框架

ACE Service Configurator也就是服务器配置框架,是适配器模式的一种实现。一个适配允许通常因为接口不兼容而不能在一起工作的类工作在一起,做法是将类自己的接口包裹在一个已存在的类中。能够提升应用的可扩展性和灵活性。该模式允许应用在运行时重新配置其服务,而无需修改、重新编译或重新链接程序自身,或是关闭和重启应用。

ACE Service Configurator框架类主要如下:

 其角色如下:

ACE_Server_Object类

ACE_Server_Object主要负责初始化、执行控制、报告以及终止。ACE_Server_Object是一个统一的接口,各个服务必须是称为ACE_Server_Object的公共基类的后代。其主要能力和方法如下:

 继承自ACE_Service_Object的应用服务可以有选择地重新定义其挂钩方法:这些方法会在适当的时间被 ACE Service Configurator 框架回调,以响应特定的事件。例如,服务对象的init()挂钩方法会在 Service Configurator 框架执行激活服务的指令时(dynamic 和 satic 指令都可以邀活服务)被回调。如果初始化成功,Init()挂钩方法必须返回0;如果失败,必须返回-1。如果(且仅在这样的情况下)init()成功的话,在ACEService Confgurator 框架执行 remove 指令移除服务或是关闭所有服务时,相应的 fini()方法会在服务对象上被回调。

ACE Service Configurator 框架属于动态配置,而不是静态地去配置服务器端口号以及ip地址等。ACE使用了c++方法重载和下面的描述的宏使用不同的字符类型,而没有改变API

为了实现这一目标,讲适配器创建在Reactor_Logging_Server_Adapter.h文件中

template <class ACCEPTOR>

//继承于ACE_Service_Object 
class Reactor_Logging_Server_Adapter : public ACE_Service_Object {
public:
  // Hook methods inherited from <ACE_Service_Object>.
  virtual int init (int argc, ACE_TCHAR *argv[]);
  virtual int fini ();
  virtual int info (ACE_TCHAR **, size_t) const;
  virtual int suspend ();
  virtual int resume ();

private:
  //采用上述已有的日志服务器
  //如果不适用该类则直接继承ACE_Service_Object进行实现 
  Reactor_Logging_Server<ACCEPTOR> *server_;
};

使用ACE Service Configurator框架创建一个Reactor_Logging_Server_Adapter实体类,会调用以下init方法:

template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::init (int argc,
                                                ACE_TCHAR *argv[]) {
  //使用ACE_TEXT_ALWAYS_CHAR在需要的地方转换为char 
  //创建临时对象,通过strnew 进行复制 使得整个Reactor_Logging_Server拥有它
  int i;
  char **array = 0;
  ACE_NEW_RETURN (array, char*[argc], -1);
  std::unique_ptr<char *[]> char_argv (array);

  for (i = 0; i < argc; ++i)
    char_argv[i] = ACE::strnew (ACE_TEXT_ALWAYS_CHAR (argv[i]));

  //动态分配Reactor_Logging_Server
  ACE_NEW_NORETURN (server_, Reactor_Logging_Server<ACCEPTOR>
                               (i, char_argv.get (),
                                ACE_Reactor::instance ()));

  //释放argv资源
  for (i = 0; i < argc; ++i) ACE::strdelete (char_argv[i]);
  return server_ == 0 ? -1 : 0;
}

在被指示移除动态配置的日志服务时,ACE Service Configurator 框架调用下面所示的Reactor Logging_Server_Adapter::fini()挂钩方法:

template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::fini ()
{ ((ACCEPTOR*)server_)->handle_close (); server_ = 0; return 0; }
//调用了handle_close 删除Reactor_Logging_Server对象

info方法则用于格式化一个字符串,在其中含有它用于侦听的tcp端口

template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::info
  (ACE_TCHAR **bufferp, size_t length) const {
  //从Reactor_Logging_Server中获取网络地址
  ACE_INET_Addr local_addr;
  server_->acceptor ().get_local_addr (local_addr);

  //格式化消息 解释做的事情
  ACE_TCHAR buf[BUFSIZ];
  ACE_OS::sprintf (buf,
                   ACE_TEXT ("%hu"),
                   local_addr.get_port_number ());
  ACE_OS::strcat
    (buf, ACE_TEXT ("/tcp  # Reactor-based logging server\n"));

  //如果没有提供缓存区来存储数据 则通过strnew进行初始化
  if (*bufferp == 0)
    *bufferp = ACE::strnew (buf);
  else
    ACE_OS::strncpy (*bufferp, buf, length); //将已经格式化的数据复制到其中
  
  //返回消息的长度
  return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (*bufferp));
}

ACE中提供了多种申请内存的方法,需要及时对申请空间的对象进行释放,函数如下所示:

Reactor_Logging_Server的suspend和resume方法使用了反应堆的方法:

//suspend 方法
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::suspend ()
{ return server_->reactor ()->suspend_handler (server_); }

//resume 方法
template <class ACCEPTOR> int
Reactor_Logging_Server_Adapter<ACCEPTOR>::resume ()
{ return server_->reactor ()->resume_handler (server_); }

ACE_Server_Repository类

ACE Service Configurator 框架不仅支持但服务器配置,同样支持多服务器配置,通过使用ACE_Server_Repository类以及ACE_Server_Repository_Iterator类来进行实现。ACE_Server_Repository类实现了管理器模式,将对一个类的所有对象的管理封装到一个单独的管理器类中,而不是单独去操作类对象。通过该种设计模式来控制所配置服务对象的生命周期,以及对他们的访问。提供的功能主要如下:

 接口如下:

ACE_Service_object_Type object()万法返回的是指问相关联的 ACE_Service_Object的指针。 ACE_Module_Type object()方法返川指向相关联的ACE_Module的指针. ACE_Module_Typeobject()方法返网指问相关联的ACE Stream的指针。

ACE_Server_Repository的实现类似于容器的实现,ACE_Server_Repository_Iterator类则实现了迭代器模式,顺序地访问ACE_Service_Repository中的ACE_Service_Type系目,而又无需暴露其内部表示。主要方法如下:

 不可以在迭代器遍历的过程中删除条目,否则会发生错误。

下面将给出一个实例。将ACE_Server_Repository类以及ACE_Server_Repository_Iterator类来实现Service_Reporter类,Service_Reporter类提供了一种元服务,客户可将其获取ACE Service Configurator框架静态或动态配置某个应用的所有服务信息。

客户通过下述方法与Service_Reporter完成交互。

1、客户建立一个到 Service_Reporter 对象的 TCP 连接,

2、Service_Reporter返回服务器的所有服务的列表给客户。

3、Service_Reporter 关闭 TCP/IP 连接。

 首先定义Service_Reporter.h头文件,完成对Service_Reporter的声明,需要继承ACE_Service_Object对象,才能使用ACE Service Configuration框架

class Service_Reporter : public ACE_Service_Object {
public:
  Service_Reporter (ACE_Reactor *r = ACE_Reactor::instance ())
    : ACE_Service_Object (r) {}

protected:
  // Hook methods inherited from <ACE_Service_Object>.
  virtual int init (int argc, ACE_TCHAR *argv[]);
  virtual int fini ();
  virtual int info (ACE_TCHAR **, size_t) const;
  virtual int suspend ();
  virtual int resume ();

  // Reactor hook methods.
  virtual int handle_input (ACE_HANDLE);
  virtual ACE_HANDLE get_handle () const
  { return acceptor_.get_handle (); }

private:
  ACE_SOCK_Acceptor acceptor_; // Acceptor instance.
  enum { DEFAULT_PORT = 9411 };
};

Service_Reporter类同样继承于ACE_Service_Object,可以通过ACE Service Configuration框架进行管理及控制。同时ACE内部提供了ACE_Dynamic_Service类模块访问ACE_Server_Repository登记的服务。服务注册之后,可通过ACE_Dynamic_Service获取服务。

如果 Server_Logging_Daemon服务的某个实例已由 ACE Service Configurator 框架进行了动态链接和初始化,应用可以使用ACE_Dynamic_Service模板来在程序中访问服务。如下所示:        

Service_Reporter方法挂钩实现在Service_Reporter.cpp中,init方法实现如下:

int Service_Reporter::init (int argc, ACE_TCHAR *argv[]) {
  //将local_addr 初始化为Service_Reporter缺省的TCP端口
  ACE_INET_Addr local_addr (Service_Reporter::DEFAULT_PORT);

  //从argv[0]开始解析 解析到-p的指令
  ACE_Get_Opt get_opt (argc, argv, ACE_TEXT ("p:"), 0);
  get_opt.long_option (ACE_TEXT ("port"),
                       'p',
                       ACE_Get_Opt::ARG_REQUIRED);

  for (int c; (c = get_opt ()) != -1; )
    switch (c) {
    case 'p':
      local_addr.set_port_number
        (ACE_OS::atoi (get_opt.opt_arg ()));
    }
  
  //初始化ACE_SOCK_Acceptor,让其在local_addr地址上监听
  //当请求来的时候,反应器分发到下面的Service_Reporter::hand_input()方法中
  acceptor_.open (local_addr);
  return reactor ()->register_handler
           (this,
            ACE_Event_Handler::ACCEPT_MASK);
}

当请求来的时候,反应器分发到下面的Service_Reporter::hand_input()方法中

int Service_Reporter::handle_input (ACE_HANDLE) {
  //接受新的客户连接 循环式服务 同时只处理一个客户
  ACE_SOCK_Stream peer_stream;
  acceptor_.accept (peer_stream);
 
 //初始化ACE_Service_Repository_Iterator 0代表已挂起服务信息
  ACE_Service_Repository_Iterator iterator
    (*ACE_Service_Repository::instance (), 0);
  
  //对于每一个服务 调用info获取信息
  //通过 sendv_n 集中写发送数据
  for (const ACE_Service_Type *st;
       iterator.next (st) != 0;
       iterator.advance ()) {
    iovec iov[3];
    iov[0].iov_base = const_cast<ACE_TCHAR *> (st->name ());

    iov[0].iov_len =
      ACE_Utils::truncate_cast<u_long> (
        ACE_OS::strlen (st->name ()) * sizeof (ACE_TCHAR));

    const ACE_TCHAR *state = st->active () ?
           ACE_TEXT (" (active) ") : ACE_TEXT (" (paused) ");
    iov[1].iov_base = const_cast<ACE_TCHAR *> (state);

    iov[1].iov_len =
      ACE_Utils::truncate_cast<u_long> (
        ACE_OS::strlen (state) * sizeof (ACE_TCHAR));

    ACE_TCHAR *report = 0;   // Ask info() to allocate buffer
    int len = st->type ()->info (&report, 0);
    iov[2].iov_base = static_cast<ACE_TCHAR *> (report);

    iov[2].iov_len = static_cast<u_long> (len);
    iov[2].iov_len *= sizeof (ACE_TCHAR);
    
    //同时发送三个对象
    peer_stream.sendv_n (iov, 3);
    ACE::strdelete (report);
  }
  
  //关闭客户端连接 释放socket
  peer_stream.close ();
  return 0;
}

Service_Reporter::info方法传回一个字符串,用于侦听TCP端口号所作的事情:

int Service_Reporter::info (ACE_TCHAR **bufferp,
                            size_t length) const {
  //设置监听的端口号和地址
  ACE_INET_Addr local_addr;
  acceptor_.get_local_addr (local_addr);
  
  //处理的事情
  ACE_TCHAR buf[BUFSIZ];
  ACE_OS::sprintf
    (buf, ACE_TEXT ("%hu"), local_addr.get_port_number ());
  ACE_OS::strcat
    (buf, ACE_TEXT ("/tcp # lists services in daemon\n"));
  if (*bufferp == 0)
    *bufferp = ACE::strnew (buf); //strnew 需要使用strdelete进行释放
  else
    ACE_OS::strncpy (*bufferp, buf, length);

  return ACE_Utils::truncate_cast<int> (ACE_OS::strlen (*bufferp));
}

Service_Reporter::suspend和Service_Reporter::resume同样调用反应堆的方法。

//调用反应堆suspend_handler方法
int Service_Reporter::suspend ()
{ return reactor ()->suspend_handler (this); }

//调用反应堆resume_handler方法
int Service_Reporter::resume ()
{ return reactor ()->resume_handler (this); }

Service_Reporter::fini方法如下所示:

//调用remove_handler 发送DONT_CALL防止重复调用回调
int Service_Reporter::fini () {
  reactor ()->remove_handler
    (this,
     ACE_Event_Handler::ACCEPT_MASK
     | ACE_Event_Handler::DONT_CALL);
  return acceptor_.close ();
}

最后,我们必须给ACE Service Configurator 框架提供一些关于这个新服务的“薄记”信息。尽管这个服务的代码将被静态地链接进示例程序中,我们想要让框架实例化一个Service_Reporter 对象,以在服务被激活时执行服务。因此,我们将一些必需的ACE 服务宏增加到 Service_Reporter 实现文件中。这些宏创建一个Service_Reporter,并将其登记到ACE_Service_Repository.

// 用于实现服务定义工程 和 gobble函数。
ACE_FACTORY_DEFINE (ACE_Local_Service, Service_Reporter)

// 定义将服务信息传递给ACE_service_Config的ACE_Static_Svc_Descriptor。
// 静态服务信息宏 用于登记信息
ACE_STATIC_SVC_DEFINE (
  Reporter_Descriptor,
  ACE_TEXT ("Service_Reporter"),
  ACE_SVC_OBJ_T,
  &ACE_SVC_NAME (Service_Reporter),
  ACE_Service_Type::DELETE_THIS //删除代表服务的ACE_Service_Type对象
  | ACE_Service_Type::DELETE_OBJ, //使得gobble函数调用,清除Service_Reporter对象
  0 // 此对象最初未处于活动状态
)

//ACE_STATIC_SVC_REQUIRE宏定义一个对象 它向ACE_Service_Repository 登记 Service_Reporter 
//的ACE_Static_Svc_Descriptor对象的实例。在许多平台上,这个宏还确保对象被实例化。
//有的地方也会在main函数中直接定义该宏
ACE_STATIC_SVC_REQUIRE (Reporter_Descriptor)

ACE FACTORY _DEFINE, 宏这样来简化对 ACE Service Configurator 框架的使用:
1、它生成不依赖于编译器的工厂方法这个宏生成的_make_Service_Reporer()工厂函数具有exterm“C”linkage,允许框架在 DLL的符号表中定位该函数,而无需知道C++编译器的“名称搅乱”方案。
2、它一致地管理动态内存,为了确保正确的跨平台行为,保证在某个 DLL,中分配的内存也在同一 DLL, 中释放是十分重要的。因此,gobbler 函数被传递给make_Service_Reporter(),以使ACE Service Configuralor 框架能够确保内存在相同的堆中进行分配和释放。

 ACE_Server_Config类

上述讲得Reactor_Logging_Server使用了静态配置,但会导致下面几种不好的情况:

在开发周期中过早地做出服务配置决策:过早做出决定,最佳配置可能会发生变化。

如果服务的实现与其初始配置紧密地耦合在一起,修改某个服务可能会给其他服务带来不良影响

系统的伸缩性很糟糕:可能会浪费资源,或资源利用不充分

ACE_Service_Config 实现了外观模式来集成 ACE Service Configuralor 框架中的其他类并对管理应用中的各个服务所必需的活动进行协调。外观模式为子系统中的一组接口提供一个一致的界面,此模式定义了一个高层接口,这个接口使得这一子系统更加容易使用。这个类提供了以下能力:

 ACE_Service_Config的接ㄩ如图5.6所示。这个类有着一个丰富的接口,因为它输出了 ACE Service Configurator 框架中的所有特性。因此,我们将对其方法的描述分组进在下面描述的三个范畴中。

1、服务配置生命周期管理方法:下列方法初始化和关闭ACE_Service_Config。

ACE_Service_Config类是单例模式的一个变种,所有的数据成员都是声明为静态的。确保各个实例状态是唯一的。

open()方法足最为常用的初始化 ACE_Service_Config的方式。它解析在argc和argv 中传入的参数,跳过第一个参数(argv[0]),因为那是程序名。下表概述了ACE_Service_Config 所能识别的选项

 服务配置方法:在解析其所有的 argc/argv参数之后,ACE_Service_Confg:open()方法调用下面
这两个方法中的一个或全部来配置应用:

下表总结了可由这两个ACE_Service_Confg方法处理的服务配置指令

 动态和初始化服务:

dynamic svc-name svc-type DLL-name:factory_func()["argc/argv options"]

dynamic 表示动态链接和初始化某个服务对象。svc-name是指派给服务的名称。svc-type指定服务的类型,可以足Service_Obiect*、Module*或是Stream*。DLL-name是factory_func()符号的动态链接库的名称。DLL-name 既可以是全路径名,也可以是没有后缀的文件名。如果它是全路径名,ACE_DLL::open()方法会被用于将指定的文件动态链接进应用进程中。但如果它是文件名,open()使用 ACE::ldfind()来定位 DLL,并将其动态链接进进程的地址空间中。

argc/argv options是可选参数列表,可以用来通过服务对象的init()挂钩方法对其进行初始化ACE Service Configurator 框架使用了 ACE_ARGV 类来将字符串分解进多个参数,并替换包含在字符串中的环境变量的值。

上述提到了ACE_DLL类,这里对其进行详细的介绍。显式链接DLL可能和解除可能会造成不统一编程接口,不安全类型等问题。ACE定义了ACE_DLL wrapper facade类来封装显式的链接,使用ACE::ldfind方法来定位DLL。

DLL文件名扩展:ACE::ldfind()通过增加适当的前缀和后缀来确定 DL, 的名称。例如,它在 Solaris 上增加 lib 前级和,so 后级,在 Windows 增加.dll 后缀。
DIL 搜索路径:ACE::ldfind()还会使用平台的 DLL 搜索路径环境变量来搜索指定的DLL,例如,它在许多 UNIX系统上使用LD_LIBRARY_PATH,在 Windows 上使用 PATH·来查找 DLL。

ACE_DLL关键方法如下所示。

初始化静态服务:

static svc-name ["argc/argv options"]

尽管ACE_Service_Confg常用于动态配置,但也可用于静态配置。static表示静态配置,svc-name是指派给服务的名称,argc/argv options与上述方法一样语法更加简单,因为不需要动态链接到某个对象中。缺省情况下是禁用的,必须显示地启用才行。

完全移除服务:

revome svn-name

remove 指令让 ACE_Service_Confg 解释器询间 ACE_Service_Repository,查找指定的服务。如果找到了该服务,解释器调用它的fini()挂钩方法。如果服务原来是从DLL中动态链接的,它将通过 ACE_DLL::close()法被解除链接。

挂起服务,但不进行移除:

suspend svc-name

suspend 指令让 ACE_Service,Config解释器询问ACE_Service_Repository,查找指定的svc-name服务。如果找到该服务,它的suspend()挂钩方法会被调用。服务可以重新定义这个方法,以实现挂起其处理所需的适当动作。

恢复之前挂起服务:

resume svc-name

resume 指令让 ACE_Service_Confg 解释器询间 ACE_Service_Repository,查找指定的 svc-name服务。如果找到该服务,它的resume()挂钩方法会被调用。服务可以重新定义这个方法,以实现恢复其处理所需的适当动作--通常是反转suspend()方法的效果。

初始化一列有序的层次相关的模块:

stream svc-name '['model-list']'

stream指令让 ACE_Service_Config 解释器初始化一列层次有序相关的模块。每个模块由一对服务组成,这些模块互相连接,并通过传递ACE_Message_Block对象进行通信。stream 指令的实现使用了ACE Streanms 框架。

示例如下所示:

//静态启动Service_Reporter 服务
static Service_Reporter "-p $SERVICE_REPORTER_PORT"

//动态启动AIO_Client_Logging_Daemon 服务 的_make_AIO_Client_Logging_Daemon函数
dynamic AIO_Client_Logging_Daemon Service_Object *
AIO_CLD:_make_AIO_Client_Logging_Daemon()
  "-p 7777"

//动态启动Server_Logging_Daemon 的_make_TPC_Logging_Server函数
dynamic Server_Logging_Daemon Service_Object *
TPCLS:_make_TPC_Logging_Server()

3、实用方法

 ACE_Service_Config能够解析的完整BNF语法,主要包括常用方法指令 以及名字状态等信息,如下所示:

 reconfigure()方法可被用于迫使 ACE_Service_Config 解释器重新处理服务配置文件。如果服务的名称已知的话,通过 suspend()和resume()万法可以进行服务的挂起和恢复。这是使用在 ACE_Service_Repository 单体中所定义的方法的捷径。

下面将通过一个示例来应用ACE Service Configurator 框架创建一个服务器。步骤如下:

1、静态配置Service_Reporter的一个实例

2、将  Reactor_Logging_Server_Adapler 模板动态地链接和配置进服务器的地址空间中。

3、初始化服务配置

我们从编写下衘这个位于Configurable_Logging_Server.cpp中的-般化的main()程序开始。这个程序将 Service_Reporter和 Reactor_Logging_Server_Adapter 服务配置进应用进程中,随即运行反应器的事件循环。

#include "ace/OS_main.h"
#include "ace/Service_Config.h"
#include "ace/Reactor.h"

int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) {
  //向ACE Service Configurator 框架登记静态Reporter服务
  ACE_STATIC_SVC_REGISTER (Reporter_Descriptor);
 
 //调用ACE_Service_Config::open()配置应用 加载相关决策
 //因为明确了是静态加载 则加了第四个参数0 
 //也可以直接在指令行上提供-y来控制是否禁用静态
  ACE_Service_Config::open
    (argc, argv, ACE_DEFAULT_LOGGER_KEY, 0);

  ACE_Reactor::instance ()->run_reactor_event_loop ();
  return 0;
}

在ACE_Service_Config:open调用时会解析下面这个svc.conf文件。

//将Service_Reporter 静态链接到可执行程序中
//让其调用Service_Reporter 中的init()方法进行初始化
static Service_Reporter "-p $SERVICE_REPORTER_PORT"

//将SLD DLL动态连接到地址空间
//提取_make_Server_Logging_Daemon工厂函数
//调用工厂模式分配一个Server_Logging_Daemon 对象
//调用Server_Logging_Daemon init() 将SERVICE_LOGGING_DAEMON_PORT作为argc/argv参数传递
//如果init成功。Server_Logging_Daemon指针存储在ACE_Service_Respository中
//在"Server_Logging_Daemon"名下
dynamic Server_Logging_Daemon Service_Object *
SLD:_make_Server_Logging_Daemon()
"$SERVICE_LOGGING_DAEMON_PORT"

上述svc.conf的xml表示则更为繁琐,如下所示:

 SLD DLL是从下面的SLD.cpp中生成的

#include "Reactor_Logging_Server_Adapter.h"
#include "Logging_Acceptor.h"
#include "SLD_export.h"

//定义Logging_Acceptor来实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor>
        Server_Logging_Daemon;

//ACE_FACTORY_DEFINE宏在含有_make_Server_Logging_Daemon()工厂函数中
//如果需要引用工厂函数 则通过ACE_FACTORY_DECLARE宏来进行引入操作
ACE_FACTORY_DEFINE (SLD, Server_Logging_Daemon)

ACE的输入/输出助手宏,能够确保在所有支持的平台上,如下所示。

 日志服务器的执行步骤如图所示:

 图5.8中的 UML序列图阐释了基于上面所示的 svc.conf 文件配置服务器口志 daemon 所涉及的各个步骤。在程序启动时,ACESTATIC_SVC_REQUIRE宏所生成的对把使用ACE_STATIC_SVC_DEFINE宏创建的Service_Reponer信息记进ACE_Service_Config。当ACE_Service_Config::open()方法被调用时,它使用指定的工厂函数来实例化一个 Service_Reponer 对象,但并不将其激活。open()方法随即调用process_directives(),解释svc.conf文件中的指令。第一条指令激活静态的 Service_Reporter 服务。第二条指令触发以下动作:
1、动态链接 SLD DLL:
2、调用_make_Server_Logging_Daemon 工厂函数来创建 Reactor_Logeing_Server_Adapter的个实例
3、调用新服务对象的init()方法来激活服务

当所有的配置活动完成时,main()程序调用 ACE_Reactor::run_reactor_event_loop()。在那里,服务运行,就像先前的示例中被静态配置的对象一样。

重配置服务器

ACE Service Configurator框架可以响应外部的事件(比如信号或命令),在运行时重配置服务器。这时,框架重新读取它的svc.conf文件,并执行指定的指令,比如将服务对象插入服务器或从中移除以及挂起或恢复已有的服务对象。我们现在闸释怎样使用这些特性来动态地重配置我们的服务器日志 daemon。通过定义一个新的svc.conf文件,并发送信息(SIGHUP或SIGINT)指令重新配置,可以增加以下能力,而不影已有的代码或进程中的Service_Reporter服务。

//移除日志daemon 从应用的地址空间解除其链接
remove Server_Logging_Daemon

//动态将Reactor_Logging_Server_Adapter模块的一个不同的实例重新配置到日志daemon中
//SERVER_LOGGING_DAEMON_PORT位于SLDex.cpp文件中 由ACE_FACTORY_DEFINE 生成
dynamic Server_Logging_Daemon Service_Object *
SLDex:_make_Server_Logging_Daemon_Ex()
  "$SERVER_LOGGING_DAEMON_PORT"

//动态配置一个Server_Shutdown服务对象,该对象controller()函数和Quit_handler类
//来等待管理员通过其标准输入发送命令关闭服务器
dynamic Server_Shutdown Service_Object *
SLDex:_make_Server_Shutdown()

上述中的 SERVER_LOGGING_DAEMON_PORT位于SLDex.cpp文件中由ACE_FACTORY_DEFINE 生成,具体代码如下:

//通过Logging_Acceptor_Ex实例化Reactor_Logging_Server_Adapter模板
typedef Reactor_Logging_Server_Adapter<Logging_Acceptor_Ex>
        Server_Logging_Daemon_Ex;

ACE_FACTORY_DEFINE (SLDEX, Server_Logging_Daemon_Ex)

上述dynamic Server_Shutdown xxx动态配置一个Server_Shutdown服务对象,该对象controller()函数和Quit_handler类,下面所示的Server_Shutdown继承自ACE_Server_Object对象。

class Server_Shutdown : public ACE_Service_Object {
public:
  //初始化数据
  //将THR_DETACHED传给spawn 控制器的线程标识符和其他资源会在线程终止后由OS自动回收
  virtual int init (int, ACE_TCHAR *[]) {
    reactor_ = ACE_Reactor::instance ();
    return ACE_Thread_Manager::instance ()->spawn
      (controller, reactor_, THR_DETACHED);
  }
  
  //通知反应器关闭
  virtual int fini () {
    Quit_Handler *quit_handler = 0;
    ACE_NEW_RETURN (quit_handler,
                    Quit_Handler (reactor_), -1);
    return reactor_->notify (quit_handler);
  }

private:
  ACE_Reactor *reactor_;
};

使用ACE_FACTORY_DEFINE来生成ACE Service Configurator框架所需的_make_Server_Shutdown()函数

ACE_FACTORY_DEFINE (SLDEX, Server_Shutdown)

 基于上面所示的svc.conf文件来配置服务器日志daemon所涉及的各个步骤:

 ACE Servicc Configurator框架中的动态重配置机制使得开发者能够修改服务器功能,或足调谐性能,而无需重新进行大量的开发和安装。例如,调试一个有问题的日志服务实现只是涉及到动态地承配置一个功能等价的服务,在此服务中包含有额外的装置来帮助标定错误的行为。不用修改、重编译、重链接或是重启正在执行的服务器日志 daemon,就可以完成这样的重配置过程。特别地,这样的重配置小会影响先前静态配置的Service_Reporer

ACE Task框架

ACE Task框架提供了强大而可扩展的面向对象并发能力,比如在对象的上下文中派生线程,以及在执行在不同线程中的对象之间传送消息和对消息进行排队。该框架可被应用于实现一些关键的并发模式,比如:
Active Object模式:它解除了“调用某个方法的线程”与“执行该方法的线程”的耦合。该模式增强了并发性,并简化了对“执行在一个或多个线程的上下文中的对象”的同步化访问。它通过对方法的调用(Method Invocation)与方法的执行(Method Execution)进行解耦来提高并发性。c++实现Active Object模式代码如下:

#include <pthread.h>     
#include <iostream>    
#include <queue>    
  
// 定义一个ActiveObject类  
class ActiveObject {    
public:    
    ActiveObject() {    
        pthread_mutex_init(&mutex_, NULL);    // 初始化互斥锁  
        pthread_cond_init(&cond_, NULL);      // 初始化条件变量  
        pthread_create(&worker_thread_, NULL, WorkerThread, this);// 创建一个工作线程,运行WorkerThread函数,传入this指针作为参数     
    }    
  
    // 析构函数  
    ~ActiveObject() {    
        {    
            pthread_mutex_lock(&mutex_);   // 加锁    
            stop_worker_ = true;    // 设置停止工作线程的标志  
            pthread_cond_signal(&cond_);    // 发送条件信号,通知等待在条件变量上的线程  
            pthread_mutex_unlock(&mutex_);    // 解锁  
        }    
        pthread_join(worker_thread_, NULL); // 等待工作线程结束      
        pthread_mutex_destroy(&mutex_);    // 销毁互斥锁    
        pthread_cond_destroy(&cond_);   // 销毁条件变量   
    }    
  
    // 添加一个任务到队列  
    void Enqueue(void (*task)(void*), void* arg) {    
        pthread_mutex_lock(&mutex_);    // 加锁  
        tasks_.push(std::make_pair(task, arg));  // 将任务及其参数添加到队列中    
        pthread_cond_signal(&cond_);   // 发送条件信号 
        pthread_mutex_unlock(&mutex_);    // 解锁   
    }    
  
private:    
    // 工作线程函数,静态成员函数,可以通过传入的参数访问ActiveObject实例  
    static void* WorkerThread(void* arg) {    
        ActiveObject* self = static_cast<ActiveObject*>(arg);    
        self->ProcessQueue();  // 处理任务队列  
        return NULL;    
    }    
  
    // 处理任务队列的函数  
    void ProcessQueue() {    
        while (true) {    
            void (*task)(void*) = NULL;    
            void* arg = NULL;    
  
            pthread_mutex_lock(&mutex_);    // 加锁 
            // 当队列为空且没有收到停止信号时,等待在条件变量上  
            while (tasks_.empty() && !stop_worker_) {    
                pthread_cond_wait(&cond_, &mutex_);    
            }    
            // 如果收到停止信号且队列为空,则退出循环  
            if (stop_worker_ && tasks_.empty()) {    
                pthread_mutex_unlock(&mutex_);    
                break;    
            }    
            // 取出队列中的任务及其参数  
            task = tasks_.front().first;    
            arg = tasks_.front().second;    
            tasks_.pop();    
            // 解锁  
            pthread_mutex_unlock(&mutex_);    
  
            // 执行任务  
            task(arg);    
        }    
    }    
  
    // 任务队列,存储任务和对应的参数  
    std::queue<std::pair<void (*)(void*), void*> > tasks_;    
    pthread_mutex_t mutex_;  // 互斥锁,用于保护任务队列和停止标志的访问   
    pthread_cond_t cond_;   // 条件变量,用于在任务队列为空时等待新的任务   
    bool stop_worker_ = false;  // 停止工作线程的标志   
    pthread_t worker_thread_;  // 工作线程的线程ID    
};    
  
// 使用示例中的任务函数,打印一个整数值  
void PrintTask(void* arg) {    
    int value = *static_cast<int*>(arg);    
    std::cout << "Executing task with value " << value << std::endl;    
}    
  
// 主函数,创建一个ActiveObject实例,并添加一个任务到队列中  
int main() {    
    ActiveObject active_object;    
    int value = 42;    
    active_object.Enqueue(PrintTask, &value);    
  
    // 等待1秒,仅用于示例,确保任务有足够的时间执行  
    sleep(1);    
  
    return 0;    
}

Half-Sync/Half-Async 模式:它解除了并发系统中的异步与同步处理的耦合,从而能够简化编程而又不会带来严重的性能下降。该模式引入了了个层次: “层用于异步(或反应式)处理,层用于同步服务处理,还有一个排队层,负责协调异步/反应式和同步层之间的通信。c++多线程实现Half-Sync/Half-Async 模式代码如下:

#include <pthread.h>    
#include <iostream>    
#include <queue>    
  
// HalfSyncHalfAsync 类实现了一个半同步半异步的任务队列处理器  
class HalfSyncHalfAsync {    
public:    
    // 构造函数,初始化互斥锁、条件变量,并启动工作线程  
    HalfSyncHalfAsync() {    
        pthread_mutex_init(&mutex_, NULL);  // 初始化互斥锁  
        pthread_cond_init(&cond_, NULL);    // 初始化条件变量  
        pthread_create(&worker_thread_, NULL, WorkerThread, this);  // 创建并启动工作线程  
    }    
  
    // 析构函数,停止工作线程,并清理资源  
    ~HalfSyncHalfAsync() {    
        {    
            pthread_mutex_lock(&mutex_);    // 加锁  
            stop_worker_ = true;             // 设置停止工作线程的标志  
            pthread_cond_signal(&cond_);     // 发送条件信号,唤醒等待的工作线程  
            pthread_mutex_unlock(&mutex_);   // 解锁  
        }    
        pthread_join(worker_thread_, NULL);  // 等待工作线程结束  
        pthread_mutex_destroy(&mutex_);      // 销毁互斥锁  
        pthread_cond_destroy(&cond_);        // 销毁条件变量  
    }    
  
    // 向任务队列中添加一个异步任务  
    void EnqueueTask(void (*task)(void*), void* arg) {    
        pthread_mutex_lock(&mutex_);         // 加锁  
        tasks_.push(std::make_pair(task, arg));  // 将任务添加到队列中  
        pthread_cond_signal(&cond_);           // 发送条件信号,唤醒等待的工作线程  
        pthread_mutex_unlock(&mutex_);         // 解锁  
    }    
  
    // 同步执行一个任务,即在当前线程立即执行 
    // 另外一种模式则全部采用异步的方式进行执行任务 
    void SyncExecute(void (*task)(void*), void* arg) {    
        task(arg);  // 直接执行任务  
    }    
  
private:    
    // 工作线程的入口函数  
    static void* WorkerThread(void* arg) {    
        HalfSyncHalfAsync* self = static_cast<HalfSyncHalfAsync*>(arg);  // 转换参数为HalfSyncHalfAsync对象指针  
        self->ProcessQueue();  // 处理任务队列  
        return NULL;    
    }    
  
    // 处理任务队列中的任务  
    void ProcessQueue() {    
        while (true) {    
            void (*task)(void*) = NULL;  // 任务函数指针  
            void* arg = NULL;            // 任务参数  
  
            pthread_mutex_lock(&mutex_);  // 加锁  
            // 等待任务队列非空或收到停止信号  
            while (tasks_.empty() && !stop_worker_) {    
                pthread_cond_wait(&cond_, &mutex_);  // 等待条件信号  
            }    
            // 如果收到停止信号且任务队列为空,则退出循环  
            if (stop_worker_ && tasks_.empty()) {    
                pthread_mutex_unlock(&mutex_);  // 解锁  
                break;    
            }    
            // 取出任务队列中的任务  
            task = tasks_.front().first;    
            arg = tasks_.front().second;    
            tasks_.pop();  // 弹出任务队列中的任务  
            pthread_mutex_unlock(&mutex_);  // 解锁  
  
            task(arg);  // 执行任务  
        }    
    }    
  
    std::queue<std::pair<void (*)(void*), void*> > tasks_;  // 任务队列,存储任务函数指针和参数  
    pthread_mutex_t mutex_;  // 互斥锁,用于保护任务队列的访问  
    pthread_cond_t cond_;    // 条件变量,用于唤醒等待的工作线程  
    bool stop_worker_ = false;  // 停止工作线程的标志  
    pthread_t worker_thread_;   // 工作线程的线程ID  
};    
  
// 示例任务函数,打印参数值  
void AsyncTask(void* arg) {    
    int value = *static_cast<int*>(arg);    
    std::cout << "Asynchronous task executed with value " << value << std::endl;    
}    
  
int main() {    
    HalfSyncHalfAsync hsha;  // 创建HalfSyncHalfAsync对象  
  
    // 添加异步任务到任务队列中  
    int async_value = 10;    
    hsha.EnqueueTask(AsyncTask, &async_value);    
  
    // 同步执行任务  
    int sync_value = 20;    
    hsha.SyncExecute(AsyncTask, &sync_value);    
  
    // 等待异步任务完成,仅用于示例(实际使用中应该有更好的同步机制)  
    sleep(1);    
  
    return 0;    
}

Active Object模式和Half-Sync/Half-Async 模式的主要区别如下:

Half-Sync/Half-Async(半同步/半异步)模式是一种处理并发I/O的模式,它结合了同步I/O和异步I/O的优点。在这种模式中,通常有一个单独的线程或核心来处理同步I/O操作(例如,读写文件或网络通信),而其他部分的程序则继续异步执行。当同步I/O操作完成时,通常会通过某种机制(如回调函数、事件、信号量等)通知异步部分

Active Object模式则是一种将方法的执行进行异步处理的设计模式。在Active Object模式中,一个对象将其方法的执行委托给一个独立的线程,从而使这些方法可以异步执行。这种模式有助于将方法的执行与方法的调用解耦,提高了系统的并发性和响应性。

主要区别

  1. 关注点不同:Half-Sync/Half-Async主要关注I/O操作的处理方式,而Active Object模式主要关注对象方法的异步执行。
  2. 实现方式不同:Half-Sync/Half-Async通常涉及一个单独的线程处理同步I/O,而其他部分异步执行。Active Object模式则通常涉及一个或多个工作线程来处理对象的方法调用。
  3. 适用场景不同:Half-Sync/Half-Async适用于需要处理大量I/O操作且希望保持程序其他部分响应性的场景。Active Object模式则适用于需要将对象方法的执行进行异步处理的场景,以提高系统的并发性和响应性。

ACE Task框架类如下:

ACE TASK框架的好处如下:

ACE_Message_Queue类

ACE定义了ACE_Message_Queue 类。这是一种可移植且又高效的进程内消总排队机制,其中利用了ACE_Message_Block 的各种高级能力。ACE_Message_Queue 是一种可移植的轻量级进程内消息排队机制,它提供了以下能力:

 ACE_Message_Queue 类的主要接口如图所示:

主要分为以下4个范畴:

1、初始化和流控制方法:下面的方法可被用于在ACE_Message_Queue 中进行初始化和管理流控制,通过水位标来防止快速的发送者发送的数据超出较慢的接受者的缓冲和计算资源的限度。默认的高水位标志是16K,当高于高水位时,会进行阻塞处理,直到总字节数降到低水位。

  2.入队/出队方法和消息缓冲:下面的方法执行ACE_Message_Queue 中的大量工作:

ACE_Message_Queue中的消息分为简单消息和复合消息,简单消息包括单个的ACE_Message_Block,而复合消息则包括多个ACE_Message_Block对象,使用复合模式链接在一起。队列中的消息通过一对指针双向地链接在一起,使用next和prev获取这对指针,按照先进先出规则。复合消息则通过一个continuation指针而单向得串联在一起,通过cont方法获取指针。

3、参数的同步策略:ACE_Message_Queue模块是通过ACE_SYNCH_DECL类及TIME_POLICY 策略类来参数化的

template <ACE_SYNCH_DECL, class TIME_POLICY = ACE_System_Time_Policy>
class ACE_Message_Queue : public ACE_Message_Queue_Base
{
  public:
    //

  protected:
    // = 用于控制并发访问的同步.
    // 保护队列免受并发访问
  ACE_SYNCH_MUTEX_T lock_;

  /// 用于使线程休眠,直到队列不再为空.
  ACE_SYNCH_CONDITION_T not_empty_cond_;

  /// 用于使线程休眠,直到队列不再满为止.
  ACE_SYNCH_CONDITION_T not_full_cond_;

  /// 返回当前时间的策略
  TIME_POLICY time_policy_;
}

ACE提供了两种ACE_SYNCH_DECL traits类,预先包装了最为常用的同步traits。

ACE_NULL_SYNCH 这个类中的 traits 是根据“null”加锁机制来实现的,如下所示:

class ACE_Export ACE_NULL_SYNCH
{
public:
  typedef ACE_Null_Mutex MUTEX;
  typedef ACE_Null_Mutex NULL_MUTEX;
  typedef ACE_Null_Mutex PROCESS_MUTEX;
  typedef ACE_Null_Mutex RECURSIVE_MUTEX;
  typedef ACE_Null_Mutex RW_MUTEX;
  typedef ACE_Null_Condition CONDITION;
  typedef ACE_Null_Condition RECURSIVE_CONDITION;
  typedef ACE_Null_Semaphore SEMAPHORE;
  typedef ACE_Null_Mutex NULL_SEMAPHORE;
};

ACE_NULL_SYNCH 类是 Null Object 模式的一个例子,这种模式通过定义“空操作”占位符移除类实现中的条件语句而简化了应用。ACE_NULL_SYNCH常常被用于单线程化应用或是这样的应用中:在其中线程间同步或是已经通过仔细地设计而被消除,或是已经通过其他的某种机制而被实现。7.4节中的客户日志 daemon 小例(238 页)阐释了对 ACE NULL SYNCHtraits 类的使用。

ACE_MT_SYNCH 这个预定义类中的taits 是根据实际的加锁机制来实现的,如下所示:

class ACE_Export ACE_MT_SYNCH
{
public:
  typedef ACE_Thread_Mutex MUTEX;
  typedef ACE_Null_Mutex NULL_MUTEX;
  typedef ACE_Process_Mutex PROCESS_MUTEX;
  typedef ACE_Recursive_Thread_Mutex RECURSIVE_MUTEX;
  typedef ACE_RW_Thread_Mutex RW_MUTEX;
  typedef ACE_Condition_Thread_Mutex CONDITION;
  typedef ACE_Condition_Recursive_Thread_Mutex RECURSIVE_CONDITION;
  typedef ACE_Thread_Semaphore SEMAPHORE;
  typedef ACE_Null_Semaphore NULL_SEMAPHORE;
};

通过使用上述trait类能够带来以下好处:

它既允许ACE_Message_Queue 工作在单线程化配置中,也允许它工作在多线程化配置中,并且
无需改变类实现。
它允许程序员通过 Strategized Locking模式,批量地改变 ACE_Message_Queue 的实例化的同步方面
如果使用的是ACE_NULL_SYNCH 类,MUTEX和CONDITION会被解析为ACE_Null_Mutex和ACE_Null_Condition。如果使用的是ACE_MT_SYNCH 类,MUTEX和CONDITION会被解析为ACE_Thread_Mutex 和 ACE_Condition_Thread_Mutex。

如果 ACE_Message_Queue 是通过 ACE_NULL_SYNCH 进行参数化的,对其入队和出队方法的调用在到达队列的边界条件时,不会阻塞调用线程。它们会返回-1,并将errno设置为EWOULDBLOCK.与此相反,如果 ACE_Message_Qucue 是通过 ACE_MT_SYNCH 实例化的,其入队、出队方法支持阻寒、阻寒和定时操作。例如,当同步化队列为时,在缺省情况下对其出队方法的调用将会阻塞,直至有消息入队,队列不再为空为止。同样,当问步化队列为时,在缺省情况下对其入队方法的调用将会阻塞,直至有足够的消息出队、使队列中的字节数降到低水位标以下,队列不再为满为止。通过将下列类型的 ACE_Time_Value 值传给这些方法,可以改变缺省的阻塞行为。

 4、关闭和消息释放:下列方法可用于关闭ACE_Message_Queue,使其停止活动,以及释放其中的消息

在内部,ACE_Message_Queue总是有以下3种状态之一:

 ACTIVATED 在此状态中所有操作都会正常工作(队列启动时总是处在ACTIVATED状态)。DEACTIVATED 在此状态中所有入队和出队操作都立刻返回-1,并将error设为ESHUTDOWN,直至队列被再次激活。
PULSED 迁移到此状态会使等待中的入队和出队操作立刻返回,就好像队列被停用(Deactivaed)一样,但所有在PULSED状态中发起的操作的行为和在ACTIVATED状态中发起的一样。

 在必须通知所有生产者和消费者线程,有重要事件发生的情况下,DEACTIVATED和PULSED状态十分有用。迁移到这两种状态的任何一种都会唤醒所有等待中的生产者和消费者线程。它们之间的区别是迁移之后队列的入队/出队行为。在DEACTIVATED状态中,所有的入队和出队操作都会失败,直至队列的状态变为ACTIVATED。而PULSED状态在行为上与ACETIVATED 状态是等价的,也就是,所有入队/出队操作都会正常进行。PULSED状态主要是用于提供信息--被唤醒的生产者/消费者可以通过检查state()方法的返回值来决定是否尝试进一步的队列操作。

下面将介绍一个实例:

ACE_Meassage_Queue用于实现客户日志。客户日志 daemon 这行在所有参与网络化日志志服务的主机上,并执行以下任务:

它使用本地IPC机制(比如共字内存、管道或是loopback socket)从在客户日志 daemon 的主机
上的客户那里接收日志记录。
它使用远地IPC机制(比如TCP/IP)来将日志记录转发给运行在指定主机上的服务器口志daemon.

在我们的客户日志 daemon中,主线程使用一个事件处理器和ACE Reactor 框架来从通过网络loopback 设备连接到客户应用的socket那里读取日志记录。该事件处理器在同步化ACE_Message_Queue中对每个口志记录进行排队。另个转发者线程并发地运行着,持续地执行以下步骤:
1、 从消息队列中取出消息
2、将消息缓冲进更大的chunk(大块)中
3、在TCP连接上将这些chunk转发给服务器日志 daemon。

定义了下列类对其进行实现:

如图 6.4 所示,这个客户日志 daemon 实现中的各个类是依照 Acceptor-Connector模式来设计的,CLD_Acceptor扮演接受器角色,CLD_Connector 扮演连接器角色,而 CLD_Handler扮演服务处理器角色。在图6.5(170页)中显示了主线程、转发者线、图6.4中的各个类以及将它们结合在一起的同步化 ACE_Message_Queue 之间的关系。 

需要包含下述头文件:

#include "ace/os_include/os_netdb.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/OS_NS_sys_socket.h"
#include "ace/OS_NS_unistd.h"
#include "ace/OS_NS_string.h"
#include "ace/Event_Handler.h"
#include "ace/INET_Addr.h"
#include "ace/Get_Opt.h"
#include "ace/Log_Record.h"
#include "ace/Truncate.h"
#include "ace/Message_Block.h"
#include "ace/Message_Queue.h"
#include "ace/Reactor.h"
#include "ace/Service_Object.h"
#include "ace/Signal.h"
#include "ace/SOCK_Acceptor.h"
#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
#include "ace/Thread_Manager.h"
#include "Logging_Acceptor.h"
#include "CLD_export.h"

上述的各个类在Client_Logging_Daemon.cpp中进行实现:

CLD_Handlerz主要提供了以下功能:

它从客户那里接收日志记录
它将日志记录转换为ACE_Message_Block
它将消息块放入同步化的消息队列
它运行一个单独的线程,由该线程负责从队列中取出消息块,并以大块的形式转发给日志服务器

class CLD_Handler : public ACE_Event_Handler {
public:
  enum { QUEUE_MAX = sizeof (ACE_Log_Record) * ACE_IOV_MAX };

  // FUZZ:禁用check_for_lack_ACE_OS
  // 初始化挂钩方法
  virtual int open (CLD_Connector *);
  virtual int close (); // Shut down hook method.
  //FUZZ: enable check_for_lack_ACE_OS

  // 访问日志服务器连接的访问器
  virtual ACE_SOCK_Stream &peer () { return peer_; }

  // 反应堆挂钩方法
  virtual int handle_input (ACE_HANDLE handle);
  virtual int handle_close (ACE_HANDLE = ACE_INVALID_HANDLE,
                            ACE_Reactor_Mask = 0);

protected:
  // 将日志记录转发到服务器日志守护进程
  virtual ACE_THR_FUNC_RETURN forward ();

  //FUZZ:禁用check_for_lack_ACE_OS
  // 使用收集写入操作发送缓冲的日志记录
  virtual int send (ACE_Message_Block *chunk[], size_t &count);
  //FUZZ: enable check_for_lack_ACE_OS

  // 转发器线程的入口点
  static ACE_THR_FUNC_RETURN run_svc (void *arg);

  // 对消息进行排队的同步<ACE_Message_Queue>
  ACE_Message_Queue<ACE_SYNCH> msg_queue_;

  // 管理转发器线程
  ACE_Thread_Manager thr_mgr_;

  // 指向<CLD_Connector>的指针
  CLD_Connector *connector_;

  // 连接到日志服务器.
  ACE_SOCK_Stream peer_;
};

在 CLD Handler 中不需要构造器或析构器,因为它的open()和 close()挂钩会在被 CLD_Acceptor工厂类调用时执行初始化和析构活动。CLD Handler 完成两种角色:输入和输出,下面对这两种角色进行解释。

输入角色:因为CLD_Handler 继承自ACE_Event_Handler,它可以使用 ACE Reactor 框架来等待记录日志记录从客户应用到达:客户应用是通过loopback TCP socket连接到客户日志daemon的。当口志记录到达客户日志daemon时,单体ACEReactor分派下面的CLD_Handler:handle_input()挂钩方法:

// 输入挂钩方法
int CLD_Handler::handle_input (ACE_HANDLE handle)
{
  //从socket句柄中读出一个日志记录存储在ACE_Message_Block 中
  ACE_Message_Block *mblk = 0;
  Logging_Handler logging_handler (handle);

  if (logging_handler.recv_log_record (mblk) != -1)
上一篇:K8s: 集群内Pod通信机制之DNS


下一篇:社会工程渗透测试教程(三)-第九章:电子邮件攻击矢量