Whamcloud - gitweb
LU-1522 recovery: rework LU-1166 patch in different way
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2011, Whamcloud, Inc.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 #define DEBUG_SUBSYSTEM S_LDLM
42
43 #ifdef __KERNEL__
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
49 #else
50 #include <liblustre.h>
51 #include <obd_class.h>
52 #endif
53
54 #include "ldlm_internal.h"
55
56 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
57                             void *data, int flag);
58
59 /**
60  * list_for_remaining_safe - iterate over the remaining entries in a list
61  *              and safeguard against removal of a list entry.
62  * \param pos   the &struct list_head to use as a loop counter. pos MUST
63  *              have been initialized prior to using it in this macro.
64  * \param n     another &struct list_head to use as temporary storage
65  * \param head  the head for your list.
66  */
67 #define list_for_remaining_safe(pos, n, head) \
68         for (n = pos->next; pos != (head); pos = n, n = pos->next)
69
70 static inline int
71 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
72 {
73         return((new->l_policy_data.l_flock.owner ==
74                 lock->l_policy_data.l_flock.owner) &&
75                (new->l_export == lock->l_export));
76 }
77
78 static inline int
79 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
80 {
81         return((new->l_policy_data.l_flock.start <=
82                 lock->l_policy_data.l_flock.end) &&
83                (new->l_policy_data.l_flock.end >=
84                 lock->l_policy_data.l_flock.start));
85 }
86
87 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
88                                            struct ldlm_lock *lock)
89 {
90         int rc = 0;
91
92         /* For server only */
93         if (req->l_export == NULL)
94                 return 0;
95
96         if (unlikely(req->l_export->exp_flock_hash == NULL)) {
97                 rc = ldlm_init_flock_export(req->l_export);
98                 if (rc)
99                         goto error;
100         }
101
102         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
103
104         req->l_policy_data.l_flock.blocking_owner =
105                 lock->l_policy_data.l_flock.owner;
106         req->l_policy_data.l_flock.blocking_export =
107                 lock->l_export;
108         req->l_policy_data.l_flock.blocking_refs = 0;
109
110         cfs_hash_add(req->l_export->exp_flock_hash,
111                      &req->l_policy_data.l_flock.owner,
112                      &req->l_exp_flock_hash);
113 error:
114         return rc;
115 }
116
117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119         /* For server only */
120         if (req->l_export == NULL)
121                 return;
122
123         check_res_locked(req->l_resource);
124         if (req->l_export->exp_flock_hash != NULL &&
125             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
126                 cfs_hash_del(req->l_export->exp_flock_hash,
127                              &req->l_policy_data.l_flock.owner,
128                              &req->l_exp_flock_hash);
129 }
130
131 static inline void
132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
133 {
134         ENTRY;
135
136         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
137                    mode, flags);
138
139         /* Safe to not lock here, since it should be empty anyway */
140         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
141
142         cfs_list_del_init(&lock->l_res_link);
143         if (flags == LDLM_FL_WAIT_NOREPROC &&
144             !(lock->l_flags & LDLM_FL_FAILED)) {
145                 /* client side - set a flag to prevent sending a CANCEL */
146                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
147
148                 /* when reaching here, it is under lock_res_and_lock(). Thus,
149                    need call the nolock version of ldlm_lock_decref_internal*/
150                 ldlm_lock_decref_internal_nolock(lock, mode);
151         }
152
153         ldlm_lock_destroy_nolock(lock);
154         EXIT;
155 }
156
157 static int
158 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
159 {
160         struct obd_export *req_exp = req->l_export;
161         struct obd_export *bl_exp = bl_lock->l_export;
162         __u64 req_owner = req->l_policy_data.l_flock.owner;
163         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
164
165         /* For server only */
166         if (req_exp == NULL)
167                 return 0;
168
169         class_export_get(bl_exp);
170         while (1) {
171                 struct obd_export *bl_exp_new;
172                 struct ldlm_lock *lock = NULL;
173                 struct ldlm_flock *flock;
174
175                 if (bl_exp->exp_flock_hash != NULL)
176                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
177                                                &bl_owner);
178                 if (lock == NULL)
179                         break;
180
181                 flock = &lock->l_policy_data.l_flock;
182                 LASSERT(flock->owner == bl_owner);
183                 bl_owner = flock->blocking_owner;
184                 bl_exp_new = class_export_get(flock->blocking_export);
185                 class_export_put(bl_exp);
186
187                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
188                 bl_exp = bl_exp_new;
189
190                 if (bl_owner == req_owner && bl_exp == req_exp) {
191                         class_export_put(bl_exp);
192                         return 1;
193                 }
194         }
195         class_export_put(bl_exp);
196
197         return 0;
198 }
199
200 int
201 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
202                         ldlm_error_t *err, cfs_list_t *work_list)
203 {
204         struct ldlm_resource *res = req->l_resource;
205         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
206         cfs_list_t *tmp;
207         cfs_list_t *ownlocks = NULL;
208         struct ldlm_lock *lock = NULL;
209         struct ldlm_lock *new = req;
210         struct ldlm_lock *new2 = NULL;
211         ldlm_mode_t mode = req->l_req_mode;
212         int local = ns_is_client(ns);
213         int added = (mode == LCK_NL);
214         int overlaps = 0;
215         int splitted = 0;
216         const struct ldlm_callback_suite null_cbs = { NULL };
217         int rc;
218         ENTRY;
219
220         CDEBUG(D_DLMTRACE, "flags %#x owner "LPU64" pid %u mode %u start "LPU64
221                " end "LPU64"\n", *flags, new->l_policy_data.l_flock.owner,
222                new->l_policy_data.l_flock.pid, mode,
223                req->l_policy_data.l_flock.start,
224                req->l_policy_data.l_flock.end);
225
226         *err = ELDLM_OK;
227
228         if (local) {
229                 /* No blocking ASTs are sent to the clients for
230                  * Posix file & record locks */
231                 req->l_blocking_ast = NULL;
232         } else {
233                 /* Called on the server for lock cancels. */
234                 req->l_blocking_ast = ldlm_flock_blocking_ast;
235         }
236
237 reprocess:
238         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
239                 /* This loop determines where this processes locks start
240                  * in the resource lr_granted list. */
241                 cfs_list_for_each(tmp, &res->lr_granted) {
242                         lock = cfs_list_entry(tmp, struct ldlm_lock,
243                                               l_res_link);
244                         if (ldlm_same_flock_owner(lock, req)) {
245                                 ownlocks = tmp;
246                                 break;
247                         }
248                 }
249         } else {
250                 lockmode_verify(mode);
251
252                 /* This loop determines if there are existing locks
253                  * that conflict with the new lock request. */
254                 cfs_list_for_each(tmp, &res->lr_granted) {
255                         lock = cfs_list_entry(tmp, struct ldlm_lock,
256                                               l_res_link);
257
258                         if (ldlm_same_flock_owner(lock, req)) {
259                                 if (!ownlocks)
260                                         ownlocks = tmp;
261                                 continue;
262                         }
263
264                         /* locks are compatible, overlap doesn't matter */
265                         if (lockmode_compat(lock->l_granted_mode, mode))
266                                 continue;
267
268                         if (!ldlm_flocks_overlap(lock, req))
269                                 continue;
270
271                         if (!first_enq)
272                                 RETURN(LDLM_ITER_CONTINUE);
273
274                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
275                                 ldlm_flock_destroy(req, mode, *flags);
276                                 *err = -EAGAIN;
277                                 RETURN(LDLM_ITER_STOP);
278                         }
279
280                         if (*flags & LDLM_FL_TEST_LOCK) {
281                                 ldlm_flock_destroy(req, mode, *flags);
282                                 req->l_req_mode = lock->l_granted_mode;
283                                 req->l_policy_data.l_flock.pid =
284                                         lock->l_policy_data.l_flock.pid;
285                                 req->l_policy_data.l_flock.start =
286                                         lock->l_policy_data.l_flock.start;
287                                 req->l_policy_data.l_flock.end =
288                                         lock->l_policy_data.l_flock.end;
289                                 *flags |= LDLM_FL_LOCK_CHANGED;
290                                 RETURN(LDLM_ITER_STOP);
291                         }
292
293                         if (ldlm_flock_deadlock(req, lock)) {
294                                 ldlm_flock_destroy(req, mode, *flags);
295                                 *err = -EDEADLK;
296                                 RETURN(LDLM_ITER_STOP);
297                         }
298
299                         rc = ldlm_flock_blocking_link(req, lock);
300                         if (rc) {
301                                 ldlm_flock_destroy(req, mode, *flags);
302                                 *err = rc;
303                                 RETURN(LDLM_ITER_STOP);
304                         }
305                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
306                         *flags |= LDLM_FL_BLOCK_GRANTED;
307                         RETURN(LDLM_ITER_STOP);
308                 }
309         }
310
311         if (*flags & LDLM_FL_TEST_LOCK) {
312                 ldlm_flock_destroy(req, mode, *flags);
313                 req->l_req_mode = LCK_NL;
314                 *flags |= LDLM_FL_LOCK_CHANGED;
315                 RETURN(LDLM_ITER_STOP);
316         }
317
318         /* In case we had slept on this lock request take it off of the
319          * deadlock detection hash list. */
320         ldlm_flock_blocking_unlink(req);
321
322         /* Scan the locks owned by this process that overlap this request.
323          * We may have to merge or split existing locks. */
324
325         if (!ownlocks)
326                 ownlocks = &res->lr_granted;
327
328         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
329                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
330
331                 if (!ldlm_same_flock_owner(lock, new))
332                         break;
333
334                 if (lock->l_granted_mode == mode) {
335                         /* If the modes are the same then we need to process
336                          * locks that overlap OR adjoin the new lock. The extra
337                          * logic condition is necessary to deal with arithmetic
338                          * overflow and underflow. */
339                         if ((new->l_policy_data.l_flock.start >
340                              (lock->l_policy_data.l_flock.end + 1))
341                             && (lock->l_policy_data.l_flock.end !=
342                                 OBD_OBJECT_EOF))
343                                 continue;
344
345                         if ((new->l_policy_data.l_flock.end <
346                              (lock->l_policy_data.l_flock.start - 1))
347                             && (lock->l_policy_data.l_flock.start != 0))
348                                 break;
349
350                         if (new->l_policy_data.l_flock.start <
351                             lock->l_policy_data.l_flock.start) {
352                                 lock->l_policy_data.l_flock.start =
353                                         new->l_policy_data.l_flock.start;
354                         } else {
355                                 new->l_policy_data.l_flock.start =
356                                         lock->l_policy_data.l_flock.start;
357                         }
358
359                         if (new->l_policy_data.l_flock.end >
360                             lock->l_policy_data.l_flock.end) {
361                                 lock->l_policy_data.l_flock.end =
362                                         new->l_policy_data.l_flock.end;
363                         } else {
364                                 new->l_policy_data.l_flock.end =
365                                         lock->l_policy_data.l_flock.end;
366                         }
367
368                         if (added) {
369                                 ldlm_flock_destroy(lock, mode, *flags);
370                         } else {
371                                 new = lock;
372                                 added = 1;
373                         }
374                         continue;
375                 }
376
377                 if (new->l_policy_data.l_flock.start >
378                     lock->l_policy_data.l_flock.end)
379                         continue;
380
381                 if (new->l_policy_data.l_flock.end <
382                     lock->l_policy_data.l_flock.start)
383                         break;
384
385                 ++overlaps;
386
387                 if (new->l_policy_data.l_flock.start <=
388                     lock->l_policy_data.l_flock.start) {
389                         if (new->l_policy_data.l_flock.end <
390                             lock->l_policy_data.l_flock.end) {
391                                 lock->l_policy_data.l_flock.start =
392                                         new->l_policy_data.l_flock.end + 1;
393                                 break;
394                         }
395                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
396                         continue;
397                 }
398                 if (new->l_policy_data.l_flock.end >=
399                     lock->l_policy_data.l_flock.end) {
400                         lock->l_policy_data.l_flock.end =
401                                 new->l_policy_data.l_flock.start - 1;
402                         continue;
403                 }
404
405                 /* split the existing lock into two locks */
406
407                 /* if this is an F_UNLCK operation then we could avoid
408                  * allocating a new lock and use the req lock passed in
409                  * with the request but this would complicate the reply
410                  * processing since updates to req get reflected in the
411                  * reply. The client side replays the lock request so
412                  * it must see the original lock data in the reply. */
413
414                 /* XXX - if ldlm_lock_new() can sleep we should
415                  * release the lr_lock, allocate the new lock,
416                  * and restart processing this lock. */
417                 if (!new2) {
418                         unlock_res_and_lock(req);
419                          new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
420                                         lock->l_granted_mode, &null_cbs,
421                                         NULL, 0);
422                         lock_res_and_lock(req);
423                         if (!new2) {
424                                 ldlm_flock_destroy(req, lock->l_granted_mode,
425                                                    *flags);
426                                 *err = -ENOLCK;
427                                 RETURN(LDLM_ITER_STOP);
428                         }
429                         goto reprocess;
430                 }
431
432                 splitted = 1;
433
434                 new2->l_granted_mode = lock->l_granted_mode;
435                 new2->l_policy_data.l_flock.pid =
436                         new->l_policy_data.l_flock.pid;
437                 new2->l_policy_data.l_flock.owner =
438                         new->l_policy_data.l_flock.owner;
439                 new2->l_policy_data.l_flock.start =
440                         lock->l_policy_data.l_flock.start;
441                 new2->l_policy_data.l_flock.end =
442                         new->l_policy_data.l_flock.start - 1;
443                 lock->l_policy_data.l_flock.start =
444                         new->l_policy_data.l_flock.end + 1;
445                 new2->l_conn_export = lock->l_conn_export;
446                 if (lock->l_export != NULL) {
447                         new2->l_export = class_export_lock_get(lock->l_export, new2);
448                         if (new2->l_export->exp_lock_hash &&
449                             cfs_hlist_unhashed(&new2->l_exp_hash))
450                                 cfs_hash_add(new2->l_export->exp_lock_hash,
451                                              &new2->l_remote_handle,
452                                              &new2->l_exp_hash);
453                 }
454                 if (*flags == LDLM_FL_WAIT_NOREPROC)
455                         ldlm_lock_addref_internal_nolock(new2,
456                                                          lock->l_granted_mode);
457
458                 /* insert new2 at lock */
459                 ldlm_resource_add_lock(res, ownlocks, new2);
460                 LDLM_LOCK_RELEASE(new2);
461                 break;
462         }
463
464         /* if new2 is created but never used, destroy it*/
465         if (splitted == 0 && new2 != NULL)
466                 ldlm_lock_destroy_nolock(new2);
467
468         /* At this point we're granting the lock request. */
469         req->l_granted_mode = req->l_req_mode;
470
471         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
472         if (!added) {
473                 cfs_list_del_init(&req->l_res_link);
474                 /* insert new lock before ownlocks in list. */
475                 ldlm_resource_add_lock(res, ownlocks, req);
476         }
477
478         if (*flags != LDLM_FL_WAIT_NOREPROC) {
479 #ifdef HAVE_SERVER_SUPPORT
480                 if (first_enq) {
481                         /* If this is an unlock, reprocess the waitq and
482                          * send completions ASTs for locks that can now be
483                          * granted. The only problem with doing this
484                          * reprocessing here is that the completion ASTs for
485                          * newly granted locks will be sent before the unlock
486                          * completion is sent. It shouldn't be an issue. Also
487                          * note that ldlm_process_flock_lock() will recurse,
488                          * but only once because first_enq will be false from
489                          * ldlm_reprocess_queue. */
490                         if ((mode == LCK_NL) && overlaps) {
491                                 CFS_LIST_HEAD(rpc_list);
492                                 int rc;
493 restart:
494                                 ldlm_reprocess_queue(res, &res->lr_waiting,
495                                                      &rpc_list);
496
497                                 unlock_res_and_lock(req);
498                                 rc = ldlm_run_ast_work(ns, &rpc_list,
499                                                        LDLM_WORK_CP_AST);
500                                 lock_res_and_lock(req);
501                                 if (rc == -ERESTART)
502                                         GOTO(restart, -ERESTART);
503                        }
504                 } else {
505                         LASSERT(req->l_completion_ast);
506                         ldlm_add_ast_work_item(req, NULL, work_list);
507                 }
508 #else /* !HAVE_SERVER_SUPPORT */
509                 /* The only one possible case for client-side calls flock
510                  * policy function is ldlm_flock_completion_ast inside which
511                  * carries LDLM_FL_WAIT_NOREPROC flag. */
512                 CERROR("Illegal parameter for client-side-only module.\n");
513                 LBUG();
514 #endif /* HAVE_SERVER_SUPPORT */
515         }
516
517         /* In case we're reprocessing the requested lock we can't destroy
518          * it until after calling ldlm_ast_work_item() above so that lawi()
519          * can bump the reference count on req. Otherwise req could be freed
520          * before the completion AST can be sent.  */
521         if (added)
522                 ldlm_flock_destroy(req, mode, *flags);
523
524         ldlm_resource_dump(D_INFO, res);
525         RETURN(LDLM_ITER_CONTINUE);
526 }
527
528 struct ldlm_flock_wait_data {
529         struct ldlm_lock *fwd_lock;
530         int               fwd_generation;
531 };
532
533 static void
534 ldlm_flock_interrupted_wait(void *data)
535 {
536         struct ldlm_lock *lock;
537         ENTRY;
538
539         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
540
541         /* take lock off the deadlock detection hash list. */
542         lock_res_and_lock(lock);
543         ldlm_flock_blocking_unlink(lock);
544
545         /* client side - set flag to prevent lock from being put on lru list */
546         lock->l_flags |= LDLM_FL_CBPENDING;
547         unlock_res_and_lock(lock);
548
549         EXIT;
550 }
551
552 /**
553  * Flock completion calback function.
554  *
555  * \param lock [in,out]: A lock to be handled
556  * \param flags    [in]: flags
557  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
558  *
559  * \retval 0    : success
560  * \retval <0   : failure
561  */
562 int
563 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
564 {
565         cfs_flock_t                    *getlk = lock->l_ast_data;
566         struct obd_device              *obd;
567         struct obd_import              *imp = NULL;
568         struct ldlm_flock_wait_data     fwd;
569         struct l_wait_info              lwi;
570         ldlm_error_t                    err;
571         int                             rc = 0;
572         ENTRY;
573
574         CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
575                flags, data, getlk);
576
577         /* Import invalidation. We need to actually release the lock
578          * references being held, so that it can go away. No point in
579          * holding the lock even if app still believes it has it, since
580          * server already dropped it anyway. Only for granted locks too. */
581         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
582             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
583                 if (lock->l_req_mode == lock->l_granted_mode &&
584                     lock->l_granted_mode != LCK_NL &&
585                     NULL == data)
586                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
587
588                 /* Need to wake up the waiter if we were evicted */
589                 cfs_waitq_signal(&lock->l_waitq);
590                 RETURN(0);
591         }
592
593         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
594
595         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
596                        LDLM_FL_BLOCK_CONV))) {
597                 if (NULL == data)
598                         /* mds granted the lock in the reply */
599                         goto granted;
600                 /* CP AST RPC: lock get granted, wake it up */
601                 cfs_waitq_signal(&lock->l_waitq);
602                 RETURN(0);
603         }
604
605         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
606                    "sleeping");
607         fwd.fwd_lock = lock;
608         obd = class_exp2obd(lock->l_conn_export);
609
610         /* if this is a local lock, there is no import */
611         if (NULL != obd)
612                 imp = obd->u.cli.cl_import;
613
614         if (NULL != imp) {
615                 cfs_spin_lock(&imp->imp_lock);
616                 fwd.fwd_generation = imp->imp_generation;
617                 cfs_spin_unlock(&imp->imp_lock);
618         }
619
620         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
621
622         /* Go to sleep until the lock is granted. */
623         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
624
625         if (rc) {
626                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
627                            rc);
628                 RETURN(rc);
629         }
630
631 granted:
632         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
633
634         if (lock->l_destroyed) {
635                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
636                 RETURN(0);
637         }
638
639         if (lock->l_flags & LDLM_FL_FAILED) {
640                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
641                 RETURN(-EIO);
642         }
643
644         if (rc) {
645                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
646                            rc);
647                 RETURN(rc);
648         }
649
650         LDLM_DEBUG(lock, "client-side enqueue granted");
651
652         lock_res_and_lock(lock);
653
654         /* take lock off the deadlock detection hash list. */
655         ldlm_flock_blocking_unlink(lock);
656
657         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
658         cfs_list_del_init(&lock->l_res_link);
659
660         if (flags & LDLM_FL_TEST_LOCK) {
661                 /* fcntl(F_GETLK) request */
662                 /* The old mode was saved in getlk->fl_type so that if the mode
663                  * in the lock changes we can decref the appropriate refcount.*/
664                 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
665                                    LDLM_FL_WAIT_NOREPROC);
666                 switch (lock->l_granted_mode) {
667                 case LCK_PR:
668                         cfs_flock_set_type(getlk, F_RDLCK);
669                         break;
670                 case LCK_PW:
671                         cfs_flock_set_type(getlk, F_WRLCK);
672                         break;
673                 default:
674                         cfs_flock_set_type(getlk, F_UNLCK);
675                 }
676                 cfs_flock_set_pid(getlk,
677                                   (pid_t)lock->l_policy_data.l_flock.pid);
678                 cfs_flock_set_start(getlk,
679                                     (loff_t)lock->l_policy_data.l_flock.start);
680                 cfs_flock_set_end(getlk,
681                                   (loff_t)lock->l_policy_data.l_flock.end);
682         } else {
683                 int noreproc = LDLM_FL_WAIT_NOREPROC;
684
685                 /* We need to reprocess the lock to do merges or splits
686                  * with existing locks owned by this process. */
687                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
688         }
689         unlock_res_and_lock(lock);
690         RETURN(0);
691 }
692 EXPORT_SYMBOL(ldlm_flock_completion_ast);
693
694 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
695                             void *data, int flag)
696 {
697         ENTRY;
698
699         LASSERT(lock);
700         LASSERT(flag == LDLM_CB_CANCELING);
701
702         /* take lock off the deadlock detection hash list. */
703         lock_res_and_lock(lock);
704         ldlm_flock_blocking_unlink(lock);
705         unlock_res_and_lock(lock);
706         RETURN(0);
707 }
708
709 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
710                                        ldlm_policy_data_t *lpolicy)
711 {
712         memset(lpolicy, 0, sizeof(*lpolicy));
713         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
714         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
715         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
716         /* Compat code, old clients had no idea about owner field and
717          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
718          * April 2011 */
719         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
720 }
721
722
723 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
724                                        ldlm_policy_data_t *lpolicy)
725 {
726         memset(lpolicy, 0, sizeof(*lpolicy));
727         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
728         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
729         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
730         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
731 }
732
733 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
734                                      ldlm_wire_policy_data_t *wpolicy)
735 {
736         memset(wpolicy, 0, sizeof(*wpolicy));
737         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
738         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
739         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
740         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
741 }
742
743 /*
744  * Export handle<->flock hash operations.
745  */
746 static unsigned
747 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
748 {
749         return cfs_hash_u64_hash(*(__u64 *)key, mask);
750 }
751
752 static void *
753 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
754 {
755         struct ldlm_lock *lock;
756
757         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
758         return &lock->l_policy_data.l_flock.owner;
759 }
760
761 static int
762 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
763 {
764         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
765 }
766
767 static void *
768 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
769 {
770         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
771 }
772
773 static void
774 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
775 {
776         struct ldlm_lock *lock;
777         struct ldlm_flock *flock;
778
779         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
780         LDLM_LOCK_GET(lock);
781
782         flock = &lock->l_policy_data.l_flock;
783         LASSERT(flock->blocking_export != NULL);
784         class_export_get(flock->blocking_export);
785         flock->blocking_refs++;
786 }
787
788 static void
789 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
790 {
791         struct ldlm_lock *lock;
792         struct ldlm_flock *flock;
793
794         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
795         LDLM_LOCK_RELEASE(lock);
796
797         flock = &lock->l_policy_data.l_flock;
798         LASSERT(flock->blocking_export != NULL);
799         class_export_put(flock->blocking_export);
800         if (--flock->blocking_refs == 0) {
801                 flock->blocking_owner = 0;
802                 flock->blocking_export = NULL;
803         }
804 }
805
806 static cfs_hash_ops_t ldlm_export_flock_ops = {
807         .hs_hash        = ldlm_export_flock_hash,
808         .hs_key         = ldlm_export_flock_key,
809         .hs_keycmp      = ldlm_export_flock_keycmp,
810         .hs_object      = ldlm_export_flock_object,
811         .hs_get         = ldlm_export_flock_get,
812         .hs_put         = ldlm_export_flock_put,
813         .hs_put_locked  = ldlm_export_flock_put,
814 };
815
816 int ldlm_init_flock_export(struct obd_export *exp)
817 {
818         exp->exp_flock_hash =
819                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
820                                 HASH_EXP_LOCK_CUR_BITS,
821                                 HASH_EXP_LOCK_MAX_BITS,
822                                 HASH_EXP_LOCK_BKT_BITS, 0,
823                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
824                                 &ldlm_export_flock_ops,
825                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
826         if (!exp->exp_flock_hash)
827                 RETURN(-ENOMEM);
828
829         RETURN(0);
830 }
831 EXPORT_SYMBOL(ldlm_init_flock_export);
832
833 void ldlm_destroy_flock_export(struct obd_export *exp)
834 {
835         ENTRY;
836         if (exp->exp_flock_hash) {
837                 cfs_hash_putref(exp->exp_flock_hash);
838                 exp->exp_flock_hash = NULL;
839         }
840         EXIT;
841 }
842 EXPORT_SYMBOL(ldlm_destroy_flock_export);