1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001-2003 Cluster File Systems, Inc.
5 * Author Peter Braam <braam@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 * For testing and management it is treated as an obd_device,
23 * although * it does not export a full OBD method table (the
24 * requests are coming * in over the wire, so object target modules
25 * do not have a full * method table.)
29 #define DEBUG_SUBSYSTEM S_OSC
32 # include <linux/version.h>
33 # include <linux/module.h>
34 # include <linux/mm.h>
35 # include <linux/highmem.h>
36 # include <linux/lustre_dlm.h>
37 # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
38 # include <linux/workqueue.h>
39 # include <linux/smp_lock.h>
41 # include <linux/locks.h>
43 #else /* __KERNEL__ */
44 # include <liblustre.h>
47 #include <linux/kp30.h>
48 #include <linux/lustre_net.h>
49 #include <linux/obd_ost.h>
50 #include <linux/obd_lov.h>
53 # include <linux/ctype.h>
54 # include <linux/init.h>
59 #include <linux/lustre_ha.h>
60 #include <linux/obd_support.h> /* for OBD_FAIL_CHECK */
61 #include <linux/lprocfs_status.h>
64 static struct osc_rpcd_ctl {
65 unsigned long orc_flags;
67 struct completion orc_starting;
68 struct completion orc_finishing;
69 struct list_head orc_req_list;
70 wait_queue_head_t orc_waitq;
71 struct ptlrpc_request_set *orc_set;
74 static DECLARE_MUTEX(osc_rpcd_sem);
75 static int osc_rpcd_users = 0;
77 void osc_rpcd_add_req(struct ptlrpc_request *req)
79 struct osc_rpcd_ctl *orc = &osc_orc;
81 ptlrpc_set_add_new_req(orc->orc_set, req);
82 wake_up(&orc->orc_waitq);
85 static int osc_rpcd_check(struct osc_rpcd_ctl *orc)
87 struct list_head *tmp, *pos;
88 struct ptlrpc_request *req;
93 if (test_bit(LIOD_STOP, &orc->orc_flags))
96 spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags);
97 list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) {
98 req = list_entry(pos, struct ptlrpc_request, rq_set_chain);
99 list_del_init(&req->rq_set_chain);
100 ptlrpc_set_add_req(orc->orc_set, req);
101 rc = 1; /* need to calculate its timeout */
103 spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags);
105 if (orc->orc_set->set_remaining) {
106 rc = rc | ptlrpc_check_set(orc->orc_set);
108 /* XXX our set never completes, so we prune the completed
109 * reqs after each iteration. boy could this be smarter. */
110 list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) {
111 req = list_entry(pos, struct ptlrpc_request,
113 if (req->rq_phase != RQ_PHASE_COMPLETE)
116 list_del_init(&req->rq_set_chain);
118 ptlrpc_req_finished (req);
126 /* ptlrpc's code paths like to execute in process context, so we have this
127 * thread which spins on a set which contains the io rpcs. llite specifies
128 * osc_rpcd's set when it pushes pages down into the oscs */
129 static int osc_rpcd(void *arg)
131 struct osc_rpcd_ctl *orc = arg;
135 kportal_daemonize("liod_writeback");
137 SIGNAL_MASK_LOCK(current, flags);
138 sigfillset(¤t->blocked);
140 SIGNAL_MASK_UNLOCK(current, flags);
142 complete(&orc->orc_starting);
145 current->flags |= PF_MEMALLOC;
147 /* this mainloop strongly resembles ptlrpc_set_wait except
148 * that our set never completes. osc_rpcd_check calls ptlrpc_check_set
149 * when there are requests in the set. new requests come in
150 * on the set's new_req_list and osc_rpcd_check moves them into
153 wait_queue_t set_wait;
154 struct l_wait_info lwi;
157 timeout = ptlrpc_set_next_timeout(orc->orc_set) * HZ;
158 lwi = LWI_TIMEOUT(timeout, ptlrpc_expired_set, orc->orc_set);
160 /* ala the pinger, wait on orc's waitqueue and the set's */
161 init_waitqueue_entry(&set_wait, current);
162 add_wait_queue(&orc->orc_set->set_waitq, &set_wait);
163 l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi);
164 remove_wait_queue(&orc->orc_set->set_waitq, &set_wait);
166 if (test_bit(LIOD_STOP, &orc->orc_flags))
169 /* XXX should be making sure we don't have anything in flight */
170 complete(&orc->orc_finishing);
174 static int osc_rpcd_recurred = 0;
175 static void *osc_rpcd_callback;
177 int osc_check_async_rpcs(void *arg)
179 struct osc_rpcd_ctl *orc = arg;
182 /* single threaded!! */
185 if (osc_rpcd_recurred == 1)
186 rc = osc_rpcd_check(orc);
193 int osc_rpcd_addref(void)
195 struct osc_rpcd_ctl *orc = &osc_orc;
200 if (++osc_rpcd_users != 1)
203 memset(orc, 0, sizeof(*orc));
204 init_completion(&orc->orc_starting);
205 init_completion(&orc->orc_finishing);
206 init_waitqueue_head(&orc->orc_waitq);
208 spin_lock_init(&orc->orc_lock);
209 INIT_LIST_HEAD(&orc->orc_req_list);
211 orc->orc_set = ptlrpc_prep_set();
212 if (orc->orc_set == NULL)
213 GOTO(out, rc = -ENOMEM);
216 if (kernel_thread(osc_rpcd, orc, 0) < 0) {
217 ptlrpc_set_destroy(orc->orc_set);
218 GOTO(out, rc = -ECHILD);
221 wait_for_completion(&orc->orc_starting);
224 liblustre_register_wait_callback(&osc_check_async_rpcs, orc);
231 void osc_rpcd_decref(void)
233 struct osc_rpcd_ctl *orc = &osc_orc;
236 if (--osc_rpcd_users == 0) {
237 set_bit(LIOD_STOP, &orc->orc_flags);
238 wake_up(&orc->orc_waitq);
240 wait_for_completion(&orc->orc_finishing);
242 liblustre_deregister_wait_callback(osc_rpcd_callback);
244 ptlrpc_set_destroy(orc->orc_set);