Whamcloud - gitweb
LU-1601 ldlm: Fix flock detection for different mounts
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                             struct ldlm_lock *lock)
105 {
106         /* For server only */
107         if (req->l_export == NULL)
108                 return;
109
110         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
111
112         req->l_policy_data.l_flock.blocking_owner =
113                 lock->l_policy_data.l_flock.owner;
114         req->l_policy_data.l_flock.blocking_export =
115                 lock->l_export;
116         req->l_policy_data.l_flock.blocking_refs = 0;
117
118         cfs_hash_add(req->l_export->exp_flock_hash,
119                      &req->l_policy_data.l_flock.owner,
120                      &req->l_exp_flock_hash);
121 }
122
123 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
124 {
125         /* For server only */
126         if (req->l_export == NULL)
127                 return;
128
129         check_res_locked(req->l_resource);
130         if (req->l_export->exp_flock_hash != NULL &&
131             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
132                 cfs_hash_del(req->l_export->exp_flock_hash,
133                              &req->l_policy_data.l_flock.owner,
134                              &req->l_exp_flock_hash);
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
143                    mode, flags);
144
145         /* Safe to not lock here, since it should be empty anyway */
146         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
147
148         cfs_list_del_init(&lock->l_res_link);
149         if (flags == LDLM_FL_WAIT_NOREPROC &&
150             !(lock->l_flags & LDLM_FL_FAILED)) {
151                 /* client side - set a flag to prevent sending a CANCEL */
152                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
153
154                 /* when reaching here, it is under lock_res_and_lock(). Thus,
155                    need call the nolock version of ldlm_lock_decref_internal*/
156                 ldlm_lock_decref_internal_nolock(lock, mode);
157         }
158
159         ldlm_lock_destroy_nolock(lock);
160         EXIT;
161 }
162
163 /**
164  * POSIX locks deadlock detection code.
165  *
166  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
167  * with, we need to iterate through all blocked POSIX locks for this
168  * export and see if there is a deadlock condition arising. (i.e. when
169  * one client holds a lock on something and want a lock on something
170  * else and at the same time another client has the opposite situation).
171  */
172
173 struct ldlm_flock_lookup_cb_data {
174         __u64 *bl_owner;
175         struct ldlm_lock *lock;
176         struct obd_export *exp;
177 };
178
179 static int ldlm_flock_lookup_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
180                                 cfs_hlist_node_t *hnode, void *data)
181 {
182         struct ldlm_flock_lookup_cb_data *cb_data = data;
183         struct obd_export *exp = cfs_hash_object(hs, hnode);
184         struct ldlm_lock *lock;
185
186         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
187         if (lock == NULL)
188                 return 0;
189
190         /* Stop on first found lock. Same process can't sleep twice */
191         cb_data->lock = lock;
192         cb_data->exp = class_export_get(exp);
193
194         return 1;
195 }
196
197 static int
198 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
199 {
200         struct obd_export *req_exp = req->l_export;
201         struct obd_export *bl_exp = bl_lock->l_export;
202         __u64 req_owner = req->l_policy_data.l_flock.owner;
203         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
204
205         /* For server only */
206         if (req_exp == NULL)
207                 return 0;
208
209         class_export_get(bl_exp);
210         while (1) {
211                 struct ldlm_flock_lookup_cb_data cb_data = {
212                                         .bl_owner = &bl_owner,
213                                         .lock = NULL,
214                                         .exp = NULL };
215                 struct obd_export *bl_exp_new;
216                 struct ldlm_lock *lock = NULL;
217                 struct ldlm_flock *flock;
218
219                 if (bl_exp->exp_flock_hash != NULL) {
220                         cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
221                                 &bl_exp->exp_connection->c_peer.nid,
222                                 ldlm_flock_lookup_cb, &cb_data);
223                         lock = cb_data.lock;
224                 }
225                 if (lock == NULL)
226                         break;
227
228                 class_export_put(bl_exp);
229                 bl_exp = cb_data.exp;
230
231                 LASSERT(req != lock);
232                 flock = &lock->l_policy_data.l_flock;
233                 LASSERT(flock->owner == bl_owner);
234                 bl_owner = flock->blocking_owner;
235                 bl_exp_new = class_export_get(flock->blocking_export);
236                 class_export_put(bl_exp);
237
238                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
239                 bl_exp = bl_exp_new;
240
241                 if (bl_exp->exp_failed)
242                         break;
243
244                 if (bl_owner == req_owner &&
245                     (bl_exp->exp_connection->c_peer.nid ==
246                      req_exp->exp_connection->c_peer.nid)) {
247                         class_export_put(bl_exp);
248                         return 1;
249                 }
250         }
251         class_export_put(bl_exp);
252
253         return 0;
254 }
255
256 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
257                                                 cfs_list_t *work_list)
258 {
259         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
260
261         if ((exp_connect_flags(lock->l_export) &
262                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
263                 CERROR("deadlock found, but client doesn't "
264                                 "support flock canceliation\n");
265         } else {
266                 LASSERT(lock->l_completion_ast);
267                 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
268                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
269                         LDLM_FL_FLOCK_DEADLOCK;
270                 ldlm_flock_blocking_unlink(lock);
271                 ldlm_resource_unlink_lock(lock);
272                 ldlm_add_ast_work_item(lock, NULL, work_list);
273         }
274 }
275
276 /**
277  * Process a granting attempt for flock lock.
278  * Must be called under ns lock held.
279  *
280  * This function looks for any conflicts for \a lock in the granted or
281  * waiting queues. The lock is granted if no conflicts are found in
282  * either queue.
283  *
284  * It is also responsible for splitting a lock if a portion of the lock
285  * is released.
286  *
287  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
288  *   - blocking ASTs have already been sent
289  *
290  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
291  *   - blocking ASTs have not been sent yet, so list of conflicting locks
292  *     would be collected and ASTs sent.
293  */
294 int
295 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
296                         ldlm_error_t *err, cfs_list_t *work_list)
297 {
298         struct ldlm_resource *res = req->l_resource;
299         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
300         cfs_list_t *tmp;
301         cfs_list_t *ownlocks = NULL;
302         struct ldlm_lock *lock = NULL;
303         struct ldlm_lock *new = req;
304         struct ldlm_lock *new2 = NULL;
305         ldlm_mode_t mode = req->l_req_mode;
306         int local = ns_is_client(ns);
307         int added = (mode == LCK_NL);
308         int overlaps = 0;
309         int splitted = 0;
310         const struct ldlm_callback_suite null_cbs = { NULL };
311         ENTRY;
312
313         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
314                LPU64" end "LPU64"\n", *flags,
315                new->l_policy_data.l_flock.owner,
316                new->l_policy_data.l_flock.pid, mode,
317                req->l_policy_data.l_flock.start,
318                req->l_policy_data.l_flock.end);
319
320         *err = ELDLM_OK;
321
322         if (local) {
323                 /* No blocking ASTs are sent to the clients for
324                  * Posix file & record locks */
325                 req->l_blocking_ast = NULL;
326         } else {
327                 /* Called on the server for lock cancels. */
328                 req->l_blocking_ast = ldlm_flock_blocking_ast;
329         }
330
331 reprocess:
332         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
333                 /* This loop determines where this processes locks start
334                  * in the resource lr_granted list. */
335                 cfs_list_for_each(tmp, &res->lr_granted) {
336                         lock = cfs_list_entry(tmp, struct ldlm_lock,
337                                               l_res_link);
338                         if (ldlm_same_flock_owner(lock, req)) {
339                                 ownlocks = tmp;
340                                 break;
341                         }
342                 }
343         } else {
344                 int reprocess_failed = 0;
345                 lockmode_verify(mode);
346
347                 /* This loop determines if there are existing locks
348                  * that conflict with the new lock request. */
349                 cfs_list_for_each(tmp, &res->lr_granted) {
350                         lock = cfs_list_entry(tmp, struct ldlm_lock,
351                                               l_res_link);
352
353                         if (ldlm_same_flock_owner(lock, req)) {
354                                 if (!ownlocks)
355                                         ownlocks = tmp;
356                                 continue;
357                         }
358
359                         /* locks are compatible, overlap doesn't matter */
360                         if (lockmode_compat(lock->l_granted_mode, mode))
361                                 continue;
362
363                         if (!ldlm_flocks_overlap(lock, req))
364                                 continue;
365
366                         if (!first_enq) {
367                                 reprocess_failed = 1;
368                                 if (ldlm_flock_deadlock(req, lock)) {
369                                         ldlm_flock_cancel_on_deadlock(req,
370                                                         work_list);
371                                         RETURN(LDLM_ITER_CONTINUE);
372                                 }
373                                 continue;
374                         }
375
376                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
377                                 ldlm_flock_destroy(req, mode, *flags);
378                                 *err = -EAGAIN;
379                                 RETURN(LDLM_ITER_STOP);
380                         }
381
382                         if (*flags & LDLM_FL_TEST_LOCK) {
383                                 ldlm_flock_destroy(req, mode, *flags);
384                                 req->l_req_mode = lock->l_granted_mode;
385                                 req->l_policy_data.l_flock.pid =
386                                         lock->l_policy_data.l_flock.pid;
387                                 req->l_policy_data.l_flock.start =
388                                         lock->l_policy_data.l_flock.start;
389                                 req->l_policy_data.l_flock.end =
390                                         lock->l_policy_data.l_flock.end;
391                                 *flags |= LDLM_FL_LOCK_CHANGED;
392                                 RETURN(LDLM_ITER_STOP);
393                         }
394
395                         /* add lock to blocking list before deadlock
396                          * check to prevent race */
397                         ldlm_flock_blocking_link(req, lock);
398
399                         if (ldlm_flock_deadlock(req, lock)) {
400                                 ldlm_flock_blocking_unlink(req);
401                                 ldlm_flock_destroy(req, mode, *flags);
402                                 *err = -EDEADLK;
403                                 RETURN(LDLM_ITER_STOP);
404                         }
405
406                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
407                         *flags |= LDLM_FL_BLOCK_GRANTED;
408                         RETURN(LDLM_ITER_STOP);
409                 }
410                 if (reprocess_failed)
411                         RETURN(LDLM_ITER_CONTINUE);
412         }
413
414         if (*flags & LDLM_FL_TEST_LOCK) {
415                 ldlm_flock_destroy(req, mode, *flags);
416                 req->l_req_mode = LCK_NL;
417                 *flags |= LDLM_FL_LOCK_CHANGED;
418                 RETURN(LDLM_ITER_STOP);
419         }
420
421         /* In case we had slept on this lock request take it off of the
422          * deadlock detection hash list. */
423         ldlm_flock_blocking_unlink(req);
424
425         /* Scan the locks owned by this process that overlap this request.
426          * We may have to merge or split existing locks. */
427
428         if (!ownlocks)
429                 ownlocks = &res->lr_granted;
430
431         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
432                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
433
434                 if (!ldlm_same_flock_owner(lock, new))
435                         break;
436
437                 if (lock->l_granted_mode == mode) {
438                         /* If the modes are the same then we need to process
439                          * locks that overlap OR adjoin the new lock. The extra
440                          * logic condition is necessary to deal with arithmetic
441                          * overflow and underflow. */
442                         if ((new->l_policy_data.l_flock.start >
443                              (lock->l_policy_data.l_flock.end + 1))
444                             && (lock->l_policy_data.l_flock.end !=
445                                 OBD_OBJECT_EOF))
446                                 continue;
447
448                         if ((new->l_policy_data.l_flock.end <
449                              (lock->l_policy_data.l_flock.start - 1))
450                             && (lock->l_policy_data.l_flock.start != 0))
451                                 break;
452
453                         if (new->l_policy_data.l_flock.start <
454                             lock->l_policy_data.l_flock.start) {
455                                 lock->l_policy_data.l_flock.start =
456                                         new->l_policy_data.l_flock.start;
457                         } else {
458                                 new->l_policy_data.l_flock.start =
459                                         lock->l_policy_data.l_flock.start;
460                         }
461
462                         if (new->l_policy_data.l_flock.end >
463                             lock->l_policy_data.l_flock.end) {
464                                 lock->l_policy_data.l_flock.end =
465                                         new->l_policy_data.l_flock.end;
466                         } else {
467                                 new->l_policy_data.l_flock.end =
468                                         lock->l_policy_data.l_flock.end;
469                         }
470
471                         if (added) {
472                                 ldlm_flock_destroy(lock, mode, *flags);
473                         } else {
474                                 new = lock;
475                                 added = 1;
476                         }
477                         continue;
478                 }
479
480                 if (new->l_policy_data.l_flock.start >
481                     lock->l_policy_data.l_flock.end)
482                         continue;
483
484                 if (new->l_policy_data.l_flock.end <
485                     lock->l_policy_data.l_flock.start)
486                         break;
487
488                 ++overlaps;
489
490                 if (new->l_policy_data.l_flock.start <=
491                     lock->l_policy_data.l_flock.start) {
492                         if (new->l_policy_data.l_flock.end <
493                             lock->l_policy_data.l_flock.end) {
494                                 lock->l_policy_data.l_flock.start =
495                                         new->l_policy_data.l_flock.end + 1;
496                                 break;
497                         }
498                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
499                         continue;
500                 }
501                 if (new->l_policy_data.l_flock.end >=
502                     lock->l_policy_data.l_flock.end) {
503                         lock->l_policy_data.l_flock.end =
504                                 new->l_policy_data.l_flock.start - 1;
505                         continue;
506                 }
507
508                 /* split the existing lock into two locks */
509
510                 /* if this is an F_UNLCK operation then we could avoid
511                  * allocating a new lock and use the req lock passed in
512                  * with the request but this would complicate the reply
513                  * processing since updates to req get reflected in the
514                  * reply. The client side replays the lock request so
515                  * it must see the original lock data in the reply. */
516
517                 /* XXX - if ldlm_lock_new() can sleep we should
518                  * release the lr_lock, allocate the new lock,
519                  * and restart processing this lock. */
520                 if (!new2) {
521                         unlock_res_and_lock(req);
522                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
523                                                 lock->l_granted_mode, &null_cbs,
524                                                 NULL, 0, LVB_T_NONE);
525                         lock_res_and_lock(req);
526                         if (!new2) {
527                                 ldlm_flock_destroy(req, lock->l_granted_mode,
528                                                    *flags);
529                                 *err = -ENOLCK;
530                                 RETURN(LDLM_ITER_STOP);
531                         }
532                         goto reprocess;
533                 }
534
535                 splitted = 1;
536
537                 new2->l_granted_mode = lock->l_granted_mode;
538                 new2->l_policy_data.l_flock.pid =
539                         new->l_policy_data.l_flock.pid;
540                 new2->l_policy_data.l_flock.owner =
541                         new->l_policy_data.l_flock.owner;
542                 new2->l_policy_data.l_flock.start =
543                         lock->l_policy_data.l_flock.start;
544                 new2->l_policy_data.l_flock.end =
545                         new->l_policy_data.l_flock.start - 1;
546                 lock->l_policy_data.l_flock.start =
547                         new->l_policy_data.l_flock.end + 1;
548                 new2->l_conn_export = lock->l_conn_export;
549                 if (lock->l_export != NULL) {
550                         new2->l_export = class_export_lock_get(lock->l_export, new2);
551                         if (new2->l_export->exp_lock_hash &&
552                             cfs_hlist_unhashed(&new2->l_exp_hash))
553                                 cfs_hash_add(new2->l_export->exp_lock_hash,
554                                              &new2->l_remote_handle,
555                                              &new2->l_exp_hash);
556                 }
557                 if (*flags == LDLM_FL_WAIT_NOREPROC)
558                         ldlm_lock_addref_internal_nolock(new2,
559                                                          lock->l_granted_mode);
560
561                 /* insert new2 at lock */
562                 ldlm_resource_add_lock(res, ownlocks, new2);
563                 LDLM_LOCK_RELEASE(new2);
564                 break;
565         }
566
567         /* if new2 is created but never used, destroy it*/
568         if (splitted == 0 && new2 != NULL)
569                 ldlm_lock_destroy_nolock(new2);
570
571         /* At this point we're granting the lock request. */
572         req->l_granted_mode = req->l_req_mode;
573
574         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
575         if (!added) {
576                 cfs_list_del_init(&req->l_res_link);
577                 /* insert new lock before ownlocks in list. */
578                 ldlm_resource_add_lock(res, ownlocks, req);
579         }
580
581         if (*flags != LDLM_FL_WAIT_NOREPROC) {
582 #ifdef HAVE_SERVER_SUPPORT
583                 if (first_enq) {
584                         /* If this is an unlock, reprocess the waitq and
585                          * send completions ASTs for locks that can now be
586                          * granted. The only problem with doing this
587                          * reprocessing here is that the completion ASTs for
588                          * newly granted locks will be sent before the unlock
589                          * completion is sent. It shouldn't be an issue. Also
590                          * note that ldlm_process_flock_lock() will recurse,
591                          * but only once because first_enq will be false from
592                          * ldlm_reprocess_queue. */
593                         if ((mode == LCK_NL) && overlaps) {
594                                 CFS_LIST_HEAD(rpc_list);
595                                 int rc;
596 restart:
597                                 ldlm_reprocess_queue(res, &res->lr_waiting,
598                                                      &rpc_list);
599
600                                 unlock_res_and_lock(req);
601                                 rc = ldlm_run_ast_work(ns, &rpc_list,
602                                                        LDLM_WORK_CP_AST);
603                                 lock_res_and_lock(req);
604                                 if (rc == -ERESTART)
605                                         GOTO(restart, -ERESTART);
606                        }
607                 } else {
608                         LASSERT(req->l_completion_ast);
609                         ldlm_add_ast_work_item(req, NULL, work_list);
610                 }
611 #else /* !HAVE_SERVER_SUPPORT */
612                 /* The only one possible case for client-side calls flock
613                  * policy function is ldlm_flock_completion_ast inside which
614                  * carries LDLM_FL_WAIT_NOREPROC flag. */
615                 CERROR("Illegal parameter for client-side-only module.\n");
616                 LBUG();
617 #endif /* HAVE_SERVER_SUPPORT */
618         }
619
620         /* In case we're reprocessing the requested lock we can't destroy
621          * it until after calling ldlm_add_ast_work_item() above so that laawi()
622          * can bump the reference count on \a req. Otherwise \a req
623          * could be freed before the completion AST can be sent.  */
624         if (added)
625                 ldlm_flock_destroy(req, mode, *flags);
626
627         ldlm_resource_dump(D_INFO, res);
628         RETURN(LDLM_ITER_CONTINUE);
629 }
630
631 struct ldlm_flock_wait_data {
632         struct ldlm_lock *fwd_lock;
633         int               fwd_generation;
634 };
635
636 static void
637 ldlm_flock_interrupted_wait(void *data)
638 {
639         struct ldlm_lock *lock;
640         ENTRY;
641
642         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
643
644         /* take lock off the deadlock detection hash list. */
645         lock_res_and_lock(lock);
646         ldlm_flock_blocking_unlink(lock);
647
648         /* client side - set flag to prevent lock from being put on LRU list */
649         lock->l_flags |= LDLM_FL_CBPENDING;
650         unlock_res_and_lock(lock);
651
652         EXIT;
653 }
654
655 /**
656  * Flock completion callback function.
657  *
658  * \param lock [in,out]: A lock to be handled
659  * \param flags    [in]: flags
660  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
661  *
662  * \retval 0    : success
663  * \retval <0   : failure
664  */
665 int
666 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
667 {
668         struct file_lock                *getlk = lock->l_ast_data;
669         struct obd_device              *obd;
670         struct obd_import              *imp = NULL;
671         struct ldlm_flock_wait_data     fwd;
672         struct l_wait_info              lwi;
673         ldlm_error_t                    err;
674         int                             rc = 0;
675         ENTRY;
676
677         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
678                flags, data, getlk);
679
680         /* Import invalidation. We need to actually release the lock
681          * references being held, so that it can go away. No point in
682          * holding the lock even if app still believes it has it, since
683          * server already dropped it anyway. Only for granted locks too. */
684         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
685             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
686                 if (lock->l_req_mode == lock->l_granted_mode &&
687                     lock->l_granted_mode != LCK_NL &&
688                     NULL == data)
689                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
690
691                 /* Need to wake up the waiter if we were evicted */
692                 wake_up(&lock->l_waitq);
693                 RETURN(0);
694         }
695
696         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
697
698         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
699                        LDLM_FL_BLOCK_CONV))) {
700                 if (NULL == data)
701                         /* mds granted the lock in the reply */
702                         goto granted;
703                 /* CP AST RPC: lock get granted, wake it up */
704                 wake_up(&lock->l_waitq);
705                 RETURN(0);
706         }
707
708         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
709                    "sleeping");
710         fwd.fwd_lock = lock;
711         obd = class_exp2obd(lock->l_conn_export);
712
713         /* if this is a local lock, there is no import */
714         if (NULL != obd)
715                 imp = obd->u.cli.cl_import;
716
717         if (NULL != imp) {
718                 spin_lock(&imp->imp_lock);
719                 fwd.fwd_generation = imp->imp_generation;
720                 spin_unlock(&imp->imp_lock);
721         }
722
723         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
724
725         /* Go to sleep until the lock is granted. */
726         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
727
728         if (rc) {
729                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
730                            rc);
731                 RETURN(rc);
732         }
733
734 granted:
735         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
736
737         if (lock->l_flags & LDLM_FL_FAILED) {
738                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
739                 RETURN(-EIO);
740         }
741
742         LDLM_DEBUG(lock, "client-side enqueue granted");
743
744         lock_res_and_lock(lock);
745
746
747         /* Protect against race where lock could have been just destroyed
748          * due to overlap in ldlm_process_flock_lock().
749          */
750         if (lock->l_flags & LDLM_FL_DESTROYED) {
751                 unlock_res_and_lock(lock);
752                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
753                 RETURN(0);
754         }
755
756         /* take lock off the deadlock detection hash list. */
757         ldlm_flock_blocking_unlink(lock);
758
759         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
760         cfs_list_del_init(&lock->l_res_link);
761
762         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
763                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
764                 rc = -EDEADLK;
765         } else if (flags & LDLM_FL_TEST_LOCK) {
766                 /* fcntl(F_GETLK) request */
767                 /* The old mode was saved in getlk->fl_type so that if the mode
768                  * in the lock changes we can decref the appropriate refcount.*/
769                 ldlm_flock_destroy(lock, flock_type(getlk),
770                                    LDLM_FL_WAIT_NOREPROC);
771                 switch (lock->l_granted_mode) {
772                 case LCK_PR:
773                         flock_set_type(getlk, F_RDLCK);
774                         break;
775                 case LCK_PW:
776                         flock_set_type(getlk, F_WRLCK);
777                         break;
778                 default:
779                         flock_set_type(getlk, F_UNLCK);
780                 }
781                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
782                 flock_set_start(getlk,
783                                 (loff_t)lock->l_policy_data.l_flock.start);
784                 flock_set_end(getlk,
785                               (loff_t)lock->l_policy_data.l_flock.end);
786         } else {
787                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
788
789                 /* We need to reprocess the lock to do merges or splits
790                  * with existing locks owned by this process. */
791                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
792         }
793         unlock_res_and_lock(lock);
794         RETURN(rc);
795 }
796 EXPORT_SYMBOL(ldlm_flock_completion_ast);
797
798 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
799                             void *data, int flag)
800 {
801         ENTRY;
802
803         LASSERT(lock);
804         LASSERT(flag == LDLM_CB_CANCELING);
805
806         /* take lock off the deadlock detection hash list. */
807         lock_res_and_lock(lock);
808         ldlm_flock_blocking_unlink(lock);
809         unlock_res_and_lock(lock);
810         RETURN(0);
811 }
812
813 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
814                                        ldlm_policy_data_t *lpolicy)
815 {
816         memset(lpolicy, 0, sizeof(*lpolicy));
817         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
818         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
819         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
820         /* Compat code, old clients had no idea about owner field and
821          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
822          * April 2011 */
823         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
824 }
825
826
827 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
828                                        ldlm_policy_data_t *lpolicy)
829 {
830         memset(lpolicy, 0, sizeof(*lpolicy));
831         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
832         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
833         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
834         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
835 }
836
837 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
838                                      ldlm_wire_policy_data_t *wpolicy)
839 {
840         memset(wpolicy, 0, sizeof(*wpolicy));
841         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
842         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
843         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
844         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
845 }
846
847 /*
848  * Export handle<->flock hash operations.
849  */
850 static unsigned
851 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
852 {
853         return cfs_hash_u64_hash(*(__u64 *)key, mask);
854 }
855
856 static void *
857 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
858 {
859         struct ldlm_lock *lock;
860
861         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
862         return &lock->l_policy_data.l_flock.owner;
863 }
864
865 static int
866 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
867 {
868         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
869 }
870
871 static void *
872 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
873 {
874         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
875 }
876
877 static void
878 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
879 {
880         struct ldlm_lock *lock;
881         struct ldlm_flock *flock;
882
883         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
884         LDLM_LOCK_GET(lock);
885
886         flock = &lock->l_policy_data.l_flock;
887         LASSERT(flock->blocking_export != NULL);
888         class_export_get(flock->blocking_export);
889         flock->blocking_refs++;
890 }
891
892 static void
893 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
894 {
895         struct ldlm_lock *lock;
896         struct ldlm_flock *flock;
897
898         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
899         LDLM_LOCK_RELEASE(lock);
900
901         flock = &lock->l_policy_data.l_flock;
902         LASSERT(flock->blocking_export != NULL);
903         class_export_put(flock->blocking_export);
904         if (--flock->blocking_refs == 0) {
905                 flock->blocking_owner = 0;
906                 flock->blocking_export = NULL;
907         }
908 }
909
910 static cfs_hash_ops_t ldlm_export_flock_ops = {
911         .hs_hash        = ldlm_export_flock_hash,
912         .hs_key         = ldlm_export_flock_key,
913         .hs_keycmp      = ldlm_export_flock_keycmp,
914         .hs_object      = ldlm_export_flock_object,
915         .hs_get         = ldlm_export_flock_get,
916         .hs_put         = ldlm_export_flock_put,
917         .hs_put_locked  = ldlm_export_flock_put,
918 };
919
920 int ldlm_init_flock_export(struct obd_export *exp)
921 {
922         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
923                 RETURN(0);
924
925         exp->exp_flock_hash =
926                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
927                                 HASH_EXP_LOCK_CUR_BITS,
928                                 HASH_EXP_LOCK_MAX_BITS,
929                                 HASH_EXP_LOCK_BKT_BITS, 0,
930                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
931                                 &ldlm_export_flock_ops,
932                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
933         if (!exp->exp_flock_hash)
934                 RETURN(-ENOMEM);
935
936         RETURN(0);
937 }
938 EXPORT_SYMBOL(ldlm_init_flock_export);
939
940 void ldlm_destroy_flock_export(struct obd_export *exp)
941 {
942         ENTRY;
943         if (exp->exp_flock_hash) {
944                 cfs_hash_putref(exp->exp_flock_hash);
945                 exp->exp_flock_hash = NULL;
946         }
947         EXIT;
948 }
949 EXPORT_SYMBOL(ldlm_destroy_flock_export);