Whamcloud - gitweb
LU-8347 ldlm: granting conflicting locks
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24  * Developed under the sponsorship of the US Government under
25  * Subcontract No. B514193
26  *
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 /**
38  * This file implements POSIX lock type for Lustre.
39  * Its policy properties are start and end of extent and PID.
40  *
41  * These locks are only done through MDS due to POSIX semantics requiring
42  * e.g. that locks could be only partially released and as such split into
43  * two parts, and also that two adjacent locks from the same process may be
44  * merged into a single wider lock.
45  *
46  * Lock modes are mapped like this:
47  * PR and PW for READ and WRITE locks
48  * NL to request a releasing of a portion of the lock
49  *
50  * These flock locks never timeout.
51  */
52
53 #define DEBUG_SUBSYSTEM S_LDLM
54
55 #include <linux/list.h>
56 #include <lustre_dlm.h>
57 #include <obd_support.h>
58 #include <obd_class.h>
59 #include <lustre_lib.h>
60
61 #include "ldlm_internal.h"
62
63 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
64                             void *data, int flag);
65
66 /**
67  * list_for_remaining_safe - iterate over the remaining entries in a list
68  *              and safeguard against removal of a list entry.
69  * \param pos   the &struct list_head to use as a loop counter. pos MUST
70  *              have been initialized prior to using it in this macro.
71  * \param n     another &struct list_head to use as temporary storage
72  * \param head  the head for your list.
73  */
74 #define list_for_remaining_safe(pos, n, head) \
75         for (n = pos->next; pos != (head); pos = n, n = pos->next)
76
77 static inline int
78 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
79 {
80         return((new->l_policy_data.l_flock.owner ==
81                 lock->l_policy_data.l_flock.owner) &&
82                (new->l_export == lock->l_export));
83 }
84
85 static inline int
86 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
87 {
88         return((new->l_policy_data.l_flock.start <=
89                 lock->l_policy_data.l_flock.end) &&
90                (new->l_policy_data.l_flock.end >=
91                 lock->l_policy_data.l_flock.start));
92 }
93
94 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
95                                             struct ldlm_lock *lock)
96 {
97         /* For server only */
98         if (req->l_export == NULL)
99                 return;
100
101         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
102
103         req->l_policy_data.l_flock.blocking_owner =
104                 lock->l_policy_data.l_flock.owner;
105         req->l_policy_data.l_flock.blocking_export =
106                 lock->l_export;
107         atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
108
109         cfs_hash_add(req->l_export->exp_flock_hash,
110                      &req->l_policy_data.l_flock.owner,
111                      &req->l_exp_flock_hash);
112 }
113
114 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
115 {
116         /* For server only */
117         if (req->l_export == NULL)
118                 return;
119
120         check_res_locked(req->l_resource);
121         if (req->l_export->exp_flock_hash != NULL &&
122             !hlist_unhashed(&req->l_exp_flock_hash))
123                 cfs_hash_del(req->l_export->exp_flock_hash,
124                              &req->l_policy_data.l_flock.owner,
125                              &req->l_exp_flock_hash);
126 }
127
128 static inline void
129 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
130 {
131         ENTRY;
132
133         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: %#llx)",
134                    mode, flags);
135
136         /* Safe to not lock here, since it should be empty anyway */
137         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
138
139         list_del_init(&lock->l_res_link);
140         if (flags == LDLM_FL_WAIT_NOREPROC) {
141                 /* client side - set a flag to prevent sending a CANCEL */
142                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
143
144                 /* when reaching here, it is under lock_res_and_lock(). Thus,
145                    need call the nolock version of ldlm_lock_decref_internal*/
146                 ldlm_lock_decref_internal_nolock(lock, mode);
147         }
148
149         ldlm_lock_destroy_nolock(lock);
150         EXIT;
151 }
152
153 /**
154  * POSIX locks deadlock detection code.
155  *
156  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
157  * with, we need to iterate through all blocked POSIX locks for this
158  * export and see if there is a deadlock condition arising. (i.e. when
159  * one client holds a lock on something and want a lock on something
160  * else and at the same time another client has the opposite situation).
161  */
162
163 struct ldlm_flock_lookup_cb_data {
164         __u64 *bl_owner;
165         struct ldlm_lock *lock;
166         struct obd_export *exp;
167 };
168
169 static int ldlm_flock_lookup_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
170                                 struct hlist_node *hnode, void *data)
171 {
172         struct ldlm_flock_lookup_cb_data *cb_data = data;
173         struct obd_export *exp = cfs_hash_object(hs, hnode);
174         struct ldlm_lock *lock;
175
176         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
177         if (lock == NULL)
178                 return 0;
179
180         /* Stop on first found lock. Same process can't sleep twice */
181         cb_data->lock = lock;
182         cb_data->exp = class_export_get(exp);
183
184         return 1;
185 }
186
187 static int
188 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
189 {
190         struct obd_export *req_exp = req->l_export;
191         struct obd_export *bl_exp = bl_lock->l_export;
192         __u64 req_owner = req->l_policy_data.l_flock.owner;
193         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
194
195         /* For server only */
196         if (req_exp == NULL)
197                 return 0;
198
199         class_export_get(bl_exp);
200         while (1) {
201                 struct ldlm_flock_lookup_cb_data cb_data = {
202                                         .bl_owner = &bl_owner,
203                                         .lock = NULL,
204                                         .exp = NULL };
205                 struct obd_export *bl_exp_new;
206                 struct ldlm_lock *lock = NULL;
207                 struct ldlm_flock *flock;
208
209                 if (bl_exp->exp_flock_hash != NULL) {
210                         cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
211                                 &bl_exp->exp_connection->c_peer.nid,
212                                 ldlm_flock_lookup_cb, &cb_data);
213                         lock = cb_data.lock;
214                 }
215                 if (lock == NULL)
216                         break;
217
218                 class_export_put(bl_exp);
219                 bl_exp = cb_data.exp;
220
221                 LASSERT(req != lock);
222                 flock = &lock->l_policy_data.l_flock;
223                 LASSERT(flock->owner == bl_owner);
224                 bl_owner = flock->blocking_owner;
225                 bl_exp_new = class_export_get(flock->blocking_export);
226                 class_export_put(bl_exp);
227
228                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
229                 bl_exp = bl_exp_new;
230
231                 if (bl_exp->exp_failed)
232                         break;
233
234                 if (bl_owner == req_owner &&
235                     (bl_exp->exp_connection->c_peer.nid ==
236                      req_exp->exp_connection->c_peer.nid)) {
237                         class_export_put(bl_exp);
238                         return 1;
239                 }
240         }
241         class_export_put(bl_exp);
242
243         return 0;
244 }
245
246 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
247                                           struct list_head *work_list)
248 {
249         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
250
251         if ((exp_connect_flags(lock->l_export) &
252                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
253                 CERROR("deadlock found, but client doesn't "
254                                 "support flock canceliation\n");
255         } else {
256                 LASSERT(lock->l_completion_ast);
257                 LASSERT(!ldlm_is_ast_sent(lock));
258                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
259                         LDLM_FL_FLOCK_DEADLOCK;
260                 ldlm_flock_blocking_unlink(lock);
261                 ldlm_resource_unlink_lock(lock);
262                 ldlm_add_ast_work_item(lock, NULL, work_list);
263         }
264 }
265
266 /**
267  * Process a granting attempt for flock lock.
268  * Must be called under ns lock held.
269  *
270  * This function looks for any conflicts for \a lock in the granted or
271  * waiting queues. The lock is granted if no conflicts are found in
272  * either queue.
273  *
274  * It is also responsible for splitting a lock if a portion of the lock
275  * is released.
276  *
277  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
278  *   - blocking ASTs have already been sent
279  *
280  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
281  *   - blocking ASTs have not been sent yet, so list of conflicting locks
282  *     would be collected and ASTs sent.
283  */
284 int
285 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
286                         enum ldlm_error *err, struct list_head *work_list)
287 {
288         struct ldlm_resource *res = req->l_resource;
289         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
290         struct list_head *tmp;
291         struct list_head *ownlocks = NULL;
292         struct ldlm_lock *lock = NULL;
293         struct ldlm_lock *new = req;
294         struct ldlm_lock *new2 = NULL;
295         enum ldlm_mode mode = req->l_req_mode;
296         int local = ns_is_client(ns);
297         int added = (mode == LCK_NL);
298         int overlaps = 0;
299         int splitted = 0;
300         const struct ldlm_callback_suite null_cbs = { NULL };
301         ENTRY;
302
303         CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
304                "%llu end %llu\n", *flags,
305                new->l_policy_data.l_flock.owner,
306                new->l_policy_data.l_flock.pid, mode,
307                req->l_policy_data.l_flock.start,
308                req->l_policy_data.l_flock.end);
309
310         *err = ELDLM_OK;
311
312         if (local) {
313                 /* No blocking ASTs are sent to the clients for
314                  * Posix file & record locks */
315                 req->l_blocking_ast = NULL;
316         } else {
317                 /* Called on the server for lock cancels. */
318                 req->l_blocking_ast = ldlm_flock_blocking_ast;
319         }
320
321 reprocess:
322         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
323                 /* This loop determines where this processes locks start
324                  * in the resource lr_granted list. */
325                 list_for_each(tmp, &res->lr_granted) {
326                         lock = list_entry(tmp, struct ldlm_lock,
327                                               l_res_link);
328                         if (ldlm_same_flock_owner(lock, req)) {
329                                 ownlocks = tmp;
330                                 break;
331                         }
332                 }
333         } else {
334                 int reprocess_failed = 0;
335                 lockmode_verify(mode);
336
337                 /* This loop determines if there are existing locks
338                  * that conflict with the new lock request. */
339                 list_for_each(tmp, &res->lr_granted) {
340                         lock = list_entry(tmp, struct ldlm_lock,
341                                               l_res_link);
342
343                         if (ldlm_same_flock_owner(lock, req)) {
344                                 if (!ownlocks)
345                                         ownlocks = tmp;
346                                 continue;
347                         }
348
349                         /* locks are compatible, overlap doesn't matter */
350                         if (lockmode_compat(lock->l_granted_mode, mode))
351                                 continue;
352
353                         if (!ldlm_flocks_overlap(lock, req))
354                                 continue;
355
356                         if (!first_enq) {
357                                 reprocess_failed = 1;
358                                 if (ldlm_flock_deadlock(req, lock)) {
359                                         ldlm_flock_cancel_on_deadlock(req,
360                                                         work_list);
361                                         RETURN(LDLM_ITER_CONTINUE);
362                                 }
363                                 continue;
364                         }
365
366                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
367                                 ldlm_flock_destroy(req, mode, *flags);
368                                 *err = -EAGAIN;
369                                 RETURN(LDLM_ITER_STOP);
370                         }
371
372                         if (*flags & LDLM_FL_TEST_LOCK) {
373                                 ldlm_flock_destroy(req, mode, *flags);
374                                 req->l_req_mode = lock->l_granted_mode;
375                                 req->l_policy_data.l_flock.pid =
376                                         lock->l_policy_data.l_flock.pid;
377                                 req->l_policy_data.l_flock.start =
378                                         lock->l_policy_data.l_flock.start;
379                                 req->l_policy_data.l_flock.end =
380                                         lock->l_policy_data.l_flock.end;
381                                 *flags |= LDLM_FL_LOCK_CHANGED;
382                                 RETURN(LDLM_ITER_STOP);
383                         }
384
385                         /* add lock to blocking list before deadlock
386                          * check to prevent race */
387                         ldlm_flock_blocking_link(req, lock);
388
389                         if (ldlm_flock_deadlock(req, lock)) {
390                                 ldlm_flock_blocking_unlink(req);
391                                 ldlm_flock_destroy(req, mode, *flags);
392                                 *err = -EDEADLK;
393                                 RETURN(LDLM_ITER_STOP);
394                         }
395
396                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
397                         *flags |= LDLM_FL_BLOCK_GRANTED;
398                         RETURN(LDLM_ITER_STOP);
399                 }
400                 if (reprocess_failed)
401                         RETURN(LDLM_ITER_CONTINUE);
402         }
403
404         if (*flags & LDLM_FL_TEST_LOCK) {
405                 ldlm_flock_destroy(req, mode, *flags);
406                 req->l_req_mode = LCK_NL;
407                 *flags |= LDLM_FL_LOCK_CHANGED;
408                 RETURN(LDLM_ITER_STOP);
409         }
410
411         /* In case we had slept on this lock request take it off of the
412          * deadlock detection hash list. */
413         ldlm_flock_blocking_unlink(req);
414
415         /* Scan the locks owned by this process that overlap this request.
416          * We may have to merge or split existing locks. */
417
418         if (!ownlocks)
419                 ownlocks = &res->lr_granted;
420
421         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
422                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
423
424                 if (!ldlm_same_flock_owner(lock, new))
425                         break;
426
427                 if (lock->l_granted_mode == mode) {
428                         /* If the modes are the same then we need to process
429                          * locks that overlap OR adjoin the new lock. The extra
430                          * logic condition is necessary to deal with arithmetic
431                          * overflow and underflow. */
432                         if ((new->l_policy_data.l_flock.start >
433                              (lock->l_policy_data.l_flock.end + 1))
434                             && (lock->l_policy_data.l_flock.end !=
435                                 OBD_OBJECT_EOF))
436                                 continue;
437
438                         if ((new->l_policy_data.l_flock.end <
439                              (lock->l_policy_data.l_flock.start - 1))
440                             && (lock->l_policy_data.l_flock.start != 0))
441                                 break;
442
443                         if (new->l_policy_data.l_flock.start <
444                             lock->l_policy_data.l_flock.start) {
445                                 lock->l_policy_data.l_flock.start =
446                                         new->l_policy_data.l_flock.start;
447                         } else {
448                                 new->l_policy_data.l_flock.start =
449                                         lock->l_policy_data.l_flock.start;
450                         }
451
452                         if (new->l_policy_data.l_flock.end >
453                             lock->l_policy_data.l_flock.end) {
454                                 lock->l_policy_data.l_flock.end =
455                                         new->l_policy_data.l_flock.end;
456                         } else {
457                                 new->l_policy_data.l_flock.end =
458                                         lock->l_policy_data.l_flock.end;
459                         }
460
461                         if (added) {
462                                 ldlm_flock_destroy(lock, mode, *flags);
463                         } else {
464                                 new = lock;
465                                 added = 1;
466                         }
467                         continue;
468                 }
469
470                 if (new->l_policy_data.l_flock.start >
471                     lock->l_policy_data.l_flock.end)
472                         continue;
473
474                 if (new->l_policy_data.l_flock.end <
475                     lock->l_policy_data.l_flock.start)
476                         break;
477
478                 ++overlaps;
479
480                 if (new->l_policy_data.l_flock.start <=
481                     lock->l_policy_data.l_flock.start) {
482                         if (new->l_policy_data.l_flock.end <
483                             lock->l_policy_data.l_flock.end) {
484                                 lock->l_policy_data.l_flock.start =
485                                         new->l_policy_data.l_flock.end + 1;
486                                 break;
487                         }
488                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
489                         continue;
490                 }
491                 if (new->l_policy_data.l_flock.end >=
492                     lock->l_policy_data.l_flock.end) {
493                         lock->l_policy_data.l_flock.end =
494                                 new->l_policy_data.l_flock.start - 1;
495                         continue;
496                 }
497
498                 /* split the existing lock into two locks */
499
500                 /* if this is an F_UNLCK operation then we could avoid
501                  * allocating a new lock and use the req lock passed in
502                  * with the request but this would complicate the reply
503                  * processing since updates to req get reflected in the
504                  * reply. The client side replays the lock request so
505                  * it must see the original lock data in the reply. */
506
507                 /* XXX - if ldlm_lock_new() can sleep we should
508                  * release the lr_lock, allocate the new lock,
509                  * and restart processing this lock. */
510                 if (new2 == NULL) {
511                         unlock_res_and_lock(req);
512                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
513                                                 lock->l_granted_mode, &null_cbs,
514                                                 NULL, 0, LVB_T_NONE);
515                         lock_res_and_lock(req);
516                         if (IS_ERR(new2)) {
517                                 ldlm_flock_destroy(req, lock->l_granted_mode,
518                                                    *flags);
519                                 *err = PTR_ERR(new2);
520                                 RETURN(LDLM_ITER_STOP);
521                         }
522                         goto reprocess;
523                 }
524
525                 splitted = 1;
526
527                 new2->l_granted_mode = lock->l_granted_mode;
528                 new2->l_policy_data.l_flock.pid =
529                         new->l_policy_data.l_flock.pid;
530                 new2->l_policy_data.l_flock.owner =
531                         new->l_policy_data.l_flock.owner;
532                 new2->l_policy_data.l_flock.start =
533                         lock->l_policy_data.l_flock.start;
534                 new2->l_policy_data.l_flock.end =
535                         new->l_policy_data.l_flock.start - 1;
536                 lock->l_policy_data.l_flock.start =
537                         new->l_policy_data.l_flock.end + 1;
538                 new2->l_conn_export = lock->l_conn_export;
539                 if (lock->l_export != NULL) {
540                         new2->l_export = class_export_lock_get(lock->l_export, new2);
541                         if (new2->l_export->exp_lock_hash &&
542                             hlist_unhashed(&new2->l_exp_hash))
543                                 cfs_hash_add(new2->l_export->exp_lock_hash,
544                                              &new2->l_remote_handle,
545                                              &new2->l_exp_hash);
546                 }
547                 if (*flags == LDLM_FL_WAIT_NOREPROC)
548                         ldlm_lock_addref_internal_nolock(new2,
549                                                          lock->l_granted_mode);
550
551                 /* insert new2 at lock */
552                 ldlm_resource_add_lock(res, ownlocks, new2);
553                 LDLM_LOCK_RELEASE(new2);
554                 break;
555         }
556
557         /* if new2 is created but never used, destroy it*/
558         if (splitted == 0 && new2 != NULL)
559                 ldlm_lock_destroy_nolock(new2);
560
561         /* At this point we're granting the lock request. */
562         req->l_granted_mode = req->l_req_mode;
563
564         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
565         if (!added) {
566                 list_del_init(&req->l_res_link);
567                 /* insert new lock before ownlocks in list. */
568                 ldlm_resource_add_lock(res, ownlocks, req);
569         }
570
571         if (*flags != LDLM_FL_WAIT_NOREPROC) {
572 #ifdef HAVE_SERVER_SUPPORT
573                 if (first_enq) {
574                         /* If this is an unlock, reprocess the waitq and
575                          * send completions ASTs for locks that can now be
576                          * granted. The only problem with doing this
577                          * reprocessing here is that the completion ASTs for
578                          * newly granted locks will be sent before the unlock
579                          * completion is sent. It shouldn't be an issue. Also
580                          * note that ldlm_process_flock_lock() will recurse,
581                          * but only once because first_enq will be false from
582                          * ldlm_reprocess_queue. */
583                         if ((mode == LCK_NL) && overlaps) {
584                                 struct list_head rpc_list;
585                                 int rc;
586
587                                 INIT_LIST_HEAD(&rpc_list);
588 restart:
589                                 ldlm_reprocess_queue(res, &res->lr_waiting,
590                                                      &rpc_list);
591
592                                 unlock_res_and_lock(req);
593                                 rc = ldlm_run_ast_work(ns, &rpc_list,
594                                                        LDLM_WORK_CP_AST);
595                                 lock_res_and_lock(req);
596                                 if (rc == -ERESTART)
597                                         GOTO(restart, rc);
598                        }
599                 } else {
600                         LASSERT(req->l_completion_ast);
601                         ldlm_add_ast_work_item(req, NULL, work_list);
602                 }
603 #else /* !HAVE_SERVER_SUPPORT */
604                 /* The only one possible case for client-side calls flock
605                  * policy function is ldlm_flock_completion_ast inside which
606                  * carries LDLM_FL_WAIT_NOREPROC flag. */
607                 CERROR("Illegal parameter for client-side-only module.\n");
608                 LBUG();
609 #endif /* HAVE_SERVER_SUPPORT */
610         }
611
612         /* In case we're reprocessing the requested lock we can't destroy
613          * it until after calling ldlm_add_ast_work_item() above so that laawi()
614          * can bump the reference count on \a req. Otherwise \a req
615          * could be freed before the completion AST can be sent.  */
616         if (added)
617                 ldlm_flock_destroy(req, mode, *flags);
618
619         ldlm_resource_dump(D_INFO, res);
620         RETURN(LDLM_ITER_CONTINUE);
621 }
622
623 struct ldlm_flock_wait_data {
624         struct ldlm_lock *fwd_lock;
625         int               fwd_generation;
626 };
627
628 static void
629 ldlm_flock_interrupted_wait(void *data)
630 {
631         struct ldlm_lock *lock;
632         ENTRY;
633
634         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
635
636         /* take lock off the deadlock detection hash list. */
637         lock_res_and_lock(lock);
638         ldlm_flock_blocking_unlink(lock);
639
640         /* client side - set flag to prevent lock from being put on LRU list */
641         ldlm_set_cbpending(lock);
642         unlock_res_and_lock(lock);
643
644         EXIT;
645 }
646
647 /**
648  * Flock completion callback function.
649  *
650  * \param lock [in,out]: A lock to be handled
651  * \param flags    [in]: flags
652  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
653  *
654  * \retval 0    : success
655  * \retval <0   : failure
656  */
657 int
658 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
659 {
660         struct file_lock *getlk = lock->l_ast_data;
661         struct obd_device *obd;
662         struct obd_import *imp = NULL;
663         struct ldlm_flock_wait_data fwd;
664         struct l_wait_info lwi;
665         enum ldlm_error err;
666         int rc = 0;
667         ENTRY;
668
669         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
670         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
671                 lock_res_and_lock(lock);
672                 lock->l_flags |= LDLM_FL_FAIL_LOC;
673                 unlock_res_and_lock(lock);
674                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
675         }
676         CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
677                flags, data, getlk);
678
679         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
680
681         if (flags & LDLM_FL_FAILED)
682                 goto granted;
683
684         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
685                 if (NULL == data)
686                         /* mds granted the lock in the reply */
687                         goto granted;
688                 /* CP AST RPC: lock get granted, wake it up */
689                 wake_up(&lock->l_waitq);
690                 RETURN(0);
691         }
692
693         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
694                    "sleeping");
695         fwd.fwd_lock = lock;
696         obd = class_exp2obd(lock->l_conn_export);
697
698         /* if this is a local lock, there is no import */
699         if (NULL != obd)
700                 imp = obd->u.cli.cl_import;
701
702         if (NULL != imp) {
703                 spin_lock(&imp->imp_lock);
704                 fwd.fwd_generation = imp->imp_generation;
705                 spin_unlock(&imp->imp_lock);
706         }
707
708         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
709
710         /* Go to sleep until the lock is granted. */
711         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
712
713         if (rc) {
714                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
715                            rc);
716                 RETURN(rc);
717         }
718
719 granted:
720         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
721
722         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
723                 lock_res_and_lock(lock);
724                 /* DEADLOCK is always set with CBPENDING */
725                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
726                 unlock_res_and_lock(lock);
727                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
728         }
729         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
730                 lock_res_and_lock(lock);
731                 /* DEADLOCK is always set with CBPENDING */
732                 lock->l_flags |= LDLM_FL_FAIL_LOC |
733                                  LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
734                 unlock_res_and_lock(lock);
735                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
736         }
737
738         lock_res_and_lock(lock);
739
740
741         /* Protect against race where lock could have been just destroyed
742          * due to overlap in ldlm_process_flock_lock().
743          */
744         if (ldlm_is_destroyed(lock)) {
745                 unlock_res_and_lock(lock);
746                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
747
748                 /* An error is still to be returned, to propagate it up to
749                  * ldlm_cli_enqueue_fini() caller. */
750                 RETURN(-EIO);
751         }
752
753         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
754         ldlm_resource_unlink_lock(lock);
755
756         /* Import invalidation. We need to actually release the lock
757          * references being held, so that it can go away. No point in
758          * holding the lock even if app still believes it has it, since
759          * server already dropped it anyway. Only for granted locks too. */
760         /* Do the same for DEADLOCK'ed locks. */
761         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
762                 int mode;
763
764                 if (flags & LDLM_FL_TEST_LOCK)
765                         LASSERT(ldlm_is_test_lock(lock));
766
767                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
768                         mode = flock_type(getlk);
769                 else
770                         mode = lock->l_granted_mode;
771
772                 if (ldlm_is_flock_deadlock(lock)) {
773                         LDLM_DEBUG(lock, "client-side enqueue deadlock "
774                                    "received");
775                         rc = -EDEADLK;
776                 }
777                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
778                 unlock_res_and_lock(lock);
779
780                 /* Need to wake up the waiter if we were evicted */
781                 wake_up(&lock->l_waitq);
782
783                 /* An error is still to be returned, to propagate it up to
784                  * ldlm_cli_enqueue_fini() caller. */
785                 RETURN(rc ? : -EIO);
786         }
787
788         LDLM_DEBUG(lock, "client-side enqueue granted");
789
790         if (flags & LDLM_FL_TEST_LOCK) {
791                 /* fcntl(F_GETLK) request */
792                 /* The old mode was saved in getlk->fl_type so that if the mode
793                  * in the lock changes we can decref the appropriate refcount.*/
794                 LASSERT(ldlm_is_test_lock(lock));
795                 ldlm_flock_destroy(lock, flock_type(getlk),
796                                    LDLM_FL_WAIT_NOREPROC);
797                 switch (lock->l_granted_mode) {
798                 case LCK_PR:
799                         flock_set_type(getlk, F_RDLCK);
800                         break;
801                 case LCK_PW:
802                         flock_set_type(getlk, F_WRLCK);
803                         break;
804                 default:
805                         flock_set_type(getlk, F_UNLCK);
806                 }
807                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
808                 flock_set_start(getlk,
809                                 (loff_t)lock->l_policy_data.l_flock.start);
810                 flock_set_end(getlk,
811                               (loff_t)lock->l_policy_data.l_flock.end);
812         } else {
813                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
814
815                 /* We need to reprocess the lock to do merges or splits
816                  * with existing locks owned by this process. */
817                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
818         }
819         unlock_res_and_lock(lock);
820         RETURN(rc);
821 }
822 EXPORT_SYMBOL(ldlm_flock_completion_ast);
823
824 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
825                             void *data, int flag)
826 {
827         ENTRY;
828
829         LASSERT(lock);
830         LASSERT(flag == LDLM_CB_CANCELING);
831
832         /* take lock off the deadlock detection hash list. */
833         lock_res_and_lock(lock);
834         ldlm_flock_blocking_unlink(lock);
835         unlock_res_and_lock(lock);
836         RETURN(0);
837 }
838
839 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
840                                      union ldlm_policy_data *lpolicy)
841 {
842         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
843         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
844         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
845         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
846 }
847
848 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
849                                      union ldlm_wire_policy_data *wpolicy)
850 {
851         memset(wpolicy, 0, sizeof(*wpolicy));
852         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
853         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
854         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
855         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
856 }
857
858 /*
859  * Export handle<->flock hash operations.
860  */
861 static unsigned
862 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
863 {
864         return cfs_hash_u64_hash(*(__u64 *)key, mask);
865 }
866
867 static void *
868 ldlm_export_flock_key(struct hlist_node *hnode)
869 {
870         struct ldlm_lock *lock;
871
872         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
873         return &lock->l_policy_data.l_flock.owner;
874 }
875
876 static int
877 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
878 {
879         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
880 }
881
882 static void *
883 ldlm_export_flock_object(struct hlist_node *hnode)
884 {
885         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
886 }
887
888 static void
889 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
890 {
891         struct ldlm_lock *lock;
892         struct ldlm_flock *flock;
893
894         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
895         LDLM_LOCK_GET(lock);
896
897         flock = &lock->l_policy_data.l_flock;
898         LASSERT(flock->blocking_export != NULL);
899         class_export_get(flock->blocking_export);
900         atomic_inc(&flock->blocking_refs);
901 }
902
903 static void
904 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
905 {
906         struct ldlm_lock *lock;
907         struct ldlm_flock *flock;
908
909         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
910
911         flock = &lock->l_policy_data.l_flock;
912         LASSERT(flock->blocking_export != NULL);
913         class_export_put(flock->blocking_export);
914         if (atomic_dec_and_test(&flock->blocking_refs)) {
915                 flock->blocking_owner = 0;
916                 flock->blocking_export = NULL;
917         }
918         LDLM_LOCK_RELEASE(lock);
919 }
920
921 static struct cfs_hash_ops ldlm_export_flock_ops = {
922         .hs_hash        = ldlm_export_flock_hash,
923         .hs_key         = ldlm_export_flock_key,
924         .hs_keycmp      = ldlm_export_flock_keycmp,
925         .hs_object      = ldlm_export_flock_object,
926         .hs_get         = ldlm_export_flock_get,
927         .hs_put         = ldlm_export_flock_put,
928         .hs_put_locked  = ldlm_export_flock_put,
929 };
930
931 int ldlm_init_flock_export(struct obd_export *exp)
932 {
933         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
934                 RETURN(0);
935
936         exp->exp_flock_hash =
937                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
938                                 HASH_EXP_LOCK_CUR_BITS,
939                                 HASH_EXP_LOCK_MAX_BITS,
940                                 HASH_EXP_LOCK_BKT_BITS, 0,
941                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
942                                 &ldlm_export_flock_ops,
943                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
944         if (!exp->exp_flock_hash)
945                 RETURN(-ENOMEM);
946
947         RETURN(0);
948 }
949
950 void ldlm_destroy_flock_export(struct obd_export *exp)
951 {
952         ENTRY;
953         if (exp->exp_flock_hash) {
954                 cfs_hash_putref(exp->exp_flock_hash);
955                 exp->exp_flock_hash = NULL;
956         }
957         EXIT;
958 }