Whamcloud - gitweb
interrupt oig_wait can produce painc on resend.
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of the Lustre file system, http://www.lustre.org
8  *   Lustre is a trademark of Cluster File Systems, Inc.
9  *
10  *   You may have signed or agreed to another license before downloading
11  *   this software.  If so, you are bound by the terms and conditions
12  *   of that agreement, and the following does not apply to you.  See the
13  *   LICENSE file included with this distribution for more information.
14  *
15  *   If you did not agree to a different license, then this copy of Lustre
16  *   is open source software; you can redistribute it and/or modify it
17  *   under the terms of version 2 of the GNU General Public License as
18  *   published by the Free Software Foundation.
19  *
20  *   In either case, Lustre is distributed in the hope that it will be
21  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
22  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
23  *   license text for more details.
24  *
25  */
26
27 #define DEBUG_SUBSYSTEM S_RPC
28
29 #ifdef __KERNEL__
30 # include <libcfs/libcfs.h>
31 #else /* __KERNEL__ */
32 # include <liblustre.h>
33 # include <ctype.h>
34 #endif
35
36 #include <libcfs/kp30.h>
37 #include <lustre_net.h>
38 # include <lustre_lib.h>
39
40 #include <lustre_ha.h>
41 #include <obd_class.h>   /* for obd_zombie */
42 #include <obd_support.h> /* for OBD_FAIL_CHECK */
43 #include <lprocfs_status.h>
44
45 #define LIOD_STOP 0
46 struct ptlrpcd_ctl {
47         unsigned long             pc_flags;
48         spinlock_t                pc_lock;
49         struct completion         pc_starting;
50         struct completion         pc_finishing;
51         struct ptlrpc_request_set *pc_set;
52         char                      pc_name[16];
53 #ifndef __KERNEL__
54         int                       pc_recurred;
55         void                     *pc_callback;
56         void                     *pc_wait_callback;
57         void                     *pc_idle_callback;
58 #endif
59 };
60
61 static struct ptlrpcd_ctl ptlrpcd_pc;
62 static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
63
64 struct semaphore ptlrpcd_sem;
65 static int ptlrpcd_users = 0;
66
67 void ptlrpcd_wake(struct ptlrpc_request *req)
68 {
69         struct ptlrpc_request_set *rq_set = req->rq_set;
70
71         LASSERT(rq_set != NULL);
72
73         cfs_waitq_signal(&rq_set->set_waitq);
74 }
75
76 /* requests that are added to the ptlrpcd queue are sent via
77  * ptlrpcd_check->ptlrpc_check_set() */
78 void ptlrpcd_add_req(struct ptlrpc_request *req)
79 {
80         struct ptlrpcd_ctl *pc;
81
82         if (req->rq_send_state == LUSTRE_IMP_FULL)
83                 pc = &ptlrpcd_pc;
84         else
85                 pc = &ptlrpcd_recovery_pc;
86
87         ptlrpc_set_add_new_req(pc->pc_set, req);
88         cfs_waitq_signal(&pc->pc_set->set_waitq);
89 }
90
91 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
92 {
93         struct list_head *tmp, *pos;
94         struct ptlrpc_request *req;
95         int rc = 0;
96         ENTRY;
97
98         if (test_bit(LIOD_STOP, &pc->pc_flags))
99                 RETURN(1);
100
101         obd_zombie_impexp_cull();
102
103         spin_lock(&pc->pc_set->set_new_req_lock);
104         list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
105                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
106                 list_del_init(&req->rq_set_chain);
107                 ptlrpc_set_add_req(pc->pc_set, req);
108                 rc = 1; /* need to calculate its timeout */
109         }
110         spin_unlock(&pc->pc_set->set_new_req_lock);
111
112         if (pc->pc_set->set_remaining) {
113                 rc = rc | ptlrpc_check_set(pc->pc_set);
114
115                 /* XXX our set never completes, so we prune the completed
116                  * reqs after each iteration. boy could this be smarter. */
117                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
118                         req = list_entry(pos, struct ptlrpc_request,
119                                          rq_set_chain);
120                         if (req->rq_phase != RQ_PHASE_COMPLETE)
121                                 continue;
122
123                         list_del_init(&req->rq_set_chain);
124                         req->rq_set = NULL;
125                         ptlrpc_req_finished (req);
126                 }
127         }
128
129         if (rc == 0) {
130                 /* If new requests have been added, make sure to wake up */
131                 spin_lock(&pc->pc_set->set_new_req_lock);
132                 rc = !list_empty(&pc->pc_set->set_new_requests);
133                 spin_unlock(&pc->pc_set->set_new_req_lock);
134         }
135
136         RETURN(rc);
137 }
138
139 #ifdef __KERNEL__
140 /* ptlrpc's code paths like to execute in process context, so we have this
141  * thread which spins on a set which contains the io rpcs.  llite specifies
142  * ptlrpcd's set when it pushes pages down into the oscs */
143 static int ptlrpcd(void *arg)
144 {
145         struct ptlrpcd_ctl *pc = arg;
146         int rc;
147         ENTRY;
148
149         if ((rc = cfs_daemonize_ctxt(pc->pc_name))) {
150                 complete(&pc->pc_starting);
151                 return rc;
152         }
153
154         complete(&pc->pc_starting);
155
156         /* this mainloop strongly resembles ptlrpc_set_wait except
157          * that our set never completes.  ptlrpcd_check calls ptlrpc_check_set
158          * when there are requests in the set.  new requests come in
159          * on the set's new_req_list and ptlrpcd_check moves them into
160          * the set. */
161         while (1) {
162                 struct l_wait_info lwi;
163                 cfs_duration_t timeout;
164
165                 timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set));
166                 lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set);
167
168                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
169
170                 if (test_bit(LIOD_STOP, &pc->pc_flags))
171                         break;
172         }
173         /* wait for inflight requests to drain */
174         if (!list_empty(&pc->pc_set->set_requests))
175                 ptlrpc_set_wait(pc->pc_set);
176         complete(&pc->pc_finishing);
177         return 0;
178 }
179
180 static void ptlrpcd_zombie_impexp_notify(void)
181 {
182         LASSERT(ptlrpcd_pc.pc_set != NULL); // call before ptlrpcd inited ?
183
184         cfs_waitq_signal(&ptlrpcd_pc.pc_set->set_waitq);
185 }
186
187 #else
188
189 int ptlrpcd_check_async_rpcs(void *arg)
190 {
191         struct ptlrpcd_ctl *pc = arg;
192         int                  rc = 0;
193
194         /* single threaded!! */
195         pc->pc_recurred++;
196
197         if (pc->pc_recurred == 1) {
198                 rc = ptlrpcd_check(pc);
199                 if (!rc)
200                         ptlrpc_expired_set(pc->pc_set);
201                 /*XXX send replay requests */
202                 if (pc == &ptlrpcd_recovery_pc)
203                         rc = ptlrpcd_check(pc);
204         }
205
206         pc->pc_recurred--;
207         return rc;
208 }
209
210 int ptlrpcd_idle(void *arg)
211 {
212         struct ptlrpcd_ctl *pc = arg;
213
214         return (list_empty(&pc->pc_set->set_new_requests) &&
215                 pc->pc_set->set_remaining == 0);
216 }
217
218 #endif
219
220 static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
221 {
222         int rc;
223
224         ENTRY;
225         memset(pc, 0, sizeof(*pc));
226         init_completion(&pc->pc_starting);
227         init_completion(&pc->pc_finishing);
228         pc->pc_flags = 0;
229         spin_lock_init(&pc->pc_lock);
230         snprintf (pc->pc_name, sizeof (pc->pc_name), name);
231
232         pc->pc_set = ptlrpc_prep_set();
233         if (pc->pc_set == NULL)
234                 RETURN(-ENOMEM);
235
236 #ifdef __KERNEL__
237         /* wake ptlrpcd when zombie imports or exports exist */
238         obd_zombie_impexp_notify = ptlrpcd_zombie_impexp_notify;
239         
240         rc = cfs_kernel_thread(ptlrpcd, pc, 0);
241         if (rc < 0)  {
242                 ptlrpc_set_destroy(pc->pc_set);
243                 RETURN(rc);
244         }
245
246         wait_for_completion(&pc->pc_starting);
247 #else
248         pc->pc_wait_callback =
249                 liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
250                                                  &ptlrpcd_check_async_rpcs, pc);
251         pc->pc_idle_callback =
252                 liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
253                                                  &ptlrpcd_idle, pc);
254         (void)rc;
255 #endif
256         RETURN(0);
257 }
258
259 static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
260 {
261         set_bit(LIOD_STOP, &pc->pc_flags);
262         cfs_waitq_signal(&pc->pc_set->set_waitq);
263 #ifdef __KERNEL__
264         obd_zombie_impexp_notify = NULL;
265         wait_for_completion(&pc->pc_finishing);
266 #else
267         liblustre_deregister_wait_callback(pc->pc_wait_callback);
268         liblustre_deregister_idle_callback(pc->pc_idle_callback);
269 #endif
270         ptlrpc_set_destroy(pc->pc_set);
271 }
272
273 int ptlrpcd_addref(void)
274 {
275         int rc = 0;
276         ENTRY;
277
278         mutex_down(&ptlrpcd_sem);
279         if (++ptlrpcd_users != 1)
280                 GOTO(out, rc);
281
282         rc = ptlrpcd_start("ptlrpcd", &ptlrpcd_pc);
283         if (rc) {
284                 --ptlrpcd_users;
285                 GOTO(out, rc);
286         }
287
288         rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
289         if (rc) {
290                 ptlrpcd_stop(&ptlrpcd_pc);
291                 --ptlrpcd_users;
292                 GOTO(out, rc);
293         }
294 out:
295         mutex_up(&ptlrpcd_sem);
296         RETURN(rc);
297 }
298
299 void ptlrpcd_decref(void)
300 {
301         mutex_down(&ptlrpcd_sem);
302         if (--ptlrpcd_users == 0) {
303                 ptlrpcd_stop(&ptlrpcd_pc);
304                 ptlrpcd_stop(&ptlrpcd_recovery_pc);
305         }
306         mutex_up(&ptlrpcd_sem);
307 }