Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
33  * Developed under the sponsorship of the US Government under
34  * Subcontract No. B514193
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 #define DEBUG_SUBSYSTEM S_LDLM
42
43 #ifdef __KERNEL__
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
49 #else
50 #include <liblustre.h>
51 #include <obd_class.h>
52 #endif
53
54 #include "ldlm_internal.h"
55
56 #define l_flock_waitq   l_lru
57
58 /**
59  * Wait queue for Posix lock deadlock detection, added with
60  * ldlm_lock::l_flock_waitq.
61  */
62 static CFS_LIST_HEAD(ldlm_flock_waitq);
63 /**
64  * Lock protecting access to ldlm_flock_waitq.
65  */
66 spinlock_t ldlm_flock_waitq_lock = SPIN_LOCK_UNLOCKED;
67
68 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
69                             void *data, int flag);
70
71 /**
72  * list_for_remaining_safe - iterate over the remaining entries in a list
73  *              and safeguard against removal of a list entry.
74  * @pos:        the &struct list_head to use as a loop counter. pos MUST
75  *              have been initialized prior to using it in this macro.
76  * @n:          another &struct list_head to use as temporary storage
77  * @head:       the head for your list.
78  */
79 #define list_for_remaining_safe(pos, n, head) \
80         for (n = pos->next; pos != (head); pos = n, n = pos->next)
81
82 static inline int
83 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
84 {
85         return((new->l_policy_data.l_flock.pid ==
86                 lock->l_policy_data.l_flock.pid) &&
87                (new->l_export == lock->l_export));
88 }
89
90 static inline int
91 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
92 {
93         return((new->l_policy_data.l_flock.start <=
94                 lock->l_policy_data.l_flock.end) &&
95                (new->l_policy_data.l_flock.end >=
96                 lock->l_policy_data.l_flock.start));
97 }
98
99 static inline void
100 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
101 {
102         ENTRY;
103
104         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
105                    mode, flags);
106
107         /* Safe to not lock here, since it should be empty anyway */
108         LASSERT(list_empty(&lock->l_flock_waitq));
109
110         list_del_init(&lock->l_res_link);
111         if (flags == LDLM_FL_WAIT_NOREPROC) {
112                 /* client side - set a flag to prevent sending a CANCEL */
113                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
114                 ldlm_lock_decref_internal(lock, mode);
115         }
116
117         ldlm_lock_destroy_nolock(lock);
118         EXIT;
119 }
120
121 static int
122 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
123 {
124         struct obd_export *req_export = req->l_export;
125         struct obd_export *blocking_export = blocking_lock->l_export;
126         pid_t req_pid = req->l_policy_data.l_flock.pid;
127         pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
128         struct ldlm_lock *lock;
129
130         spin_lock(&ldlm_flock_waitq_lock);
131 restart:
132         list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
133                 if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
134                     (lock->l_export != blocking_export))
135                         continue;
136
137                 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
138                 blocking_export = (struct obd_export *)(long)
139                         lock->l_policy_data.l_flock.blocking_export;
140                 if (blocking_pid == req_pid && blocking_export == req_export) {
141                         spin_unlock(&ldlm_flock_waitq_lock);
142                         return 1;
143                 }
144
145                 goto restart;
146         }
147         spin_unlock(&ldlm_flock_waitq_lock);
148
149         return 0;
150 }
151
152 int
153 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
154                         ldlm_error_t *err, struct list_head *work_list)
155 {
156         struct ldlm_resource *res = req->l_resource;
157         struct ldlm_namespace *ns = res->lr_namespace;
158         struct list_head *tmp;
159         struct list_head *ownlocks = NULL;
160         struct ldlm_lock *lock = NULL;
161         struct ldlm_lock *new = req;
162         struct ldlm_lock *new2 = NULL;
163         ldlm_mode_t mode = req->l_req_mode;
164         int local = ns_is_client(ns);
165         int added = (mode == LCK_NL);
166         int overlaps = 0;
167         ENTRY;
168
169         CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
170                "\n", *flags, new->l_policy_data.l_flock.pid, mode,
171                req->l_policy_data.l_flock.start,
172                req->l_policy_data.l_flock.end);
173
174         *err = ELDLM_OK;
175
176         if (local) {
177                 /* No blocking ASTs are sent to the clients for
178                  * Posix file & record locks */
179                 req->l_blocking_ast = NULL;
180         } else {
181                 /* Called on the server for lock cancels. */
182                 req->l_blocking_ast = ldlm_flock_blocking_ast;
183         }
184
185         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
186                 /* This loop determines where this processes locks start
187                  * in the resource lr_granted list. */
188                 list_for_each(tmp, &res->lr_granted) {
189                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
190                         if (ldlm_same_flock_owner(lock, req)) {
191                                 ownlocks = tmp;
192                                 break;
193                         }
194                 }
195         } else {
196                 lockmode_verify(mode);
197
198                 /* This loop determines if there are existing locks
199                  * that conflict with the new lock request. */
200                 list_for_each(tmp, &res->lr_granted) {
201                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
202
203                         if (ldlm_same_flock_owner(lock, req)) {
204                                 if (!ownlocks)
205                                         ownlocks = tmp;
206                                 continue;
207                         }
208
209                         /* locks are compatible, overlap doesn't matter */
210                         if (lockmode_compat(lock->l_granted_mode, mode))
211                                 continue;
212
213                         if (!ldlm_flocks_overlap(lock, req))
214                                 continue;
215
216                         if (!first_enq)
217                                 RETURN(LDLM_ITER_CONTINUE);
218
219                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
220                                 ldlm_flock_destroy(req, mode, *flags);
221                                 *err = -EAGAIN;
222                                 RETURN(LDLM_ITER_STOP);
223                         }
224
225                         if (*flags & LDLM_FL_TEST_LOCK) {
226                                 ldlm_flock_destroy(req, mode, *flags);
227                                 req->l_req_mode = lock->l_granted_mode;
228                                 req->l_policy_data.l_flock.pid =
229                                         lock->l_policy_data.l_flock.pid;
230                                 req->l_policy_data.l_flock.start =
231                                         lock->l_policy_data.l_flock.start;
232                                 req->l_policy_data.l_flock.end =
233                                         lock->l_policy_data.l_flock.end;
234                                 *flags |= LDLM_FL_LOCK_CHANGED;
235                                 RETURN(LDLM_ITER_STOP);
236                         }
237
238                         if (ldlm_flock_deadlock(req, lock)) {
239                                 ldlm_flock_destroy(req, mode, *flags);
240                                 *err = -EDEADLK;
241                                 RETURN(LDLM_ITER_STOP);
242                         }
243
244                         req->l_policy_data.l_flock.blocking_pid =
245                                 lock->l_policy_data.l_flock.pid;
246                         req->l_policy_data.l_flock.blocking_export =
247                                 (long)(void *)lock->l_export;
248
249                         LASSERT(list_empty(&req->l_flock_waitq));
250                         spin_lock(&ldlm_flock_waitq_lock);
251                         list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
252                         spin_unlock(&ldlm_flock_waitq_lock);
253
254                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
255                         *flags |= LDLM_FL_BLOCK_GRANTED;
256                         RETURN(LDLM_ITER_STOP);
257                 }
258         }
259
260         if (*flags & LDLM_FL_TEST_LOCK) {
261                 ldlm_flock_destroy(req, mode, *flags);
262                 req->l_req_mode = LCK_NL;
263                 *flags |= LDLM_FL_LOCK_CHANGED;
264                 RETURN(LDLM_ITER_STOP);
265         }
266
267         /* In case we had slept on this lock request take it off of the
268          * deadlock detection waitq. */
269         spin_lock(&ldlm_flock_waitq_lock);
270         list_del_init(&req->l_flock_waitq);
271         spin_unlock(&ldlm_flock_waitq_lock);
272
273         /* Scan the locks owned by this process that overlap this request.
274          * We may have to merge or split existing locks. */
275
276         if (!ownlocks)
277                 ownlocks = &res->lr_granted;
278
279         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
280                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
281
282                 if (!ldlm_same_flock_owner(lock, new))
283                         break;
284
285                 if (lock->l_granted_mode == mode) {
286                         /* If the modes are the same then we need to process
287                          * locks that overlap OR adjoin the new lock. The extra
288                          * logic condition is necessary to deal with arithmetic
289                          * overflow and underflow. */
290                         if ((new->l_policy_data.l_flock.start >
291                              (lock->l_policy_data.l_flock.end + 1))
292                             && (lock->l_policy_data.l_flock.end !=
293                                 OBD_OBJECT_EOF))
294                                 continue;
295
296                         if ((new->l_policy_data.l_flock.end <
297                              (lock->l_policy_data.l_flock.start - 1))
298                             && (lock->l_policy_data.l_flock.start != 0))
299                                 break;
300
301                         if (new->l_policy_data.l_flock.start <
302                             lock->l_policy_data.l_flock.start) {
303                                 lock->l_policy_data.l_flock.start =
304                                         new->l_policy_data.l_flock.start;
305                         } else {
306                                 new->l_policy_data.l_flock.start =
307                                         lock->l_policy_data.l_flock.start;
308                         }
309
310                         if (new->l_policy_data.l_flock.end >
311                             lock->l_policy_data.l_flock.end) {
312                                 lock->l_policy_data.l_flock.end =
313                                         new->l_policy_data.l_flock.end;
314                         } else {
315                                 new->l_policy_data.l_flock.end =
316                                         lock->l_policy_data.l_flock.end;
317                         }
318
319                         if (added) {
320                                 ldlm_flock_destroy(lock, mode, *flags);
321                         } else {
322                                 new = lock;
323                                 added = 1;
324                         }
325                         continue;
326                 }
327
328                 if (new->l_policy_data.l_flock.start >
329                     lock->l_policy_data.l_flock.end)
330                         continue;
331
332                 if (new->l_policy_data.l_flock.end <
333                     lock->l_policy_data.l_flock.start)
334                         break;
335
336                 ++overlaps;
337
338                 if (new->l_policy_data.l_flock.start <=
339                     lock->l_policy_data.l_flock.start) {
340                         if (new->l_policy_data.l_flock.end <
341                             lock->l_policy_data.l_flock.end) {
342                                 lock->l_policy_data.l_flock.start =
343                                         new->l_policy_data.l_flock.end + 1;
344                                 break;
345                         }
346                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
347                         continue;
348                 }
349                 if (new->l_policy_data.l_flock.end >=
350                     lock->l_policy_data.l_flock.end) {
351                         lock->l_policy_data.l_flock.end =
352                                 new->l_policy_data.l_flock.start - 1;
353                         continue;
354                 }
355
356                 /* split the existing lock into two locks */
357
358                 /* if this is an F_UNLCK operation then we could avoid
359                  * allocating a new lock and use the req lock passed in
360                  * with the request but this would complicate the reply
361                  * processing since updates to req get reflected in the
362                  * reply. The client side replays the lock request so
363                  * it must see the original lock data in the reply. */
364
365                 /* XXX - if ldlm_lock_new() can sleep we should
366                  * release the ns_lock, allocate the new lock,
367                  * and restart processing this lock. */
368                 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
369                                         lock->l_granted_mode, NULL, NULL, NULL,
370                                         NULL, 0);
371                 if (!new2) {
372                         ldlm_flock_destroy(req, lock->l_granted_mode, *flags);
373                         *err = -ENOLCK;
374                         RETURN(LDLM_ITER_STOP);
375                 }
376
377                 new2->l_granted_mode = lock->l_granted_mode;
378                 new2->l_policy_data.l_flock.pid =
379                         new->l_policy_data.l_flock.pid;
380                 new2->l_policy_data.l_flock.start =
381                         lock->l_policy_data.l_flock.start;
382                 new2->l_policy_data.l_flock.end =
383                         new->l_policy_data.l_flock.start - 1;
384                 lock->l_policy_data.l_flock.start =
385                         new->l_policy_data.l_flock.end + 1;
386                 new2->l_conn_export = lock->l_conn_export;
387                 if (lock->l_export != NULL) {
388                         new2->l_export = class_export_get(lock->l_export);
389                         spin_lock(&new2->l_export->exp_ldlm_data.led_lock);
390                         list_add(&new2->l_export_chain,
391                                  &new2->l_export->exp_ldlm_data.led_held_locks);
392                         spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);
393                 }
394                 if (*flags == LDLM_FL_WAIT_NOREPROC)
395                         ldlm_lock_addref_internal(new2, lock->l_granted_mode);
396
397                 /* insert new2 at lock */
398                 ldlm_resource_add_lock(res, ownlocks, new2);
399                 LDLM_LOCK_PUT(new2);
400                 break;
401         }
402
403         /* At this point we're granting the lock request. */
404         req->l_granted_mode = req->l_req_mode;
405
406         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
407         if (!added) {
408                 list_del_init(&req->l_res_link);
409                 /* insert new lock before ownlocks in list. */
410                 ldlm_resource_add_lock(res, ownlocks, req);
411         }
412
413         if (*flags != LDLM_FL_WAIT_NOREPROC) {
414                 if (first_enq) {
415                         /* If this is an unlock, reprocess the waitq and
416                          * send completions ASTs for locks that can now be
417                          * granted. The only problem with doing this
418                          * reprocessing here is that the completion ASTs for
419                          * newly granted locks will be sent before the unlock
420                          * completion is sent. It shouldn't be an issue. Also
421                          * note that ldlm_process_flock_lock() will recurse,
422                          * but only once because first_enq will be false from
423                          * ldlm_reprocess_queue. */
424                         if ((mode == LCK_NL) && overlaps) {
425                                 CFS_LIST_HEAD(rpc_list);
426                                 int rc;
427 restart:
428                                 ldlm_reprocess_queue(res, &res->lr_waiting,
429                                                      &rpc_list);
430
431                                 unlock_res(res);
432                                 rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
433                                 lock_res(res);
434                                 if (rc == -ERESTART)
435                                         GOTO(restart, -ERESTART);
436                        }
437                 } else {
438                         LASSERT(req->l_completion_ast);
439                         ldlm_add_ast_work_item(req, NULL, work_list);
440                 }
441         }
442
443         /* In case we're reprocessing the requested lock we can't destroy
444          * it until after calling ldlm_ast_work_item() above so that lawi()
445          * can bump the reference count on req. Otherwise req could be freed
446          * before the completion AST can be sent.  */
447         if (added)
448                 ldlm_flock_destroy(req, mode, *flags);
449
450         ldlm_resource_dump(D_OTHER, res);
451         RETURN(LDLM_ITER_CONTINUE);
452 }
453
454 struct ldlm_flock_wait_data {
455         struct ldlm_lock *fwd_lock;
456         int               fwd_generation;
457 };
458
459 static void
460 ldlm_flock_interrupted_wait(void *data)
461 {
462         struct ldlm_lock *lock;
463         struct lustre_handle lockh;
464         int rc;
465         ENTRY;
466
467         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
468
469         /* take lock off the deadlock detection waitq. */
470         spin_lock(&ldlm_flock_waitq_lock);
471         list_del_init(&lock->l_flock_waitq);
472         spin_unlock(&ldlm_flock_waitq_lock);
473
474         /* client side - set flag to prevent lock from being put on lru list */
475         lock->l_flags |= LDLM_FL_CBPENDING;
476
477         ldlm_lock_decref_internal(lock, lock->l_req_mode);
478         ldlm_lock2handle(lock, &lockh);
479         rc = ldlm_cli_cancel(&lockh);
480         if (rc != ELDLM_OK)
481                 CERROR("ldlm_cli_cancel: %d\n", rc);
482
483         EXIT;
484 }
485
486 int
487 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
488 {
489         struct ldlm_namespace *ns;
490         cfs_flock_t *getlk = lock->l_ast_data;
491         struct ldlm_flock_wait_data fwd;
492         struct obd_device *obd;
493         struct obd_import *imp = NULL;
494         ldlm_error_t err;
495         int rc = 0;
496         struct l_wait_info lwi;
497         ENTRY;
498
499         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
500                flags, data, getlk);
501
502         /* Import invalidation. We need to actually release the lock
503          * references being held, so that it can go away. No point in
504          * holding the lock even if app still believes it has it, since
505          * server already dropped it anyway. Only for granted locks too. */
506         lock_res_and_lock(lock);
507         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) == 
508             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
509                 unlock_res_and_lock(lock);
510                 if (lock->l_req_mode == lock->l_granted_mode &&
511                     lock->l_granted_mode != LCK_NL)
512                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
513                 RETURN(0);
514         }
515         unlock_res_and_lock(lock);
516
517         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
518
519         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
520                        LDLM_FL_BLOCK_CONV)))
521                 goto  granted;
522
523         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
524                    "sleeping");
525
526         fwd.fwd_lock = lock;
527         obd = class_exp2obd(lock->l_conn_export);
528
529         /* if this is a local lock, then there is no import */
530         if (obd != NULL)
531                 imp = obd->u.cli.cl_import;
532
533         if (imp != NULL) {
534                 spin_lock(&imp->imp_lock);
535                 fwd.fwd_generation = imp->imp_generation;
536                 spin_unlock(&imp->imp_lock);
537         }
538
539         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
540
541         /* Go to sleep until the lock is granted. */
542         rc = l_wait_event(lock->l_waitq,
543                           ((lock->l_req_mode == lock->l_granted_mode) ||
544                            lock->l_destroyed), &lwi);
545
546         LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);
547         RETURN(rc);
548
549 granted:
550         /* before flock's complete ast gets here, the flock
551          * can possibly be freed by another thread
552          */
553         if (lock->l_destroyed) {
554                 LDLM_DEBUG(lock, "already destroyed by another thread");
555                 RETURN(0);
556         }
557
558         LDLM_DEBUG(lock, "client-side enqueue granted");
559         ns = lock->l_resource->lr_namespace;
560         lock_res(lock->l_resource);
561
562         /* take lock off the deadlock detection waitq. */
563         spin_lock(&ldlm_flock_waitq_lock);
564         list_del_init(&lock->l_flock_waitq);
565         spin_unlock(&ldlm_flock_waitq_lock);
566
567         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
568         list_del_init(&lock->l_res_link);
569
570         if (flags & LDLM_FL_TEST_LOCK) {
571                 /* fcntl(F_GETLK) request */
572                 /* The old mode was saved in getlk->fl_type so that if the mode
573                  * in the lock changes we can decref the approprate refcount. */
574                 ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);
575                 switch (lock->l_granted_mode) {
576                 case LCK_PR:
577                         cfs_flock_set_type(getlk, F_RDLCK);
578                         break;
579                 case LCK_PW:
580                         cfs_flock_set_type(getlk, F_WRLCK);
581                         break;
582                 default:
583                         cfs_flock_set_type(getlk, F_UNLCK);
584                 }
585                 cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
586                 cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);
587                 cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);
588         } else {
589                 int noreproc = LDLM_FL_WAIT_NOREPROC;
590
591                 /* We need to reprocess the lock to do merges or splits
592                  * with existing locks owned by this process. */
593                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
594                 if (flags == 0)
595                         cfs_waitq_signal(&lock->l_waitq);
596         }
597         unlock_res(lock->l_resource);
598         RETURN(0);
599 }
600 EXPORT_SYMBOL(ldlm_flock_completion_ast);
601
602 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
603                             void *data, int flag)
604 {
605         struct ldlm_namespace *ns;
606         ENTRY;
607
608         LASSERT(lock);
609         LASSERT(flag == LDLM_CB_CANCELING);
610
611         ns = lock->l_resource->lr_namespace;
612
613         /* take lock off the deadlock detection waitq. */
614         spin_lock(&ldlm_flock_waitq_lock);
615         list_del_init(&lock->l_flock_waitq);
616         spin_unlock(&ldlm_flock_waitq_lock);
617         RETURN(0);
618 }