Whamcloud - gitweb
b=23596 account direct i/o inflight separately from non-direct i/o
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
33  * Developed under the sponsorship of the US Government under
34  * Subcontract No. B514193
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 #define DEBUG_SUBSYSTEM S_LDLM
42
43 #ifdef __KERNEL__
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
49 #else
50 #include <liblustre.h>
51 #include <obd_class.h>
52 #endif
53
54 #include "ldlm_internal.h"
55
56 #define l_flock_waitq   l_lru
57
58 static struct list_head ldlm_flock_waitq = CFS_LIST_HEAD_INIT(ldlm_flock_waitq);
59 spinlock_t ldlm_flock_waitq_lock = SPIN_LOCK_UNLOCKED;
60
61 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
62                             void *data, int flag);
63
64 /**
65  * list_for_remaining_safe - iterate over the remaining entries in a list
66  *              and safeguard against removal of a list entry.
67  * @pos:        the &struct list_head to use as a loop counter. pos MUST
68  *              have been initialized prior to using it in this macro.
69  * @n:          another &struct list_head to use as temporary storage
70  * @head:       the head for your list.
71  */
72 #define list_for_remaining_safe(pos, n, head) \
73         for (n = pos->next; pos != (head); pos = n, n = pos->next)
74
75 static inline int
76 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
77 {
78         return((new->l_policy_data.l_flock.pid ==
79                 lock->l_policy_data.l_flock.pid) &&
80                (new->l_export == lock->l_export));
81 }
82
83 static inline int
84 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
85 {
86         return((new->l_policy_data.l_flock.start <=
87                 lock->l_policy_data.l_flock.end) &&
88                (new->l_policy_data.l_flock.end >=
89                 lock->l_policy_data.l_flock.start));
90 }
91
92 static inline void
93 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
94 {
95         ENTRY;
96
97         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
98                    mode, flags);
99
100         /* Safe to not lock here, since it should be empty anyway */
101         LASSERT(list_empty(&lock->l_flock_waitq));
102
103         list_del_init(&lock->l_res_link);
104         if (flags == LDLM_FL_WAIT_NOREPROC &&
105             !(lock->l_flags & LDLM_FL_FAILED)) {
106                 /* client side - set a flag to prevent sending a CANCEL */
107                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
108
109                 /* when reaching here, it is under lock_res_and_lock(). Thus,
110                    need call the nolock version of ldlm_lock_decref_internal*/
111                 ldlm_lock_decref_internal_nolock(lock, mode);
112         }
113
114         ldlm_lock_destroy_nolock(lock);
115         EXIT;
116 }
117
118 static int
119 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
120 {
121         struct obd_export *req_export = req->l_export;
122         struct obd_export *blocking_export = blocking_lock->l_export;
123         pid_t req_pid = req->l_policy_data.l_flock.pid;
124         pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
125         struct ldlm_lock *lock;
126
127         spin_lock(&ldlm_flock_waitq_lock);
128 restart:
129         list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
130                 if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
131                     (lock->l_export != blocking_export))
132                         continue;
133
134                 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
135                 blocking_export = (struct obd_export *)(long)
136                         lock->l_policy_data.l_flock.blocking_export;
137                 if (blocking_pid == req_pid && blocking_export == req_export) {
138                         spin_unlock(&ldlm_flock_waitq_lock);
139                         return 1;
140                 }
141
142                 goto restart;
143         }
144         spin_unlock(&ldlm_flock_waitq_lock);
145
146         return 0;
147 }
148
149 int
150 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
151                         ldlm_error_t *err, struct list_head *work_list)
152 {
153         struct ldlm_resource *res = req->l_resource;
154         struct ldlm_namespace *ns = res->lr_namespace;
155         struct list_head *tmp;
156         struct list_head *ownlocks = NULL;
157         struct ldlm_lock *lock = NULL;
158         struct ldlm_lock *new = req;
159         struct ldlm_lock *new2 = NULL;
160         ldlm_mode_t mode = req->l_req_mode;
161         int local = ns_is_client(ns);
162         int added = (mode == LCK_NL);
163         int overlaps = 0;
164         int splitted = 0;
165         ENTRY;
166
167         CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
168                "\n", *flags, new->l_policy_data.l_flock.pid, mode,
169                req->l_policy_data.l_flock.start,
170                req->l_policy_data.l_flock.end);
171
172         *err = ELDLM_OK;
173
174         if (local) {
175                 /* No blocking ASTs are sent to the clients for
176                  * Posix file & record locks */
177                 req->l_blocking_ast = NULL;
178         } else {
179                 /* Called on the server for lock cancels. */
180                 req->l_blocking_ast = ldlm_flock_blocking_ast;
181         }
182
183 reprocess:
184         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
185                 /* This loop determines where this processes locks start
186                  * in the resource lr_granted list. */
187                 list_for_each(tmp, &res->lr_granted) {
188                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
189                         if (ldlm_same_flock_owner(lock, req)) {
190                                 ownlocks = tmp;
191                                 break;
192                         }
193                 }
194         } else {
195                 lockmode_verify(mode);
196
197                 /* This loop determines if there are existing locks
198                  * that conflict with the new lock request. */
199                 list_for_each(tmp, &res->lr_granted) {
200                         lock = list_entry(tmp, struct ldlm_lock, l_res_link);
201
202                         if (ldlm_same_flock_owner(lock, req)) {
203                                 if (!ownlocks)
204                                         ownlocks = tmp;
205                                 continue;
206                         }
207
208                         /* locks are compatible, overlap doesn't matter */
209                         if (lockmode_compat(lock->l_granted_mode, mode))
210                                 continue;
211
212                         if (!ldlm_flocks_overlap(lock, req))
213                                 continue;
214
215                         if (!first_enq)
216                                 RETURN(LDLM_ITER_CONTINUE);
217
218                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
219                                 ldlm_flock_destroy(req, mode, *flags);
220                                 *err = -EAGAIN;
221                                 RETURN(LDLM_ITER_STOP);
222                         }
223
224                         if (*flags & LDLM_FL_TEST_LOCK) {
225                                 ldlm_flock_destroy(req, mode, *flags);
226                                 req->l_req_mode = lock->l_granted_mode;
227                                 req->l_policy_data.l_flock.pid =
228                                         lock->l_policy_data.l_flock.pid;
229                                 req->l_policy_data.l_flock.start =
230                                         lock->l_policy_data.l_flock.start;
231                                 req->l_policy_data.l_flock.end =
232                                         lock->l_policy_data.l_flock.end;
233                                 *flags |= LDLM_FL_LOCK_CHANGED;
234                                 RETURN(LDLM_ITER_STOP);
235                         }
236
237                         if (ldlm_flock_deadlock(req, lock)) {
238                                 ldlm_flock_destroy(req, mode, *flags);
239                                 *err = -EDEADLK;
240                                 RETURN(LDLM_ITER_STOP);
241                         }
242
243                         req->l_policy_data.l_flock.blocking_pid =
244                                 lock->l_policy_data.l_flock.pid;
245                         req->l_policy_data.l_flock.blocking_export =
246                                 (long)(void *)lock->l_export;
247
248                         LASSERT(list_empty(&req->l_flock_waitq));
249                         spin_lock(&ldlm_flock_waitq_lock);
250                         list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq);
251                         spin_unlock(&ldlm_flock_waitq_lock);
252
253                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
254                         *flags |= LDLM_FL_BLOCK_GRANTED;
255                         RETURN(LDLM_ITER_STOP);
256                 }
257         }
258
259         if (*flags & LDLM_FL_TEST_LOCK) {
260                 ldlm_flock_destroy(req, mode, *flags);
261                 req->l_req_mode = LCK_NL;
262                 *flags |= LDLM_FL_LOCK_CHANGED;
263                 RETURN(LDLM_ITER_STOP);
264         }
265
266         /* In case we had slept on this lock request take it off of the
267          * deadlock detection waitq. */
268         spin_lock(&ldlm_flock_waitq_lock);
269         list_del_init(&req->l_flock_waitq);
270         spin_unlock(&ldlm_flock_waitq_lock);
271
272         /* Scan the locks owned by this process that overlap this request.
273          * We may have to merge or split existing locks. */
274
275         if (!ownlocks)
276                 ownlocks = &res->lr_granted;
277
278         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
279                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
280
281                 if (!ldlm_same_flock_owner(lock, new))
282                         break;
283
284                 if (lock->l_granted_mode == mode) {
285                         /* If the modes are the same then we need to process
286                          * locks that overlap OR adjoin the new lock. The extra
287                          * logic condition is necessary to deal with arithmetic
288                          * overflow and underflow. */
289                         if ((new->l_policy_data.l_flock.start >
290                              (lock->l_policy_data.l_flock.end + 1))
291                             && (lock->l_policy_data.l_flock.end !=
292                                 OBD_OBJECT_EOF))
293                                 continue;
294
295                         if ((new->l_policy_data.l_flock.end <
296                              (lock->l_policy_data.l_flock.start - 1))
297                             && (lock->l_policy_data.l_flock.start != 0))
298                                 break;
299
300                         if (new->l_policy_data.l_flock.start <
301                             lock->l_policy_data.l_flock.start) {
302                                 lock->l_policy_data.l_flock.start =
303                                         new->l_policy_data.l_flock.start;
304                         } else {
305                                 new->l_policy_data.l_flock.start =
306                                         lock->l_policy_data.l_flock.start;
307                         }
308
309                         if (new->l_policy_data.l_flock.end >
310                             lock->l_policy_data.l_flock.end) {
311                                 lock->l_policy_data.l_flock.end =
312                                         new->l_policy_data.l_flock.end;
313                         } else {
314                                 new->l_policy_data.l_flock.end =
315                                         lock->l_policy_data.l_flock.end;
316                         }
317
318                         if (added) {
319                                 ldlm_flock_destroy(lock, mode, *flags);
320                         } else {
321                                 new = lock;
322                                 added = 1;
323                         }
324                         continue;
325                 }
326
327                 if (new->l_policy_data.l_flock.start >
328                     lock->l_policy_data.l_flock.end)
329                         continue;
330
331                 if (new->l_policy_data.l_flock.end <
332                     lock->l_policy_data.l_flock.start)
333                         break;
334
335                 ++overlaps;
336
337                 if (new->l_policy_data.l_flock.start <=
338                     lock->l_policy_data.l_flock.start) {
339                         if (new->l_policy_data.l_flock.end <
340                             lock->l_policy_data.l_flock.end) {
341                                 lock->l_policy_data.l_flock.start =
342                                         new->l_policy_data.l_flock.end + 1;
343                                 break;
344                         }
345                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
346                         continue;
347                 }
348                 if (new->l_policy_data.l_flock.end >=
349                     lock->l_policy_data.l_flock.end) {
350                         lock->l_policy_data.l_flock.end =
351                                 new->l_policy_data.l_flock.start - 1;
352                         continue;
353                 }
354
355                 /* split the existing lock into two locks */
356
357                 /* if this is an F_UNLCK operation then we could avoid
358                  * allocating a new lock and use the req lock passed in
359                  * with the request but this would complicate the reply
360                  * processing since updates to req get reflected in the
361                  * reply. The client side replays the lock request so
362                  * it must see the original lock data in the reply. */
363
364                 /* XXX - if ldlm_lock_new() can sleep we should
365                  * release the ns_lock, allocate the new lock,
366                  * and restart processing this lock. */
367                 if (!new2) {
368                         unlock_res_and_lock(req);
369                         new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,
370                                         lock->l_granted_mode, NULL, NULL, NULL,
371                                         NULL, 0);
372                         lock_res_and_lock(req);
373                         if (!new2) {
374                                 ldlm_flock_destroy(req, lock->l_granted_mode,
375                                                    *flags);
376                                 *err = -ENOLCK;
377                                 RETURN(LDLM_ITER_STOP);
378                         }
379                         goto reprocess;
380                 }
381
382                 splitted = 1;
383
384                 new2->l_granted_mode = lock->l_granted_mode;
385                 new2->l_policy_data.l_flock.pid =
386                         new->l_policy_data.l_flock.pid;
387                 new2->l_policy_data.l_flock.start =
388                         lock->l_policy_data.l_flock.start;
389                 new2->l_policy_data.l_flock.end =
390                         new->l_policy_data.l_flock.start - 1;
391                 lock->l_policy_data.l_flock.start =
392                         new->l_policy_data.l_flock.end + 1;
393                 new2->l_conn_export = lock->l_conn_export;
394                 if (lock->l_export != NULL) {
395                         new2->l_export = class_export_get(lock->l_export);
396                         if (new2->l_export->exp_lock_hash &&
397                             hlist_unhashed(&new2->l_exp_hash))
398                                 lustre_hash_add(new2->l_export->exp_lock_hash,
399                                                 &new2->l_remote_handle,
400                                                 &new2->l_exp_hash);
401                 }
402                 if (*flags == LDLM_FL_WAIT_NOREPROC)
403                         ldlm_lock_addref_internal_nolock(new2,
404                                                          lock->l_granted_mode);
405
406                 /* insert new2 at lock */
407                 ldlm_resource_add_lock(res, ownlocks, new2);
408                 LDLM_LOCK_PUT(new2);
409                 break;
410         }
411
412         /* if new2 is created but never used, destroy it*/
413         if (splitted == 0 && new2 != NULL)
414                 ldlm_lock_destroy_nolock(new2);
415
416         /* At this point we're granting the lock request. */
417         req->l_granted_mode = req->l_req_mode;
418
419         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
420         if (!added) {
421                 list_del_init(&req->l_res_link);
422                 /* insert new lock before ownlocks in list. */
423                 ldlm_resource_add_lock(res, ownlocks, req);
424         }
425
426         if (*flags != LDLM_FL_WAIT_NOREPROC) {
427                 if (first_enq) {
428                         /* If this is an unlock, reprocess the waitq and
429                          * send completions ASTs for locks that can now be
430                          * granted. The only problem with doing this
431                          * reprocessing here is that the completion ASTs for
432                          * newly granted locks will be sent before the unlock
433                          * completion is sent. It shouldn't be an issue. Also
434                          * note that ldlm_process_flock_lock() will recurse,
435                          * but only once because first_enq will be false from
436                          * ldlm_reprocess_queue. */
437                         if ((mode == LCK_NL) && overlaps) {
438                                 struct list_head rpc_list
439                                                  = CFS_LIST_HEAD_INIT(rpc_list);
440                                 int rc;
441 restart:
442                                 ldlm_reprocess_queue(res, &res->lr_waiting,
443                                                      &rpc_list);
444
445                                 unlock_res_and_lock(req);
446                                 rc = ldlm_run_cp_ast_work(&rpc_list);
447                                 lock_res_and_lock(req);
448                                 if (rc == -ERESTART)
449                                         GOTO(restart, -ERESTART);
450                        }
451                 } else {
452                         LASSERT(req->l_completion_ast);
453                         ldlm_add_ast_work_item(req, NULL, work_list);
454                 }
455         }
456
457         /* In case we're reprocessing the requested lock we can't destroy
458          * it until after calling ldlm_ast_work_item() above so that lawi()
459          * can bump the reference count on req. Otherwise req could be freed
460          * before the completion AST can be sent.  */
461         if (added)
462                 ldlm_flock_destroy(req, mode, *flags);
463
464         ldlm_resource_dump(D_INFO, res);
465         RETURN(LDLM_ITER_CONTINUE);
466 }
467
468 struct ldlm_flock_wait_data {
469         struct ldlm_lock *fwd_lock;
470         int               fwd_generation;
471 };
472
473 static void
474 ldlm_flock_interrupted_wait(void *data)
475 {
476         struct ldlm_lock *lock;
477         struct lustre_handle lockh;
478         int rc;
479         ENTRY;
480
481         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
482
483         /* take lock off the deadlock detection waitq. */
484         spin_lock(&ldlm_flock_waitq_lock);
485         list_del_init(&lock->l_flock_waitq);
486         spin_unlock(&ldlm_flock_waitq_lock);
487
488         /* client side - set flag to prevent lock from being put on lru list */
489         lock->l_flags |= LDLM_FL_CBPENDING;
490
491         ldlm_lock_decref_internal(lock, lock->l_req_mode);
492         ldlm_lock2handle(lock, &lockh);
493         rc = ldlm_cli_cancel(&lockh);
494         if (rc != ELDLM_OK)
495                 CERROR("ldlm_cli_cancel: %d\n", rc);
496
497         EXIT;
498 }
499
500 /**
501  * Flock completion calback function.
502  *
503  * \param lock [in,out]: A lock to be handled
504  * \param flags    [in]: flags
505  * \param *data    [in]: ldlm_run_cp_ast_work() will use ldlm_cb_set_arg
506  *
507  * \retval 0    : success
508  * \retval <0   : failure
509  */
510 int
511 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
512 {
513         cfs_flock_t                    *getlk = lock->l_ast_data;
514         struct obd_device              *obd;
515         struct obd_import              *imp = NULL;
516         struct ldlm_flock_wait_data     fwd;
517         struct l_wait_info              lwi;
518         ldlm_error_t                    err;
519         int                             rc = 0;
520         ENTRY;
521
522         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
523                flags, data, getlk);
524
525         /* Import invalidation. We need to actually release the lock
526          * references being held, so that it can go away. No point in
527          * holding the lock even if app still believes it has it, since
528          * server already dropped it anyway. Only for granted locks too. */
529         lock_res_and_lock(lock);
530         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
531             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
532                 unlock_res_and_lock(lock);
533                 if (lock->l_req_mode == lock->l_granted_mode &&
534                     lock->l_granted_mode != LCK_NL &&
535                     NULL == data)
536                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
537                 RETURN(0);
538         }
539         unlock_res_and_lock(lock);
540
541         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
542
543         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
544                        LDLM_FL_BLOCK_CONV))) {
545                 if (NULL == data)
546                         /* mds granted the lock in the reply */
547                         goto granted;
548                 /* CP AST RPC: lock get granted, wake it up */
549                 cfs_waitq_signal(&lock->l_waitq);
550                 RETURN(0);
551         }
552
553         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
554                    "sleeping");
555         fwd.fwd_lock = lock;
556         obd = class_exp2obd(lock->l_conn_export);
557
558         /* if this is a local lock, there is no import */
559         if (NULL != obd)
560                 imp = obd->u.cli.cl_import;
561
562         if (NULL != imp) {
563                 spin_lock(&imp->imp_lock);
564                 fwd.fwd_generation = imp->imp_generation;
565                 spin_unlock(&imp->imp_lock);
566         }
567
568         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
569
570         /* Go to sleep until the lock is granted. */
571         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
572
573         if (rc) {
574                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
575                            rc);
576                 RETURN(rc);
577         }
578
579 granted:
580         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
581
582         lock_res_and_lock(lock);
583         if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {
584                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
585                 unlock_res_and_lock(lock);
586                 RETURN(-EIO);
587         }
588         if (rc) {
589                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
590                            rc);
591                 unlock_res_and_lock(lock);
592                 RETURN(rc);
593         }
594
595         LDLM_DEBUG(lock, "client-side enqueue granted");
596
597         /* take lock off the deadlock detection waitq. */
598         spin_lock(&ldlm_flock_waitq_lock);
599         list_del_init(&lock->l_flock_waitq);
600         spin_unlock(&ldlm_flock_waitq_lock);
601
602         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
603         list_del_init(&lock->l_res_link);
604
605         if (flags & LDLM_FL_TEST_LOCK) {
606                 /* fcntl(F_GETLK) request */
607                 /* The old mode was saved in getlk->fl_type so that if the mode
608                  * in the lock changes we can decref the appropriate refcount.*/
609                 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
610                                    LDLM_FL_WAIT_NOREPROC);
611                 switch (lock->l_granted_mode) {
612                 case LCK_PR:
613                         cfs_flock_set_type(getlk, F_RDLCK);
614                         break;
615                 case LCK_PW:
616                         cfs_flock_set_type(getlk, F_WRLCK);
617                         break;
618                 default:
619                         cfs_flock_set_type(getlk, F_UNLCK);
620                 }
621                 cfs_flock_set_pid(getlk,
622                                   (pid_t)lock->l_policy_data.l_flock.pid);
623                 cfs_flock_set_start(getlk,
624                                     (loff_t)lock->l_policy_data.l_flock.start);
625                 cfs_flock_set_end(getlk,
626                                   (loff_t)lock->l_policy_data.l_flock.end);
627         } else {
628                 int noreproc = LDLM_FL_WAIT_NOREPROC;
629
630                 /* We need to reprocess the lock to do merges or splits
631                  * with existing locks owned by this process. */
632                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
633         }
634         unlock_res_and_lock(lock);
635         RETURN(0);
636 }
637 EXPORT_SYMBOL(ldlm_flock_completion_ast);
638
639 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
640                             void *data, int flag)
641 {
642         struct ldlm_namespace *ns;
643         ENTRY;
644
645         LASSERT(lock);
646         LASSERT(flag == LDLM_CB_CANCELING);
647
648         ns = lock->l_resource->lr_namespace;
649
650         /* take lock off the deadlock detection waitq. */
651         spin_lock(&ldlm_flock_waitq_lock);
652         list_del_init(&lock->l_flock_waitq);
653         spin_unlock(&ldlm_flock_waitq_lock);
654         RETURN(0);
655 }