{"id":212,"date":"2014-12-17T17:15:04","date_gmt":"2014-12-17T17:15:04","guid":{"rendered":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=212"},"modified":"2017-12-17T15:46:28","modified_gmt":"2017-12-17T15:46:28","slug":"the-event-dispatcher","status":"publish","type":"post","link":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=212","title":{"rendered":"The event dispatcher"},"content":{"rendered":"<p>Once Orchids has completed all initializations, it calls <code>event_dispatcher_main_loop()<\/code>, which repeatedly collects events from various sources, and launches threads to monitor signatures.\u00a0 The <code>event_dispatcher_main_loop()<\/code> function can be found in file <code>evt_mgr.c<\/code>.\u00a0 Here is how it works.<br \/>\n<!--more--><\/p>\n<p>The <code>event_dispatcher_main_loop()<\/code> function runs an infinite loop.\u00a0 There are two kinds of things that it is waiting for:<\/p>\n<ul>\n<li>First, it is waiting on input coming from a certain set of sockets; events received on an Internet socket will be read from there;<\/li>\n<li>Second, it is also looking for actions that are scheduled to be done at a later time, and are kept in a priority queue (<code>ctx-&gt;rtactionlist<\/code>, where <code>ctx<\/code> is the Orchids context); this is a general purpose mechanism that is used to poll text files, to register timeouts, and various other things.<\/li>\n<\/ul>\n<h1>Sockets<\/h1>\n<p>The code that looks from input waiting on a socket looks like this:<\/p>\n<pre>retval = select(ctx-&gt;maxfd + 1, &amp;rfds, NULL, NULL, wait_time_ptr);<\/pre>\n<p>Once Orchids detects activity on a file descriptor, <code>retval<\/code> will contain a positive number: the number of file descriptors that are <em>ready<\/em> for input.\u00a0 It will then sweep through the list <code>ctx-&gt;realtime_handler_list<\/code> to find the correct callback(s) that must be called to deal with input on those file descriptors that are ready. These callbacks are registered through the <code>add_input_descriptor()<\/code> call, in <code>orchids_api.c<\/code>.<\/p>\n<p>It may also happen that <code>retval<\/code> equals 0.\u00a0 In that case, no input was ready, and the timeout, given by <code>wait_time_ptr<\/code>, has been reached.\u00a0 This timeout is always equal to the minimal delay one should wait before we must trigger an action from the priority queue: see the next section on the <code>rtactionlist<\/code> priority queue.<\/p>\n<p>The file descriptors on which <code>select()<\/code> is waiting are registered through the <code>add_input_source()<\/code> function call, in <code>orchids_cfg.c<\/code>.\u00a0 You shouldn&#8217;t call it directly: it is meant to be called when Orchids reads an <code>INPUT<\/code> directive at configuration time, see below.<\/p>\n<p>Let us take an example to see how all this configuration takes place.\u00a0 In the configuration file <code>orchids-inputs.conf<\/code>, (in the distribution, look into <code>dist\/<\/code>; otherwise, look typically into <code>\/usr\/local\/etc\/orchids\/,<\/code>) you should find a line of the following form:<\/p>\n<pre>INPUT\u00a0\u00a0 \u00a0\u00a0\u00a0 \u00a0\u00a0\u00a0 \u00a0textfile\u00a0\u00a0 \u00a0\/var\/log\/messages<\/pre>\n<p>This instructs Orchids to open the file <code>\/var\/log\/messages<\/code> and feed it as input to the <code>mod_textfile<\/code> module. It also registers <code>\/var\/log\/messages<\/code> as an indexing key, and I will talk about it in another post, dealing with so-called <a title=\"Reading variable-length event sources\" href=\"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=248\">dissectors<\/a>.<\/p>\n<p>When Orchids does all its configuration stuff, it will parse the command <code>INPUT<\/code>. The table <code>config_dir_g[]<\/code>, in <code>orchids_cfg.c<\/code>, lists all the configuration commands that Orchids recognizes. One of the lines there reads:<\/p>\n<pre>  { \"INPUT\", add_input_source, \"Add an input source module\"},<\/pre>\n<p>This tells Orchids to just call <code>add_input_source()<\/code> when it sees a line starting with <code>INPUT<\/code>.<\/p>\n<p>Now <code>add_input_source()<\/code> will read the rest of the line, take the next word (<code>textfile<\/code> here) as the base name of a module (here, <code>mod_textfile<\/code>), and it will look for a configuration directive provided by this module of the exact same name <code>INPUT<\/code>.<\/p>\n<p>The <code>mod_textfile<\/code> declares one such directive, precisely. Indeed, in its table <code>textfile_dir[]<\/code> of recognized directives (which Orchids knows about because the declared module structure <code>input_module_t mod_textfile = {<\/code> contains a pointer to it), we find the following line:<\/p>\n<pre>  { \"INPUT\", add_input_file, \"Add a file as input source\" },<\/pre>\n<p>The function <code>add_input_file()<\/code> is local to the <code>mod_textfile<\/code> module. When instructed to open a <code>SOCK_UNIX<\/code> socket, it connects to it; when told to read from a pipe, it opens it; in both cases, it obtains a file descriptor, and uses it to call <code>add_input_descriptor()<\/code> and register it as one of the possible sources of input that <code>select()<\/code> should wait for.<\/p>\n<p>There is a final case that <code>add_input_file<\/code> deals with: opening a regular file.\u00a0 It makes no sense to <code>select()<\/code> on a regular file.\u00a0 Instead, <code>add_input_file()<\/code> will add the file descriptor to a local list of polled files, with prescribed polling interval.\u00a0 At post-configuration time (namely, after all modules have been loaded), Orchids will call the module&#8217;s <code>textfile_postconfig()<\/code> function (a pointer to that function is installed in the declared module structure <code>input_module_t mod_textfile<\/code>). This in turn calls <code>register_rtcallback()<\/code> to install a callback that will be called after the polling period to read the contents of the file in.<\/p>\n<h1>The rtactionlist priority queue<\/h1>\n<p>The other thing that <code>event_dispatcher_main_loop()<\/code> deals with is a priority queue of actions to be done later.\u00a0 Each comes with a time at which the action should be triggered, and a callback to invoke at that time (and additional data to pass to the callback).<\/p>\n<p>There are several places in the infinite loop implemented by <code>event_dispatcher_main_loop()<\/code>\u00a0 where it does that.\u00a0 They all have the form:<\/p>\n<pre>he = get_next_rtaction(ctx);<\/pre>\n<p>Right after that, Orchids will execute the callback <code>he-&gt;cb<\/code> if there is one. In the example of <code>mod_textfile<\/code>, for instance, with key <code>\/var\/log\/messages, <\/code>since the latter is a regular file, the module&#8217;s post-configuration function will have registered a callback that reads the contents of the file.\u00a0 This callback is called <code>rtaction_read_files()<\/code>, and what it does is:<\/p>\n<ul>\n<li>read in as much new text was appended to the file and put it in some internal buffers<\/li>\n<li>try to read one complete line (terminated by a newline <code>\\n<\/code>) and package it as an Orchids <a title=\"Converting input into events\" href=\"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=267\">event<\/a> (a list of fields)<\/li>\n<li>re-register itself to read the next line, or read some more text in, as needed, later.<\/li>\n<\/ul>\n<p>Since my point here is to talk about the priority queue, let us look at the last point first.\u00a0 What it does, concretely, is to call:<\/p>\n<pre>register_rtaction(ctx, he);<\/pre>\n<p>with the same <code>he<\/code> as the one we got from our earlier call to <code>get_next_rtaction()<\/code>.\u00a0 This is normal: we reschedule ourselves to be called later by the infinite loop in the <code>event_dispatcher_main_loop()<\/code> function.<\/p>\n<p>However, we do the following subtle thing.\u00a0 If we haven&#8217;t reached the end of the file yet, namely if we still have a few lines to parse, or if we haven&#8217;t read in the whole new contents of the file, then we do not change <code>he-&gt;date<\/code>: this way, the date at which we are rescheduled is <em>now<\/em>.\u00a0 The effect is that the loop in <code>event_dispatcher_main_loop()<\/code> will call us back again immediately to read some new lines.\u00a0 This is the proper way to do it: the infinite loop may do other things, e.g., reading other urgent input from other sources, which you should give Orchids the chance of dealing with.<\/p>\n<p>On the opposite, if we have reached the (current) end of the file, we add the standard value of the polling interval to the date (<code>if (eof) he-&gt;date.tv_sec += cfg-&gt;poll_period<\/code>), so as to be called back in that amount of time. \u00a0Before that, we must call \u00a0<code>gettimeofday (&amp;he-&gt;date, NULL)<\/code>; see <a title=\"gettimeofday() and ctx-&gt;cur_loop_time\" href=\"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=293\">this post<\/a>.<\/p>\n<p>The priority queue is implemented as a skew heap, for optimal time performance.<\/p>\n<p>Most of the time, the above behavior is typical: your callback should reschedule itself by calling <code>register_rtaction()<\/code> as above.\u00a0 If you do <em>not<\/em> wish to be rescheduled, you should free the memory used by <code>he<\/code>, by caling <code>gc_base_free(ctx-&gt;gc_ctx,he)<\/code>.\u00a0 Otherwise you will introduce a memory leak.<\/p>\n<p>To install such delayed callbacks, conversely, the obvious way is to allocate a <code>heap_entry_t<\/code>, put a pointer to it into some local variable <code>he<\/code>, and to call <code>register_rtaction (ctx, he)<\/code>. You would also have to fill in the <code>heap_entry_t<\/code> structure by hand. A better way is to call <code>register_rtcallback()<\/code>, which does most of the job for you: it takes an Orchids context <code>ctx<\/code>, a callback, two pieces of data that you wish to be forwarded to the callback (one meant to be garbage-collectable, the other one meant to be allocated by hand), a delay (as a <code>time_t<\/code>), and a priority (of type\u00a0<code>int<\/code>)\u00a0before the callback is triggered.<\/p>\n<p>The <code>rtactionlist<\/code>\u00a0priority\u00a0queue is managed in such a way that the first heap entries that will be returned by\u00a0<code>get_next_rtaction()<\/code>\u00a0are those that are scheduled earliest, and among all those which are scheduled at the same date, those that have the highest priority.<\/p>\n<h1>But what does Orchids do next?<\/h1>\n<p>We have seen how Orchids was able to read data from several sources, sockets, or polled files, and how the <code>rtactionlist<\/code> priority queue could be used to shedule tasks to be done later, or even immediately.<\/p>\n<p>However, what does it do next?\u00a0 In the example of <code>mod_textfile<\/code>, I said that the <code>rtaction_read_files()<\/code> callback did three things:<\/p>\n<ul>\n<li>read in as much new text was appended to the file and put it in some internal buffers<\/li>\n<li>try to read one complete line and package it as an Orchids event<\/li>\n<li>re-register itself to read the next line, or read some more text in, as needed, later.<\/li>\n<\/ul>\n<p>But I only commented on the last one.\u00a0 The first point consists in handling a partially filled buffer of text, possibly containing characters from an incomplete line already present in the buffer, and advancing pointers as we progress in finding lines.<\/p>\n<p>The second point is more mysterious.\u00a0 I&#8217;ll talk about it in <a title=\"Converting input into events\" href=\"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/?p=267\">another post<\/a>.\u00a0 Roughly, Orchids packages the line just read as a record, and feeds it to the event injector.\u00a0 The latter will try to dissect the line, repreatedly, using several modules if necessary.\u00a0 When it reaches a point where no further dissection can be done, it launches the Orchids engine, creating and advancing threads, executing bytecode, in the hope of finding a match.<\/p>\n<p>&nbsp;<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Once Orchids has completed all initializations, it calls event_dispatcher_main_loop(), which repeatedly collects events from various sources, and launches threads to monitor signatures.\u00a0 The event_dispatcher_main_loop() function can be found in file evt_mgr.c.\u00a0 Here is how it works.<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[8],"tags":[],"class_list":["post-212","post","type-post","status-publish","format-standard","hentry","category-event-management"],"_links":{"self":[{"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/posts\/212","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=212"}],"version-history":[{"count":13,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/posts\/212\/revisions"}],"predecessor-version":[{"id":302,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=\/wp\/v2\/posts\/212\/revisions\/302"}],"wp:attachment":[{"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=212"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=212"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/projects.lsv.ens-paris-saclay.fr\/orchidsdev\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=212"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}