Whamcloud - gitweb
c09663a22ec6f3df3fcc32c9fae5b9ec5ea4d787
[fs/lustre-release.git] / lustre / ptlrpc / ptlrpcd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/ptlrpcd.c
37  */
38
39 #define DEBUG_SUBSYSTEM S_RPC
40
41 #ifdef __KERNEL__
42 # include <libcfs/libcfs.h>
43 #else /* __KERNEL__ */
44 # include <liblustre.h>
45 # include <ctype.h>
46 #endif
47
48 #include <lustre_net.h>
49 # include <lustre_lib.h>
50
51 #include <lustre_ha.h>
52 #include <obd_class.h>   /* for obd_zombie */
53 #include <obd_support.h> /* for OBD_FAIL_CHECK */
54 #include <lprocfs_status.h>
55
56 #define LIOD_STOP 0
57 struct ptlrpcd_ctl {
58         unsigned long             pc_flags;
59         spinlock_t                pc_lock;
60         struct completion         pc_starting;
61         struct completion         pc_finishing;
62         struct ptlrpc_request_set *pc_set;
63         char                      pc_name[16];
64 #ifndef __KERNEL__
65         int                       pc_recurred;
66         void                     *pc_callback;
67         void                     *pc_wait_callback;
68         void                     *pc_idle_callback;
69 #endif
70 };
71
72 static struct ptlrpcd_ctl ptlrpcd_pc;
73 static struct ptlrpcd_ctl ptlrpcd_recovery_pc;
74
75 struct semaphore ptlrpcd_sem;
76 static int ptlrpcd_users = 0;
77
78 void ptlrpcd_wake(struct ptlrpc_request *req)
79 {
80         struct ptlrpc_request_set *rq_set = req->rq_set;
81
82         LASSERT(rq_set != NULL);
83
84         cfs_waitq_signal(&rq_set->set_waitq);
85 }
86
87 /* requests that are added to the ptlrpcd queue are sent via
88  * ptlrpcd_check->ptlrpc_check_set() */
89 void ptlrpcd_add_req(struct ptlrpc_request *req)
90 {
91         struct ptlrpcd_ctl *pc;
92
93         if (req->rq_send_state == LUSTRE_IMP_FULL)
94                 pc = &ptlrpcd_pc;
95         else
96                 pc = &ptlrpcd_recovery_pc;
97
98         ptlrpc_set_add_new_req(pc->pc_set, req);
99         cfs_waitq_signal(&pc->pc_set->set_waitq);
100 }
101
102 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)
103 {
104         struct list_head *tmp, *pos;
105         struct ptlrpc_request *req;
106         int rc = 0;
107         ENTRY;
108
109         if (test_bit(LIOD_STOP, &pc->pc_flags))
110                 RETURN(1);
111
112         spin_lock(&pc->pc_set->set_new_req_lock);
113         list_for_each_safe(pos, tmp, &pc->pc_set->set_new_requests) {
114                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
115                 list_del_init(&req->rq_set_chain);
116                 ptlrpc_set_add_req(pc->pc_set, req);
117                 rc = 1; /* need to calculate its timeout */
118         }
119         spin_unlock(&pc->pc_set->set_new_req_lock);
120
121         if (pc->pc_set->set_remaining) {
122                 rc = rc | ptlrpc_check_set(pc->pc_set);
123
124                 /* XXX our set never completes, so we prune the completed
125                  * reqs after each iteration. boy could this be smarter. */
126                 list_for_each_safe(pos, tmp, &pc->pc_set->set_requests) {
127                         req = list_entry(pos, struct ptlrpc_request,
128                                          rq_set_chain);
129                         if (req->rq_phase != RQ_PHASE_COMPLETE)
130                                 continue;
131
132                         list_del_init(&req->rq_set_chain);
133                         req->rq_set = NULL;
134                         ptlrpc_req_finished (req);
135                 }
136         }
137
138         if (rc == 0) {
139                 /* If new requests have been added, make sure to wake up */
140                 spin_lock(&pc->pc_set->set_new_req_lock);
141                 rc = !list_empty(&pc->pc_set->set_new_requests);
142                 spin_unlock(&pc->pc_set->set_new_req_lock);
143         }
144
145         RETURN(rc);
146 }
147
148 #ifdef __KERNEL__
149 /* ptlrpc's code paths like to execute in process context, so we have this
150  * thread which spins on a set which contains the io rpcs.  llite specifies
151  * ptlrpcd's set when it pushes pages down into the oscs */
152 static int ptlrpcd(void *arg)
153 {
154         struct ptlrpcd_ctl *pc = arg;
155         int rc;
156         ENTRY;
157
158         if ((rc = cfs_daemonize_ctxt(pc->pc_name))) {
159                 complete(&pc->pc_starting);
160                 return rc;
161         }
162
163         complete(&pc->pc_starting);
164
165         /* this mainloop strongly resembles ptlrpc_set_wait except
166          * that our set never completes.  ptlrpcd_check calls ptlrpc_check_set
167          * when there are requests in the set.  new requests come in
168          * on the set's new_req_list and ptlrpcd_check moves them into
169          * the set. */
170         while (1) {
171                 struct l_wait_info lwi;
172                 cfs_duration_t timeout;
173
174                 timeout = cfs_time_seconds(ptlrpc_set_next_timeout(pc->pc_set));
175                 lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, pc->pc_set);
176
177                 l_wait_event(pc->pc_set->set_waitq, ptlrpcd_check(pc), &lwi);
178
179                 if (test_bit(LIOD_STOP, &pc->pc_flags))
180                         break;
181         }
182         /* wait for inflight requests to drain */
183         if (!list_empty(&pc->pc_set->set_requests))
184                 ptlrpc_set_wait(pc->pc_set);
185         complete(&pc->pc_finishing);
186         return 0;
187 }
188
189 #else
190
191 int ptlrpcd_check_async_rpcs(void *arg)
192 {
193         struct ptlrpcd_ctl *pc = arg;
194         int                  rc = 0;
195
196         /* single threaded!! */
197         pc->pc_recurred++;
198
199         if (pc->pc_recurred == 1) {
200                 rc = ptlrpcd_check(pc);
201                 if (!rc)
202                         ptlrpc_expired_set(pc->pc_set);
203                 /*XXX send replay requests */
204                 if (pc == &ptlrpcd_recovery_pc)
205                         rc = ptlrpcd_check(pc);
206         }
207
208         pc->pc_recurred--;
209         return rc;
210 }
211
212 int ptlrpcd_idle(void *arg)
213 {
214         struct ptlrpcd_ctl *pc = arg;
215
216         return (list_empty(&pc->pc_set->set_new_requests) &&
217                 pc->pc_set->set_remaining == 0);
218 }
219
220 #endif
221
222 static int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc)
223 {
224         int rc;
225
226         ENTRY;
227         memset(pc, 0, sizeof(*pc));
228         init_completion(&pc->pc_starting);
229         init_completion(&pc->pc_finishing);
230         pc->pc_flags = 0;
231         spin_lock_init(&pc->pc_lock);
232         snprintf (pc->pc_name, sizeof (pc->pc_name), name);
233
234         pc->pc_set = ptlrpc_prep_set();
235         if (pc->pc_set == NULL)
236                 RETURN(-ENOMEM);
237
238 #ifdef __KERNEL__
239         rc = cfs_kernel_thread(ptlrpcd, pc, 0);
240         if (rc < 0)  {
241                 ptlrpc_set_destroy(pc->pc_set);
242                 RETURN(rc);
243         }
244
245         wait_for_completion(&pc->pc_starting);
246 #else
247         pc->pc_wait_callback =
248                 liblustre_register_wait_callback("ptlrpcd_check_async_rpcs",
249                                                  &ptlrpcd_check_async_rpcs, pc);
250         pc->pc_idle_callback =
251                 liblustre_register_idle_callback("ptlrpcd_check_idle_rpcs",
252                                                  &ptlrpcd_idle, pc);
253         (void)rc;
254 #endif
255         RETURN(0);
256 }
257
258 static void ptlrpcd_stop(struct ptlrpcd_ctl *pc)
259 {
260         set_bit(LIOD_STOP, &pc->pc_flags);
261         cfs_waitq_signal(&pc->pc_set->set_waitq);
262 #ifdef __KERNEL__
263         wait_for_completion(&pc->pc_finishing);
264 #else
265         liblustre_deregister_wait_callback(pc->pc_wait_callback);
266         liblustre_deregister_idle_callback(pc->pc_idle_callback);
267 #endif
268         ptlrpc_set_destroy(pc->pc_set);
269 }
270
271 int ptlrpcd_addref(void)
272 {
273         int rc = 0;
274         ENTRY;
275
276         mutex_down(&ptlrpcd_sem);
277         if (++ptlrpcd_users != 1)
278                 GOTO(out, rc);
279
280         rc = ptlrpcd_start("ptlrpcd", &ptlrpcd_pc);
281         if (rc) {
282                 --ptlrpcd_users;
283                 GOTO(out, rc);
284         }
285
286         rc = ptlrpcd_start("ptlrpcd-recov", &ptlrpcd_recovery_pc);
287         if (rc) {
288                 ptlrpcd_stop(&ptlrpcd_pc);
289                 --ptlrpcd_users;
290                 GOTO(out, rc);
291         }
292 out:
293         mutex_up(&ptlrpcd_sem);
294         RETURN(rc);
295 }
296
297 void ptlrpcd_decref(void)
298 {
299         mutex_down(&ptlrpcd_sem);
300         if (--ptlrpcd_users == 0) {
301                 ptlrpcd_stop(&ptlrpcd_pc);
302                 ptlrpcd_stop(&ptlrpcd_recovery_pc);
303         }
304         mutex_up(&ptlrpcd_sem);
305 }