Whamcloud - gitweb
LU-3097 build: fix 'deadcode' errors
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                             struct ldlm_lock *lock)
105 {
106         /* For server only */
107         if (req->l_export == NULL)
108                 return;
109
110         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
111
112         req->l_policy_data.l_flock.blocking_owner =
113                 lock->l_policy_data.l_flock.owner;
114         req->l_policy_data.l_flock.blocking_export =
115                 lock->l_export;
116         req->l_policy_data.l_flock.blocking_refs = 0;
117
118         cfs_hash_add(req->l_export->exp_flock_hash,
119                      &req->l_policy_data.l_flock.owner,
120                      &req->l_exp_flock_hash);
121 }
122
123 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
124 {
125         /* For server only */
126         if (req->l_export == NULL)
127                 return;
128
129         check_res_locked(req->l_resource);
130         if (req->l_export->exp_flock_hash != NULL &&
131             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
132                 cfs_hash_del(req->l_export->exp_flock_hash,
133                              &req->l_policy_data.l_flock.owner,
134                              &req->l_exp_flock_hash);
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
143                    mode, flags);
144
145         /* Safe to not lock here, since it should be empty anyway */
146         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
147
148         cfs_list_del_init(&lock->l_res_link);
149         if (flags == LDLM_FL_WAIT_NOREPROC &&
150             !(lock->l_flags & LDLM_FL_FAILED)) {
151                 /* client side - set a flag to prevent sending a CANCEL */
152                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
153
154                 /* when reaching here, it is under lock_res_and_lock(). Thus,
155                    need call the nolock version of ldlm_lock_decref_internal*/
156                 ldlm_lock_decref_internal_nolock(lock, mode);
157         }
158
159         ldlm_lock_destroy_nolock(lock);
160         EXIT;
161 }
162
163 /**
164  * POSIX locks deadlock detection code.
165  *
166  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
167  * with, we need to iterate through all blocked POSIX locks for this
168  * export and see if there is a deadlock condition arising. (i.e. when
169  * one client holds a lock on something and want a lock on something
170  * else and at the same time another client has the opposite situation).
171  */
172 static int
173 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
174 {
175         struct obd_export *req_exp = req->l_export;
176         struct obd_export *bl_exp = bl_lock->l_export;
177         __u64 req_owner = req->l_policy_data.l_flock.owner;
178         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
179
180         /* For server only */
181         if (req_exp == NULL)
182                 return 0;
183
184         class_export_get(bl_exp);
185         while (1) {
186                 struct obd_export *bl_exp_new;
187                 struct ldlm_lock *lock = NULL;
188                 struct ldlm_flock *flock;
189
190                 if (bl_exp->exp_flock_hash != NULL)
191                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
192                                                &bl_owner);
193                 if (lock == NULL)
194                         break;
195
196                 LASSERT(req != lock);
197                 flock = &lock->l_policy_data.l_flock;
198                 LASSERT(flock->owner == bl_owner);
199                 bl_owner = flock->blocking_owner;
200                 bl_exp_new = class_export_get(flock->blocking_export);
201                 class_export_put(bl_exp);
202
203                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
204                 bl_exp = bl_exp_new;
205
206                 if (bl_owner == req_owner && bl_exp == req_exp) {
207                         class_export_put(bl_exp);
208                         return 1;
209                 }
210         }
211         class_export_put(bl_exp);
212
213         return 0;
214 }
215
216 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
217                                                 cfs_list_t *work_list)
218 {
219         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
220
221         if ((exp_connect_flags(lock->l_export) &
222                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
223                 CERROR("deadlock found, but client doesn't "
224                                 "support flock canceliation\n");
225         } else {
226                 LASSERT(lock->l_completion_ast);
227                 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
228                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
229                         LDLM_FL_FLOCK_DEADLOCK;
230                 ldlm_flock_blocking_unlink(lock);
231                 ldlm_resource_unlink_lock(lock);
232                 ldlm_add_ast_work_item(lock, NULL, work_list);
233         }
234 }
235
236 /**
237  * Process a granting attempt for flock lock.
238  * Must be called under ns lock held.
239  *
240  * This function looks for any conflicts for \a lock in the granted or
241  * waiting queues. The lock is granted if no conflicts are found in
242  * either queue.
243  *
244  * It is also responsible for splitting a lock if a portion of the lock
245  * is released.
246  *
247  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
248  *   - blocking ASTs have already been sent
249  *
250  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
251  *   - blocking ASTs have not been sent yet, so list of conflicting locks
252  *     would be collected and ASTs sent.
253  */
254 int
255 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
256                         ldlm_error_t *err, cfs_list_t *work_list)
257 {
258         struct ldlm_resource *res = req->l_resource;
259         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
260         cfs_list_t *tmp;
261         cfs_list_t *ownlocks = NULL;
262         struct ldlm_lock *lock = NULL;
263         struct ldlm_lock *new = req;
264         struct ldlm_lock *new2 = NULL;
265         ldlm_mode_t mode = req->l_req_mode;
266         int local = ns_is_client(ns);
267         int added = (mode == LCK_NL);
268         int overlaps = 0;
269         int splitted = 0;
270         const struct ldlm_callback_suite null_cbs = { NULL };
271         ENTRY;
272
273         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
274                LPU64" end "LPU64"\n", *flags,
275                new->l_policy_data.l_flock.owner,
276                new->l_policy_data.l_flock.pid, mode,
277                req->l_policy_data.l_flock.start,
278                req->l_policy_data.l_flock.end);
279
280         *err = ELDLM_OK;
281
282         if (local) {
283                 /* No blocking ASTs are sent to the clients for
284                  * Posix file & record locks */
285                 req->l_blocking_ast = NULL;
286         } else {
287                 /* Called on the server for lock cancels. */
288                 req->l_blocking_ast = ldlm_flock_blocking_ast;
289         }
290
291 reprocess:
292         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
293                 /* This loop determines where this processes locks start
294                  * in the resource lr_granted list. */
295                 cfs_list_for_each(tmp, &res->lr_granted) {
296                         lock = cfs_list_entry(tmp, struct ldlm_lock,
297                                               l_res_link);
298                         if (ldlm_same_flock_owner(lock, req)) {
299                                 ownlocks = tmp;
300                                 break;
301                         }
302                 }
303         } else {
304                 int reprocess_failed = 0;
305                 lockmode_verify(mode);
306
307                 /* This loop determines if there are existing locks
308                  * that conflict with the new lock request. */
309                 cfs_list_for_each(tmp, &res->lr_granted) {
310                         lock = cfs_list_entry(tmp, struct ldlm_lock,
311                                               l_res_link);
312
313                         if (ldlm_same_flock_owner(lock, req)) {
314                                 if (!ownlocks)
315                                         ownlocks = tmp;
316                                 continue;
317                         }
318
319                         /* locks are compatible, overlap doesn't matter */
320                         if (lockmode_compat(lock->l_granted_mode, mode))
321                                 continue;
322
323                         if (!ldlm_flocks_overlap(lock, req))
324                                 continue;
325
326                         if (!first_enq) {
327                                 reprocess_failed = 1;
328                                 if (ldlm_flock_deadlock(req, lock)) {
329                                         ldlm_flock_cancel_on_deadlock(req,
330                                                         work_list);
331                                         RETURN(LDLM_ITER_CONTINUE);
332                                 }
333                                 continue;
334                         }
335
336                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
337                                 ldlm_flock_destroy(req, mode, *flags);
338                                 *err = -EAGAIN;
339                                 RETURN(LDLM_ITER_STOP);
340                         }
341
342                         if (*flags & LDLM_FL_TEST_LOCK) {
343                                 ldlm_flock_destroy(req, mode, *flags);
344                                 req->l_req_mode = lock->l_granted_mode;
345                                 req->l_policy_data.l_flock.pid =
346                                         lock->l_policy_data.l_flock.pid;
347                                 req->l_policy_data.l_flock.start =
348                                         lock->l_policy_data.l_flock.start;
349                                 req->l_policy_data.l_flock.end =
350                                         lock->l_policy_data.l_flock.end;
351                                 *flags |= LDLM_FL_LOCK_CHANGED;
352                                 RETURN(LDLM_ITER_STOP);
353                         }
354
355                         /* add lock to blocking list before deadlock
356                          * check to prevent race */
357                         ldlm_flock_blocking_link(req, lock);
358
359                         if (ldlm_flock_deadlock(req, lock)) {
360                                 ldlm_flock_blocking_unlink(req);
361                                 ldlm_flock_destroy(req, mode, *flags);
362                                 *err = -EDEADLK;
363                                 RETURN(LDLM_ITER_STOP);
364                         }
365
366                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
367                         *flags |= LDLM_FL_BLOCK_GRANTED;
368                         RETURN(LDLM_ITER_STOP);
369                 }
370                 if (reprocess_failed)
371                         RETURN(LDLM_ITER_CONTINUE);
372         }
373
374         if (*flags & LDLM_FL_TEST_LOCK) {
375                 ldlm_flock_destroy(req, mode, *flags);
376                 req->l_req_mode = LCK_NL;
377                 *flags |= LDLM_FL_LOCK_CHANGED;
378                 RETURN(LDLM_ITER_STOP);
379         }
380
381         /* In case we had slept on this lock request take it off of the
382          * deadlock detection hash list. */
383         ldlm_flock_blocking_unlink(req);
384
385         /* Scan the locks owned by this process that overlap this request.
386          * We may have to merge or split existing locks. */
387
388         if (!ownlocks)
389                 ownlocks = &res->lr_granted;
390
391         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
392                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
393
394                 if (!ldlm_same_flock_owner(lock, new))
395                         break;
396
397                 if (lock->l_granted_mode == mode) {
398                         /* If the modes are the same then we need to process
399                          * locks that overlap OR adjoin the new lock. The extra
400                          * logic condition is necessary to deal with arithmetic
401                          * overflow and underflow. */
402                         if ((new->l_policy_data.l_flock.start >
403                              (lock->l_policy_data.l_flock.end + 1))
404                             && (lock->l_policy_data.l_flock.end !=
405                                 OBD_OBJECT_EOF))
406                                 continue;
407
408                         if ((new->l_policy_data.l_flock.end <
409                              (lock->l_policy_data.l_flock.start - 1))
410                             && (lock->l_policy_data.l_flock.start != 0))
411                                 break;
412
413                         if (new->l_policy_data.l_flock.start <
414                             lock->l_policy_data.l_flock.start) {
415                                 lock->l_policy_data.l_flock.start =
416                                         new->l_policy_data.l_flock.start;
417                         } else {
418                                 new->l_policy_data.l_flock.start =
419                                         lock->l_policy_data.l_flock.start;
420                         }
421
422                         if (new->l_policy_data.l_flock.end >
423                             lock->l_policy_data.l_flock.end) {
424                                 lock->l_policy_data.l_flock.end =
425                                         new->l_policy_data.l_flock.end;
426                         } else {
427                                 new->l_policy_data.l_flock.end =
428                                         lock->l_policy_data.l_flock.end;
429                         }
430
431                         if (added) {
432                                 ldlm_flock_destroy(lock, mode, *flags);
433                         } else {
434                                 new = lock;
435                                 added = 1;
436                         }
437                         continue;
438                 }
439
440                 if (new->l_policy_data.l_flock.start >
441                     lock->l_policy_data.l_flock.end)
442                         continue;
443
444                 if (new->l_policy_data.l_flock.end <
445                     lock->l_policy_data.l_flock.start)
446                         break;
447
448                 ++overlaps;
449
450                 if (new->l_policy_data.l_flock.start <=
451                     lock->l_policy_data.l_flock.start) {
452                         if (new->l_policy_data.l_flock.end <
453                             lock->l_policy_data.l_flock.end) {
454                                 lock->l_policy_data.l_flock.start =
455                                         new->l_policy_data.l_flock.end + 1;
456                                 break;
457                         }
458                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
459                         continue;
460                 }
461                 if (new->l_policy_data.l_flock.end >=
462                     lock->l_policy_data.l_flock.end) {
463                         lock->l_policy_data.l_flock.end =
464                                 new->l_policy_data.l_flock.start - 1;
465                         continue;
466                 }
467
468                 /* split the existing lock into two locks */
469
470                 /* if this is an F_UNLCK operation then we could avoid
471                  * allocating a new lock and use the req lock passed in
472                  * with the request but this would complicate the reply
473                  * processing since updates to req get reflected in the
474                  * reply. The client side replays the lock request so
475                  * it must see the original lock data in the reply. */
476
477                 /* XXX - if ldlm_lock_new() can sleep we should
478                  * release the lr_lock, allocate the new lock,
479                  * and restart processing this lock. */
480                 if (!new2) {
481                         unlock_res_and_lock(req);
482                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
483                                                 lock->l_granted_mode, &null_cbs,
484                                                 NULL, 0, LVB_T_NONE);
485                         lock_res_and_lock(req);
486                         if (!new2) {
487                                 ldlm_flock_destroy(req, lock->l_granted_mode,
488                                                    *flags);
489                                 *err = -ENOLCK;
490                                 RETURN(LDLM_ITER_STOP);
491                         }
492                         goto reprocess;
493                 }
494
495                 splitted = 1;
496
497                 new2->l_granted_mode = lock->l_granted_mode;
498                 new2->l_policy_data.l_flock.pid =
499                         new->l_policy_data.l_flock.pid;
500                 new2->l_policy_data.l_flock.owner =
501                         new->l_policy_data.l_flock.owner;
502                 new2->l_policy_data.l_flock.start =
503                         lock->l_policy_data.l_flock.start;
504                 new2->l_policy_data.l_flock.end =
505                         new->l_policy_data.l_flock.start - 1;
506                 lock->l_policy_data.l_flock.start =
507                         new->l_policy_data.l_flock.end + 1;
508                 new2->l_conn_export = lock->l_conn_export;
509                 if (lock->l_export != NULL) {
510                         new2->l_export = class_export_lock_get(lock->l_export, new2);
511                         if (new2->l_export->exp_lock_hash &&
512                             cfs_hlist_unhashed(&new2->l_exp_hash))
513                                 cfs_hash_add(new2->l_export->exp_lock_hash,
514                                              &new2->l_remote_handle,
515                                              &new2->l_exp_hash);
516                 }
517                 if (*flags == LDLM_FL_WAIT_NOREPROC)
518                         ldlm_lock_addref_internal_nolock(new2,
519                                                          lock->l_granted_mode);
520
521                 /* insert new2 at lock */
522                 ldlm_resource_add_lock(res, ownlocks, new2);
523                 LDLM_LOCK_RELEASE(new2);
524                 break;
525         }
526
527         /* if new2 is created but never used, destroy it*/
528         if (splitted == 0 && new2 != NULL)
529                 ldlm_lock_destroy_nolock(new2);
530
531         /* At this point we're granting the lock request. */
532         req->l_granted_mode = req->l_req_mode;
533
534         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
535         if (!added) {
536                 cfs_list_del_init(&req->l_res_link);
537                 /* insert new lock before ownlocks in list. */
538                 ldlm_resource_add_lock(res, ownlocks, req);
539         }
540
541         if (*flags != LDLM_FL_WAIT_NOREPROC) {
542 #ifdef HAVE_SERVER_SUPPORT
543                 if (first_enq) {
544                         /* If this is an unlock, reprocess the waitq and
545                          * send completions ASTs for locks that can now be
546                          * granted. The only problem with doing this
547                          * reprocessing here is that the completion ASTs for
548                          * newly granted locks will be sent before the unlock
549                          * completion is sent. It shouldn't be an issue. Also
550                          * note that ldlm_process_flock_lock() will recurse,
551                          * but only once because first_enq will be false from
552                          * ldlm_reprocess_queue. */
553                         if ((mode == LCK_NL) && overlaps) {
554                                 CFS_LIST_HEAD(rpc_list);
555                                 int rc;
556 restart:
557                                 ldlm_reprocess_queue(res, &res->lr_waiting,
558                                                      &rpc_list);
559
560                                 unlock_res_and_lock(req);
561                                 rc = ldlm_run_ast_work(ns, &rpc_list,
562                                                        LDLM_WORK_CP_AST);
563                                 lock_res_and_lock(req);
564                                 if (rc == -ERESTART)
565                                         GOTO(restart, -ERESTART);
566                        }
567                 } else {
568                         LASSERT(req->l_completion_ast);
569                         ldlm_add_ast_work_item(req, NULL, work_list);
570                 }
571 #else /* !HAVE_SERVER_SUPPORT */
572                 /* The only one possible case for client-side calls flock
573                  * policy function is ldlm_flock_completion_ast inside which
574                  * carries LDLM_FL_WAIT_NOREPROC flag. */
575                 CERROR("Illegal parameter for client-side-only module.\n");
576                 LBUG();
577 #endif /* HAVE_SERVER_SUPPORT */
578         }
579
580         /* In case we're reprocessing the requested lock we can't destroy
581          * it until after calling ldlm_add_ast_work_item() above so that laawi()
582          * can bump the reference count on \a req. Otherwise \a req
583          * could be freed before the completion AST can be sent.  */
584         if (added)
585                 ldlm_flock_destroy(req, mode, *flags);
586
587         ldlm_resource_dump(D_INFO, res);
588         RETURN(LDLM_ITER_CONTINUE);
589 }
590
591 struct ldlm_flock_wait_data {
592         struct ldlm_lock *fwd_lock;
593         int               fwd_generation;
594 };
595
596 static void
597 ldlm_flock_interrupted_wait(void *data)
598 {
599         struct ldlm_lock *lock;
600         ENTRY;
601
602         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
603
604         /* take lock off the deadlock detection hash list. */
605         lock_res_and_lock(lock);
606         ldlm_flock_blocking_unlink(lock);
607
608         /* client side - set flag to prevent lock from being put on LRU list */
609         lock->l_flags |= LDLM_FL_CBPENDING;
610         unlock_res_and_lock(lock);
611
612         EXIT;
613 }
614
615 /**
616  * Flock completion callback function.
617  *
618  * \param lock [in,out]: A lock to be handled
619  * \param flags    [in]: flags
620  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
621  *
622  * \retval 0    : success
623  * \retval <0   : failure
624  */
625 int
626 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
627 {
628         struct file_lock                *getlk = lock->l_ast_data;
629         struct obd_device              *obd;
630         struct obd_import              *imp = NULL;
631         struct ldlm_flock_wait_data     fwd;
632         struct l_wait_info              lwi;
633         ldlm_error_t                    err;
634         int                             rc = 0;
635         ENTRY;
636
637         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
638                flags, data, getlk);
639
640         /* Import invalidation. We need to actually release the lock
641          * references being held, so that it can go away. No point in
642          * holding the lock even if app still believes it has it, since
643          * server already dropped it anyway. Only for granted locks too. */
644         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
645             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
646                 if (lock->l_req_mode == lock->l_granted_mode &&
647                     lock->l_granted_mode != LCK_NL &&
648                     NULL == data)
649                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
650
651                 /* Need to wake up the waiter if we were evicted */
652                 cfs_waitq_signal(&lock->l_waitq);
653                 RETURN(0);
654         }
655
656         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
657
658         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
659                        LDLM_FL_BLOCK_CONV))) {
660                 if (NULL == data)
661                         /* mds granted the lock in the reply */
662                         goto granted;
663                 /* CP AST RPC: lock get granted, wake it up */
664                 cfs_waitq_signal(&lock->l_waitq);
665                 RETURN(0);
666         }
667
668         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
669                    "sleeping");
670         fwd.fwd_lock = lock;
671         obd = class_exp2obd(lock->l_conn_export);
672
673         /* if this is a local lock, there is no import */
674         if (NULL != obd)
675                 imp = obd->u.cli.cl_import;
676
677         if (NULL != imp) {
678                 spin_lock(&imp->imp_lock);
679                 fwd.fwd_generation = imp->imp_generation;
680                 spin_unlock(&imp->imp_lock);
681         }
682
683         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
684
685         /* Go to sleep until the lock is granted. */
686         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
687
688         if (rc) {
689                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
690                            rc);
691                 RETURN(rc);
692         }
693
694 granted:
695         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
696
697         if (lock->l_flags & LDLM_FL_DESTROYED) {
698                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
699                 RETURN(0);
700         }
701
702         if (lock->l_flags & LDLM_FL_FAILED) {
703                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
704                 RETURN(-EIO);
705         }
706
707         LDLM_DEBUG(lock, "client-side enqueue granted");
708
709         lock_res_and_lock(lock);
710
711         /* take lock off the deadlock detection hash list. */
712         ldlm_flock_blocking_unlink(lock);
713
714         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
715         cfs_list_del_init(&lock->l_res_link);
716
717         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
718                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
719                 rc = -EDEADLK;
720         } else if (flags & LDLM_FL_TEST_LOCK) {
721                 /* fcntl(F_GETLK) request */
722                 /* The old mode was saved in getlk->fl_type so that if the mode
723                  * in the lock changes we can decref the appropriate refcount.*/
724                 ldlm_flock_destroy(lock, flock_type(getlk),
725                                    LDLM_FL_WAIT_NOREPROC);
726                 switch (lock->l_granted_mode) {
727                 case LCK_PR:
728                         flock_set_type(getlk, F_RDLCK);
729                         break;
730                 case LCK_PW:
731                         flock_set_type(getlk, F_WRLCK);
732                         break;
733                 default:
734                         flock_set_type(getlk, F_UNLCK);
735                 }
736                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
737                 flock_set_start(getlk,
738                                 (loff_t)lock->l_policy_data.l_flock.start);
739                 flock_set_end(getlk,
740                               (loff_t)lock->l_policy_data.l_flock.end);
741         } else {
742                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
743
744                 /* We need to reprocess the lock to do merges or splits
745                  * with existing locks owned by this process. */
746                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
747         }
748         unlock_res_and_lock(lock);
749         RETURN(rc);
750 }
751 EXPORT_SYMBOL(ldlm_flock_completion_ast);
752
753 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
754                             void *data, int flag)
755 {
756         ENTRY;
757
758         LASSERT(lock);
759         LASSERT(flag == LDLM_CB_CANCELING);
760
761         /* take lock off the deadlock detection hash list. */
762         lock_res_and_lock(lock);
763         ldlm_flock_blocking_unlink(lock);
764         unlock_res_and_lock(lock);
765         RETURN(0);
766 }
767
768 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
769                                        ldlm_policy_data_t *lpolicy)
770 {
771         memset(lpolicy, 0, sizeof(*lpolicy));
772         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
773         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
774         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
775         /* Compat code, old clients had no idea about owner field and
776          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
777          * April 2011 */
778         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
779 }
780
781
782 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
783                                        ldlm_policy_data_t *lpolicy)
784 {
785         memset(lpolicy, 0, sizeof(*lpolicy));
786         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
787         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
788         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
789         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
790 }
791
792 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
793                                      ldlm_wire_policy_data_t *wpolicy)
794 {
795         memset(wpolicy, 0, sizeof(*wpolicy));
796         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
797         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
798         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
799         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
800 }
801
802 /*
803  * Export handle<->flock hash operations.
804  */
805 static unsigned
806 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
807 {
808         return cfs_hash_u64_hash(*(__u64 *)key, mask);
809 }
810
811 static void *
812 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
813 {
814         struct ldlm_lock *lock;
815
816         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
817         return &lock->l_policy_data.l_flock.owner;
818 }
819
820 static int
821 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
822 {
823         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
824 }
825
826 static void *
827 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
828 {
829         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
830 }
831
832 static void
833 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
834 {
835         struct ldlm_lock *lock;
836         struct ldlm_flock *flock;
837
838         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
839         LDLM_LOCK_GET(lock);
840
841         flock = &lock->l_policy_data.l_flock;
842         LASSERT(flock->blocking_export != NULL);
843         class_export_get(flock->blocking_export);
844         flock->blocking_refs++;
845 }
846
847 static void
848 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
849 {
850         struct ldlm_lock *lock;
851         struct ldlm_flock *flock;
852
853         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
854         LDLM_LOCK_RELEASE(lock);
855
856         flock = &lock->l_policy_data.l_flock;
857         LASSERT(flock->blocking_export != NULL);
858         class_export_put(flock->blocking_export);
859         if (--flock->blocking_refs == 0) {
860                 flock->blocking_owner = 0;
861                 flock->blocking_export = NULL;
862         }
863 }
864
865 static cfs_hash_ops_t ldlm_export_flock_ops = {
866         .hs_hash        = ldlm_export_flock_hash,
867         .hs_key         = ldlm_export_flock_key,
868         .hs_keycmp      = ldlm_export_flock_keycmp,
869         .hs_object      = ldlm_export_flock_object,
870         .hs_get         = ldlm_export_flock_get,
871         .hs_put         = ldlm_export_flock_put,
872         .hs_put_locked  = ldlm_export_flock_put,
873 };
874
875 int ldlm_init_flock_export(struct obd_export *exp)
876 {
877         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
878                 RETURN(0);
879
880         exp->exp_flock_hash =
881                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
882                                 HASH_EXP_LOCK_CUR_BITS,
883                                 HASH_EXP_LOCK_MAX_BITS,
884                                 HASH_EXP_LOCK_BKT_BITS, 0,
885                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
886                                 &ldlm_export_flock_ops,
887                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
888         if (!exp->exp_flock_hash)
889                 RETURN(-ENOMEM);
890
891         RETURN(0);
892 }
893 EXPORT_SYMBOL(ldlm_init_flock_export);
894
895 void ldlm_destroy_flock_export(struct obd_export *exp)
896 {
897         ENTRY;
898         if (exp->exp_flock_hash) {
899                 cfs_hash_putref(exp->exp_flock_hash);
900                 exp->exp_flock_hash = NULL;
901         }
902         EXIT;
903 }
904 EXPORT_SYMBOL(ldlm_destroy_flock_export);