Whamcloud - gitweb
LU-1684 ldlm: move ldlm flags not sent through wire to upper 32bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2011, Whamcloud, Inc.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 #define DEBUG_SUBSYSTEM S_LDLM
42
43 #ifdef __KERNEL__
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
49 #else
50 #include <liblustre.h>
51 #include <obd_class.h>
52 #endif
53
54 #include "ldlm_internal.h"
55
56 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
57                             void *data, int flag);
58
59 /**
60  * list_for_remaining_safe - iterate over the remaining entries in a list
61  *              and safeguard against removal of a list entry.
62  * \param pos   the &struct list_head to use as a loop counter. pos MUST
63  *              have been initialized prior to using it in this macro.
64  * \param n     another &struct list_head to use as temporary storage
65  * \param head  the head for your list.
66  */
67 #define list_for_remaining_safe(pos, n, head) \
68         for (n = pos->next; pos != (head); pos = n, n = pos->next)
69
70 static inline int
71 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
72 {
73         return((new->l_policy_data.l_flock.owner ==
74                 lock->l_policy_data.l_flock.owner) &&
75                (new->l_export == lock->l_export));
76 }
77
78 static inline int
79 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
80 {
81         return((new->l_policy_data.l_flock.start <=
82                 lock->l_policy_data.l_flock.end) &&
83                (new->l_policy_data.l_flock.end >=
84                 lock->l_policy_data.l_flock.start));
85 }
86
87 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
88                                            struct ldlm_lock *lock)
89 {
90         int rc = 0;
91
92         /* For server only */
93         if (req->l_export == NULL)
94                 return 0;
95
96         if (unlikely(req->l_export->exp_flock_hash == NULL)) {
97                 rc = ldlm_init_flock_export(req->l_export);
98                 if (rc)
99                         goto error;
100         }
101
102         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
103
104         req->l_policy_data.l_flock.blocking_owner =
105                 lock->l_policy_data.l_flock.owner;
106         req->l_policy_data.l_flock.blocking_export =
107                 lock->l_export;
108         req->l_policy_data.l_flock.blocking_refs = 0;
109
110         cfs_hash_add(req->l_export->exp_flock_hash,
111                      &req->l_policy_data.l_flock.owner,
112                      &req->l_exp_flock_hash);
113 error:
114         return rc;
115 }
116
117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119         /* For server only */
120         if (req->l_export == NULL)
121                 return;
122
123         check_res_locked(req->l_resource);
124         if (req->l_export->exp_flock_hash != NULL &&
125             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
126                 cfs_hash_del(req->l_export->exp_flock_hash,
127                              &req->l_policy_data.l_flock.owner,
128                              &req->l_exp_flock_hash);
129 }
130
131 static inline void
132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133 {
134         ENTRY;
135
136         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
137                    mode, flags);
138
139         /* Safe to not lock here, since it should be empty anyway */
140         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
141
142         cfs_list_del_init(&lock->l_res_link);
143         if (flags == LDLM_FL_WAIT_NOREPROC &&
144             !(lock->l_flags & LDLM_FL_FAILED)) {
145                 /* client side - set a flag to prevent sending a CANCEL */
146                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
147
148                 /* when reaching here, it is under lock_res_and_lock(). Thus,
149                    need call the nolock version of ldlm_lock_decref_internal*/
150                 ldlm_lock_decref_internal_nolock(lock, mode);
151         }
152
153         ldlm_lock_destroy_nolock(lock);
154         EXIT;
155 }
156
157 static int
158 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
159 {
160         struct obd_export *req_exp = req->l_export;
161         struct obd_export *bl_exp = bl_lock->l_export;
162         __u64 req_owner = req->l_policy_data.l_flock.owner;
163         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
164
165         /* For server only */
166         if (req_exp == NULL)
167                 return 0;
168
169         class_export_get(bl_exp);
170         while (1) {
171                 struct obd_export *bl_exp_new;
172                 struct ldlm_lock *lock = NULL;
173                 struct ldlm_flock *flock;
174
175                 if (bl_exp->exp_flock_hash != NULL)
176                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
177                                                &bl_owner);
178                 if (lock == NULL)
179                         break;
180
181                 flock = &lock->l_policy_data.l_flock;
182                 LASSERT(flock->owner == bl_owner);
183                 bl_owner = flock->blocking_owner;
184                 bl_exp_new = class_export_get(flock->blocking_export);
185                 class_export_put(bl_exp);
186
187                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
188                 bl_exp = bl_exp_new;
189
190                 if (bl_owner == req_owner && bl_exp == req_exp) {
191                         class_export_put(bl_exp);
192                         return 1;
193                 }
194         }
195         class_export_put(bl_exp);
196
197         return 0;
198 }
199
200 int
201 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
202                         ldlm_error_t *err, cfs_list_t *work_list)
203 {
204         struct ldlm_resource *res = req->l_resource;
205         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
206         cfs_list_t *tmp;
207         cfs_list_t *ownlocks = NULL;
208         struct ldlm_lock *lock = NULL;
209         struct ldlm_lock *new = req;
210         struct ldlm_lock *new2 = NULL;
211         ldlm_mode_t mode = req->l_req_mode;
212         int local = ns_is_client(ns);
213         int added = (mode == LCK_NL);
214         int overlaps = 0;
215         int splitted = 0;
216         const struct ldlm_callback_suite null_cbs = { NULL };
217         int rc;
218         ENTRY;
219
220         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
221                LPU64" end "LPU64"\n", *flags,
222                new->l_policy_data.l_flock.owner,
223                new->l_policy_data.l_flock.pid, mode,
224                req->l_policy_data.l_flock.start,
225                req->l_policy_data.l_flock.end);
226
227         *err = ELDLM_OK;
228
229         if (local) {
230                 /* No blocking ASTs are sent to the clients for
231                  * Posix file & record locks */
232                 req->l_blocking_ast = NULL;
233         } else {
234                 /* Called on the server for lock cancels. */
235                 req->l_blocking_ast = ldlm_flock_blocking_ast;
236         }
237
238 reprocess:
239         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
240                 /* This loop determines where this processes locks start
241                  * in the resource lr_granted list. */
242                 cfs_list_for_each(tmp, &res->lr_granted) {
243                         lock = cfs_list_entry(tmp, struct ldlm_lock,
244                                               l_res_link);
245                         if (ldlm_same_flock_owner(lock, req)) {
246                                 ownlocks = tmp;
247                                 break;
248                         }
249                 }
250         } else {
251                 lockmode_verify(mode);
252
253                 /* This loop determines if there are existing locks
254                  * that conflict with the new lock request. */
255                 cfs_list_for_each(tmp, &res->lr_granted) {
256                         lock = cfs_list_entry(tmp, struct ldlm_lock,
257                                               l_res_link);
258
259                         if (ldlm_same_flock_owner(lock, req)) {
260                                 if (!ownlocks)
261                                         ownlocks = tmp;
262                                 continue;
263                         }
264
265                         /* locks are compatible, overlap doesn't matter */
266                         if (lockmode_compat(lock->l_granted_mode, mode))
267                                 continue;
268
269                         if (!ldlm_flocks_overlap(lock, req))
270                                 continue;
271
272                         if (!first_enq)
273                                 RETURN(LDLM_ITER_CONTINUE);
274
275                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
276                                 ldlm_flock_destroy(req, mode, *flags);
277                                 *err = -EAGAIN;
278                                 RETURN(LDLM_ITER_STOP);
279                         }
280
281                         if (*flags & LDLM_FL_TEST_LOCK) {
282                                 ldlm_flock_destroy(req, mode, *flags);
283                                 req->l_req_mode = lock->l_granted_mode;
284                                 req->l_policy_data.l_flock.pid =
285                                         lock->l_policy_data.l_flock.pid;
286                                 req->l_policy_data.l_flock.start =
287                                         lock->l_policy_data.l_flock.start;
288                                 req->l_policy_data.l_flock.end =
289                                         lock->l_policy_data.l_flock.end;
290                                 *flags |= LDLM_FL_LOCK_CHANGED;
291                                 RETURN(LDLM_ITER_STOP);
292                         }
293
294                         if (ldlm_flock_deadlock(req, lock)) {
295                                 ldlm_flock_destroy(req, mode, *flags);
296                                 *err = -EDEADLK;
297                                 RETURN(LDLM_ITER_STOP);
298                         }
299
300                         rc = ldlm_flock_blocking_link(req, lock);
301                         if (rc) {
302                                 ldlm_flock_destroy(req, mode, *flags);
303                                 *err = rc;
304                                 RETURN(LDLM_ITER_STOP);
305                         }
306                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
307                         *flags |= LDLM_FL_BLOCK_GRANTED;
308                         RETURN(LDLM_ITER_STOP);
309                 }
310         }
311
312         if (*flags & LDLM_FL_TEST_LOCK) {
313                 ldlm_flock_destroy(req, mode, *flags);
314                 req->l_req_mode = LCK_NL;
315                 *flags |= LDLM_FL_LOCK_CHANGED;
316                 RETURN(LDLM_ITER_STOP);
317         }
318
319         /* In case we had slept on this lock request take it off of the
320          * deadlock detection hash list. */
321         ldlm_flock_blocking_unlink(req);
322
323         /* Scan the locks owned by this process that overlap this request.
324          * We may have to merge or split existing locks. */
325
326         if (!ownlocks)
327                 ownlocks = &res->lr_granted;
328
329         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
330                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
331
332                 if (!ldlm_same_flock_owner(lock, new))
333                         break;
334
335                 if (lock->l_granted_mode == mode) {
336                         /* If the modes are the same then we need to process
337                          * locks that overlap OR adjoin the new lock. The extra
338                          * logic condition is necessary to deal with arithmetic
339                          * overflow and underflow. */
340                         if ((new->l_policy_data.l_flock.start >
341                              (lock->l_policy_data.l_flock.end + 1))
342                             && (lock->l_policy_data.l_flock.end !=
343                                 OBD_OBJECT_EOF))
344                                 continue;
345
346                         if ((new->l_policy_data.l_flock.end <
347                              (lock->l_policy_data.l_flock.start - 1))
348                             && (lock->l_policy_data.l_flock.start != 0))
349                                 break;
350
351                         if (new->l_policy_data.l_flock.start <
352                             lock->l_policy_data.l_flock.start) {
353                                 lock->l_policy_data.l_flock.start =
354                                         new->l_policy_data.l_flock.start;
355                         } else {
356                                 new->l_policy_data.l_flock.start =
357                                         lock->l_policy_data.l_flock.start;
358                         }
359
360                         if (new->l_policy_data.l_flock.end >
361                             lock->l_policy_data.l_flock.end) {
362                                 lock->l_policy_data.l_flock.end =
363                                         new->l_policy_data.l_flock.end;
364                         } else {
365                                 new->l_policy_data.l_flock.end =
366                                         lock->l_policy_data.l_flock.end;
367                         }
368
369                         if (added) {
370                                 ldlm_flock_destroy(lock, mode, *flags);
371                         } else {
372                                 new = lock;
373                                 added = 1;
374                         }
375                         continue;
376                 }
377
378                 if (new->l_policy_data.l_flock.start >
379                     lock->l_policy_data.l_flock.end)
380                         continue;
381
382                 if (new->l_policy_data.l_flock.end <
383                     lock->l_policy_data.l_flock.start)
384                         break;
385
386                 ++overlaps;
387
388                 if (new->l_policy_data.l_flock.start <=
389                     lock->l_policy_data.l_flock.start) {
390                         if (new->l_policy_data.l_flock.end <
391                             lock->l_policy_data.l_flock.end) {
392                                 lock->l_policy_data.l_flock.start =
393                                         new->l_policy_data.l_flock.end + 1;
394                                 break;
395                         }
396                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
397                         continue;
398                 }
399                 if (new->l_policy_data.l_flock.end >=
400                     lock->l_policy_data.l_flock.end) {
401                         lock->l_policy_data.l_flock.end =
402                                 new->l_policy_data.l_flock.start - 1;
403                         continue;
404                 }
405
406                 /* split the existing lock into two locks */
407
408                 /* if this is an F_UNLCK operation then we could avoid
409                  * allocating a new lock and use the req lock passed in
410                  * with the request but this would complicate the reply
411                  * processing since updates to req get reflected in the
412                  * reply. The client side replays the lock request so
413                  * it must see the original lock data in the reply. */
414
415                 /* XXX - if ldlm_lock_new() can sleep we should
416                  * release the lr_lock, allocate the new lock,
417                  * and restart processing this lock. */
418                 if (!new2) {
419                         unlock_res_and_lock(req);
420                          new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
421                                         lock->l_granted_mode, &null_cbs,
422                                         NULL, 0);
423                         lock_res_and_lock(req);
424                         if (!new2) {
425                                 ldlm_flock_destroy(req, lock->l_granted_mode,
426                                                    *flags);
427                                 *err = -ENOLCK;
428                                 RETURN(LDLM_ITER_STOP);
429                         }
430                         goto reprocess;
431                 }
432
433                 splitted = 1;
434
435                 new2->l_granted_mode = lock->l_granted_mode;
436                 new2->l_policy_data.l_flock.pid =
437                         new->l_policy_data.l_flock.pid;
438                 new2->l_policy_data.l_flock.owner =
439                         new->l_policy_data.l_flock.owner;
440                 new2->l_policy_data.l_flock.start =
441                         lock->l_policy_data.l_flock.start;
442                 new2->l_policy_data.l_flock.end =
443                         new->l_policy_data.l_flock.start - 1;
444                 lock->l_policy_data.l_flock.start =
445                         new->l_policy_data.l_flock.end + 1;
446                 new2->l_conn_export = lock->l_conn_export;
447                 if (lock->l_export != NULL) {
448                         new2->l_export = class_export_lock_get(lock->l_export, new2);
449                         if (new2->l_export->exp_lock_hash &&
450                             cfs_hlist_unhashed(&new2->l_exp_hash))
451                                 cfs_hash_add(new2->l_export->exp_lock_hash,
452                                              &new2->l_remote_handle,
453                                              &new2->l_exp_hash);
454                 }
455                 if (*flags == LDLM_FL_WAIT_NOREPROC)
456                         ldlm_lock_addref_internal_nolock(new2,
457                                                          lock->l_granted_mode);
458
459                 /* insert new2 at lock */
460                 ldlm_resource_add_lock(res, ownlocks, new2);
461                 LDLM_LOCK_RELEASE(new2);
462                 break;
463         }
464
465         /* if new2 is created but never used, destroy it*/
466         if (splitted == 0 && new2 != NULL)
467                 ldlm_lock_destroy_nolock(new2);
468
469         /* At this point we're granting the lock request. */
470         req->l_granted_mode = req->l_req_mode;
471
472         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
473         if (!added) {
474                 cfs_list_del_init(&req->l_res_link);
475                 /* insert new lock before ownlocks in list. */
476                 ldlm_resource_add_lock(res, ownlocks, req);
477         }
478
479         if (*flags != LDLM_FL_WAIT_NOREPROC) {
480 #ifdef HAVE_SERVER_SUPPORT
481                 if (first_enq) {
482                         /* If this is an unlock, reprocess the waitq and
483                          * send completions ASTs for locks that can now be
484                          * granted. The only problem with doing this
485                          * reprocessing here is that the completion ASTs for
486                          * newly granted locks will be sent before the unlock
487                          * completion is sent. It shouldn't be an issue. Also
488                          * note that ldlm_process_flock_lock() will recurse,
489                          * but only once because first_enq will be false from
490                          * ldlm_reprocess_queue. */
491                         if ((mode == LCK_NL) && overlaps) {
492                                 CFS_LIST_HEAD(rpc_list);
493                                 int rc;
494 restart:
495                                 ldlm_reprocess_queue(res, &res->lr_waiting,
496                                                      &rpc_list);
497
498                                 unlock_res_and_lock(req);
499                                 rc = ldlm_run_ast_work(ns, &rpc_list,
500                                                        LDLM_WORK_CP_AST);
501                                 lock_res_and_lock(req);
502                                 if (rc == -ERESTART)
503                                         GOTO(restart, -ERESTART);
504                        }
505                 } else {
506                         LASSERT(req->l_completion_ast);
507                         ldlm_add_ast_work_item(req, NULL, work_list);
508                 }
509 #else /* !HAVE_SERVER_SUPPORT */
510                 /* The only one possible case for client-side calls flock
511                  * policy function is ldlm_flock_completion_ast inside which
512                  * carries LDLM_FL_WAIT_NOREPROC flag. */
513                 CERROR("Illegal parameter for client-side-only module.\n");
514                 LBUG();
515 #endif /* HAVE_SERVER_SUPPORT */
516         }
517
518         /* In case we're reprocessing the requested lock we can't destroy
519          * it until after calling ldlm_ast_work_item() above so that lawi()
520          * can bump the reference count on req. Otherwise req could be freed
521          * before the completion AST can be sent.  */
522         if (added)
523                 ldlm_flock_destroy(req, mode, *flags);
524
525         ldlm_resource_dump(D_INFO, res);
526         RETURN(LDLM_ITER_CONTINUE);
527 }
528
529 struct ldlm_flock_wait_data {
530         struct ldlm_lock *fwd_lock;
531         int               fwd_generation;
532 };
533
534 static void
535 ldlm_flock_interrupted_wait(void *data)
536 {
537         struct ldlm_lock *lock;
538         ENTRY;
539
540         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
541
542         /* take lock off the deadlock detection hash list. */
543         lock_res_and_lock(lock);
544         ldlm_flock_blocking_unlink(lock);
545
546         /* client side - set flag to prevent lock from being put on lru list */
547         lock->l_flags |= LDLM_FL_CBPENDING;
548         unlock_res_and_lock(lock);
549
550         EXIT;
551 }
552
553 /**
554  * Flock completion calback function.
555  *
556  * \param lock [in,out]: A lock to be handled
557  * \param flags    [in]: flags
558  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
559  *
560  * \retval 0    : success
561  * \retval <0   : failure
562  */
563 int
564 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
565 {
566         cfs_flock_t                    *getlk = lock->l_ast_data;
567         struct obd_device              *obd;
568         struct obd_import              *imp = NULL;
569         struct ldlm_flock_wait_data     fwd;
570         struct l_wait_info              lwi;
571         ldlm_error_t                    err;
572         int                             rc = 0;
573         ENTRY;
574
575         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
576                flags, data, getlk);
577
578         /* Import invalidation. We need to actually release the lock
579          * references being held, so that it can go away. No point in
580          * holding the lock even if app still believes it has it, since
581          * server already dropped it anyway. Only for granted locks too. */
582         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
583             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
584                 if (lock->l_req_mode == lock->l_granted_mode &&
585                     lock->l_granted_mode != LCK_NL &&
586                     NULL == data)
587                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
588
589                 /* Need to wake up the waiter if we were evicted */
590                 cfs_waitq_signal(&lock->l_waitq);
591                 RETURN(0);
592         }
593
594         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
595
596         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
597                        LDLM_FL_BLOCK_CONV))) {
598                 if (NULL == data)
599                         /* mds granted the lock in the reply */
600                         goto granted;
601                 /* CP AST RPC: lock get granted, wake it up */
602                 cfs_waitq_signal(&lock->l_waitq);
603                 RETURN(0);
604         }
605
606         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
607                    "sleeping");
608         fwd.fwd_lock = lock;
609         obd = class_exp2obd(lock->l_conn_export);
610
611         /* if this is a local lock, there is no import */
612         if (NULL != obd)
613                 imp = obd->u.cli.cl_import;
614
615         if (NULL != imp) {
616                 cfs_spin_lock(&imp->imp_lock);
617                 fwd.fwd_generation = imp->imp_generation;
618                 cfs_spin_unlock(&imp->imp_lock);
619         }
620
621         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
622
623         /* Go to sleep until the lock is granted. */
624         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
625
626         if (rc) {
627                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
628                            rc);
629                 RETURN(rc);
630         }
631
632 granted:
633         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
634
635         if (lock->l_destroyed) {
636                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
637                 RETURN(0);
638         }
639
640         if (lock->l_flags & LDLM_FL_FAILED) {
641                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
642                 RETURN(-EIO);
643         }
644
645         if (rc) {
646                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
647                            rc);
648                 RETURN(rc);
649         }
650
651         LDLM_DEBUG(lock, "client-side enqueue granted");
652
653         lock_res_and_lock(lock);
654
655         /* take lock off the deadlock detection hash list. */
656         ldlm_flock_blocking_unlink(lock);
657
658         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
659         cfs_list_del_init(&lock->l_res_link);
660
661         if (flags & LDLM_FL_TEST_LOCK) {
662                 /* fcntl(F_GETLK) request */
663                 /* The old mode was saved in getlk->fl_type so that if the mode
664                  * in the lock changes we can decref the appropriate refcount.*/
665                 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
666                                    LDLM_FL_WAIT_NOREPROC);
667                 switch (lock->l_granted_mode) {
668                 case LCK_PR:
669                         cfs_flock_set_type(getlk, F_RDLCK);
670                         break;
671                 case LCK_PW:
672                         cfs_flock_set_type(getlk, F_WRLCK);
673                         break;
674                 default:
675                         cfs_flock_set_type(getlk, F_UNLCK);
676                 }
677                 cfs_flock_set_pid(getlk,
678                                   (pid_t)lock->l_policy_data.l_flock.pid);
679                 cfs_flock_set_start(getlk,
680                                     (loff_t)lock->l_policy_data.l_flock.start);
681                 cfs_flock_set_end(getlk,
682                                   (loff_t)lock->l_policy_data.l_flock.end);
683         } else {
684                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
685
686                 /* We need to reprocess the lock to do merges or splits
687                  * with existing locks owned by this process. */
688                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
689         }
690         unlock_res_and_lock(lock);
691         RETURN(0);
692 }
693 EXPORT_SYMBOL(ldlm_flock_completion_ast);
694
695 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
696                             void *data, int flag)
697 {
698         ENTRY;
699
700         LASSERT(lock);
701         LASSERT(flag == LDLM_CB_CANCELING);
702
703         /* take lock off the deadlock detection hash list. */
704         lock_res_and_lock(lock);
705         ldlm_flock_blocking_unlink(lock);
706         unlock_res_and_lock(lock);
707         RETURN(0);
708 }
709
710 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
711                                        ldlm_policy_data_t *lpolicy)
712 {
713         memset(lpolicy, 0, sizeof(*lpolicy));
714         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
715         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
716         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
717         /* Compat code, old clients had no idea about owner field and
718          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
719          * April 2011 */
720         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
721 }
722
723
724 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
725                                        ldlm_policy_data_t *lpolicy)
726 {
727         memset(lpolicy, 0, sizeof(*lpolicy));
728         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
729         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
730         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
731         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
732 }
733
734 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
735                                      ldlm_wire_policy_data_t *wpolicy)
736 {
737         memset(wpolicy, 0, sizeof(*wpolicy));
738         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
739         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
740         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
741         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
742 }
743
744 /*
745  * Export handle<->flock hash operations.
746  */
747 static unsigned
748 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
749 {
750         return cfs_hash_u64_hash(*(__u64 *)key, mask);
751 }
752
753 static void *
754 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
755 {
756         struct ldlm_lock *lock;
757
758         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
759         return &lock->l_policy_data.l_flock.owner;
760 }
761
762 static int
763 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
764 {
765         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
766 }
767
768 static void *
769 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
770 {
771         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
772 }
773
774 static void
775 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
776 {
777         struct ldlm_lock *lock;
778         struct ldlm_flock *flock;
779
780         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
781         LDLM_LOCK_GET(lock);
782
783         flock = &lock->l_policy_data.l_flock;
784         LASSERT(flock->blocking_export != NULL);
785         class_export_get(flock->blocking_export);
786         flock->blocking_refs++;
787 }
788
789 static void
790 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
791 {
792         struct ldlm_lock *lock;
793         struct ldlm_flock *flock;
794
795         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796         LDLM_LOCK_RELEASE(lock);
797
798         flock = &lock->l_policy_data.l_flock;
799         LASSERT(flock->blocking_export != NULL);
800         class_export_put(flock->blocking_export);
801         if (--flock->blocking_refs == 0) {
802                 flock->blocking_owner = 0;
803                 flock->blocking_export = NULL;
804         }
805 }
806
807 static cfs_hash_ops_t ldlm_export_flock_ops = {
808         .hs_hash        = ldlm_export_flock_hash,
809         .hs_key         = ldlm_export_flock_key,
810         .hs_keycmp      = ldlm_export_flock_keycmp,
811         .hs_object      = ldlm_export_flock_object,
812         .hs_get         = ldlm_export_flock_get,
813         .hs_put         = ldlm_export_flock_put,
814         .hs_put_locked  = ldlm_export_flock_put,
815 };
816
817 int ldlm_init_flock_export(struct obd_export *exp)
818 {
819         exp->exp_flock_hash =
820                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
821                                 HASH_EXP_LOCK_CUR_BITS,
822                                 HASH_EXP_LOCK_MAX_BITS,
823                                 HASH_EXP_LOCK_BKT_BITS, 0,
824                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
825                                 &ldlm_export_flock_ops,
826                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
827         if (!exp->exp_flock_hash)
828                 RETURN(-ENOMEM);
829
830         RETURN(0);
831 }
832 EXPORT_SYMBOL(ldlm_init_flock_export);
833
834 void ldlm_destroy_flock_export(struct obd_export *exp)
835 {
836         ENTRY;
837         if (exp->exp_flock_hash) {
838                 cfs_hash_putref(exp->exp_flock_hash);
839                 exp->exp_flock_hash = NULL;
840         }
841         EXIT;
842 }
843 EXPORT_SYMBOL(ldlm_destroy_flock_export);