Whamcloud - gitweb
LU-3963 ldlm: convert to linux list api
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                             struct ldlm_lock *lock)
105 {
106         /* For server only */
107         if (req->l_export == NULL)
108                 return;
109
110         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
111
112         req->l_policy_data.l_flock.blocking_owner =
113                 lock->l_policy_data.l_flock.owner;
114         req->l_policy_data.l_flock.blocking_export =
115                 lock->l_export;
116         req->l_policy_data.l_flock.blocking_refs = 0;
117
118         cfs_hash_add(req->l_export->exp_flock_hash,
119                      &req->l_policy_data.l_flock.owner,
120                      &req->l_exp_flock_hash);
121 }
122
123 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
124 {
125         /* For server only */
126         if (req->l_export == NULL)
127                 return;
128
129         check_res_locked(req->l_resource);
130         if (req->l_export->exp_flock_hash != NULL &&
131             !hlist_unhashed(&req->l_exp_flock_hash))
132                 cfs_hash_del(req->l_export->exp_flock_hash,
133                              &req->l_policy_data.l_flock.owner,
134                              &req->l_exp_flock_hash);
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: "LPX64")",
143                    mode, flags);
144
145         /* Safe to not lock here, since it should be empty anyway */
146         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
147
148         list_del_init(&lock->l_res_link);
149         if (flags == LDLM_FL_WAIT_NOREPROC) {
150                 /* client side - set a flag to prevent sending a CANCEL */
151                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
152
153                 /* when reaching here, it is under lock_res_and_lock(). Thus,
154                    need call the nolock version of ldlm_lock_decref_internal*/
155                 ldlm_lock_decref_internal_nolock(lock, mode);
156         }
157
158         ldlm_lock_destroy_nolock(lock);
159         EXIT;
160 }
161
162 /**
163  * POSIX locks deadlock detection code.
164  *
165  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
166  * with, we need to iterate through all blocked POSIX locks for this
167  * export and see if there is a deadlock condition arising. (i.e. when
168  * one client holds a lock on something and want a lock on something
169  * else and at the same time another client has the opposite situation).
170  */
171
172 struct ldlm_flock_lookup_cb_data {
173         __u64 *bl_owner;
174         struct ldlm_lock *lock;
175         struct obd_export *exp;
176 };
177
178 static int ldlm_flock_lookup_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
179                                 struct hlist_node *hnode, void *data)
180 {
181         struct ldlm_flock_lookup_cb_data *cb_data = data;
182         struct obd_export *exp = cfs_hash_object(hs, hnode);
183         struct ldlm_lock *lock;
184
185         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
186         if (lock == NULL)
187                 return 0;
188
189         /* Stop on first found lock. Same process can't sleep twice */
190         cb_data->lock = lock;
191         cb_data->exp = class_export_get(exp);
192
193         return 1;
194 }
195
196 static int
197 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
198 {
199         struct obd_export *req_exp = req->l_export;
200         struct obd_export *bl_exp = bl_lock->l_export;
201         __u64 req_owner = req->l_policy_data.l_flock.owner;
202         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
203
204         /* For server only */
205         if (req_exp == NULL)
206                 return 0;
207
208         class_export_get(bl_exp);
209         while (1) {
210                 struct ldlm_flock_lookup_cb_data cb_data = {
211                                         .bl_owner = &bl_owner,
212                                         .lock = NULL,
213                                         .exp = NULL };
214                 struct obd_export *bl_exp_new;
215                 struct ldlm_lock *lock = NULL;
216                 struct ldlm_flock *flock;
217
218                 if (bl_exp->exp_flock_hash != NULL) {
219                         cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
220                                 &bl_exp->exp_connection->c_peer.nid,
221                                 ldlm_flock_lookup_cb, &cb_data);
222                         lock = cb_data.lock;
223                 }
224                 if (lock == NULL)
225                         break;
226
227                 class_export_put(bl_exp);
228                 bl_exp = cb_data.exp;
229
230                 LASSERT(req != lock);
231                 flock = &lock->l_policy_data.l_flock;
232                 LASSERT(flock->owner == bl_owner);
233                 bl_owner = flock->blocking_owner;
234                 bl_exp_new = class_export_get(flock->blocking_export);
235                 class_export_put(bl_exp);
236
237                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
238                 bl_exp = bl_exp_new;
239
240                 if (bl_exp->exp_failed)
241                         break;
242
243                 if (bl_owner == req_owner &&
244                     (bl_exp->exp_connection->c_peer.nid ==
245                      req_exp->exp_connection->c_peer.nid)) {
246                         class_export_put(bl_exp);
247                         return 1;
248                 }
249         }
250         class_export_put(bl_exp);
251
252         return 0;
253 }
254
255 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
256                                           struct list_head *work_list)
257 {
258         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
259
260         if ((exp_connect_flags(lock->l_export) &
261                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
262                 CERROR("deadlock found, but client doesn't "
263                                 "support flock canceliation\n");
264         } else {
265                 LASSERT(lock->l_completion_ast);
266                 LASSERT(!ldlm_is_ast_sent(lock));
267                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
268                         LDLM_FL_FLOCK_DEADLOCK;
269                 ldlm_flock_blocking_unlink(lock);
270                 ldlm_resource_unlink_lock(lock);
271                 ldlm_add_ast_work_item(lock, NULL, work_list);
272         }
273 }
274
275 /**
276  * Process a granting attempt for flock lock.
277  * Must be called under ns lock held.
278  *
279  * This function looks for any conflicts for \a lock in the granted or
280  * waiting queues. The lock is granted if no conflicts are found in
281  * either queue.
282  *
283  * It is also responsible for splitting a lock if a portion of the lock
284  * is released.
285  *
286  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
287  *   - blocking ASTs have already been sent
288  *
289  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
290  *   - blocking ASTs have not been sent yet, so list of conflicting locks
291  *     would be collected and ASTs sent.
292  */
293 int
294 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
295                         ldlm_error_t *err, struct list_head *work_list)
296 {
297         struct ldlm_resource *res = req->l_resource;
298         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
299         struct list_head *tmp;
300         struct list_head *ownlocks = NULL;
301         struct ldlm_lock *lock = NULL;
302         struct ldlm_lock *new = req;
303         struct ldlm_lock *new2 = NULL;
304         ldlm_mode_t mode = req->l_req_mode;
305         int local = ns_is_client(ns);
306         int added = (mode == LCK_NL);
307         int overlaps = 0;
308         int splitted = 0;
309         const struct ldlm_callback_suite null_cbs = { NULL };
310         ENTRY;
311
312         CDEBUG(D_DLMTRACE, "flags "LPX64" owner "LPU64" pid %u mode %u start "
313                LPU64" end "LPU64"\n", *flags,
314                new->l_policy_data.l_flock.owner,
315                new->l_policy_data.l_flock.pid, mode,
316                req->l_policy_data.l_flock.start,
317                req->l_policy_data.l_flock.end);
318
319         *err = ELDLM_OK;
320
321         if (local) {
322                 /* No blocking ASTs are sent to the clients for
323                  * Posix file & record locks */
324                 req->l_blocking_ast = NULL;
325         } else {
326                 /* Called on the server for lock cancels. */
327                 req->l_blocking_ast = ldlm_flock_blocking_ast;
328         }
329
330 reprocess:
331         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
332                 /* This loop determines where this processes locks start
333                  * in the resource lr_granted list. */
334                 list_for_each(tmp, &res->lr_granted) {
335                         lock = list_entry(tmp, struct ldlm_lock,
336                                               l_res_link);
337                         if (ldlm_same_flock_owner(lock, req)) {
338                                 ownlocks = tmp;
339                                 break;
340                         }
341                 }
342         } else {
343                 int reprocess_failed = 0;
344                 lockmode_verify(mode);
345
346                 /* This loop determines if there are existing locks
347                  * that conflict with the new lock request. */
348                 list_for_each(tmp, &res->lr_granted) {
349                         lock = list_entry(tmp, struct ldlm_lock,
350                                               l_res_link);
351
352                         if (ldlm_same_flock_owner(lock, req)) {
353                                 if (!ownlocks)
354                                         ownlocks = tmp;
355                                 continue;
356                         }
357
358                         /* locks are compatible, overlap doesn't matter */
359                         if (lockmode_compat(lock->l_granted_mode, mode))
360                                 continue;
361
362                         if (!ldlm_flocks_overlap(lock, req))
363                                 continue;
364
365                         if (!first_enq) {
366                                 reprocess_failed = 1;
367                                 if (ldlm_flock_deadlock(req, lock)) {
368                                         ldlm_flock_cancel_on_deadlock(req,
369                                                         work_list);
370                                         RETURN(LDLM_ITER_CONTINUE);
371                                 }
372                                 continue;
373                         }
374
375                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
376                                 ldlm_flock_destroy(req, mode, *flags);
377                                 *err = -EAGAIN;
378                                 RETURN(LDLM_ITER_STOP);
379                         }
380
381                         if (*flags & LDLM_FL_TEST_LOCK) {
382                                 ldlm_flock_destroy(req, mode, *flags);
383                                 req->l_req_mode = lock->l_granted_mode;
384                                 req->l_policy_data.l_flock.pid =
385                                         lock->l_policy_data.l_flock.pid;
386                                 req->l_policy_data.l_flock.start =
387                                         lock->l_policy_data.l_flock.start;
388                                 req->l_policy_data.l_flock.end =
389                                         lock->l_policy_data.l_flock.end;
390                                 *flags |= LDLM_FL_LOCK_CHANGED;
391                                 RETURN(LDLM_ITER_STOP);
392                         }
393
394                         /* add lock to blocking list before deadlock
395                          * check to prevent race */
396                         ldlm_flock_blocking_link(req, lock);
397
398                         if (ldlm_flock_deadlock(req, lock)) {
399                                 ldlm_flock_blocking_unlink(req);
400                                 ldlm_flock_destroy(req, mode, *flags);
401                                 *err = -EDEADLK;
402                                 RETURN(LDLM_ITER_STOP);
403                         }
404
405                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
406                         *flags |= LDLM_FL_BLOCK_GRANTED;
407                         RETURN(LDLM_ITER_STOP);
408                 }
409                 if (reprocess_failed)
410                         RETURN(LDLM_ITER_CONTINUE);
411         }
412
413         if (*flags & LDLM_FL_TEST_LOCK) {
414                 ldlm_flock_destroy(req, mode, *flags);
415                 req->l_req_mode = LCK_NL;
416                 *flags |= LDLM_FL_LOCK_CHANGED;
417                 RETURN(LDLM_ITER_STOP);
418         }
419
420         /* In case we had slept on this lock request take it off of the
421          * deadlock detection hash list. */
422         ldlm_flock_blocking_unlink(req);
423
424         /* Scan the locks owned by this process that overlap this request.
425          * We may have to merge or split existing locks. */
426
427         if (!ownlocks)
428                 ownlocks = &res->lr_granted;
429
430         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
431                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
432
433                 if (!ldlm_same_flock_owner(lock, new))
434                         break;
435
436                 if (lock->l_granted_mode == mode) {
437                         /* If the modes are the same then we need to process
438                          * locks that overlap OR adjoin the new lock. The extra
439                          * logic condition is necessary to deal with arithmetic
440                          * overflow and underflow. */
441                         if ((new->l_policy_data.l_flock.start >
442                              (lock->l_policy_data.l_flock.end + 1))
443                             && (lock->l_policy_data.l_flock.end !=
444                                 OBD_OBJECT_EOF))
445                                 continue;
446
447                         if ((new->l_policy_data.l_flock.end <
448                              (lock->l_policy_data.l_flock.start - 1))
449                             && (lock->l_policy_data.l_flock.start != 0))
450                                 break;
451
452                         if (new->l_policy_data.l_flock.start <
453                             lock->l_policy_data.l_flock.start) {
454                                 lock->l_policy_data.l_flock.start =
455                                         new->l_policy_data.l_flock.start;
456                         } else {
457                                 new->l_policy_data.l_flock.start =
458                                         lock->l_policy_data.l_flock.start;
459                         }
460
461                         if (new->l_policy_data.l_flock.end >
462                             lock->l_policy_data.l_flock.end) {
463                                 lock->l_policy_data.l_flock.end =
464                                         new->l_policy_data.l_flock.end;
465                         } else {
466                                 new->l_policy_data.l_flock.end =
467                                         lock->l_policy_data.l_flock.end;
468                         }
469
470                         if (added) {
471                                 ldlm_flock_destroy(lock, mode, *flags);
472                         } else {
473                                 new = lock;
474                                 added = 1;
475                         }
476                         continue;
477                 }
478
479                 if (new->l_policy_data.l_flock.start >
480                     lock->l_policy_data.l_flock.end)
481                         continue;
482
483                 if (new->l_policy_data.l_flock.end <
484                     lock->l_policy_data.l_flock.start)
485                         break;
486
487                 ++overlaps;
488
489                 if (new->l_policy_data.l_flock.start <=
490                     lock->l_policy_data.l_flock.start) {
491                         if (new->l_policy_data.l_flock.end <
492                             lock->l_policy_data.l_flock.end) {
493                                 lock->l_policy_data.l_flock.start =
494                                         new->l_policy_data.l_flock.end + 1;
495                                 break;
496                         }
497                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
498                         continue;
499                 }
500                 if (new->l_policy_data.l_flock.end >=
501                     lock->l_policy_data.l_flock.end) {
502                         lock->l_policy_data.l_flock.end =
503                                 new->l_policy_data.l_flock.start - 1;
504                         continue;
505                 }
506
507                 /* split the existing lock into two locks */
508
509                 /* if this is an F_UNLCK operation then we could avoid
510                  * allocating a new lock and use the req lock passed in
511                  * with the request but this would complicate the reply
512                  * processing since updates to req get reflected in the
513                  * reply. The client side replays the lock request so
514                  * it must see the original lock data in the reply. */
515
516                 /* XXX - if ldlm_lock_new() can sleep we should
517                  * release the lr_lock, allocate the new lock,
518                  * and restart processing this lock. */
519                 if (new2 == NULL) {
520                         unlock_res_and_lock(req);
521                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
522                                                 lock->l_granted_mode, &null_cbs,
523                                                 NULL, 0, LVB_T_NONE);
524                         lock_res_and_lock(req);
525                         if (IS_ERR(new2)) {
526                                 ldlm_flock_destroy(req, lock->l_granted_mode,
527                                                    *flags);
528                                 *err = PTR_ERR(new2);
529                                 RETURN(LDLM_ITER_STOP);
530                         }
531                         goto reprocess;
532                 }
533
534                 splitted = 1;
535
536                 new2->l_granted_mode = lock->l_granted_mode;
537                 new2->l_policy_data.l_flock.pid =
538                         new->l_policy_data.l_flock.pid;
539                 new2->l_policy_data.l_flock.owner =
540                         new->l_policy_data.l_flock.owner;
541                 new2->l_policy_data.l_flock.start =
542                         lock->l_policy_data.l_flock.start;
543                 new2->l_policy_data.l_flock.end =
544                         new->l_policy_data.l_flock.start - 1;
545                 lock->l_policy_data.l_flock.start =
546                         new->l_policy_data.l_flock.end + 1;
547                 new2->l_conn_export = lock->l_conn_export;
548                 if (lock->l_export != NULL) {
549                         new2->l_export = class_export_lock_get(lock->l_export, new2);
550                         if (new2->l_export->exp_lock_hash &&
551                             hlist_unhashed(&new2->l_exp_hash))
552                                 cfs_hash_add(new2->l_export->exp_lock_hash,
553                                              &new2->l_remote_handle,
554                                              &new2->l_exp_hash);
555                 }
556                 if (*flags == LDLM_FL_WAIT_NOREPROC)
557                         ldlm_lock_addref_internal_nolock(new2,
558                                                          lock->l_granted_mode);
559
560                 /* insert new2 at lock */
561                 ldlm_resource_add_lock(res, ownlocks, new2);
562                 LDLM_LOCK_RELEASE(new2);
563                 break;
564         }
565
566         /* if new2 is created but never used, destroy it*/
567         if (splitted == 0 && new2 != NULL)
568                 ldlm_lock_destroy_nolock(new2);
569
570         /* At this point we're granting the lock request. */
571         req->l_granted_mode = req->l_req_mode;
572
573         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
574         if (!added) {
575                 list_del_init(&req->l_res_link);
576                 /* insert new lock before ownlocks in list. */
577                 ldlm_resource_add_lock(res, ownlocks, req);
578         }
579
580         if (*flags != LDLM_FL_WAIT_NOREPROC) {
581 #ifdef HAVE_SERVER_SUPPORT
582                 if (first_enq) {
583                         /* If this is an unlock, reprocess the waitq and
584                          * send completions ASTs for locks that can now be
585                          * granted. The only problem with doing this
586                          * reprocessing here is that the completion ASTs for
587                          * newly granted locks will be sent before the unlock
588                          * completion is sent. It shouldn't be an issue. Also
589                          * note that ldlm_process_flock_lock() will recurse,
590                          * but only once because first_enq will be false from
591                          * ldlm_reprocess_queue. */
592                         if ((mode == LCK_NL) && overlaps) {
593                                 struct list_head rpc_list;
594                                 int rc;
595
596                                 INIT_LIST_HEAD(&rpc_list);
597 restart:
598                                 ldlm_reprocess_queue(res, &res->lr_waiting,
599                                                      &rpc_list);
600
601                                 unlock_res_and_lock(req);
602                                 rc = ldlm_run_ast_work(ns, &rpc_list,
603                                                        LDLM_WORK_CP_AST);
604                                 lock_res_and_lock(req);
605                                 if (rc == -ERESTART)
606                                         GOTO(restart, rc);
607                        }
608                 } else {
609                         LASSERT(req->l_completion_ast);
610                         ldlm_add_ast_work_item(req, NULL, work_list);
611                 }
612 #else /* !HAVE_SERVER_SUPPORT */
613                 /* The only one possible case for client-side calls flock
614                  * policy function is ldlm_flock_completion_ast inside which
615                  * carries LDLM_FL_WAIT_NOREPROC flag. */
616                 CERROR("Illegal parameter for client-side-only module.\n");
617                 LBUG();
618 #endif /* HAVE_SERVER_SUPPORT */
619         }
620
621         /* In case we're reprocessing the requested lock we can't destroy
622          * it until after calling ldlm_add_ast_work_item() above so that laawi()
623          * can bump the reference count on \a req. Otherwise \a req
624          * could be freed before the completion AST can be sent.  */
625         if (added)
626                 ldlm_flock_destroy(req, mode, *flags);
627
628         ldlm_resource_dump(D_INFO, res);
629         RETURN(LDLM_ITER_CONTINUE);
630 }
631
632 struct ldlm_flock_wait_data {
633         struct ldlm_lock *fwd_lock;
634         int               fwd_generation;
635 };
636
637 static void
638 ldlm_flock_interrupted_wait(void *data)
639 {
640         struct ldlm_lock *lock;
641         ENTRY;
642
643         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
644
645         /* take lock off the deadlock detection hash list. */
646         lock_res_and_lock(lock);
647         ldlm_flock_blocking_unlink(lock);
648
649         /* client side - set flag to prevent lock from being put on LRU list */
650         ldlm_set_cbpending(lock);
651         unlock_res_and_lock(lock);
652
653         EXIT;
654 }
655
656 /**
657  * Flock completion callback function.
658  *
659  * \param lock [in,out]: A lock to be handled
660  * \param flags    [in]: flags
661  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
662  *
663  * \retval 0    : success
664  * \retval <0   : failure
665  */
666 int
667 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
668 {
669         struct file_lock                *getlk = lock->l_ast_data;
670         struct obd_device              *obd;
671         struct obd_import              *imp = NULL;
672         struct ldlm_flock_wait_data     fwd;
673         struct l_wait_info              lwi;
674         ldlm_error_t                    err;
675         int                             rc = 0;
676         ENTRY;
677
678         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
679         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
680                 lock_res_and_lock(lock);
681                 lock->l_flags |= LDLM_FL_FAIL_LOC;
682                 unlock_res_and_lock(lock);
683                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
684         }
685         CDEBUG(D_DLMTRACE, "flags: "LPX64" data: %p getlk: %p\n",
686                flags, data, getlk);
687
688         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
689
690         if (flags & LDLM_FL_FAILED)
691                 goto granted;
692
693         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
694                        LDLM_FL_BLOCK_CONV))) {
695                 if (NULL == data)
696                         /* mds granted the lock in the reply */
697                         goto granted;
698                 /* CP AST RPC: lock get granted, wake it up */
699                 wake_up(&lock->l_waitq);
700                 RETURN(0);
701         }
702
703         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
704                    "sleeping");
705         fwd.fwd_lock = lock;
706         obd = class_exp2obd(lock->l_conn_export);
707
708         /* if this is a local lock, there is no import */
709         if (NULL != obd)
710                 imp = obd->u.cli.cl_import;
711
712         if (NULL != imp) {
713                 spin_lock(&imp->imp_lock);
714                 fwd.fwd_generation = imp->imp_generation;
715                 spin_unlock(&imp->imp_lock);
716         }
717
718         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
719
720         /* Go to sleep until the lock is granted. */
721         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
722
723         if (rc) {
724                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
725                            rc);
726                 RETURN(rc);
727         }
728
729 granted:
730         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
731
732         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
733                 lock_res_and_lock(lock);
734                 /* DEADLOCK is always set with CBPENDING */
735                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
736                 unlock_res_and_lock(lock);
737                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
738         }
739         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
740                 lock_res_and_lock(lock);
741                 /* DEADLOCK is always set with CBPENDING */
742                 lock->l_flags |= LDLM_FL_FAIL_LOC |
743                                  LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
744                 unlock_res_and_lock(lock);
745                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
746         }
747
748         lock_res_and_lock(lock);
749
750
751         /* Protect against race where lock could have been just destroyed
752          * due to overlap in ldlm_process_flock_lock().
753          */
754         if (ldlm_is_destroyed(lock)) {
755                 unlock_res_and_lock(lock);
756                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
757
758                 /* An error is still to be returned, to propagate it up to
759                  * ldlm_cli_enqueue_fini() caller. */
760                 RETURN(-EIO);
761         }
762
763         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
764         ldlm_resource_unlink_lock(lock);
765
766         /* Import invalidation. We need to actually release the lock
767          * references being held, so that it can go away. No point in
768          * holding the lock even if app still believes it has it, since
769          * server already dropped it anyway. Only for granted locks too. */
770         /* Do the same for DEADLOCK'ed locks. */
771         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
772                 int mode;
773
774                 if (flags & LDLM_FL_TEST_LOCK)
775                         LASSERT(ldlm_is_test_lock(lock));
776
777                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
778                         mode = flock_type(getlk);
779                 else
780                         mode = lock->l_granted_mode;
781
782                 if (ldlm_is_flock_deadlock(lock)) {
783                         LDLM_DEBUG(lock, "client-side enqueue deadlock "
784                                    "received");
785                         rc = -EDEADLK;
786                 }
787                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
788                 unlock_res_and_lock(lock);
789
790                 /* Need to wake up the waiter if we were evicted */
791                 wake_up(&lock->l_waitq);
792
793                 /* An error is still to be returned, to propagate it up to
794                  * ldlm_cli_enqueue_fini() caller. */
795                 RETURN(rc ? : -EIO);
796         }
797
798         LDLM_DEBUG(lock, "client-side enqueue granted");
799
800         if (flags & LDLM_FL_TEST_LOCK) {
801                 /* fcntl(F_GETLK) request */
802                 /* The old mode was saved in getlk->fl_type so that if the mode
803                  * in the lock changes we can decref the appropriate refcount.*/
804                 LASSERT(ldlm_is_test_lock(lock));
805                 ldlm_flock_destroy(lock, flock_type(getlk),
806                                    LDLM_FL_WAIT_NOREPROC);
807                 switch (lock->l_granted_mode) {
808                 case LCK_PR:
809                         flock_set_type(getlk, F_RDLCK);
810                         break;
811                 case LCK_PW:
812                         flock_set_type(getlk, F_WRLCK);
813                         break;
814                 default:
815                         flock_set_type(getlk, F_UNLCK);
816                 }
817                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
818                 flock_set_start(getlk,
819                                 (loff_t)lock->l_policy_data.l_flock.start);
820                 flock_set_end(getlk,
821                               (loff_t)lock->l_policy_data.l_flock.end);
822         } else {
823                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
824
825                 /* We need to reprocess the lock to do merges or splits
826                  * with existing locks owned by this process. */
827                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
828         }
829         unlock_res_and_lock(lock);
830         RETURN(rc);
831 }
832 EXPORT_SYMBOL(ldlm_flock_completion_ast);
833
834 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
835                             void *data, int flag)
836 {
837         ENTRY;
838
839         LASSERT(lock);
840         LASSERT(flag == LDLM_CB_CANCELING);
841
842         /* take lock off the deadlock detection hash list. */
843         lock_res_and_lock(lock);
844         ldlm_flock_blocking_unlink(lock);
845         unlock_res_and_lock(lock);
846         RETURN(0);
847 }
848
849 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
850                                        ldlm_policy_data_t *lpolicy)
851 {
852         memset(lpolicy, 0, sizeof(*lpolicy));
853         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
854         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
855         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
856         /* Compat code, old clients had no idea about owner field and
857          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
858          * April 2011 */
859         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
860 }
861
862
863 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
864                                        ldlm_policy_data_t *lpolicy)
865 {
866         memset(lpolicy, 0, sizeof(*lpolicy));
867         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
868         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
869         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
870         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
871 }
872
873 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
874                                      ldlm_wire_policy_data_t *wpolicy)
875 {
876         memset(wpolicy, 0, sizeof(*wpolicy));
877         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
878         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
879         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
880         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
881 }
882
883 /*
884  * Export handle<->flock hash operations.
885  */
886 static unsigned
887 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
888 {
889         return cfs_hash_u64_hash(*(__u64 *)key, mask);
890 }
891
892 static void *
893 ldlm_export_flock_key(struct hlist_node *hnode)
894 {
895         struct ldlm_lock *lock;
896
897         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
898         return &lock->l_policy_data.l_flock.owner;
899 }
900
901 static int
902 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
903 {
904         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
905 }
906
907 static void *
908 ldlm_export_flock_object(struct hlist_node *hnode)
909 {
910         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
911 }
912
913 static void
914 ldlm_export_flock_get(cfs_hash_t *hs, struct hlist_node *hnode)
915 {
916         struct ldlm_lock *lock;
917         struct ldlm_flock *flock;
918
919         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
920         LDLM_LOCK_GET(lock);
921
922         flock = &lock->l_policy_data.l_flock;
923         LASSERT(flock->blocking_export != NULL);
924         class_export_get(flock->blocking_export);
925         flock->blocking_refs++;
926 }
927
928 static void
929 ldlm_export_flock_put(cfs_hash_t *hs, struct hlist_node *hnode)
930 {
931         struct ldlm_lock *lock;
932         struct ldlm_flock *flock;
933
934         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
935         LDLM_LOCK_RELEASE(lock);
936
937         flock = &lock->l_policy_data.l_flock;
938         LASSERT(flock->blocking_export != NULL);
939         class_export_put(flock->blocking_export);
940         if (--flock->blocking_refs == 0) {
941                 flock->blocking_owner = 0;
942                 flock->blocking_export = NULL;
943         }
944 }
945
946 static cfs_hash_ops_t ldlm_export_flock_ops = {
947         .hs_hash        = ldlm_export_flock_hash,
948         .hs_key         = ldlm_export_flock_key,
949         .hs_keycmp      = ldlm_export_flock_keycmp,
950         .hs_object      = ldlm_export_flock_object,
951         .hs_get         = ldlm_export_flock_get,
952         .hs_put         = ldlm_export_flock_put,
953         .hs_put_locked  = ldlm_export_flock_put,
954 };
955
956 int ldlm_init_flock_export(struct obd_export *exp)
957 {
958         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
959                 RETURN(0);
960
961         exp->exp_flock_hash =
962                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
963                                 HASH_EXP_LOCK_CUR_BITS,
964                                 HASH_EXP_LOCK_MAX_BITS,
965                                 HASH_EXP_LOCK_BKT_BITS, 0,
966                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
967                                 &ldlm_export_flock_ops,
968                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
969         if (!exp->exp_flock_hash)
970                 RETURN(-ENOMEM);
971
972         RETURN(0);
973 }
974 EXPORT_SYMBOL(ldlm_init_flock_export);
975
976 void ldlm_destroy_flock_export(struct obd_export *exp)
977 {
978         ENTRY;
979         if (exp->exp_flock_hash) {
980                 cfs_hash_putref(exp->exp_flock_hash);
981                 exp->exp_flock_hash = NULL;
982         }
983         EXIT;
984 }
985 EXPORT_SYMBOL(ldlm_destroy_flock_export);