Whamcloud - gitweb
LU-3297 lov: convert magic to host-endian in lov_dump_lmm()
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                            struct ldlm_lock *lock)
105 {
106         int rc = 0;
107
108         /* For server only */
109         if (req->l_export == NULL)
110                 return 0;
111
112         if (unlikely(req->l_export->exp_flock_hash == NULL)) {
113                 rc = ldlm_init_flock_export(req->l_export);
114                 if (rc)
115                         goto error;
116         }
117
118         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
119
120         req->l_policy_data.l_flock.blocking_owner =
121                 lock->l_policy_data.l_flock.owner;
122         req->l_policy_data.l_flock.blocking_export =
123                 lock->l_export;
124         req->l_policy_data.l_flock.blocking_refs = 0;
125
126         cfs_hash_add(req->l_export->exp_flock_hash,
127                      &req->l_policy_data.l_flock.owner,
128                      &req->l_exp_flock_hash);
129 error:
130         return rc;
131 }
132
133 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
134 {
135         /* For server only */
136         if (req->l_export == NULL)
137                 return;
138
139         check_res_locked(req->l_resource);
140         if (req->l_export->exp_flock_hash != NULL &&
141             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
142                 cfs_hash_del(req->l_export->exp_flock_hash,
143                              &req->l_policy_data.l_flock.owner,
144                              &req->l_exp_flock_hash);
145 }
146
147 static inline void
148 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
149 {
150         ENTRY;
151
152         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
153                    mode, flags);
154
155         /* Safe to not lock here, since it should be empty anyway */
156         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
157
158         cfs_list_del_init(&lock->l_res_link);
159         if (flags == LDLM_FL_WAIT_NOREPROC &&
160             !(lock->l_flags & LDLM_FL_FAILED)) {
161                 /* client side - set a flag to prevent sending a CANCEL */
162                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
163
164                 /* when reaching here, it is under lock_res_and_lock(). Thus,
165                    need call the nolock version of ldlm_lock_decref_internal*/
166                 ldlm_lock_decref_internal_nolock(lock, mode);
167         }
168
169         ldlm_lock_destroy_nolock(lock);
170         EXIT;
171 }
172
173 /**
174  * POSIX locks deadlock detection code.
175  *
176  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
177  * with, we need to iterate through all blocked POSIX locks for this
178  * export and see if there is a deadlock condition arising. (i.e. when
179  * one client holds a lock on something and want a lock on something
180  * else and at the same time another client has the opposite situation).
181  */
182 static int
183 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
184 {
185         struct obd_export *req_exp = req->l_export;
186         struct obd_export *bl_exp = bl_lock->l_export;
187         __u64 req_owner = req->l_policy_data.l_flock.owner;
188         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
189
190         /* For server only */
191         if (req_exp == NULL)
192                 return 0;
193
194         class_export_get(bl_exp);
195         while (1) {
196                 struct obd_export *bl_exp_new;
197                 struct ldlm_lock *lock = NULL;
198                 struct ldlm_flock *flock;
199
200                 if (bl_exp->exp_flock_hash != NULL)
201                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
202                                                &bl_owner);
203                 if (lock == NULL)
204                         break;
205
206                 LASSERT(req != lock);
207                 flock = &lock->l_policy_data.l_flock;
208                 LASSERT(flock->owner == bl_owner);
209                 bl_owner = flock->blocking_owner;
210                 bl_exp_new = class_export_get(flock->blocking_export);
211                 class_export_put(bl_exp);
212
213                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
214                 bl_exp = bl_exp_new;
215
216                 if (bl_owner == req_owner && bl_exp == req_exp) {
217                         class_export_put(bl_exp);
218                         return 1;
219                 }
220         }
221         class_export_put(bl_exp);
222
223         return 0;
224 }
225
226 /**
227  * Process a granting attempt for flock lock.
228  * Must be called under ns lock held.
229  *
230  * This function looks for any conflicts for \a lock in the granted or
231  * waiting queues. The lock is granted if no conflicts are found in
232  * either queue.
233  *
234  * It is also responsible for splitting a lock if a portion of the lock
235  * is released.
236  *
237  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
238  *   - blocking ASTs have already been sent
239  *
240  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
241  *   - blocking ASTs have not been sent yet, so list of conflicting locks
242  *     would be collected and ASTs sent.
243  */
244 int
245 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
246                         ldlm_error_t *err, cfs_list_t *work_list)
247 {
248         struct ldlm_resource *res = req->l_resource;
249         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
250         cfs_list_t *tmp;
251         cfs_list_t *ownlocks = NULL;
252         struct ldlm_lock *lock = NULL;
253         struct ldlm_lock *new = req;
254         struct ldlm_lock *new2 = NULL;
255         ldlm_mode_t mode = req->l_req_mode;
256         int local = ns_is_client(ns);
257         int added = (mode == LCK_NL);
258         int overlaps = 0;
259         int splitted = 0;
260         const struct ldlm_callback_suite null_cbs = { NULL };
261         int rc;
262         ENTRY;
263
264         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
265                LPU64" end "LPU64"\n", *flags,
266                new->l_policy_data.l_flock.owner,
267                new->l_policy_data.l_flock.pid, mode,
268                req->l_policy_data.l_flock.start,
269                req->l_policy_data.l_flock.end);
270
271         *err = ELDLM_OK;
272
273         if (local) {
274                 /* No blocking ASTs are sent to the clients for
275                  * Posix file & record locks */
276                 req->l_blocking_ast = NULL;
277         } else {
278                 /* Called on the server for lock cancels. */
279                 req->l_blocking_ast = ldlm_flock_blocking_ast;
280         }
281
282 reprocess:
283         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
284                 /* This loop determines where this processes locks start
285                  * in the resource lr_granted list. */
286                 cfs_list_for_each(tmp, &res->lr_granted) {
287                         lock = cfs_list_entry(tmp, struct ldlm_lock,
288                                               l_res_link);
289                         if (ldlm_same_flock_owner(lock, req)) {
290                                 ownlocks = tmp;
291                                 break;
292                         }
293                 }
294         } else {
295                 lockmode_verify(mode);
296
297                 /* This loop determines if there are existing locks
298                  * that conflict with the new lock request. */
299                 cfs_list_for_each(tmp, &res->lr_granted) {
300                         lock = cfs_list_entry(tmp, struct ldlm_lock,
301                                               l_res_link);
302
303                         if (ldlm_same_flock_owner(lock, req)) {
304                                 if (!ownlocks)
305                                         ownlocks = tmp;
306                                 continue;
307                         }
308
309                         /* locks are compatible, overlap doesn't matter */
310                         if (lockmode_compat(lock->l_granted_mode, mode))
311                                 continue;
312
313                         if (!ldlm_flocks_overlap(lock, req))
314                                 continue;
315
316                         if (!first_enq)
317                                 RETURN(LDLM_ITER_CONTINUE);
318
319                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
320                                 ldlm_flock_destroy(req, mode, *flags);
321                                 *err = -EAGAIN;
322                                 RETURN(LDLM_ITER_STOP);
323                         }
324
325                         if (*flags & LDLM_FL_TEST_LOCK) {
326                                 ldlm_flock_destroy(req, mode, *flags);
327                                 req->l_req_mode = lock->l_granted_mode;
328                                 req->l_policy_data.l_flock.pid =
329                                         lock->l_policy_data.l_flock.pid;
330                                 req->l_policy_data.l_flock.start =
331                                         lock->l_policy_data.l_flock.start;
332                                 req->l_policy_data.l_flock.end =
333                                         lock->l_policy_data.l_flock.end;
334                                 *flags |= LDLM_FL_LOCK_CHANGED;
335                                 RETURN(LDLM_ITER_STOP);
336                         }
337
338                         /* add lock to blocking list before deadlock
339                          * check to prevent race */
340                         rc = ldlm_flock_blocking_link(req, lock);
341                         if (rc) {
342                                 ldlm_flock_destroy(req, mode, *flags);
343                                 *err = rc;
344                                 RETURN(LDLM_ITER_STOP);
345                         }
346                         if (ldlm_flock_deadlock(req, lock)) {
347                                 ldlm_flock_blocking_unlink(req);
348                                 ldlm_flock_destroy(req, mode, *flags);
349                                 *err = -EDEADLK;
350                                 RETURN(LDLM_ITER_STOP);
351                         }
352
353                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
354                         *flags |= LDLM_FL_BLOCK_GRANTED;
355                         RETURN(LDLM_ITER_STOP);
356                 }
357         }
358
359         if (*flags & LDLM_FL_TEST_LOCK) {
360                 ldlm_flock_destroy(req, mode, *flags);
361                 req->l_req_mode = LCK_NL;
362                 *flags |= LDLM_FL_LOCK_CHANGED;
363                 RETURN(LDLM_ITER_STOP);
364         }
365
366         /* In case we had slept on this lock request take it off of the
367          * deadlock detection hash list. */
368         ldlm_flock_blocking_unlink(req);
369
370         /* Scan the locks owned by this process that overlap this request.
371          * We may have to merge or split existing locks. */
372
373         if (!ownlocks)
374                 ownlocks = &res->lr_granted;
375
376         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
377                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
378
379                 if (!ldlm_same_flock_owner(lock, new))
380                         break;
381
382                 if (lock->l_granted_mode == mode) {
383                         /* If the modes are the same then we need to process
384                          * locks that overlap OR adjoin the new lock. The extra
385                          * logic condition is necessary to deal with arithmetic
386                          * overflow and underflow. */
387                         if ((new->l_policy_data.l_flock.start >
388                              (lock->l_policy_data.l_flock.end + 1))
389                             && (lock->l_policy_data.l_flock.end !=
390                                 OBD_OBJECT_EOF))
391                                 continue;
392
393                         if ((new->l_policy_data.l_flock.end <
394                              (lock->l_policy_data.l_flock.start - 1))
395                             && (lock->l_policy_data.l_flock.start != 0))
396                                 break;
397
398                         if (new->l_policy_data.l_flock.start <
399                             lock->l_policy_data.l_flock.start) {
400                                 lock->l_policy_data.l_flock.start =
401                                         new->l_policy_data.l_flock.start;
402                         } else {
403                                 new->l_policy_data.l_flock.start =
404                                         lock->l_policy_data.l_flock.start;
405                         }
406
407                         if (new->l_policy_data.l_flock.end >
408                             lock->l_policy_data.l_flock.end) {
409                                 lock->l_policy_data.l_flock.end =
410                                         new->l_policy_data.l_flock.end;
411                         } else {
412                                 new->l_policy_data.l_flock.end =
413                                         lock->l_policy_data.l_flock.end;
414                         }
415
416                         if (added) {
417                                 ldlm_flock_destroy(lock, mode, *flags);
418                         } else {
419                                 new = lock;
420                                 added = 1;
421                         }
422                         continue;
423                 }
424
425                 if (new->l_policy_data.l_flock.start >
426                     lock->l_policy_data.l_flock.end)
427                         continue;
428
429                 if (new->l_policy_data.l_flock.end <
430                     lock->l_policy_data.l_flock.start)
431                         break;
432
433                 ++overlaps;
434
435                 if (new->l_policy_data.l_flock.start <=
436                     lock->l_policy_data.l_flock.start) {
437                         if (new->l_policy_data.l_flock.end <
438                             lock->l_policy_data.l_flock.end) {
439                                 lock->l_policy_data.l_flock.start =
440                                         new->l_policy_data.l_flock.end + 1;
441                                 break;
442                         }
443                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
444                         continue;
445                 }
446                 if (new->l_policy_data.l_flock.end >=
447                     lock->l_policy_data.l_flock.end) {
448                         lock->l_policy_data.l_flock.end =
449                                 new->l_policy_data.l_flock.start - 1;
450                         continue;
451                 }
452
453                 /* split the existing lock into two locks */
454
455                 /* if this is an F_UNLCK operation then we could avoid
456                  * allocating a new lock and use the req lock passed in
457                  * with the request but this would complicate the reply
458                  * processing since updates to req get reflected in the
459                  * reply. The client side replays the lock request so
460                  * it must see the original lock data in the reply. */
461
462                 /* XXX - if ldlm_lock_new() can sleep we should
463                  * release the lr_lock, allocate the new lock,
464                  * and restart processing this lock. */
465                 if (!new2) {
466                         unlock_res_and_lock(req);
467                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
468                                                 lock->l_granted_mode, &null_cbs,
469                                                 NULL, 0, LVB_T_NONE);
470                         lock_res_and_lock(req);
471                         if (!new2) {
472                                 ldlm_flock_destroy(req, lock->l_granted_mode,
473                                                    *flags);
474                                 *err = -ENOLCK;
475                                 RETURN(LDLM_ITER_STOP);
476                         }
477                         goto reprocess;
478                 }
479
480                 splitted = 1;
481
482                 new2->l_granted_mode = lock->l_granted_mode;
483                 new2->l_policy_data.l_flock.pid =
484                         new->l_policy_data.l_flock.pid;
485                 new2->l_policy_data.l_flock.owner =
486                         new->l_policy_data.l_flock.owner;
487                 new2->l_policy_data.l_flock.start =
488                         lock->l_policy_data.l_flock.start;
489                 new2->l_policy_data.l_flock.end =
490                         new->l_policy_data.l_flock.start - 1;
491                 lock->l_policy_data.l_flock.start =
492                         new->l_policy_data.l_flock.end + 1;
493                 new2->l_conn_export = lock->l_conn_export;
494                 if (lock->l_export != NULL) {
495                         new2->l_export = class_export_lock_get(lock->l_export, new2);
496                         if (new2->l_export->exp_lock_hash &&
497                             cfs_hlist_unhashed(&new2->l_exp_hash))
498                                 cfs_hash_add(new2->l_export->exp_lock_hash,
499                                              &new2->l_remote_handle,
500                                              &new2->l_exp_hash);
501                 }
502                 if (*flags == LDLM_FL_WAIT_NOREPROC)
503                         ldlm_lock_addref_internal_nolock(new2,
504                                                          lock->l_granted_mode);
505
506                 /* insert new2 at lock */
507                 ldlm_resource_add_lock(res, ownlocks, new2);
508                 LDLM_LOCK_RELEASE(new2);
509                 break;
510         }
511
512         /* if new2 is created but never used, destroy it*/
513         if (splitted == 0 && new2 != NULL)
514                 ldlm_lock_destroy_nolock(new2);
515
516         /* At this point we're granting the lock request. */
517         req->l_granted_mode = req->l_req_mode;
518
519         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
520         if (!added) {
521                 cfs_list_del_init(&req->l_res_link);
522                 /* insert new lock before ownlocks in list. */
523                 ldlm_resource_add_lock(res, ownlocks, req);
524         }
525
526         if (*flags != LDLM_FL_WAIT_NOREPROC) {
527 #ifdef HAVE_SERVER_SUPPORT
528                 if (first_enq) {
529                         /* If this is an unlock, reprocess the waitq and
530                          * send completions ASTs for locks that can now be
531                          * granted. The only problem with doing this
532                          * reprocessing here is that the completion ASTs for
533                          * newly granted locks will be sent before the unlock
534                          * completion is sent. It shouldn't be an issue. Also
535                          * note that ldlm_process_flock_lock() will recurse,
536                          * but only once because first_enq will be false from
537                          * ldlm_reprocess_queue. */
538                         if ((mode == LCK_NL) && overlaps) {
539                                 CFS_LIST_HEAD(rpc_list);
540                                 int rc;
541 restart:
542                                 ldlm_reprocess_queue(res, &res->lr_waiting,
543                                                      &rpc_list);
544
545                                 unlock_res_and_lock(req);
546                                 rc = ldlm_run_ast_work(ns, &rpc_list,
547                                                        LDLM_WORK_CP_AST);
548                                 lock_res_and_lock(req);
549                                 if (rc == -ERESTART)
550                                         GOTO(restart, -ERESTART);
551                        }
552                 } else {
553                         LASSERT(req->l_completion_ast);
554                         ldlm_add_ast_work_item(req, NULL, work_list);
555                 }
556 #else /* !HAVE_SERVER_SUPPORT */
557                 /* The only one possible case for client-side calls flock
558                  * policy function is ldlm_flock_completion_ast inside which
559                  * carries LDLM_FL_WAIT_NOREPROC flag. */
560                 CERROR("Illegal parameter for client-side-only module.\n");
561                 LBUG();
562 #endif /* HAVE_SERVER_SUPPORT */
563         }
564
565         /* In case we're reprocessing the requested lock we can't destroy
566          * it until after calling ldlm_add_ast_work_item() above so that laawi()
567          * can bump the reference count on \a req. Otherwise \a req
568          * could be freed before the completion AST can be sent.  */
569         if (added)
570                 ldlm_flock_destroy(req, mode, *flags);
571
572         ldlm_resource_dump(D_INFO, res);
573         RETURN(LDLM_ITER_CONTINUE);
574 }
575
576 struct ldlm_flock_wait_data {
577         struct ldlm_lock *fwd_lock;
578         int               fwd_generation;
579 };
580
581 static void
582 ldlm_flock_interrupted_wait(void *data)
583 {
584         struct ldlm_lock *lock;
585         ENTRY;
586
587         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
588
589         /* take lock off the deadlock detection hash list. */
590         lock_res_and_lock(lock);
591         ldlm_flock_blocking_unlink(lock);
592
593         /* client side - set flag to prevent lock from being put on LRU list */
594         lock->l_flags |= LDLM_FL_CBPENDING;
595         unlock_res_and_lock(lock);
596
597         EXIT;
598 }
599
600 /**
601  * Flock completion callback function.
602  *
603  * \param lock [in,out]: A lock to be handled
604  * \param flags    [in]: flags
605  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
606  *
607  * \retval 0    : success
608  * \retval <0   : failure
609  */
610 int
611 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
612 {
613         struct file_lock                *getlk = lock->l_ast_data;
614         struct obd_device              *obd;
615         struct obd_import              *imp = NULL;
616         struct ldlm_flock_wait_data     fwd;
617         struct l_wait_info              lwi;
618         ldlm_error_t                    err;
619         int                             rc = 0;
620         ENTRY;
621
622         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
623                flags, data, getlk);
624
625         /* Import invalidation. We need to actually release the lock
626          * references being held, so that it can go away. No point in
627          * holding the lock even if app still believes it has it, since
628          * server already dropped it anyway. Only for granted locks too. */
629         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
630             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
631                 if (lock->l_req_mode == lock->l_granted_mode &&
632                     lock->l_granted_mode != LCK_NL &&
633                     NULL == data)
634                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
635
636                 /* Need to wake up the waiter if we were evicted */
637                 cfs_waitq_signal(&lock->l_waitq);
638                 RETURN(0);
639         }
640
641         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
642
643         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
644                        LDLM_FL_BLOCK_CONV))) {
645                 if (NULL == data)
646                         /* mds granted the lock in the reply */
647                         goto granted;
648                 /* CP AST RPC: lock get granted, wake it up */
649                 cfs_waitq_signal(&lock->l_waitq);
650                 RETURN(0);
651         }
652
653         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
654                    "sleeping");
655         fwd.fwd_lock = lock;
656         obd = class_exp2obd(lock->l_conn_export);
657
658         /* if this is a local lock, there is no import */
659         if (NULL != obd)
660                 imp = obd->u.cli.cl_import;
661
662         if (NULL != imp) {
663                 spin_lock(&imp->imp_lock);
664                 fwd.fwd_generation = imp->imp_generation;
665                 spin_unlock(&imp->imp_lock);
666         }
667
668         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
669
670         /* Go to sleep until the lock is granted. */
671         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
672
673         if (rc) {
674                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
675                            rc);
676                 RETURN(rc);
677         }
678
679 granted:
680         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
681
682         if (lock->l_flags & LDLM_FL_DESTROYED) {
683                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
684                 RETURN(0);
685         }
686
687         if (lock->l_flags & LDLM_FL_FAILED) {
688                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
689                 RETURN(-EIO);
690         }
691
692         if (rc) {
693                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
694                            rc);
695                 RETURN(rc);
696         }
697
698         LDLM_DEBUG(lock, "client-side enqueue granted");
699
700         lock_res_and_lock(lock);
701
702         /* take lock off the deadlock detection hash list. */
703         ldlm_flock_blocking_unlink(lock);
704
705         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
706         cfs_list_del_init(&lock->l_res_link);
707
708         if (flags & LDLM_FL_TEST_LOCK) {
709                 /* fcntl(F_GETLK) request */
710                 /* The old mode was saved in getlk->fl_type so that if the mode
711                  * in the lock changes we can decref the appropriate refcount.*/
712                 ldlm_flock_destroy(lock, flock_type(getlk),
713                                    LDLM_FL_WAIT_NOREPROC);
714                 switch (lock->l_granted_mode) {
715                 case LCK_PR:
716                         flock_set_type(getlk, F_RDLCK);
717                         break;
718                 case LCK_PW:
719                         flock_set_type(getlk, F_WRLCK);
720                         break;
721                 default:
722                         flock_set_type(getlk, F_UNLCK);
723                 }
724                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
725                 flock_set_start(getlk,
726                                 (loff_t)lock->l_policy_data.l_flock.start);
727                 flock_set_end(getlk,
728                               (loff_t)lock->l_policy_data.l_flock.end);
729         } else {
730                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
731
732                 /* We need to reprocess the lock to do merges or splits
733                  * with existing locks owned by this process. */
734                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
735         }
736         unlock_res_and_lock(lock);
737         RETURN(0);
738 }
739 EXPORT_SYMBOL(ldlm_flock_completion_ast);
740
741 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
742                             void *data, int flag)
743 {
744         ENTRY;
745
746         LASSERT(lock);
747         LASSERT(flag == LDLM_CB_CANCELING);
748
749         /* take lock off the deadlock detection hash list. */
750         lock_res_and_lock(lock);
751         ldlm_flock_blocking_unlink(lock);
752         unlock_res_and_lock(lock);
753         RETURN(0);
754 }
755
756 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
757                                        ldlm_policy_data_t *lpolicy)
758 {
759         memset(lpolicy, 0, sizeof(*lpolicy));
760         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
761         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
762         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
763         /* Compat code, old clients had no idea about owner field and
764          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
765          * April 2011 */
766         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
767 }
768
769
770 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
771                                        ldlm_policy_data_t *lpolicy)
772 {
773         memset(lpolicy, 0, sizeof(*lpolicy));
774         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
775         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
776         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
777         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
778 }
779
780 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
781                                      ldlm_wire_policy_data_t *wpolicy)
782 {
783         memset(wpolicy, 0, sizeof(*wpolicy));
784         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
785         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
786         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
787         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
788 }
789
790 /*
791  * Export handle<->flock hash operations.
792  */
793 static unsigned
794 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
795 {
796         return cfs_hash_u64_hash(*(__u64 *)key, mask);
797 }
798
799 static void *
800 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
801 {
802         struct ldlm_lock *lock;
803
804         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
805         return &lock->l_policy_data.l_flock.owner;
806 }
807
808 static int
809 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
810 {
811         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
812 }
813
814 static void *
815 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
816 {
817         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
818 }
819
820 static void
821 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
822 {
823         struct ldlm_lock *lock;
824         struct ldlm_flock *flock;
825
826         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
827         LDLM_LOCK_GET(lock);
828
829         flock = &lock->l_policy_data.l_flock;
830         LASSERT(flock->blocking_export != NULL);
831         class_export_get(flock->blocking_export);
832         flock->blocking_refs++;
833 }
834
835 static void
836 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
837 {
838         struct ldlm_lock *lock;
839         struct ldlm_flock *flock;
840
841         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
842         LDLM_LOCK_RELEASE(lock);
843
844         flock = &lock->l_policy_data.l_flock;
845         LASSERT(flock->blocking_export != NULL);
846         class_export_put(flock->blocking_export);
847         if (--flock->blocking_refs == 0) {
848                 flock->blocking_owner = 0;
849                 flock->blocking_export = NULL;
850         }
851 }
852
853 static cfs_hash_ops_t ldlm_export_flock_ops = {
854         .hs_hash        = ldlm_export_flock_hash,
855         .hs_key         = ldlm_export_flock_key,
856         .hs_keycmp      = ldlm_export_flock_keycmp,
857         .hs_object      = ldlm_export_flock_object,
858         .hs_get         = ldlm_export_flock_get,
859         .hs_put         = ldlm_export_flock_put,
860         .hs_put_locked  = ldlm_export_flock_put,
861 };
862
863 int ldlm_init_flock_export(struct obd_export *exp)
864 {
865         exp->exp_flock_hash =
866                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
867                                 HASH_EXP_LOCK_CUR_BITS,
868                                 HASH_EXP_LOCK_MAX_BITS,
869                                 HASH_EXP_LOCK_BKT_BITS, 0,
870                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
871                                 &ldlm_export_flock_ops,
872                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
873         if (!exp->exp_flock_hash)
874                 RETURN(-ENOMEM);
875
876         RETURN(0);
877 }
878 EXPORT_SYMBOL(ldlm_init_flock_export);
879
880 void ldlm_destroy_flock_export(struct obd_export *exp)
881 {
882         ENTRY;
883         if (exp->exp_flock_hash) {
884                 cfs_hash_putref(exp->exp_flock_hash);
885                 exp->exp_flock_hash = NULL;
886         }
887         EXIT;
888 }
889 EXPORT_SYMBOL(ldlm_destroy_flock_export);