OpenDNSSEC-signer  1.4.1
worker.c
Go to the documentation of this file.
1 /*
2  * $Id: worker.c 7005 2013-02-05 10:31:30Z matthijs $
3  *
4  * Copyright (c) 2009 NLNet Labs. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
19  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
21  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
23  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
25  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  */
28 
34 #include "daemon/engine.h"
35 #include "daemon/worker.h"
36 #include "shared/allocator.h"
37 #include "shared/duration.h"
38 #include "shared/hsm.h"
39 #include "shared/locks.h"
40 #include "shared/log.h"
41 #include "shared/status.h"
42 #include "signer/tools.h"
43 #include "signer/zone.h"
44 
45 #include <time.h> /* time() */
46 
48  { WORKER_WORKER, "worker" },
49  { WORKER_DRUDGER, "drudger" },
50  { 0, NULL }
51 };
52 
53 
58 static const char*
59 worker2str(worker_id type)
60 {
61  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
62  if (lt) {
63  return lt->name;
64  }
65  return NULL;
66 }
67 
68 
74 worker_create(allocator_type* allocator, int num, worker_id type)
75 {
76  worker_type* worker;
77  if (!allocator) {
78  return NULL;
79  }
80  worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
81  if (!worker) {
82  return NULL;
83  }
84  ods_log_debug("[%s[%i]] create", worker2str(type), num+1);
85  lock_basic_init(&worker->worker_lock);
86  lock_basic_set(&worker->worker_alarm);
87  lock_basic_lock(&worker->worker_lock);
88  worker->allocator = allocator;
89  worker->thread_num = num +1;
90  worker->engine = NULL;
91  worker->task = NULL;
92  worker->working_with = TASK_NONE;
93  worker->need_to_exit = 0;
94  worker->type = type;
95  worker->clock_in = 0;
96  worker->jobs_appointed = 0;
97  worker->jobs_completed = 0;
98  worker->jobs_failed = 0;
99  worker->sleeping = 0;
100  worker->waiting = 0;
101  lock_basic_unlock(&worker->worker_lock);
102  return worker;
103 }
104 
105 
110 static void
111 worker_working_with(worker_type* worker, task_id with, task_id next,
112  const char* str, const char* name, task_id* what, time_t* when)
113 {
114  worker->working_with = with;
115  ods_log_verbose("[%s[%i]] %s zone %s", worker2str(worker->type),
116  worker->thread_num, str, name);
117  *what = next;
118  *when = time_now();
119  return;
120 }
121 
122 
127 static int
128 worker_fulfilled(worker_type* worker)
129 {
130  int ret = 0;
131  ret = (worker->jobs_completed + worker->jobs_failed) ==
132  worker->jobs_appointed;
133  return ret;
134 }
135 
136 
141 static void
142 worker_clear_jobs(worker_type* worker)
143 {
144  ods_log_assert(worker);
145  lock_basic_lock(&worker->worker_lock);
146  worker->jobs_appointed = 0;
147  worker->jobs_completed = 0;
148  worker->jobs_failed = 0;
149  lock_basic_unlock(&worker->worker_lock);
150  return;
151 }
152 
153 
158 static void
159 worker_queue_rrset(worker_type* worker, fifoq_type* q, rrset_type* rrset)
160 {
162  int tries = 0;
163  ods_log_assert(worker);
164  ods_log_assert(q);
165  ods_log_assert(rrset);
166 
167  lock_basic_lock(&q->q_lock);
168  status = fifoq_push(q, (void*) rrset, worker, &tries);
169  while (status == ODS_STATUS_UNCHANGED) {
170  tries++;
171  if (worker->need_to_exit) {
173  return;
174  }
181  lock_basic_sleep(&q->q_nonfull, &q->q_lock, 5);
182  status = fifoq_push(q, (void*) rrset, worker, &tries);
183  }
185 
186  ods_log_assert(status == ODS_STATUS_OK);
187  lock_basic_lock(&worker->worker_lock);
188  worker->jobs_appointed += 1;
189  lock_basic_unlock(&worker->worker_lock);
190  return;
191 }
192 
193 
198 static void
199 worker_queue_domain(worker_type* worker, fifoq_type* q, domain_type* domain)
200 {
201  rrset_type* rrset = NULL;
202  denial_type* denial = NULL;
203  ods_log_assert(worker);
204  ods_log_assert(q);
205  ods_log_assert(domain);
206  rrset = domain->rrsets;
207  while (rrset) {
208  worker_queue_rrset(worker, q, rrset);
209  rrset = rrset->next;
210  }
211  denial = (denial_type*) domain->denial;
212  if (denial && denial->rrset) {
213  worker_queue_rrset(worker, q, denial->rrset);
214  }
215  return;
216 }
217 
218 
223 static void
224 worker_queue_zone(worker_type* worker, fifoq_type* q, zone_type* zone)
225 {
226  ldns_rbnode_t* node = LDNS_RBTREE_NULL;
227  domain_type* domain = NULL;
228  ods_log_assert(worker);
229  ods_log_assert(q);
230  ods_log_assert(zone);
231  worker_clear_jobs(worker);
232  if (!zone->db || !zone->db->domains) {
233  return;
234  }
235  if (zone->db->domains->root != LDNS_RBTREE_NULL) {
236  node = ldns_rbtree_first(zone->db->domains);
237  }
238  while (node && node != LDNS_RBTREE_NULL) {
239  domain = (domain_type*) node->data;
240  worker_queue_domain(worker, q, domain);
241  node = ldns_rbtree_next(node);
242  }
243  return;
244 }
245 
246 
251 static ods_status
252 worker_check_jobs(worker_type* worker, task_type* task) {
253  ods_log_assert(worker);
254  ods_log_assert(task);
255  lock_basic_lock(&worker->worker_lock);
256  if (worker->jobs_failed) {
257  ods_log_error("[%s[%i]] sign zone %s failed: %u RRsets failed",
258  worker2str(worker->type), worker->thread_num,
259  task_who2str(task), worker->jobs_failed);
260  lock_basic_unlock(&worker->worker_lock);
261  return ODS_STATUS_ERR;
262  } else if (worker->jobs_completed != worker->jobs_appointed) {
263  ods_log_error("[%s[%i]] sign zone %s failed: processed %u of %u "
264  "RRsets", worker2str(worker->type), worker->thread_num,
265  task_who2str(task), worker->jobs_completed,
266  worker->jobs_appointed);
267  lock_basic_unlock(&worker->worker_lock);
268  return ODS_STATUS_ERR;
269  } else if (worker->need_to_exit) {
270  ods_log_debug("[%s[%i]] sign zone %s failed: worker needs to exit",
271  worker2str(worker->type), worker->thread_num, task_who2str(task));
272  lock_basic_unlock(&worker->worker_lock);
273  return ODS_STATUS_ERR;
274  } else {
275  ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u RRsets "
276  "succeeded", worker2str(worker->type), worker->thread_num,
277  task_who2str(task), worker->jobs_completed,
278  worker->jobs_appointed);
279  ods_log_assert(worker->jobs_appointed == worker->jobs_completed);
280  }
281  lock_basic_unlock(&worker->worker_lock);
282  return ODS_STATUS_OK;
283 }
284 
285 
290 static void
291 worker_perform_task(worker_type* worker)
292 {
293  engine_type* engine = NULL;
294  zone_type* zone = NULL;
295  task_type* task = NULL;
296  task_id what = TASK_NONE;
297  time_t when = 0;
298  time_t never = (3600*24*365);
299  ods_status status = ODS_STATUS_OK;
300  int backup = 0;
301  time_t start = 0;
302  time_t end = 0;
303 
304  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
305  return;
306  }
307  engine = (engine_type*) worker->engine;
308  task = (task_type*) worker->task;
309  zone = (zone_type*) worker->task->zone;
310  ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
311  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
312  task_who2str(task), (uint32_t) worker->clock_in);
313  /* do what you have been told to do */
314  switch (task->what) {
315  case TASK_SIGNCONF:
316  /* perform 'load signconf' task */
317  worker_working_with(worker, TASK_SIGNCONF, TASK_READ,
318  "configure", task_who2str(task), &what, &when);
319  status = tools_signconf(zone);
320  if (status == ODS_STATUS_UNCHANGED) {
321  if (!zone->signconf->last_modified) {
322  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
323  worker2str(worker->type), worker->thread_num,
324  task_who2str(task));
325  status = ODS_STATUS_ERR;
326  }
327  }
328  if (status == ODS_STATUS_UNCHANGED) {
329  if (task->halted != TASK_NONE && task->halted != TASK_SIGNCONF) {
330  goto task_perform_continue;
331  }
332  status = ODS_STATUS_OK;
333  } else if (status == ODS_STATUS_OK) {
334  task->interrupt = TASK_NONE;
335  task->halted = TASK_NONE;
336  } else {
337  if (task->halted == TASK_NONE) {
338  goto task_perform_fail;
339  }
340  goto task_perform_continue;
341  }
342  /* break; */
343  case TASK_READ:
344  /* perform 'read input adapter' task */
345  worker_working_with(worker, TASK_READ, TASK_SIGN,
346  "read", task_who2str(task), &what, &when);
347  task->what = TASK_READ;
348  if (!zone->signconf->last_modified) {
349  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
350  worker2str(worker->type), worker->thread_num,
351  task_who2str(task));
352  status = ODS_STATUS_ERR;
353  } else {
354  lhsm_check_connection((void*)engine);
355  status = tools_input(zone);
356  }
357 
358  if (status == ODS_STATUS_UNCHANGED) {
359  ods_log_verbose("[%s[%i]] zone %s unsigned data not changed, "
360  "continue", worker2str(worker->type), worker->thread_num,
361  task_who2str(task));
362  status = ODS_STATUS_OK;
363  }
364  if (status == ODS_STATUS_OK) {
365  if (task->interrupt > TASK_SIGNCONF) {
366  task->interrupt = TASK_NONE;
367  task->halted = TASK_NONE;
368  }
369  } else {
370  if (task->halted == TASK_NONE) {
371  goto task_perform_fail;
372  }
373  goto task_perform_continue;
374  }
375  /* break; */
376  case TASK_SIGN:
377  /* perform 'sign' task */
378  worker_working_with(worker, TASK_SIGN, TASK_WRITE,
379  "sign", task_who2str(task), &what, &when);
380  task->what = TASK_SIGN;
381  status = zone_update_serial(zone);
382  if (status == ODS_STATUS_OK) {
383  if (task->interrupt > TASK_SIGNCONF) {
384  task->interrupt = TASK_NONE;
385  task->halted = TASK_NONE;
386  }
387  } else {
388  ods_log_error("[%s[%i]] unable to sign zone %s: "
389  "failed to increment serial",
390  worker2str(worker->type), worker->thread_num,
391  task_who2str(task));
392  if (task->halted == TASK_NONE) {
393  goto task_perform_fail;
394  }
395  goto task_perform_continue;
396  }
397 
398  /* start timer */
399  start = time(NULL);
400  if (zone->stats) {
402  if (!zone->stats->start_time) {
403  zone->stats->start_time = start;
404  }
405  zone->stats->sig_count = 0;
406  zone->stats->sig_soa_count = 0;
407  zone->stats->sig_reuse = 0;
408  zone->stats->sig_time = 0;
410  }
411  /* check the HSM connection before queuing sign operations */
412  lhsm_check_connection((void*)engine);
413  /* queue menial, hard signing work */
414  worker_queue_zone(worker, engine->signq, zone);
415  ods_log_deeebug("[%s[%i]] wait until drudgers are finished "
416  "signing zone %s", worker2str(worker->type), worker->thread_num,
417  task_who2str(task));
418  /* sleep until work is done */
419  worker_sleep_unless(worker, 0);
420  /* stop timer */
421  end = time(NULL);
422  status = worker_check_jobs(worker, task);
423  worker_clear_jobs(worker);
424  if (status == ODS_STATUS_OK && zone->stats) {
426  zone->stats->sig_time = (end-start);
428  }
429  if (status != ODS_STATUS_OK) {
430  if (task->halted == TASK_NONE) {
431  goto task_perform_fail;
432  }
433  goto task_perform_continue;
434  } else {
435  if (task->interrupt > TASK_SIGNCONF) {
436  task->interrupt = TASK_NONE;
437  task->halted = TASK_NONE;
438  }
439  }
440  /* break; */
441  case TASK_WRITE:
442  /* perform 'write to output adapter' task */
443  worker_working_with(worker, TASK_WRITE, TASK_SIGN,
444  "write", task_who2str(task), &what, &when);
445  task->what = TASK_WRITE;
446  status = tools_output(zone, engine);
447  if (status == ODS_STATUS_OK) {
448  if (task->interrupt > TASK_SIGNCONF) {
449  task->interrupt = TASK_NONE;
450  task->halted = TASK_NONE;
451  }
452  } else {
453  /* clear signatures? */
454  if (task->halted == TASK_NONE) {
455  goto task_perform_fail;
456  }
457  goto task_perform_continue;
458  }
459  zone->db->is_processed = 1;
460  if (zone->signconf &&
462  what = TASK_SIGN;
463  when = worker->clock_in +
465  } else {
466  ods_log_error("[%s[%i]] unable to retrieve resign interval "
467  "for zone %s: duration2time() failed",
468  worker2str(worker->type), worker->thread_num,
469  task_who2str(task));
470  ods_log_info("[%s[%i]] defaulting to 1H resign interval for "
471  "zone %s", worker2str(worker->type), worker->thread_num,
472  task_who2str(task));
473  what = TASK_SIGN;
474  when = worker->clock_in + 3600;
475  }
476  backup = 1;
477  break;
478  case TASK_NONE:
479  worker->working_with = TASK_NONE;
480  /* no task */
481  ods_log_warning("[%s[%i]] none task for zone %s",
482  worker2str(worker->type), worker->thread_num,
483  task_who2str(task));
484  when = time_now() + never;
485  break;
486  default:
487  worker->working_with = TASK_NONE;
488  /* unknown task */
489  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
490  worker2str(worker->type), worker->thread_num,
491  task_who2str(task));
492  what = TASK_SIGNCONF;
493  when = time_now();
494  break;
495  }
496  /* no error */
497  task->backoff = 0;
498  if (task->interrupt != TASK_NONE && task->interrupt != what) {
499  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
500  worker2str(worker->type), worker->thread_num,
501  task_what2str(what), task_who2str(task));
502  task->halted = what;
503  task->halted_when = when;
504  task->what = task->interrupt;
505  task->when = time_now();
506  } else {
507  ods_log_debug("[%s[%i]] next task %s for zone %s",
508  worker2str(worker->type), worker->thread_num,
509  task_what2str(what), task_who2str(task));
510  task->what = what;
511  task->when = when;
512  task->interrupt = TASK_NONE;
513  task->halted = TASK_NONE;
514  task->halted_when = 0;
515  }
516  /* backup the last successful run */
517  if (backup) {
518  status = zone_backup2(zone);
519  if (status != ODS_STATUS_OK) {
520  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
521  worker2str(worker->type), worker->thread_num,
522  task_who2str(task), ods_status2str(status));
523  /* just a warning */
524  status = ODS_STATUS_OK;
525  }
526  backup = 0;
527  }
528  return;
529 
530 task_perform_fail:
531  if (status != ODS_STATUS_XFR_NOT_READY) {
532  /* other statuses is critical, and we know it is not ODS_STATUS_OK */
533  ods_log_crit("[%s[%i]] CRITICAL: failed to sign zone %s: %s",
534  worker2str(worker->type), worker->thread_num,
535  task_who2str(task), ods_status2str(status));
536  }
537  /* in case of failure, also mark zone processed (for single run usage) */
538  zone->db->is_processed = 1;
539  if (task->backoff) {
540  task->backoff *= 2;
541  } else {
542  task->backoff = 60;
543  }
544  if (task->backoff > ODS_SE_MAX_BACKOFF) {
545  task->backoff = ODS_SE_MAX_BACKOFF;
546  }
547  ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
548  worker2str(worker->type), worker->thread_num,
549  task_what2str(task->what), task_who2str(task), task->backoff);
550  task->when = time_now() + task->backoff;
551  return;
552 
553 task_perform_continue:
554  ods_log_info("[%s[%i]] continue task %s for zone %s",
555  worker2str(worker->type), worker->thread_num,
556  task_what2str(task->halted), task_who2str(task));
557  task->what = task->halted;
558  task->when = task->halted_when;
559  task->interrupt = TASK_NONE;
560  task->halted = TASK_NONE;
561  task->halted_when = 0;
562  return;
563 }
564 
565 
570 static void
571 worker_work(worker_type* worker)
572 {
573  time_t now = 0;
574  time_t timeout = 1;
575  engine_type* engine = NULL;
576  zone_type* zone = NULL;
577  ods_status status = ODS_STATUS_OK;
578 
579  ods_log_assert(worker);
580  ods_log_assert(worker->type == WORKER_WORKER);
581 
582  engine = (engine_type*) worker->engine;
583  while (worker->need_to_exit == 0) {
584  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
585  worker->thread_num);
586  now = time_now();
588  worker->task = schedule_pop_task(engine->taskq);
589  if (worker->task) {
590  worker->working_with = worker->task->what;
592  zone = (zone_type*) worker->task->zone;
593 
594  lock_basic_lock(&zone->zone_lock);
595  ods_log_debug("[%s[%i]] start working on zone %s",
596  worker2str(worker->type), worker->thread_num, zone->name);
597  worker->clock_in = time(NULL);
598  worker_perform_task(worker);
599  zone->task = worker->task;
600  ods_log_debug("[%s[%i]] finished working on zone %s",
601  worker2str(worker->type), worker->thread_num, zone->name);
602 
604  worker->task = NULL;
605  worker->working_with = TASK_NONE;
606  status = schedule_task(engine->taskq, zone->task, 1);
607  if (status != ODS_STATUS_OK) {
608  ods_log_error("[%s[%i]] unable to schedule task for zone %s: "
609  "%s", worker2str(worker->type), worker->thread_num,
610  zone->name, ods_status2str(status));
611  }
614  timeout = 1;
616  lock_basic_lock(&engine->signal_lock);
617  if (engine->need_to_reload) {
618  lock_basic_alarm(&engine->signal_cond);
619  }
620  lock_basic_unlock(&engine->signal_lock);
621 
622  } else {
623  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
624  worker->thread_num);
625  worker->task = schedule_get_first_task(engine->taskq);
627  if (worker->task && !engine->taskq->loading) {
628  timeout = (worker->task->when - now);
629  } else {
630  timeout *= 2;
631  }
632  if (timeout > ODS_SE_MAX_BACKOFF) {
633  timeout = ODS_SE_MAX_BACKOFF;
634  }
635  worker->task = NULL;
636  worker_sleep(worker, timeout);
637  }
638  }
639  return;
640 }
641 
642 
647 static void
648 worker_drudge(worker_type* worker)
649 {
650  engine_type* engine = NULL;
651  zone_type* zone = NULL;
652  task_type* task = NULL;
653  rrset_type* rrset = NULL;
654  ods_status status = ODS_STATUS_OK;
655  worker_type* superior = NULL;
656  hsm_ctx_t* ctx = NULL;
657 
658  ods_log_assert(worker);
659  ods_log_assert(worker->engine);
660  ods_log_assert(worker->type == WORKER_DRUDGER);
661 
662  engine = (engine_type*) worker->engine;
663  while (worker->need_to_exit == 0) {
664  ods_log_deeebug("[%s[%i]] report for duty", worker2str(worker->type),
665  worker->thread_num);
666  /* initialize */
667  superior = NULL;
668  zone = NULL;
669  task = NULL;
670  /* get item */
671  lock_basic_lock(&engine->signq->q_lock);
672  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
673  if (!rrset) {
674  ods_log_deeebug("[%s[%i]] nothing to do, wait",
675  worker2str(worker->type), worker->thread_num);
683  &engine->signq->q_lock, 0);
684  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
685  }
686  lock_basic_unlock(&engine->signq->q_lock);
687  /* do some work */
688  if (rrset) {
689  ods_log_assert(superior);
690  if (!ctx) {
691  ods_log_debug("[%s[%i]] create hsm context",
692  worker2str(worker->type), worker->thread_num);
693  ctx = hsm_create_context();
694  }
695  if (!ctx) {
696  ods_log_crit("[%s[%i]] error creating libhsm context",
697  worker2str(worker->type), worker->thread_num);
698  engine->need_to_reload = 1;
699  lock_basic_lock(&superior->worker_lock);
700  superior->jobs_failed++;
701  lock_basic_unlock(&superior->worker_lock);
702  } else {
703  ods_log_assert(ctx);
704  lock_basic_lock(&superior->worker_lock);
705  task = superior->task;
706  ods_log_assert(task);
707  zone = task->zone;
708  lock_basic_unlock(&superior->worker_lock);
709  ods_log_assert(zone);
710  ods_log_assert(zone->apex);
711  ods_log_assert(zone->signconf);
712  worker->clock_in = time(NULL);
713  status = rrset_sign(ctx, rrset, superior->clock_in);
714  lock_basic_lock(&superior->worker_lock);
715  if (status == ODS_STATUS_OK) {
716  superior->jobs_completed++;
717  } else {
718  superior->jobs_failed++;
719  }
720  lock_basic_unlock(&superior->worker_lock);
721  }
722  if (worker_fulfilled(superior) && superior->sleeping) {
723  ods_log_deeebug("[%s[%i]] wake up superior[%u], work is "
724  "done", worker2str(worker->type), worker->thread_num,
725  superior->thread_num);
726  worker_wakeup(superior);
727  }
728  superior = NULL;
729  rrset = NULL;
730  }
731  /* done work */
732  }
733  /* wake up superior */
734  if (superior && superior->sleeping) {
735  ods_log_deeebug("[%s[%i]] wake up superior[%u], i am exiting",
736  worker2str(worker->type), worker->thread_num, superior->thread_num);
737  worker_wakeup(superior);
738  }
739  /* cleanup open HSM sessions */
740  if (ctx) {
741  hsm_destroy_context(ctx);
742  }
743  return;
744 }
745 
746 
751 void
753 {
754  ods_log_assert(worker);
755  switch (worker->type) {
756  case WORKER_DRUDGER:
757  worker_drudge(worker);
758  break;
759  case WORKER_WORKER:
760  worker_work(worker);
761  break;
762  default:
763  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
764  break;
765  }
766  return;
767 }
768 
769 
774 void
775 worker_sleep(worker_type* worker, time_t timeout)
776 {
777  ods_log_assert(worker);
778  lock_basic_lock(&worker->worker_lock);
779  worker->sleeping = 1;
780  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
781  timeout);
782  lock_basic_unlock(&worker->worker_lock);
783  return;
784 }
785 
786 
791 void
792 worker_sleep_unless(worker_type* worker, time_t timeout)
793 {
794  ods_log_assert(worker);
795  lock_basic_lock(&worker->worker_lock);
796  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
797  worker->sleeping = 1;
798  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
799  timeout);
800  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
801  "appointed, %u completed, %u failed", worker2str(worker->type),
802  worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
803  worker->jobs_failed);
804  }
805  lock_basic_unlock(&worker->worker_lock);
806  return;
807 }
808 
809 
814 void
816 {
817  ods_log_assert(worker);
818  if (worker && worker->sleeping && !worker->waiting) {
819  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
820  worker->thread_num);
821  lock_basic_lock(&worker->worker_lock);
822  lock_basic_alarm(&worker->worker_alarm);
823  worker->sleeping = 0;
824  lock_basic_unlock(&worker->worker_lock);
825  }
826  return;
827 }
828 
829 
834 void
835 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
836  time_t timeout)
837 {
838  lock_basic_lock(lock);
839  lock_basic_sleep(condition, lock, timeout);
840  lock_basic_unlock(lock);
841  return;
842 }
843 
844 
849 void
850 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
851 {
852  worker_wait_timeout(lock, condition, 0);
853  return;
854 }
855 
856 
861 void
862 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
863 {
864  lock_basic_lock(lock);
865  lock_basic_alarm(condition);
866  lock_basic_unlock(lock);
867  return;
868 }
869 
870 
875 void
876 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
877 {
878  lock_basic_lock(lock);
879  lock_basic_broadcast(condition);
880  lock_basic_unlock(lock);
881  return;
882 }
883 
884 
889 void
891 {
892  allocator_type* allocator;
893  cond_basic_type worker_cond;
894  lock_basic_type worker_lock;
895  if (!worker) {
896  return;
897  }
898  allocator = worker->allocator;
899  worker_cond = worker->worker_alarm;
900  worker_lock = worker->worker_lock;
901  allocator_deallocate(allocator, (void*) worker);
902  lock_basic_destroy(&worker_lock);
903  lock_basic_off(&worker_cond);
904  return;
905 }