1: #line 353 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6:
7:
8:
9:
10:
11:
12:
13:
14: namespace flx { namespace faio {
15:
16: class FAIO_EXTERN flx_driver_request_base {
17: public:
18: virtual ~flx_driver_request_base() {}
19:
20: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f) = 0;
21: };
22:
23:
24:
25:
26: class FAIO_EXTERN async_copipe : public flx_driver_request_base {
27: public:
28: enum { WINDWARD, LEEWARD, NUM_CHANNELS };
29:
30: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
31: void connect(void* f, demux::sel_param* pb, bool reading, int channel);
32: void close_channel(int which);
33: void disconnect();
34:
35: static async_copipe* create_copipe() { return new async_copipe; }
36: bool debug;
37: void set_debug(bool);
38:
39: private:
40: enum { READER, WRITER, NUM_CNXNS };
41:
42:
43: void* thread[NUM_CNXNS];
44: demux::sel_param* pb[NUM_CNXNS];
45: int num_users;
46:
47: bool channel_open[NUM_CHANNELS];
48: int current_channel;
49: void wake_thread(int n, flx_drv* drv);
50: void wake_all_threads(flx_drv* drv);
51:
52: async_copipe();
53: };
54:
55: class FAIO_EXTERN copipe_endpt {
56: int read_channel;
57:
58: copipe_endpt(async_copipe* p, int rchan) :
59: read_channel(rchan), pipe(p), debug(false)
60: {}
61:
62: public:
63: async_copipe* pipe;
64:
65:
66: void shutdown(int how);
67: int get_channel(bool reading);
68: bool debug;
69: void set_debug(bool d) { debug = d; }
70:
71: ~copipe_endpt() { pipe->disconnect(); }
72: static void pipe_pair(copipe_endpt* pair[2]);
73: };
74:
75:
76: class FAIO_EXTERN sleep_request
77: : public flx_driver_request_base, public demux::sleep_task
78: {
79: thread_wakeup fw;
80: double delta;
81: public:
82: sleep_request() {}
83:
84: sleep_request(double d) : delta(d) {}
85:
86:
87: virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
88:
89:
90: virtual void fire();
91: };
92:
93: }}
94:
95:
1: #line 449 "./lpsrc/flx_faio.pak"
2:
3:
4:
5:
6: using namespace flx::demux;
7: namespace flx { namespace faio {
8:
9:
10:
11:
12:
13:
14: void
15: async_copipe::set_debug(bool d) { debug = d; }
16:
17: bool
18: async_copipe::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
19: {
20:
21:
22: bool wake_caller = (f != thread[READER] && f != thread[WRITER]);
23:
24:
25: if(-1 == current_channel)
26: return true;
27:
28:
29: if(!channel_open[current_channel])
30: {
31: wake_all_threads(drv);
32: current_channel = -1;
33: return wake_caller;
34: }
35:
36:
37:
38: if(!(thread[READER] && thread[WRITER])) return wake_caller;
39:
40:
41: if(wake_caller) fprintf(stderr,"we should never have a 3rd party here!\n");
42:
43:
44: long len;
45: long nb1 = pb[READER]->buffer_size-pb[READER]->bytes_written;
46: long nb2 = pb[WRITER]->buffer_size-pb[WRITER]->bytes_written;
47:
48:
49: len = (nb1 < nb2) ? nb1 : nb2;
50:
51:
52: memmove(pb[READER]->buffer + pb[READER]->bytes_written,
53: pb[WRITER]->buffer + pb[WRITER]->bytes_written, len);
54: pb[READER]->bytes_written += len;
55: pb[WRITER]->bytes_written += len;
56:
57:
58:
59: int num_woken = 0;
60: for(int i = 0; i < NUM_CNXNS; i++)
61: {
62:
63: if(pb[i]->bytes_written == pb[i]->buffer_size)
64: {
65: wake_thread(i, drv);
66: num_woken++;
67: }
68: }
69:
70: if(num_woken == NUM_CNXNS)
71: current_channel = -1;
72:
73:
74:
75:
76: return false;
77: }
78:
79: void
80: async_copipe::wake_thread(int n, flx_drv* drv)
81: {
82: drv->sched(thread[n]);
83: thread[n] = 0;
84: }
85:
86: void
87: async_copipe::wake_all_threads(flx_drv* drv)
88: {
89: for(int i = 0; i < NUM_CNXNS; i++)
90: {
91: if(thread[i]) wake_thread(i, drv);
92: }
93: }
94:
95: async_copipe::async_copipe() : debug(false)
96: {
97: thread[READER] = 0;
98: thread[WRITER] = 0;
99: channel_open[WINDWARD] = true;
100: channel_open[LEEWARD] = true;
101: current_channel = -1;
102: num_users = 2;
103: }
104:
105:
106: void
107: async_copipe::close_channel(int which)
108: {
109:
110:
111:
112: channel_open[which] = false;
113: }
114:
115: void
116: async_copipe::disconnect()
117: {
118: if(debug)fprintf(stderr,"num_users before disconnect: %i\n", num_users);
119: if(--num_users == 0)
120: {
121: if(debug)fprintf(stderr,"deleting this!\n");
122: delete this;
123: }
124: }
125:
126: void
127: async_copipe::connect(void* f, sel_param* inpb, bool reading, int channel)
128: {
129: int i = (reading) ? READER : WRITER;
130:
131: if(-1 == current_channel)
132: {
133: current_channel = channel;
134: }
135: else if(current_channel != channel)
136: {
137: if(debug)fprintf(stderr,"conflicting channels! make sure this causes a wake up!\n");
138: current_channel = -1;
139: return;
140: }
141:
142:
143: if(f && thread[i])
144: if(debug)fprintf(stderr,"copipe conflict! results undefined!\n");
145:
146: thread[i] = f;
147: pb[i] = inpb;
148: }
149:
150:
151: void
152: copipe_endpt::pipe_pair(copipe_endpt* pair[2])
153: {
154:
155:
156: async_copipe* p = async_copipe::create_copipe();
157:
158: pair[0] = new copipe_endpt(p, async_copipe::WINDWARD);
159: pair[1] = new copipe_endpt(p, async_copipe::LEEWARD);
160: }
161:
162: int
163: copipe_endpt::get_channel(bool reading)
164: {
165: if(reading) return read_channel;
166:
167: return (read_channel == async_copipe::WINDWARD) ?
168: async_copipe::LEEWARD : async_copipe::WINDWARD;
169: }
170:
171:
172: void
173: copipe_endpt::shutdown(int how)
174: {
175: int write_channel = get_channel(false);
176:
177: switch(how)
178: {
179: case 0:
180:
181: pipe->close_channel(read_channel);
182: break;
183: case 1:
184: pipe->close_channel(write_channel);
185: break;
186: case 2:
187:
188: pipe->close_channel(async_copipe::WINDWARD);
189: pipe->close_channel(async_copipe::LEEWARD);
190: break;
191: }
192: }
193:
194:
195:
196:
197: bool
198: sleep_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
199: {
200:
201:
202: RECORD_THREAD_INFO(fw);
203:
204: drv->get_sleepers()->add_sleep_request(this, delta);
205:
206: return false;
207: }
208:
209:
210: void
211: sleep_request::fire()
212: {
213: fw.wake();
214: }
215:
216: }}
217: