Whamcloud - gitweb
Revert "LU-2459 osd: add LMA incompat flag check"
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                            struct ldlm_lock *lock)
105 {
106         int rc = 0;
107
108         /* For server only */
109         if (req->l_export == NULL)
110                 return 0;
111
112         if (unlikely(req->l_export->exp_flock_hash == NULL)) {
113                 rc = ldlm_init_flock_export(req->l_export);
114                 if (rc)
115                         goto error;
116         }
117
118         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
119
120         req->l_policy_data.l_flock.blocking_owner =
121                 lock->l_policy_data.l_flock.owner;
122         req->l_policy_data.l_flock.blocking_export =
123                 lock->l_export;
124         req->l_policy_data.l_flock.blocking_refs = 0;
125
126         cfs_hash_add(req->l_export->exp_flock_hash,
127                      &req->l_policy_data.l_flock.owner,
128                      &req->l_exp_flock_hash);
129 error:
130         return rc;
131 }
132
133 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
134 {
135         /* For server only */
136         if (req->l_export == NULL)
137                 return;
138
139         check_res_locked(req->l_resource);
140         if (req->l_export->exp_flock_hash != NULL &&
141             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
142                 cfs_hash_del(req->l_export->exp_flock_hash,
143                              &req->l_policy_data.l_flock.owner,
144                              &req->l_exp_flock_hash);
145 }
146
147 static inline void
148 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
149 {
150         ENTRY;
151
152         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
153                    mode, flags);
154
155         /* Safe to not lock here, since it should be empty anyway */
156         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
157
158         cfs_list_del_init(&lock->l_res_link);
159         if (flags == LDLM_FL_WAIT_NOREPROC &&
160             !(lock->l_flags & LDLM_FL_FAILED)) {
161                 /* client side - set a flag to prevent sending a CANCEL */
162                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
163
164                 /* when reaching here, it is under lock_res_and_lock(). Thus,
165                    need call the nolock version of ldlm_lock_decref_internal*/
166                 ldlm_lock_decref_internal_nolock(lock, mode);
167         }
168
169         ldlm_lock_destroy_nolock(lock);
170         EXIT;
171 }
172
173 /**
174  * POSIX locks deadlock detection code.
175  *
176  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
177  * with, we need to iterate through all blocked POSIX locks for this
178  * export and see if there is a deadlock condition arising. (i.e. when
179  * one client holds a lock on something and want a lock on something
180  * else and at the same time another client has the opposite situation).
181  */
182 static int
183 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
184 {
185         struct obd_export *req_exp = req->l_export;
186         struct obd_export *bl_exp = bl_lock->l_export;
187         __u64 req_owner = req->l_policy_data.l_flock.owner;
188         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
189
190         /* For server only */
191         if (req_exp == NULL)
192                 return 0;
193
194         class_export_get(bl_exp);
195         while (1) {
196                 struct obd_export *bl_exp_new;
197                 struct ldlm_lock *lock = NULL;
198                 struct ldlm_flock *flock;
199
200                 if (bl_exp->exp_flock_hash != NULL)
201                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
202                                                &bl_owner);
203                 if (lock == NULL)
204                         break;
205
206                 flock = &lock->l_policy_data.l_flock;
207                 LASSERT(flock->owner == bl_owner);
208                 bl_owner = flock->blocking_owner;
209                 bl_exp_new = class_export_get(flock->blocking_export);
210                 class_export_put(bl_exp);
211
212                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
213                 bl_exp = bl_exp_new;
214
215                 if (bl_owner == req_owner && bl_exp == req_exp) {
216                         class_export_put(bl_exp);
217                         return 1;
218                 }
219         }
220         class_export_put(bl_exp);
221
222         return 0;
223 }
224
225 /**
226  * Process a granting attempt for flock lock.
227  * Must be called under ns lock held.
228  *
229  * This function looks for any conflicts for \a lock in the granted or
230  * waiting queues. The lock is granted if no conflicts are found in
231  * either queue.
232  *
233  * It is also responsible for splitting a lock if a portion of the lock
234  * is released.
235  *
236  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
237  *   - blocking ASTs have already been sent
238  *
239  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
240  *   - blocking ASTs have not been sent yet, so list of conflicting locks
241  *     would be collected and ASTs sent.
242  */
243 int
244 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
245                         ldlm_error_t *err, cfs_list_t *work_list)
246 {
247         struct ldlm_resource *res = req->l_resource;
248         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
249         cfs_list_t *tmp;
250         cfs_list_t *ownlocks = NULL;
251         struct ldlm_lock *lock = NULL;
252         struct ldlm_lock *new = req;
253         struct ldlm_lock *new2 = NULL;
254         ldlm_mode_t mode = req->l_req_mode;
255         int local = ns_is_client(ns);
256         int added = (mode == LCK_NL);
257         int overlaps = 0;
258         int splitted = 0;
259         const struct ldlm_callback_suite null_cbs = { NULL };
260         int rc;
261         ENTRY;
262
263         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
264                LPU64" end "LPU64"\n", *flags,
265                new->l_policy_data.l_flock.owner,
266                new->l_policy_data.l_flock.pid, mode,
267                req->l_policy_data.l_flock.start,
268                req->l_policy_data.l_flock.end);
269
270         *err = ELDLM_OK;
271
272         if (local) {
273                 /* No blocking ASTs are sent to the clients for
274                  * Posix file & record locks */
275                 req->l_blocking_ast = NULL;
276         } else {
277                 /* Called on the server for lock cancels. */
278                 req->l_blocking_ast = ldlm_flock_blocking_ast;
279         }
280
281 reprocess:
282         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
283                 /* This loop determines where this processes locks start
284                  * in the resource lr_granted list. */
285                 cfs_list_for_each(tmp, &res->lr_granted) {
286                         lock = cfs_list_entry(tmp, struct ldlm_lock,
287                                               l_res_link);
288                         if (ldlm_same_flock_owner(lock, req)) {
289                                 ownlocks = tmp;
290                                 break;
291                         }
292                 }
293         } else {
294                 lockmode_verify(mode);
295
296                 /* This loop determines if there are existing locks
297                  * that conflict with the new lock request. */
298                 cfs_list_for_each(tmp, &res->lr_granted) {
299                         lock = cfs_list_entry(tmp, struct ldlm_lock,
300                                               l_res_link);
301
302                         if (ldlm_same_flock_owner(lock, req)) {
303                                 if (!ownlocks)
304                                         ownlocks = tmp;
305                                 continue;
306                         }
307
308                         /* locks are compatible, overlap doesn't matter */
309                         if (lockmode_compat(lock->l_granted_mode, mode))
310                                 continue;
311
312                         if (!ldlm_flocks_overlap(lock, req))
313                                 continue;
314
315                         if (!first_enq)
316                                 RETURN(LDLM_ITER_CONTINUE);
317
318                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
319                                 ldlm_flock_destroy(req, mode, *flags);
320                                 *err = -EAGAIN;
321                                 RETURN(LDLM_ITER_STOP);
322                         }
323
324                         if (*flags & LDLM_FL_TEST_LOCK) {
325                                 ldlm_flock_destroy(req, mode, *flags);
326                                 req->l_req_mode = lock->l_granted_mode;
327                                 req->l_policy_data.l_flock.pid =
328                                         lock->l_policy_data.l_flock.pid;
329                                 req->l_policy_data.l_flock.start =
330                                         lock->l_policy_data.l_flock.start;
331                                 req->l_policy_data.l_flock.end =
332                                         lock->l_policy_data.l_flock.end;
333                                 *flags |= LDLM_FL_LOCK_CHANGED;
334                                 RETURN(LDLM_ITER_STOP);
335                         }
336
337                         if (ldlm_flock_deadlock(req, lock)) {
338                                 ldlm_flock_destroy(req, mode, *flags);
339                                 *err = -EDEADLK;
340                                 RETURN(LDLM_ITER_STOP);
341                         }
342
343                         rc = ldlm_flock_blocking_link(req, lock);
344                         if (rc) {
345                                 ldlm_flock_destroy(req, mode, *flags);
346                                 *err = rc;
347                                 RETURN(LDLM_ITER_STOP);
348                         }
349                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
350                         *flags |= LDLM_FL_BLOCK_GRANTED;
351                         RETURN(LDLM_ITER_STOP);
352                 }
353         }
354
355         if (*flags & LDLM_FL_TEST_LOCK) {
356                 ldlm_flock_destroy(req, mode, *flags);
357                 req->l_req_mode = LCK_NL;
358                 *flags |= LDLM_FL_LOCK_CHANGED;
359                 RETURN(LDLM_ITER_STOP);
360         }
361
362         /* In case we had slept on this lock request take it off of the
363          * deadlock detection hash list. */
364         ldlm_flock_blocking_unlink(req);
365
366         /* Scan the locks owned by this process that overlap this request.
367          * We may have to merge or split existing locks. */
368
369         if (!ownlocks)
370                 ownlocks = &res->lr_granted;
371
372         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
373                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
374
375                 if (!ldlm_same_flock_owner(lock, new))
376                         break;
377
378                 if (lock->l_granted_mode == mode) {
379                         /* If the modes are the same then we need to process
380                          * locks that overlap OR adjoin the new lock. The extra
381                          * logic condition is necessary to deal with arithmetic
382                          * overflow and underflow. */
383                         if ((new->l_policy_data.l_flock.start >
384                              (lock->l_policy_data.l_flock.end + 1))
385                             && (lock->l_policy_data.l_flock.end !=
386                                 OBD_OBJECT_EOF))
387                                 continue;
388
389                         if ((new->l_policy_data.l_flock.end <
390                              (lock->l_policy_data.l_flock.start - 1))
391                             && (lock->l_policy_data.l_flock.start != 0))
392                                 break;
393
394                         if (new->l_policy_data.l_flock.start <
395                             lock->l_policy_data.l_flock.start) {
396                                 lock->l_policy_data.l_flock.start =
397                                         new->l_policy_data.l_flock.start;
398                         } else {
399                                 new->l_policy_data.l_flock.start =
400                                         lock->l_policy_data.l_flock.start;
401                         }
402
403                         if (new->l_policy_data.l_flock.end >
404                             lock->l_policy_data.l_flock.end) {
405                                 lock->l_policy_data.l_flock.end =
406                                         new->l_policy_data.l_flock.end;
407                         } else {
408                                 new->l_policy_data.l_flock.end =
409                                         lock->l_policy_data.l_flock.end;
410                         }
411
412                         if (added) {
413                                 ldlm_flock_destroy(lock, mode, *flags);
414                         } else {
415                                 new = lock;
416                                 added = 1;
417                         }
418                         continue;
419                 }
420
421                 if (new->l_policy_data.l_flock.start >
422                     lock->l_policy_data.l_flock.end)
423                         continue;
424
425                 if (new->l_policy_data.l_flock.end <
426                     lock->l_policy_data.l_flock.start)
427                         break;
428
429                 ++overlaps;
430
431                 if (new->l_policy_data.l_flock.start <=
432                     lock->l_policy_data.l_flock.start) {
433                         if (new->l_policy_data.l_flock.end <
434                             lock->l_policy_data.l_flock.end) {
435                                 lock->l_policy_data.l_flock.start =
436                                         new->l_policy_data.l_flock.end + 1;
437                                 break;
438                         }
439                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
440                         continue;
441                 }
442                 if (new->l_policy_data.l_flock.end >=
443                     lock->l_policy_data.l_flock.end) {
444                         lock->l_policy_data.l_flock.end =
445                                 new->l_policy_data.l_flock.start - 1;
446                         continue;
447                 }
448
449                 /* split the existing lock into two locks */
450
451                 /* if this is an F_UNLCK operation then we could avoid
452                  * allocating a new lock and use the req lock passed in
453                  * with the request but this would complicate the reply
454                  * processing since updates to req get reflected in the
455                  * reply. The client side replays the lock request so
456                  * it must see the original lock data in the reply. */
457
458                 /* XXX - if ldlm_lock_new() can sleep we should
459                  * release the lr_lock, allocate the new lock,
460                  * and restart processing this lock. */
461                 if (!new2) {
462                         unlock_res_and_lock(req);
463                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
464                                                 lock->l_granted_mode, &null_cbs,
465                                                 NULL, 0, LVB_T_NONE);
466                         lock_res_and_lock(req);
467                         if (!new2) {
468                                 ldlm_flock_destroy(req, lock->l_granted_mode,
469                                                    *flags);
470                                 *err = -ENOLCK;
471                                 RETURN(LDLM_ITER_STOP);
472                         }
473                         goto reprocess;
474                 }
475
476                 splitted = 1;
477
478                 new2->l_granted_mode = lock->l_granted_mode;
479                 new2->l_policy_data.l_flock.pid =
480                         new->l_policy_data.l_flock.pid;
481                 new2->l_policy_data.l_flock.owner =
482                         new->l_policy_data.l_flock.owner;
483                 new2->l_policy_data.l_flock.start =
484                         lock->l_policy_data.l_flock.start;
485                 new2->l_policy_data.l_flock.end =
486                         new->l_policy_data.l_flock.start - 1;
487                 lock->l_policy_data.l_flock.start =
488                         new->l_policy_data.l_flock.end + 1;
489                 new2->l_conn_export = lock->l_conn_export;
490                 if (lock->l_export != NULL) {
491                         new2->l_export = class_export_lock_get(lock->l_export, new2);
492                         if (new2->l_export->exp_lock_hash &&
493                             cfs_hlist_unhashed(&new2->l_exp_hash))
494                                 cfs_hash_add(new2->l_export->exp_lock_hash,
495                                              &new2->l_remote_handle,
496                                              &new2->l_exp_hash);
497                 }
498                 if (*flags == LDLM_FL_WAIT_NOREPROC)
499                         ldlm_lock_addref_internal_nolock(new2,
500                                                          lock->l_granted_mode);
501
502                 /* insert new2 at lock */
503                 ldlm_resource_add_lock(res, ownlocks, new2);
504                 LDLM_LOCK_RELEASE(new2);
505                 break;
506         }
507
508         /* if new2 is created but never used, destroy it*/
509         if (splitted == 0 && new2 != NULL)
510                 ldlm_lock_destroy_nolock(new2);
511
512         /* At this point we're granting the lock request. */
513         req->l_granted_mode = req->l_req_mode;
514
515         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
516         if (!added) {
517                 cfs_list_del_init(&req->l_res_link);
518                 /* insert new lock before ownlocks in list. */
519                 ldlm_resource_add_lock(res, ownlocks, req);
520         }
521
522         if (*flags != LDLM_FL_WAIT_NOREPROC) {
523 #ifdef HAVE_SERVER_SUPPORT
524                 if (first_enq) {
525                         /* If this is an unlock, reprocess the waitq and
526                          * send completions ASTs for locks that can now be
527                          * granted. The only problem with doing this
528                          * reprocessing here is that the completion ASTs for
529                          * newly granted locks will be sent before the unlock
530                          * completion is sent. It shouldn't be an issue. Also
531                          * note that ldlm_process_flock_lock() will recurse,
532                          * but only once because first_enq will be false from
533                          * ldlm_reprocess_queue. */
534                         if ((mode == LCK_NL) && overlaps) {
535                                 CFS_LIST_HEAD(rpc_list);
536                                 int rc;
537 restart:
538                                 ldlm_reprocess_queue(res, &res->lr_waiting,
539                                                      &rpc_list);
540
541                                 unlock_res_and_lock(req);
542                                 rc = ldlm_run_ast_work(ns, &rpc_list,
543                                                        LDLM_WORK_CP_AST);
544                                 lock_res_and_lock(req);
545                                 if (rc == -ERESTART)
546                                         GOTO(restart, -ERESTART);
547                        }
548                 } else {
549                         LASSERT(req->l_completion_ast);
550                         ldlm_add_ast_work_item(req, NULL, work_list);
551                 }
552 #else /* !HAVE_SERVER_SUPPORT */
553                 /* The only one possible case for client-side calls flock
554                  * policy function is ldlm_flock_completion_ast inside which
555                  * carries LDLM_FL_WAIT_NOREPROC flag. */
556                 CERROR("Illegal parameter for client-side-only module.\n");
557                 LBUG();
558 #endif /* HAVE_SERVER_SUPPORT */
559         }
560
561         /* In case we're reprocessing the requested lock we can't destroy
562          * it until after calling ldlm_add_ast_work_item() above so that laawi()
563          * can bump the reference count on \a req. Otherwise \a req
564          * could be freed before the completion AST can be sent.  */
565         if (added)
566                 ldlm_flock_destroy(req, mode, *flags);
567
568         ldlm_resource_dump(D_INFO, res);
569         RETURN(LDLM_ITER_CONTINUE);
570 }
571
572 struct ldlm_flock_wait_data {
573         struct ldlm_lock *fwd_lock;
574         int               fwd_generation;
575 };
576
577 static void
578 ldlm_flock_interrupted_wait(void *data)
579 {
580         struct ldlm_lock *lock;
581         ENTRY;
582
583         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
584
585         /* take lock off the deadlock detection hash list. */
586         lock_res_and_lock(lock);
587         ldlm_flock_blocking_unlink(lock);
588
589         /* client side - set flag to prevent lock from being put on LRU list */
590         lock->l_flags |= LDLM_FL_CBPENDING;
591         unlock_res_and_lock(lock);
592
593         EXIT;
594 }
595
596 /**
597  * Flock completion callback function.
598  *
599  * \param lock [in,out]: A lock to be handled
600  * \param flags    [in]: flags
601  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
602  *
603  * \retval 0    : success
604  * \retval <0   : failure
605  */
606 int
607 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
608 {
609         struct file_lock                *getlk = lock->l_ast_data;
610         struct obd_device              *obd;
611         struct obd_import              *imp = NULL;
612         struct ldlm_flock_wait_data     fwd;
613         struct l_wait_info              lwi;
614         ldlm_error_t                    err;
615         int                             rc = 0;
616         ENTRY;
617
618         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
619                flags, data, getlk);
620
621         /* Import invalidation. We need to actually release the lock
622          * references being held, so that it can go away. No point in
623          * holding the lock even if app still believes it has it, since
624          * server already dropped it anyway. Only for granted locks too. */
625         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
626             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
627                 if (lock->l_req_mode == lock->l_granted_mode &&
628                     lock->l_granted_mode != LCK_NL &&
629                     NULL == data)
630                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
631
632                 /* Need to wake up the waiter if we were evicted */
633                 cfs_waitq_signal(&lock->l_waitq);
634                 RETURN(0);
635         }
636
637         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
638
639         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
640                        LDLM_FL_BLOCK_CONV))) {
641                 if (NULL == data)
642                         /* mds granted the lock in the reply */
643                         goto granted;
644                 /* CP AST RPC: lock get granted, wake it up */
645                 cfs_waitq_signal(&lock->l_waitq);
646                 RETURN(0);
647         }
648
649         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
650                    "sleeping");
651         fwd.fwd_lock = lock;
652         obd = class_exp2obd(lock->l_conn_export);
653
654         /* if this is a local lock, there is no import */
655         if (NULL != obd)
656                 imp = obd->u.cli.cl_import;
657
658         if (NULL != imp) {
659                 spin_lock(&imp->imp_lock);
660                 fwd.fwd_generation = imp->imp_generation;
661                 spin_unlock(&imp->imp_lock);
662         }
663
664         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
665
666         /* Go to sleep until the lock is granted. */
667         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
668
669         if (rc) {
670                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
671                            rc);
672                 RETURN(rc);
673         }
674
675 granted:
676         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
677
678         if (lock->l_destroyed) {
679                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
680                 RETURN(0);
681         }
682
683         if (lock->l_flags & LDLM_FL_FAILED) {
684                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
685                 RETURN(-EIO);
686         }
687
688         if (rc) {
689                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
690                            rc);
691                 RETURN(rc);
692         }
693
694         LDLM_DEBUG(lock, "client-side enqueue granted");
695
696         lock_res_and_lock(lock);
697
698         /* take lock off the deadlock detection hash list. */
699         ldlm_flock_blocking_unlink(lock);
700
701         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
702         cfs_list_del_init(&lock->l_res_link);
703
704         if (flags & LDLM_FL_TEST_LOCK) {
705                 /* fcntl(F_GETLK) request */
706                 /* The old mode was saved in getlk->fl_type so that if the mode
707                  * in the lock changes we can decref the appropriate refcount.*/
708                 ldlm_flock_destroy(lock, flock_type(getlk),
709                                    LDLM_FL_WAIT_NOREPROC);
710                 switch (lock->l_granted_mode) {
711                 case LCK_PR:
712                         flock_set_type(getlk, F_RDLCK);
713                         break;
714                 case LCK_PW:
715                         flock_set_type(getlk, F_WRLCK);
716                         break;
717                 default:
718                         flock_set_type(getlk, F_UNLCK);
719                 }
720                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
721                 flock_set_start(getlk,
722                                 (loff_t)lock->l_policy_data.l_flock.start);
723                 flock_set_end(getlk,
724                               (loff_t)lock->l_policy_data.l_flock.end);
725         } else {
726                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
727
728                 /* We need to reprocess the lock to do merges or splits
729                  * with existing locks owned by this process. */
730                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
731         }
732         unlock_res_and_lock(lock);
733         RETURN(0);
734 }
735 EXPORT_SYMBOL(ldlm_flock_completion_ast);
736
737 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
738                             void *data, int flag)
739 {
740         ENTRY;
741
742         LASSERT(lock);
743         LASSERT(flag == LDLM_CB_CANCELING);
744
745         /* take lock off the deadlock detection hash list. */
746         lock_res_and_lock(lock);
747         ldlm_flock_blocking_unlink(lock);
748         unlock_res_and_lock(lock);
749         RETURN(0);
750 }
751
752 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
753                                        ldlm_policy_data_t *lpolicy)
754 {
755         memset(lpolicy, 0, sizeof(*lpolicy));
756         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
757         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
758         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
759         /* Compat code, old clients had no idea about owner field and
760          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
761          * April 2011 */
762         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
763 }
764
765
766 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
767                                        ldlm_policy_data_t *lpolicy)
768 {
769         memset(lpolicy, 0, sizeof(*lpolicy));
770         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
771         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
772         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
773         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
774 }
775
776 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
777                                      ldlm_wire_policy_data_t *wpolicy)
778 {
779         memset(wpolicy, 0, sizeof(*wpolicy));
780         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
781         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
782         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
783         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
784 }
785
786 /*
787  * Export handle<->flock hash operations.
788  */
789 static unsigned
790 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
791 {
792         return cfs_hash_u64_hash(*(__u64 *)key, mask);
793 }
794
795 static void *
796 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
797 {
798         struct ldlm_lock *lock;
799
800         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
801         return &lock->l_policy_data.l_flock.owner;
802 }
803
804 static int
805 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
806 {
807         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
808 }
809
810 static void *
811 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
812 {
813         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
814 }
815
816 static void
817 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
818 {
819         struct ldlm_lock *lock;
820         struct ldlm_flock *flock;
821
822         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
823         LDLM_LOCK_GET(lock);
824
825         flock = &lock->l_policy_data.l_flock;
826         LASSERT(flock->blocking_export != NULL);
827         class_export_get(flock->blocking_export);
828         flock->blocking_refs++;
829 }
830
831 static void
832 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
833 {
834         struct ldlm_lock *lock;
835         struct ldlm_flock *flock;
836
837         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
838         LDLM_LOCK_RELEASE(lock);
839
840         flock = &lock->l_policy_data.l_flock;
841         LASSERT(flock->blocking_export != NULL);
842         class_export_put(flock->blocking_export);
843         if (--flock->blocking_refs == 0) {
844                 flock->blocking_owner = 0;
845                 flock->blocking_export = NULL;
846         }
847 }
848
849 static cfs_hash_ops_t ldlm_export_flock_ops = {
850         .hs_hash        = ldlm_export_flock_hash,
851         .hs_key         = ldlm_export_flock_key,
852         .hs_keycmp      = ldlm_export_flock_keycmp,
853         .hs_object      = ldlm_export_flock_object,
854         .hs_get         = ldlm_export_flock_get,
855         .hs_put         = ldlm_export_flock_put,
856         .hs_put_locked  = ldlm_export_flock_put,
857 };
858
859 int ldlm_init_flock_export(struct obd_export *exp)
860 {
861         exp->exp_flock_hash =
862                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
863                                 HASH_EXP_LOCK_CUR_BITS,
864                                 HASH_EXP_LOCK_MAX_BITS,
865                                 HASH_EXP_LOCK_BKT_BITS, 0,
866                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
867                                 &ldlm_export_flock_ops,
868                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
869         if (!exp->exp_flock_hash)
870                 RETURN(-ENOMEM);
871
872         RETURN(0);
873 }
874 EXPORT_SYMBOL(ldlm_init_flock_export);
875
876 void ldlm_destroy_flock_export(struct obd_export *exp)
877 {
878         ENTRY;
879         if (exp->exp_flock_hash) {
880                 cfs_hash_putref(exp->exp_flock_hash);
881                 exp->exp_flock_hash = NULL;
882         }
883         EXIT;
884 }
885 EXPORT_SYMBOL(ldlm_destroy_flock_export);