8. Portable Job Queue
The class is actually a thread plus a job queue.
Start cpp section to pthread/pthread_work_fifo.hpp[1
/1
]
1: #line 1326 "./lpsrc/flx_pthread.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace pthread {
10:
11:
12: class PTHREAD_EXTERN worker_task
13: {
14: public:
15: virtual ~worker_task() {}
16:
17:
18: virtual void doit() = 0;
19:
20:
21:
22: virtual void finished() = 0;
23: };
24:
25:
26: class PTHREAD_EXTERN worker_fifo
27: {
28: sleep_queue_t fifo;
29: int nthreads;
30:
31: static void* thread_start(void*);
32: bool thread_loop_body();
33: void stop_worker_thread();
34: void start_worker_thread();
35:
36: public:
37: worker_fifo(int n, int m);
38: ~worker_fifo();
39: void add_worker_task(worker_task* task);
40: int get_nthreads()const;
41: void set_nthreads(int);
42: };
43:
44: }}
45:
46:
Start cpp section to pthread/pthread_work_fifo.cpp[1
/1
]
1: #line 1373 "./lpsrc/flx_pthread.pak"
2:
3:
4: namespace flx { namespace pthread {
5:
6: int worker_fifo::get_nthreads()const { return nthreads; }
7:
8: void worker_fifo::set_nthreads(int n)
9: {
10: while(nthreads<n) start_worker_thread();
11: while(nthreads>n) stop_worker_thread();
12: }
13:
14: void worker_fifo::start_worker_thread()
15: {
16: ++nthreads;
17:
18: #ifdef _WIN32
19: flx_detached_thread_t().init((LPTHREAD_START_ROUTINE)thread_start, this);
20: #else
21: flx_detached_thread_t().init(thread_start, this);
22: #endif
23: }
24:
25: worker_fifo::worker_fifo(int n, int m) : fifo(n), nthreads(0)
26: {
27: set_nthreads(m);
28: }
29:
30: void
31: worker_fifo::stop_worker_thread()
32: {
33:
34: --nthreads;
35: add_worker_task(NULL);
36: }
37:
38: worker_fifo::~worker_fifo()
39: {
40: while(nthreads>0)stop_worker_thread();
41: fifo.wait_until_empty();
42: }
43:
44:
45: void*
46: worker_fifo::thread_start(void* udat)
47: {
48: worker_fifo* fio = (worker_fifo*)udat;
49: while(fio->thread_loop_body()) ;
50: return 0;
51: }
52:
53:
54:
55: bool
56: worker_fifo::thread_loop_body()
57: {
58: worker_task* req = (worker_task*)fifo.dequeue();
59:
60:
61: if(!req) return false;
62:
63: req->doit();
64: req->finished();
65:
66: return true;
67: }
68:
69: void
70: worker_fifo::add_worker_task(worker_task* task)
71: {
72:
73: fifo.enqueue(task);
74: }
75:
76: }}
77:
78:
Start felix section to lib/pthread.flx[1
/1
]
1: #line 1452 "./lpsrc/flx_pthread.pak"
2:
3:
4: header mutex_hxx = '#include "pthread_mutex.hpp"';
5: header condv_hxx = '#include "pthread_condv.hpp"';
6: header counter_hxx = '#include "pthread_counter.hpp"';
7: header semaphore_hxx = '#include "pthread_semaphore.hpp"';
8: header monitor_hxx = '#include "pthread_monitor.hpp"';
9:
10: module Pthread
11: {
12: requires package "flx_pthread";
13: open C_hack;
14: proc spawn_pthread(p:1->0)
15: {
16: var con = start p;
17: var fthr = mk_thread con;
18: svc$ svc_spawn_pthread fthr;
19: }
20:
21: type mutex = "flx_mutex_t" requires mutex_hxx;
22: proc lock: lvalue[mutex] = "$1.lock();";
23: proc unlock: lvalue[mutex] = "$1.unlock();";
24: proc lock (m:&mutex) { lock$ *m; }
25: proc unlock (m:&mutex) { unlock$ *m; }
26:
27:
28: type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
29:
30: fun mk_pchannel[t]: 1->pchannel[t] =
31: "new flx::pthread::monitor_t()"
32: ;
33:
34: proc _read[t]: pchannel[t] * ptr[ptr[t]] = "*$2 = (?1*)($1->dequeue());";
35:
36: proc read[t](v:&t,chan:pchannel[t]) {
37: var p : ptr[t];
38: _read (chan, addr p);
39: *v = *p;
40: }
41:
42: proc _write[t]: pchannel[t] * ptr[t] = "$1->enqueue((void*)$2);";
43: proc write[t](chan:pchannel[t], v:t) {
44: var ps = cast[ptr[t]]$ xnew v;
45: _write (chan,ps);
46: }
47: }
48:
Start felix section to test/nd101.flx[1
/1
]
1: #line 1501 "./lpsrc/flx_pthread.pak"
2:
3: include "flx_faio";
4: include "pthread";
5: open Pthread;
6:
7: print "Pthread spawning test"; endl;
8:
9: proc thr (x:int) { print "Thread "; print x; endl; }
10:
11: proc flx_main
12: {
13: print "Running main\n";
14: var chan = mk_pchannel[int]();
15: var dummy: int;
16:
17: spawn_pthread { thr 1; write (chan,1); };
18: spawn_pthread { thr 2; write (chan,2); };
19: spawn_pthread { thr 3; write (chan,3); };
20: spawn_pthread { thr 4; write (chan,4); };
21: spawn_pthread { thr 5; write (chan,5); };
22: spawn_pthread { thr 6; write (chan,6); };
23: spawn_pthread { thr 7; write (chan,7); };
24: print "Spawned\n";
25: &dummy <- read chan;
26: print "joined "; print dummy; endl;
27: &dummy <- read chan;
28: print "joined "; print dummy; endl;
29: &dummy <- read chan;
30: print "joined "; print dummy; endl;
31: &dummy <- read chan;
32: print "joined "; print dummy; endl;
33: &dummy <- read chan;
34: print "joined "; print dummy; endl;
35: &dummy <- read chan;
36: print "joined "; print dummy; endl;
37: &dummy <- read chan;
38: print "joined "; print dummy; endl;
39: print "Joined all\n";
40: }
41:
42: export proc flx_main of (1) as "flx_main";
43:
Start felix section to test/nd102.flx[1
/1
]
1: #line 1545 "./lpsrc/flx_pthread.pak"
2:
3: include "pthread";
4: include "flx_faio";
5: open String;
6:
7: print "Garbage collector world stop test"; endl;
8:
9: proc randprint(n:int)
10: {
11: var i = 5;
12: print$ "Start Thread number "+str(n); endl;
13: whilst i > 0 do
14: var d = double_of$ Cstdlib::rand()%10;
15: if d == 0.0 do
16: print "ZERO FOUND -- collecting!"; endl;
17: collect;
18: print "collected!"; endl;
19: done;
20: print$ "Thread "+str n +" Sleep #"+str i+" for "+str d+" sec"; endl;
21: --i;
22: Faio::sleep d;
23: done;
24: print$ "Finish Thread number "+str(n); endl;
25: }
26:
27: Pthread::spawn_pthread { randprint(1); };
28: Pthread::spawn_pthread { randprint(2); };
29: Pthread::spawn_pthread { randprint(3); };
30: Pthread::spawn_pthread { randprint(4); };
31: Pthread::spawn_pthread { randprint(5); };
32:
33: print "Mainline done!"; endl;