Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / osc / osc_rpcd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2001-2003 Cluster File Systems, Inc.
5  *   Author Peter Braam <braam@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  *
22  *  For testing and management it is treated as an obd_device,
23  *  although * it does not export a full OBD method table (the
24  *  requests are coming * in over the wire, so object target modules
25  *  do not have a full * method table.)
26  *
27  */
28
29 #define DEBUG_SUBSYSTEM S_OSC
30
31 #ifdef __KERNEL__
32 # include <linux/version.h>
33 # include <linux/module.h>
34 # include <linux/mm.h>
35 # include <linux/highmem.h>
36 # include <linux/lustre_dlm.h>
37 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 #  include <linux/workqueue.h>
39 #  include <linux/smp_lock.h>
40 # else
41 #  include <linux/locks.h>
42 # endif
43 #else /* __KERNEL__ */
44 # include <liblustre.h>
45 #endif
46
47 #include <linux/kp30.h>
48 #include <linux/lustre_net.h>
49 #include <linux/obd_ost.h>
50 #include <linux/obd_lov.h>
51
52 #ifndef  __CYGWIN__
53 # include <linux/ctype.h>
54 # include <linux/init.h>
55 #else
56 # include <ctype.h>
57 #endif
58
59 #include <linux/lustre_ha.h>
60 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
61 #include <linux/lprocfs_status.h>
62
63 #define LIOD_STOP 0
64 static struct osc_rpcd_ctl {
65         unsigned long             orc_flags;
66         spinlock_t                orc_lock;
67         struct completion         orc_starting;
68         struct completion         orc_finishing;
69         struct list_head          orc_req_list;
70         wait_queue_head_t         orc_waitq;
71         struct ptlrpc_request_set *orc_set;
72 } osc_orc;
73
74 static DECLARE_MUTEX(osc_rpcd_sem);
75 static int osc_rpcd_users = 0;
76
77 void osc_rpcd_add_req(struct ptlrpc_request *req)
78 {
79         struct osc_rpcd_ctl *orc = &osc_orc;
80
81         ptlrpc_set_add_new_req(orc->orc_set, req);
82         wake_up(&orc->orc_waitq);
83 }
84
85 static int osc_rpcd_check(struct osc_rpcd_ctl *orc)
86 {
87         struct list_head *tmp, *pos;
88         struct ptlrpc_request *req;
89         unsigned long flags;
90         int rc = 0;
91         ENTRY;
92
93         if (test_bit(LIOD_STOP, &orc->orc_flags))
94                 RETURN(1);
95
96         spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags);
97         list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) {
98                 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
99                 list_del_init(&req->rq_set_chain);
100                 ptlrpc_set_add_req(orc->orc_set, req);
101                 rc = 1; /* need to calculate its timeout */
102         }
103         spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags);
104
105         if (orc->orc_set->set_remaining) {
106                 rc = rc | ptlrpc_check_set(orc->orc_set);
107
108                 /* XXX our set never completes, so we prune the completed
109                  * reqs after each iteration. boy could this be smarter. */
110                 list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) {
111                         req = list_entry(pos, struct ptlrpc_request,
112                                          rq_set_chain);
113                         if (req->rq_phase != RQ_PHASE_COMPLETE)
114                                 continue;
115
116                         list_del_init(&req->rq_set_chain);
117                         req->rq_set = NULL;
118                         ptlrpc_req_finished (req);
119                 }
120         }
121
122         RETURN(rc);
123 }
124
125 #ifdef __KERNEL__
126 /* ptlrpc's code paths like to execute in process context, so we have this
127  * thread which spins on a set which contains the io rpcs.  llite specifies
128  * osc_rpcd's set when it pushes pages down into the oscs */
129 static int osc_rpcd(void *arg)
130 {
131         struct osc_rpcd_ctl *orc = arg;
132         unsigned long flags;
133         ENTRY;
134
135         kportal_daemonize("liod_writeback");
136
137         SIGNAL_MASK_LOCK(current, flags);
138         sigfillset(&current->blocked);
139         RECALC_SIGPENDING;
140         SIGNAL_MASK_UNLOCK(current, flags);
141
142         complete(&orc->orc_starting);
143
144         /* like kswapd */
145         current->flags |= PF_MEMALLOC;
146
147         /* this mainloop strongly resembles ptlrpc_set_wait except
148          * that our set never completes.  osc_rpcd_check calls ptlrpc_check_set
149          * when there are requests in the set.  new requests come in
150          * on the set's new_req_list and osc_rpcd_check moves them into
151          * the set. */
152         while (1) {
153                 wait_queue_t set_wait;
154                 struct l_wait_info lwi;
155                 int timeout;
156
157                 timeout = ptlrpc_set_next_timeout(orc->orc_set) * HZ;
158                 lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, orc->orc_set);
159
160                 /* ala the pinger, wait on orc's waitqueue and the set's */
161                 init_waitqueue_entry(&set_wait, current);
162                 add_wait_queue(&orc->orc_set->set_waitq, &set_wait);
163                 l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi);
164                 remove_wait_queue(&orc->orc_set->set_waitq, &set_wait);
165
166                 if (test_bit(LIOD_STOP, &orc->orc_flags))
167                         break;
168         }
169         /* XXX should be making sure we don't have anything in flight */
170         complete(&orc->orc_finishing);
171         return 0;
172 }
173 #else
174 static int osc_rpcd_recurred = 0;
175 static void *osc_rpcd_callback;
176
177 int osc_check_async_rpcs(void *arg)
178 {
179         struct osc_rpcd_ctl *orc = arg;
180         int                  rc = 0;
181
182         /* single threaded!! */
183         osc_rpcd_recurred++;
184
185         if (osc_rpcd_recurred == 1)
186                 rc = osc_rpcd_check(orc);
187
188         osc_rpcd_recurred--;
189         return rc;
190 }
191 #endif
192
193 int osc_rpcd_addref(void)
194 {
195         struct osc_rpcd_ctl *orc = &osc_orc;
196         int rc = 0;
197         ENTRY;
198
199         down(&osc_rpcd_sem);
200         if (++osc_rpcd_users != 1)
201                 GOTO(out, rc);
202
203         memset(orc, 0, sizeof(*orc));
204         init_completion(&orc->orc_starting);
205         init_completion(&orc->orc_finishing);
206         init_waitqueue_head(&orc->orc_waitq);
207         orc->orc_flags = 0;
208         spin_lock_init(&orc->orc_lock);
209         INIT_LIST_HEAD(&orc->orc_req_list);
210
211         orc->orc_set = ptlrpc_prep_set();
212         if (orc->orc_set == NULL)
213                 GOTO(out, rc = -ENOMEM);
214
215 #ifdef __KERNEL__
216         if (kernel_thread(osc_rpcd, orc, 0) < 0)  {
217                 ptlrpc_set_destroy(orc->orc_set);
218                 GOTO(out, rc = -ECHILD);
219         }
220
221         wait_for_completion(&orc->orc_starting);
222 #else
223         osc_rpcd_callback =
224                 liblustre_register_wait_callback(&osc_check_async_rpcs, orc);
225 #endif
226 out:
227         up(&osc_rpcd_sem);
228         RETURN(rc);
229 }
230
231 void osc_rpcd_decref(void)
232 {
233         struct osc_rpcd_ctl *orc = &osc_orc;
234
235         down(&osc_rpcd_sem);
236         if (--osc_rpcd_users == 0) {
237                 set_bit(LIOD_STOP, &orc->orc_flags);
238                 wake_up(&orc->orc_waitq);
239 #ifdef __KERNEL__
240                 wait_for_completion(&orc->orc_finishing);
241 #else
242                 liblustre_deregister_wait_callback(osc_rpcd_callback);
243 #endif
244                 ptlrpc_set_destroy(orc->orc_set);
245         }
246         up(&osc_rpcd_sem);
247 }