Whamcloud - gitweb
LU-2062 utils: HSM Posix CopyTool
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                             struct ldlm_lock *lock)
105 {
106         /* For server only */
107         if (req->l_export == NULL)
108                 return;
109
110         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
111
112         req->l_policy_data.l_flock.blocking_owner =
113                 lock->l_policy_data.l_flock.owner;
114         req->l_policy_data.l_flock.blocking_export =
115                 lock->l_export;
116         req->l_policy_data.l_flock.blocking_refs = 0;
117
118         cfs_hash_add(req->l_export->exp_flock_hash,
119                      &req->l_policy_data.l_flock.owner,
120                      &req->l_exp_flock_hash);
121 }
122
123 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
124 {
125         /* For server only */
126         if (req->l_export == NULL)
127                 return;
128
129         check_res_locked(req->l_resource);
130         if (req->l_export->exp_flock_hash != NULL &&
131             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
132                 cfs_hash_del(req->l_export->exp_flock_hash,
133                              &req->l_policy_data.l_flock.owner,
134                              &req->l_exp_flock_hash);
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
143                    mode, flags);
144
145         /* Safe to not lock here, since it should be empty anyway */
146         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
147
148         cfs_list_del_init(&lock->l_res_link);
149         if (flags == LDLM_FL_WAIT_NOREPROC &&
150             !(lock->l_flags & LDLM_FL_FAILED)) {
151                 /* client side - set a flag to prevent sending a CANCEL */
152                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
153
154                 /* when reaching here, it is under lock_res_and_lock(). Thus,
155                    need call the nolock version of ldlm_lock_decref_internal*/
156                 ldlm_lock_decref_internal_nolock(lock, mode);
157         }
158
159         ldlm_lock_destroy_nolock(lock);
160         EXIT;
161 }
162
163 /**
164  * POSIX locks deadlock detection code.
165  *
166  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
167  * with, we need to iterate through all blocked POSIX locks for this
168  * export and see if there is a deadlock condition arising. (i.e. when
169  * one client holds a lock on something and want a lock on something
170  * else and at the same time another client has the opposite situation).
171  */
172 static int
173 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
174 {
175         struct obd_export *req_exp = req->l_export;
176         struct obd_export *bl_exp = bl_lock->l_export;
177         __u64 req_owner = req->l_policy_data.l_flock.owner;
178         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
179
180         /* For server only */
181         if (req_exp == NULL)
182                 return 0;
183
184         class_export_get(bl_exp);
185         while (1) {
186                 struct obd_export *bl_exp_new;
187                 struct ldlm_lock *lock = NULL;
188                 struct ldlm_flock *flock;
189
190                 if (bl_exp->exp_flock_hash != NULL)
191                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
192                                                &bl_owner);
193                 if (lock == NULL)
194                         break;
195
196                 LASSERT(req != lock);
197                 flock = &lock->l_policy_data.l_flock;
198                 LASSERT(flock->owner == bl_owner);
199                 bl_owner = flock->blocking_owner;
200                 bl_exp_new = class_export_get(flock->blocking_export);
201                 class_export_put(bl_exp);
202
203                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
204                 bl_exp = bl_exp_new;
205
206                 if (bl_owner == req_owner && bl_exp == req_exp) {
207                         class_export_put(bl_exp);
208                         return 1;
209                 }
210         }
211         class_export_put(bl_exp);
212
213         return 0;
214 }
215
216 /**
217  * Process a granting attempt for flock lock.
218  * Must be called under ns lock held.
219  *
220  * This function looks for any conflicts for \a lock in the granted or
221  * waiting queues. The lock is granted if no conflicts are found in
222  * either queue.
223  *
224  * It is also responsible for splitting a lock if a portion of the lock
225  * is released.
226  *
227  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
228  *   - blocking ASTs have already been sent
229  *
230  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
231  *   - blocking ASTs have not been sent yet, so list of conflicting locks
232  *     would be collected and ASTs sent.
233  */
234 int
235 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
236                         ldlm_error_t *err, cfs_list_t *work_list)
237 {
238         struct ldlm_resource *res = req->l_resource;
239         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
240         cfs_list_t *tmp;
241         cfs_list_t *ownlocks = NULL;
242         struct ldlm_lock *lock = NULL;
243         struct ldlm_lock *new = req;
244         struct ldlm_lock *new2 = NULL;
245         ldlm_mode_t mode = req->l_req_mode;
246         int local = ns_is_client(ns);
247         int added = (mode == LCK_NL);
248         int overlaps = 0;
249         int splitted = 0;
250         const struct ldlm_callback_suite null_cbs = { NULL };
251         ENTRY;
252
253         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
254                LPU64" end "LPU64"\n", *flags,
255                new->l_policy_data.l_flock.owner,
256                new->l_policy_data.l_flock.pid, mode,
257                req->l_policy_data.l_flock.start,
258                req->l_policy_data.l_flock.end);
259
260         *err = ELDLM_OK;
261
262         if (local) {
263                 /* No blocking ASTs are sent to the clients for
264                  * Posix file & record locks */
265                 req->l_blocking_ast = NULL;
266         } else {
267                 /* Called on the server for lock cancels. */
268                 req->l_blocking_ast = ldlm_flock_blocking_ast;
269         }
270
271 reprocess:
272         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
273                 /* This loop determines where this processes locks start
274                  * in the resource lr_granted list. */
275                 cfs_list_for_each(tmp, &res->lr_granted) {
276                         lock = cfs_list_entry(tmp, struct ldlm_lock,
277                                               l_res_link);
278                         if (ldlm_same_flock_owner(lock, req)) {
279                                 ownlocks = tmp;
280                                 break;
281                         }
282                 }
283         } else {
284                 lockmode_verify(mode);
285
286                 /* This loop determines if there are existing locks
287                  * that conflict with the new lock request. */
288                 cfs_list_for_each(tmp, &res->lr_granted) {
289                         lock = cfs_list_entry(tmp, struct ldlm_lock,
290                                               l_res_link);
291
292                         if (ldlm_same_flock_owner(lock, req)) {
293                                 if (!ownlocks)
294                                         ownlocks = tmp;
295                                 continue;
296                         }
297
298                         /* locks are compatible, overlap doesn't matter */
299                         if (lockmode_compat(lock->l_granted_mode, mode))
300                                 continue;
301
302                         if (!ldlm_flocks_overlap(lock, req))
303                                 continue;
304
305                         if (!first_enq)
306                                 RETURN(LDLM_ITER_CONTINUE);
307
308                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
309                                 ldlm_flock_destroy(req, mode, *flags);
310                                 *err = -EAGAIN;
311                                 RETURN(LDLM_ITER_STOP);
312                         }
313
314                         if (*flags & LDLM_FL_TEST_LOCK) {
315                                 ldlm_flock_destroy(req, mode, *flags);
316                                 req->l_req_mode = lock->l_granted_mode;
317                                 req->l_policy_data.l_flock.pid =
318                                         lock->l_policy_data.l_flock.pid;
319                                 req->l_policy_data.l_flock.start =
320                                         lock->l_policy_data.l_flock.start;
321                                 req->l_policy_data.l_flock.end =
322                                         lock->l_policy_data.l_flock.end;
323                                 *flags |= LDLM_FL_LOCK_CHANGED;
324                                 RETURN(LDLM_ITER_STOP);
325                         }
326
327                         /* add lock to blocking list before deadlock
328                          * check to prevent race */
329                         ldlm_flock_blocking_link(req, lock);
330
331                         if (ldlm_flock_deadlock(req, lock)) {
332                                 ldlm_flock_blocking_unlink(req);
333                                 ldlm_flock_destroy(req, mode, *flags);
334                                 *err = -EDEADLK;
335                                 RETURN(LDLM_ITER_STOP);
336                         }
337
338                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
339                         *flags |= LDLM_FL_BLOCK_GRANTED;
340                         RETURN(LDLM_ITER_STOP);
341                 }
342         }
343
344         if (*flags & LDLM_FL_TEST_LOCK) {
345                 ldlm_flock_destroy(req, mode, *flags);
346                 req->l_req_mode = LCK_NL;
347                 *flags |= LDLM_FL_LOCK_CHANGED;
348                 RETURN(LDLM_ITER_STOP);
349         }
350
351         /* In case we had slept on this lock request take it off of the
352          * deadlock detection hash list. */
353         ldlm_flock_blocking_unlink(req);
354
355         /* Scan the locks owned by this process that overlap this request.
356          * We may have to merge or split existing locks. */
357
358         if (!ownlocks)
359                 ownlocks = &res->lr_granted;
360
361         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
362                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
363
364                 if (!ldlm_same_flock_owner(lock, new))
365                         break;
366
367                 if (lock->l_granted_mode == mode) {
368                         /* If the modes are the same then we need to process
369                          * locks that overlap OR adjoin the new lock. The extra
370                          * logic condition is necessary to deal with arithmetic
371                          * overflow and underflow. */
372                         if ((new->l_policy_data.l_flock.start >
373                              (lock->l_policy_data.l_flock.end + 1))
374                             && (lock->l_policy_data.l_flock.end !=
375                                 OBD_OBJECT_EOF))
376                                 continue;
377
378                         if ((new->l_policy_data.l_flock.end <
379                              (lock->l_policy_data.l_flock.start - 1))
380                             && (lock->l_policy_data.l_flock.start != 0))
381                                 break;
382
383                         if (new->l_policy_data.l_flock.start <
384                             lock->l_policy_data.l_flock.start) {
385                                 lock->l_policy_data.l_flock.start =
386                                         new->l_policy_data.l_flock.start;
387                         } else {
388                                 new->l_policy_data.l_flock.start =
389                                         lock->l_policy_data.l_flock.start;
390                         }
391
392                         if (new->l_policy_data.l_flock.end >
393                             lock->l_policy_data.l_flock.end) {
394                                 lock->l_policy_data.l_flock.end =
395                                         new->l_policy_data.l_flock.end;
396                         } else {
397                                 new->l_policy_data.l_flock.end =
398                                         lock->l_policy_data.l_flock.end;
399                         }
400
401                         if (added) {
402                                 ldlm_flock_destroy(lock, mode, *flags);
403                         } else {
404                                 new = lock;
405                                 added = 1;
406                         }
407                         continue;
408                 }
409
410                 if (new->l_policy_data.l_flock.start >
411                     lock->l_policy_data.l_flock.end)
412                         continue;
413
414                 if (new->l_policy_data.l_flock.end <
415                     lock->l_policy_data.l_flock.start)
416                         break;
417
418                 ++overlaps;
419
420                 if (new->l_policy_data.l_flock.start <=
421                     lock->l_policy_data.l_flock.start) {
422                         if (new->l_policy_data.l_flock.end <
423                             lock->l_policy_data.l_flock.end) {
424                                 lock->l_policy_data.l_flock.start =
425                                         new->l_policy_data.l_flock.end + 1;
426                                 break;
427                         }
428                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
429                         continue;
430                 }
431                 if (new->l_policy_data.l_flock.end >=
432                     lock->l_policy_data.l_flock.end) {
433                         lock->l_policy_data.l_flock.end =
434                                 new->l_policy_data.l_flock.start - 1;
435                         continue;
436                 }
437
438                 /* split the existing lock into two locks */
439
440                 /* if this is an F_UNLCK operation then we could avoid
441                  * allocating a new lock and use the req lock passed in
442                  * with the request but this would complicate the reply
443                  * processing since updates to req get reflected in the
444                  * reply. The client side replays the lock request so
445                  * it must see the original lock data in the reply. */
446
447                 /* XXX - if ldlm_lock_new() can sleep we should
448                  * release the lr_lock, allocate the new lock,
449                  * and restart processing this lock. */
450                 if (!new2) {
451                         unlock_res_and_lock(req);
452                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
453                                                 lock->l_granted_mode, &null_cbs,
454                                                 NULL, 0, LVB_T_NONE);
455                         lock_res_and_lock(req);
456                         if (!new2) {
457                                 ldlm_flock_destroy(req, lock->l_granted_mode,
458                                                    *flags);
459                                 *err = -ENOLCK;
460                                 RETURN(LDLM_ITER_STOP);
461                         }
462                         goto reprocess;
463                 }
464
465                 splitted = 1;
466
467                 new2->l_granted_mode = lock->l_granted_mode;
468                 new2->l_policy_data.l_flock.pid =
469                         new->l_policy_data.l_flock.pid;
470                 new2->l_policy_data.l_flock.owner =
471                         new->l_policy_data.l_flock.owner;
472                 new2->l_policy_data.l_flock.start =
473                         lock->l_policy_data.l_flock.start;
474                 new2->l_policy_data.l_flock.end =
475                         new->l_policy_data.l_flock.start - 1;
476                 lock->l_policy_data.l_flock.start =
477                         new->l_policy_data.l_flock.end + 1;
478                 new2->l_conn_export = lock->l_conn_export;
479                 if (lock->l_export != NULL) {
480                         new2->l_export = class_export_lock_get(lock->l_export, new2);
481                         if (new2->l_export->exp_lock_hash &&
482                             cfs_hlist_unhashed(&new2->l_exp_hash))
483                                 cfs_hash_add(new2->l_export->exp_lock_hash,
484                                              &new2->l_remote_handle,
485                                              &new2->l_exp_hash);
486                 }
487                 if (*flags == LDLM_FL_WAIT_NOREPROC)
488                         ldlm_lock_addref_internal_nolock(new2,
489                                                          lock->l_granted_mode);
490
491                 /* insert new2 at lock */
492                 ldlm_resource_add_lock(res, ownlocks, new2);
493                 LDLM_LOCK_RELEASE(new2);
494                 break;
495         }
496
497         /* if new2 is created but never used, destroy it*/
498         if (splitted == 0 && new2 != NULL)
499                 ldlm_lock_destroy_nolock(new2);
500
501         /* At this point we're granting the lock request. */
502         req->l_granted_mode = req->l_req_mode;
503
504         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
505         if (!added) {
506                 cfs_list_del_init(&req->l_res_link);
507                 /* insert new lock before ownlocks in list. */
508                 ldlm_resource_add_lock(res, ownlocks, req);
509         }
510
511         if (*flags != LDLM_FL_WAIT_NOREPROC) {
512 #ifdef HAVE_SERVER_SUPPORT
513                 if (first_enq) {
514                         /* If this is an unlock, reprocess the waitq and
515                          * send completions ASTs for locks that can now be
516                          * granted. The only problem with doing this
517                          * reprocessing here is that the completion ASTs for
518                          * newly granted locks will be sent before the unlock
519                          * completion is sent. It shouldn't be an issue. Also
520                          * note that ldlm_process_flock_lock() will recurse,
521                          * but only once because first_enq will be false from
522                          * ldlm_reprocess_queue. */
523                         if ((mode == LCK_NL) && overlaps) {
524                                 CFS_LIST_HEAD(rpc_list);
525                                 int rc;
526 restart:
527                                 ldlm_reprocess_queue(res, &res->lr_waiting,
528                                                      &rpc_list);
529
530                                 unlock_res_and_lock(req);
531                                 rc = ldlm_run_ast_work(ns, &rpc_list,
532                                                        LDLM_WORK_CP_AST);
533                                 lock_res_and_lock(req);
534                                 if (rc == -ERESTART)
535                                         GOTO(restart, -ERESTART);
536                        }
537                 } else {
538                         LASSERT(req->l_completion_ast);
539                         ldlm_add_ast_work_item(req, NULL, work_list);
540                 }
541 #else /* !HAVE_SERVER_SUPPORT */
542                 /* The only one possible case for client-side calls flock
543                  * policy function is ldlm_flock_completion_ast inside which
544                  * carries LDLM_FL_WAIT_NOREPROC flag. */
545                 CERROR("Illegal parameter for client-side-only module.\n");
546                 LBUG();
547 #endif /* HAVE_SERVER_SUPPORT */
548         }
549
550         /* In case we're reprocessing the requested lock we can't destroy
551          * it until after calling ldlm_add_ast_work_item() above so that laawi()
552          * can bump the reference count on \a req. Otherwise \a req
553          * could be freed before the completion AST can be sent.  */
554         if (added)
555                 ldlm_flock_destroy(req, mode, *flags);
556
557         ldlm_resource_dump(D_INFO, res);
558         RETURN(LDLM_ITER_CONTINUE);
559 }
560
561 struct ldlm_flock_wait_data {
562         struct ldlm_lock *fwd_lock;
563         int               fwd_generation;
564 };
565
566 static void
567 ldlm_flock_interrupted_wait(void *data)
568 {
569         struct ldlm_lock *lock;
570         ENTRY;
571
572         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
573
574         /* take lock off the deadlock detection hash list. */
575         lock_res_and_lock(lock);
576         ldlm_flock_blocking_unlink(lock);
577
578         /* client side - set flag to prevent lock from being put on LRU list */
579         lock->l_flags |= LDLM_FL_CBPENDING;
580         unlock_res_and_lock(lock);
581
582         EXIT;
583 }
584
585 /**
586  * Flock completion callback function.
587  *
588  * \param lock [in,out]: A lock to be handled
589  * \param flags    [in]: flags
590  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
591  *
592  * \retval 0    : success
593  * \retval <0   : failure
594  */
595 int
596 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
597 {
598         struct file_lock                *getlk = lock->l_ast_data;
599         struct obd_device              *obd;
600         struct obd_import              *imp = NULL;
601         struct ldlm_flock_wait_data     fwd;
602         struct l_wait_info              lwi;
603         ldlm_error_t                    err;
604         int                             rc = 0;
605         ENTRY;
606
607         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
608                flags, data, getlk);
609
610         /* Import invalidation. We need to actually release the lock
611          * references being held, so that it can go away. No point in
612          * holding the lock even if app still believes it has it, since
613          * server already dropped it anyway. Only for granted locks too. */
614         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
615             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
616                 if (lock->l_req_mode == lock->l_granted_mode &&
617                     lock->l_granted_mode != LCK_NL &&
618                     NULL == data)
619                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
620
621                 /* Need to wake up the waiter if we were evicted */
622                 cfs_waitq_signal(&lock->l_waitq);
623                 RETURN(0);
624         }
625
626         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
627
628         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
629                        LDLM_FL_BLOCK_CONV))) {
630                 if (NULL == data)
631                         /* mds granted the lock in the reply */
632                         goto granted;
633                 /* CP AST RPC: lock get granted, wake it up */
634                 cfs_waitq_signal(&lock->l_waitq);
635                 RETURN(0);
636         }
637
638         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
639                    "sleeping");
640         fwd.fwd_lock = lock;
641         obd = class_exp2obd(lock->l_conn_export);
642
643         /* if this is a local lock, there is no import */
644         if (NULL != obd)
645                 imp = obd->u.cli.cl_import;
646
647         if (NULL != imp) {
648                 spin_lock(&imp->imp_lock);
649                 fwd.fwd_generation = imp->imp_generation;
650                 spin_unlock(&imp->imp_lock);
651         }
652
653         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
654
655         /* Go to sleep until the lock is granted. */
656         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
657
658         if (rc) {
659                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
660                            rc);
661                 RETURN(rc);
662         }
663
664 granted:
665         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
666
667         if (lock->l_flags & LDLM_FL_DESTROYED) {
668                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
669                 RETURN(0);
670         }
671
672         if (lock->l_flags & LDLM_FL_FAILED) {
673                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
674                 RETURN(-EIO);
675         }
676
677         if (rc) {
678                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
679                            rc);
680                 RETURN(rc);
681         }
682
683         LDLM_DEBUG(lock, "client-side enqueue granted");
684
685         lock_res_and_lock(lock);
686
687         /* take lock off the deadlock detection hash list. */
688         ldlm_flock_blocking_unlink(lock);
689
690         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
691         cfs_list_del_init(&lock->l_res_link);
692
693         if (flags & LDLM_FL_TEST_LOCK) {
694                 /* fcntl(F_GETLK) request */
695                 /* The old mode was saved in getlk->fl_type so that if the mode
696                  * in the lock changes we can decref the appropriate refcount.*/
697                 ldlm_flock_destroy(lock, flock_type(getlk),
698                                    LDLM_FL_WAIT_NOREPROC);
699                 switch (lock->l_granted_mode) {
700                 case LCK_PR:
701                         flock_set_type(getlk, F_RDLCK);
702                         break;
703                 case LCK_PW:
704                         flock_set_type(getlk, F_WRLCK);
705                         break;
706                 default:
707                         flock_set_type(getlk, F_UNLCK);
708                 }
709                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
710                 flock_set_start(getlk,
711                                 (loff_t)lock->l_policy_data.l_flock.start);
712                 flock_set_end(getlk,
713                               (loff_t)lock->l_policy_data.l_flock.end);
714         } else {
715                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
716
717                 /* We need to reprocess the lock to do merges or splits
718                  * with existing locks owned by this process. */
719                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
720         }
721         unlock_res_and_lock(lock);
722         RETURN(0);
723 }
724 EXPORT_SYMBOL(ldlm_flock_completion_ast);
725
726 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
727                             void *data, int flag)
728 {
729         ENTRY;
730
731         LASSERT(lock);
732         LASSERT(flag == LDLM_CB_CANCELING);
733
734         /* take lock off the deadlock detection hash list. */
735         lock_res_and_lock(lock);
736         ldlm_flock_blocking_unlink(lock);
737         unlock_res_and_lock(lock);
738         RETURN(0);
739 }
740
741 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
742                                        ldlm_policy_data_t *lpolicy)
743 {
744         memset(lpolicy, 0, sizeof(*lpolicy));
745         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
746         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
747         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
748         /* Compat code, old clients had no idea about owner field and
749          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
750          * April 2011 */
751         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
752 }
753
754
755 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
756                                        ldlm_policy_data_t *lpolicy)
757 {
758         memset(lpolicy, 0, sizeof(*lpolicy));
759         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
760         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
761         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
762         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
763 }
764
765 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
766                                      ldlm_wire_policy_data_t *wpolicy)
767 {
768         memset(wpolicy, 0, sizeof(*wpolicy));
769         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
770         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
771         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
772         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
773 }
774
775 /*
776  * Export handle<->flock hash operations.
777  */
778 static unsigned
779 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
780 {
781         return cfs_hash_u64_hash(*(__u64 *)key, mask);
782 }
783
784 static void *
785 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
786 {
787         struct ldlm_lock *lock;
788
789         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
790         return &lock->l_policy_data.l_flock.owner;
791 }
792
793 static int
794 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
795 {
796         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
797 }
798
799 static void *
800 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
801 {
802         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
803 }
804
805 static void
806 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
807 {
808         struct ldlm_lock *lock;
809         struct ldlm_flock *flock;
810
811         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
812         LDLM_LOCK_GET(lock);
813
814         flock = &lock->l_policy_data.l_flock;
815         LASSERT(flock->blocking_export != NULL);
816         class_export_get(flock->blocking_export);
817         flock->blocking_refs++;
818 }
819
820 static void
821 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
822 {
823         struct ldlm_lock *lock;
824         struct ldlm_flock *flock;
825
826         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
827         LDLM_LOCK_RELEASE(lock);
828
829         flock = &lock->l_policy_data.l_flock;
830         LASSERT(flock->blocking_export != NULL);
831         class_export_put(flock->blocking_export);
832         if (--flock->blocking_refs == 0) {
833                 flock->blocking_owner = 0;
834                 flock->blocking_export = NULL;
835         }
836 }
837
838 static cfs_hash_ops_t ldlm_export_flock_ops = {
839         .hs_hash        = ldlm_export_flock_hash,
840         .hs_key         = ldlm_export_flock_key,
841         .hs_keycmp      = ldlm_export_flock_keycmp,
842         .hs_object      = ldlm_export_flock_object,
843         .hs_get         = ldlm_export_flock_get,
844         .hs_put         = ldlm_export_flock_put,
845         .hs_put_locked  = ldlm_export_flock_put,
846 };
847
848 int ldlm_init_flock_export(struct obd_export *exp)
849 {
850         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
851                 RETURN(0);
852
853         exp->exp_flock_hash =
854                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
855                                 HASH_EXP_LOCK_CUR_BITS,
856                                 HASH_EXP_LOCK_MAX_BITS,
857                                 HASH_EXP_LOCK_BKT_BITS, 0,
858                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
859                                 &ldlm_export_flock_ops,
860                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
861         if (!exp->exp_flock_hash)
862                 RETURN(-ENOMEM);
863
864         RETURN(0);
865 }
866 EXPORT_SYMBOL(ldlm_init_flock_export);
867
868 void ldlm_destroy_flock_export(struct obd_export *exp)
869 {
870         ENTRY;
871         if (exp->exp_flock_hash) {
872                 cfs_hash_putref(exp->exp_flock_hash);
873                 exp->exp_flock_hash = NULL;
874         }
875         EXIT;
876 }
877 EXPORT_SYMBOL(ldlm_destroy_flock_export);