1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Phil Schwan <phil@clusterfs.com>
8 * This file is part of Lustre, http://www.lustre.org.
10 * Lustre is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Lustre is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Lustre; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 #define DEBUG_SUBSYSTEM S_LDLM
27 #include <linux/lustre_dlm.h>
28 #include <linux/obd_support.h>
29 #include <linux/obd_class.h>
30 #include <linux/lustre_lib.h>
32 #include <liblustre.h>
36 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
38 if ((new->l_data.l_flock.pid == lock->l_data.l_flock.pid) &&
39 (new->l_export == lock->l_export))
46 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
48 if ((new->l_data.l_flock.start <= lock->l_data.l_flock.end) &&
49 (new->l_data.l_flock.end >= lock->l_data.l_flock.start))
56 ldlm_flock_destroy(struct ldlm_lock *lock, int flags)
60 list_del_init(&lock->l_res_link);
61 if (flags == LDLM_FL_WAIT_NOREPROC) {
63 struct lustre_handle lockh;
65 /* Set a flag to prevent us from sending a CANCEL */
66 lock->l_flags |= LDLM_FL_LOCAL_ONLY;
68 ldlm_lock2handle(lock, &lockh);
69 ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode);
72 ldlm_lock_destroy(lock);
77 ldlm_flock_enqueue(struct ldlm_lock **reqp, void *req_cookie, int *flags,
78 int first_enq, ldlm_error_t *err)
80 struct ldlm_lock *req = *reqp;
81 struct ldlm_lock *new = req;
82 struct ldlm_lock *new2 = NULL;
83 struct ldlm_lock *lock = NULL;
84 struct ldlm_resource *res = req->l_resource;
85 struct ldlm_namespace *ns = res->lr_namespace;
86 struct list_head *tmp;
87 struct list_head *ownlocks;
88 ldlm_mode_t mode = req->l_req_mode;
93 CDEBUG(D_FLOCK, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n",
94 *flags, new->l_data.l_flock.pid, mode,
95 req->l_data.l_flock.start, req->l_data.l_flock.end);
99 /* No blocking ASTs are sent for record locks */
100 req->l_blocking_ast = NULL;
103 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
104 list_for_each(tmp, &res->lr_granted) {
105 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
107 if (ldlm_same_flock_owner(lock, req)) {
113 list_for_each(tmp, &res->lr_granted) {
114 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
116 if (ldlm_same_flock_owner(lock, req)) {
122 /* locks are compatible, overlap doesn't matter */
123 if (lockmode_compat(lock->l_granted_mode, mode))
126 if (!ldlm_flocks_overlap(lock, req))
129 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
130 ldlm_flock_destroy(req, *flags);
131 *err = ELDLM_LOCK_ABORTED;
132 RETURN(LDLM_ITER_STOP);
135 if (*flags & LDLM_FL_TEST_LOCK) {
136 req->l_granted_mode = lock->l_granted_mode;
137 req->l_data.l_flock.pid =
138 lock->l_data.l_flock.pid;
139 req->l_data.l_flock.start =
140 lock->l_data.l_flock.start;
141 req->l_data.l_flock.end =
142 lock->l_data.l_flock.end;
143 ldlm_flock_destroy(req, *flags);
144 RETURN(LDLM_ITER_STOP);
148 /* XXX - add deadlock detection check here */
151 *flags |= LDLM_FL_BLOCK_GRANTED;
152 RETURN(LDLM_ITER_CONTINUE);
156 if (*flags & LDLM_FL_TEST_LOCK) {
158 req->l_granted_mode = req->l_req_mode;
159 RETURN(LDLM_ITER_STOP);
162 added = (mode == LCK_NL);
164 /* Insert the new lock into the list */
167 ownlocks = &res->lr_granted;
169 for (tmp = ownlocks->next; ownlocks != &res->lr_granted;
170 ownlocks = tmp, tmp = ownlocks->next) {
171 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
173 if (!ldlm_same_flock_owner(lock, new))
176 if (lock->l_granted_mode == mode) {
177 if (lock->l_data.l_flock.end <
178 (new->l_data.l_flock.start - 1))
181 if (lock->l_data.l_flock.start >
182 (new->l_data.l_flock.end + 1))
185 if (lock->l_data.l_flock.start >
186 new->l_data.l_flock.start)
187 lock->l_data.l_flock.start =
188 new->l_data.l_flock.start;
190 new->l_data.l_flock.start =
191 lock->l_data.l_flock.start;
193 if (lock->l_data.l_flock.end <
194 new->l_data.l_flock.end)
195 lock->l_data.l_flock.end =
196 new->l_data.l_flock.end;
198 new->l_data.l_flock.end =
199 lock->l_data.l_flock.end;
202 ldlm_flock_destroy(lock, *flags);
210 if (lock->l_data.l_flock.end < new->l_data.l_flock.start)
212 if (lock->l_data.l_flock.start > new->l_data.l_flock.end)
217 if (new->l_data.l_flock.start <=
218 lock->l_data.l_flock.start) {
219 if (new->l_data.l_flock.end <
220 lock->l_data.l_flock.end) {
221 lock->l_data.l_flock.start =
222 new->l_data.l_flock.end + 1;
225 ldlm_flock_destroy(lock, *flags);
227 lock->l_data.l_flock.start =
228 new->l_data.l_flock.start;
229 lock->l_data.l_flock.end =
230 new->l_data.l_flock.end;
236 if (new->l_data.l_flock.end >= lock->l_data.l_flock.end) {
237 lock->l_data.l_flock.end =
238 new->l_data.l_flock.start - 1;
242 /* split the existing lock into two locks */
244 /* if this is an F_UNLCK operation then we could avoid
245 * allocating a new lock and use the req lock passed in
246 * with the request but this would complicate the reply
247 * processing since updates to req get reflected in the
248 * reply. The client side must see the original lock data
249 * so that it can process the unlock properly. */
251 /* XXX - if ldlm_lock_new() can sleep we have to
252 * release the ns_lock, allocate the new lock, and
253 * restart processing this lock. */
254 new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK,
255 lock->l_granted_mode, NULL, NULL);
262 new2->l_granted_mode = lock->l_granted_mode;
263 new2->l_data.l_flock.pid = new->l_data.l_flock.pid;
264 new2->l_data.l_flock.start = lock->l_data.l_flock.start;
265 new2->l_data.l_flock.end = new->l_data.l_flock.start - 1;
266 lock->l_data.l_flock.start = new->l_data.l_flock.end + 1;
267 new2->l_connh = lock->l_connh;
268 if ((new2->l_export = lock->l_export) != NULL) {
269 list_add(&new2->l_export_chain,
271 exp_ldlm_data.led_held_locks);
273 if (*flags == LDLM_FL_WAIT_NOREPROC) {
275 ldlm_lock_addref_internal(new2, lock->l_granted_mode);
278 /* insert new2 at lock */
279 list_add_tail(&new2->l_res_link, ownlocks);
285 ldlm_flock_destroy(req, *flags);
287 /* insert new at ownlocks */
288 new->l_granted_mode = new->l_req_mode;
289 list_del_init(&new->l_res_link);
290 list_add_tail(&new->l_res_link, ownlocks);
293 if (*flags != LDLM_FL_WAIT_NOREPROC) {
294 if (req->l_completion_ast)
295 ldlm_add_ast_work_item(req, NULL, NULL, 0);
297 /* The only problem with doing the reprocessing here is that
298 * the completion ASTs for newly granted locks will be sent
299 * before the unlock completion is sent. It shouldn't be an
300 * issue. Also note that ldlm_flock_enqueue() will recurse,
301 * but only once because there can't be unlock requests on
303 if ((mode == LCK_NL) && overlaps)
304 ldlm_reprocess_queue(res, &res->lr_waiting);
307 ldlm_resource_dump(res);
309 RETURN(LDLM_ITER_CONTINUE);
312 static void interrupted_flock_completion_wait(void *data)
316 struct flock_wait_data {
317 struct ldlm_lock *fwd_lock;
322 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
324 struct ldlm_namespace *ns;
325 struct file_lock *getlk = data;
326 struct flock_wait_data fwd;
327 unsigned long irqflags;
328 struct obd_device *obd;
329 struct obd_import *imp = NULL;
332 struct l_wait_info lwi;
335 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
338 wake_up(&lock->l_waitq);
342 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
343 LDLM_FL_BLOCK_CONV)))
346 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
349 ldlm_lock_dump(D_OTHER, lock);
352 obd = class_conn2obd(lock->l_connh);
354 /* if this is a local lock, then there is no import */
356 imp = obd->u.cli.cl_import;
359 spin_lock_irqsave(&imp->imp_lock, irqflags);
360 fwd.fwd_generation = imp->imp_generation;
361 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
364 lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait,
367 /* Go to sleep until the lock is granted. */
368 rc = l_wait_event(lock->l_waitq,
369 ((lock->l_req_mode == lock->l_granted_mode) ||
370 lock->l_destroyed), &lwi);
372 LASSERT(!(lock->l_destroyed));
375 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
382 LDLM_DEBUG(lock, "client-side enqueue waking up");
383 ns = lock->l_resource->lr_namespace;
384 l_lock(&ns->ns_lock);
386 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
387 list_del_init(&lock->l_res_link);
390 /* fcntl(F_GETLK) request */
391 if (lock->l_granted_mode == LCK_PR)
392 getlk->fl_type = F_RDLCK;
393 else if (lock->l_granted_mode == LCK_PW)
394 getlk->fl_type = F_WRLCK;
396 getlk->fl_type = F_UNLCK;
397 getlk->fl_pid = lock->l_data.l_flock.pid;
398 getlk->fl_start = lock->l_data.l_flock.start;
399 getlk->fl_end = lock->l_data.l_flock.end;
400 /* ldlm_flock_destroy(lock); */
402 flags = LDLM_FL_WAIT_NOREPROC;
403 /* We need to reprocess the lock to do merges or split */
404 ldlm_flock_enqueue(&lock, NULL, &flags, 1, &err);
406 l_unlock(&ns->ns_lock);
410 /* This function is only called on the client when a lock is aborted. */
412 ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld,
413 void *data, int flag)
416 ldlm_lock_destroy(lock);