Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24  * Developed under the sponsorship of the US Government under
25  * Subcontract No. B514193
26  *
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2017, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  */
35
36 /**
37  * This file implements POSIX lock type for Lustre.
38  * Its policy properties are start and end of extent and PID.
39  *
40  * These locks are only done through MDS due to POSIX semantics requiring
41  * e.g. that locks could be only partially released and as such split into
42  * two parts, and also that two adjacent locks from the same process may be
43  * merged into a single wider lock.
44  *
45  * Lock modes are mapped like this:
46  * PR and PW for READ and WRITE locks
47  * NL to request a releasing of a portion of the lock
48  *
49  * These flock locks never timeout.
50  */
51
52 #define DEBUG_SUBSYSTEM S_LDLM
53
54 #include <linux/list.h>
55 #ifdef HAVE_LINUX_FILELOCK_HEADER
56 #include <linux/filelock.h>
57 #endif
58 #include <lustre_dlm.h>
59 #include <obd_support.h>
60 #include <obd_class.h>
61 #include <lustre_lib.h>
62
63 #include "ldlm_internal.h"
64
65 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
66                             void *data, int flag);
67
68 static inline int
69 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
70 {
71         return ((new->l_policy_data.l_flock.owner ==
72                  lock->l_policy_data.l_flock.owner) &&
73                 (new->l_export == lock->l_export));
74 }
75
76 static inline int
77 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
78 {
79         return ((new->l_policy_data.l_flock.start <=
80                  lock->l_policy_data.l_flock.end) &&
81                 (new->l_policy_data.l_flock.end >=
82                  lock->l_policy_data.l_flock.start));
83 }
84
85 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
86                                             struct ldlm_lock *lock)
87 {
88         /* For server only */
89         if (req->l_export == NULL)
90                 return;
91
92         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
93
94         req->l_policy_data.l_flock.blocking_owner =
95                 lock->l_policy_data.l_flock.owner;
96         req->l_policy_data.l_flock.blocking_export =
97                 lock->l_export;
98         atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
99
100         cfs_hash_add(req->l_export->exp_flock_hash,
101                      &req->l_policy_data.l_flock.owner,
102                      &req->l_exp_flock_hash);
103 }
104
105 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
106 {
107         /* For server only */
108         if (req->l_export == NULL)
109                 return;
110
111         check_res_locked(req->l_resource);
112         if (req->l_export->exp_flock_hash != NULL &&
113             !hlist_unhashed(&req->l_exp_flock_hash))
114                 cfs_hash_del(req->l_export->exp_flock_hash,
115                              &req->l_policy_data.l_flock.owner,
116                              &req->l_exp_flock_hash);
117 }
118
119 /** Remove cancelled lock from resource interval tree. */
120 void ldlm_flock_unlink_lock(struct ldlm_lock *lock)
121 {
122         struct ldlm_resource *res = lock->l_resource;
123         struct ldlm_interval *node = lock->l_tree_node;
124
125         if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
126                 return;
127
128         node = ldlm_interval_detach(lock);
129         if (node) {
130                 struct interval_node **root = &res->lr_flock_node.lfn_root;
131
132                 interval_erase(&node->li_node, root);
133                 ldlm_interval_free(node);
134         }
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "%s(mode: %d, flags: %#llx)", __func__, mode, flags);
143
144         /* Safe to not lock here, since it should be empty anyway */
145         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
146
147         list_del_init(&lock->l_res_link);
148         if (flags == LDLM_FL_WAIT_NOREPROC) {
149                 /* client side - set a flag to prevent sending a CANCEL */
150                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
151
152                 /* when reaching here, it is under lock_res_and_lock(). Thus,
153                  * need call the nolock version of ldlm_lock_decref_internal
154                  */
155                 ldlm_lock_decref_internal_nolock(lock, mode);
156         }
157         ldlm_flock_unlink_lock(lock);
158
159         ldlm_lock_destroy_nolock(lock);
160         EXIT;
161 }
162
163 #ifdef HAVE_SERVER_SUPPORT
164 /**
165  * POSIX locks deadlock detection code.
166  *
167  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
168  * with, we need to iterate through all blocked POSIX locks for this
169  * export and see if there is a deadlock condition arising. (i.e. when
170  * one client holds a lock on something and want a lock on something
171  * else and at the same time another client has the opposite situation).
172  */
173 struct ldlm_flock_lookup_cb_data {
174         __u64 *bl_owner;
175         struct ldlm_lock *lock;
176         struct obd_export *exp;
177 };
178
179 static int ldlm_flock_lookup_cb(struct obd_export *exp, void *data)
180 {
181         struct ldlm_flock_lookup_cb_data *cb_data = data;
182         struct ldlm_lock *lock;
183
184         if (exp->exp_failed)
185                 return 0;
186
187         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
188         if (lock == NULL)
189                 return 0;
190
191         /* Stop on first found lock. Same process can't sleep twice */
192         cb_data->lock = lock;
193         cb_data->exp = class_export_get(exp);
194
195         return 1;
196 }
197
198 static int
199 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
200 {
201         struct obd_export *req_exp = req->l_export;
202         struct obd_export *bl_exp = bl_lock->l_export;
203         __u64 req_owner = req->l_policy_data.l_flock.owner;
204         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
205
206         /* For server only */
207         if (req_exp == NULL)
208                 return 0;
209
210         class_export_get(bl_exp);
211         while (1) {
212                 struct ldlm_flock_lookup_cb_data cb_data = {
213                         .bl_owner = &bl_owner,
214                         .lock = NULL,
215                         .exp = NULL,
216                 };
217                 struct ptlrpc_connection *bl_exp_conn;
218                 struct obd_export *bl_exp_new;
219                 struct ldlm_lock *lock = NULL;
220                 struct ldlm_flock *flock;
221
222                 bl_exp_conn = bl_exp->exp_connection;
223                 if (bl_exp->exp_flock_hash != NULL) {
224                         int found;
225
226                         found = obd_nid_export_for_each(bl_exp->exp_obd,
227                                                         &bl_exp_conn->c_peer.nid,
228                                                         ldlm_flock_lookup_cb,
229                                                         &cb_data);
230                         if (found)
231                                 lock = cb_data.lock;
232                 }
233                 if (lock == NULL)
234                         break;
235
236                 class_export_put(bl_exp);
237                 bl_exp = cb_data.exp;
238
239                 LASSERT(req != lock);
240                 flock = &lock->l_policy_data.l_flock;
241                 LASSERT(flock->owner == bl_owner);
242                 bl_owner = flock->blocking_owner;
243                 bl_exp_new = class_export_get(flock->blocking_export);
244                 class_export_put(bl_exp);
245
246                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
247                 bl_exp = bl_exp_new;
248
249                 if (bl_exp->exp_failed)
250                         break;
251
252                 if (bl_owner == req_owner &&
253                     nid_same(&bl_exp_conn->c_peer.nid,
254                               &req_exp->exp_connection->c_peer.nid)) {
255                         class_export_put(bl_exp);
256                         return 1;
257                 }
258         }
259         class_export_put(bl_exp);
260
261         return 0;
262 }
263
264 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
265                                           struct list_head *work_list)
266 {
267         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
268
269         if ((exp_connect_flags(lock->l_export) &
270              OBD_CONNECT_FLOCK_DEAD) == 0) {
271                 CERROR("deadlock found, but client doesn't support flock canceliation\n");
272         } else {
273                 LASSERT(lock->l_completion_ast);
274                 LASSERT(!ldlm_is_ast_sent(lock));
275                 lock->l_flags |= (LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
276                                   LDLM_FL_FLOCK_DEADLOCK);
277                 ldlm_flock_blocking_unlink(lock);
278                 ldlm_resource_unlink_lock(lock);
279                 ldlm_add_ast_work_item(lock, NULL, work_list);
280         }
281 }
282 #endif /* HAVE_SERVER_SUPPORT */
283
284 /** Add newly granted lock into interval tree for the resource. */
285 static void ldlm_flock_add_lock(struct ldlm_resource *res,
286                                 struct list_head *head,
287                                 struct ldlm_lock *lock)
288 {
289         struct interval_node *found, **root;
290         struct ldlm_interval *node = lock->l_tree_node;
291         struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
292         int rc;
293
294         LASSERT(ldlm_is_granted(lock));
295
296         LASSERT(node != NULL);
297         LASSERT(!interval_is_intree(&node->li_node));
298
299         rc = interval_set(&node->li_node, extent->start, extent->end);
300         LASSERT(!rc);
301
302         root = &res->lr_flock_node.lfn_root;
303         found = interval_insert(&node->li_node, root);
304         if (found) { /* The same extent found. */
305                 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
306
307                 LASSERT(tmp != NULL);
308                 ldlm_interval_free(tmp);
309                 ldlm_interval_attach(to_ldlm_interval(found), lock);
310         }
311
312         /* Add the locks into list */
313         ldlm_resource_add_lock(res, head, lock);
314 }
315
316 static void
317 ldlm_flock_range_update(struct ldlm_lock *lock, struct ldlm_lock *req)
318 {
319         struct ldlm_resource *res = lock->l_resource;
320         struct interval_node *found, **root = &res->lr_flock_node.lfn_root;
321         struct ldlm_interval *node;
322         struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
323
324         node = ldlm_interval_detach(lock);
325         if (!node) {
326                 node = ldlm_interval_detach(req);
327                 LASSERT(node);
328         } else {
329                 interval_erase(&node->li_node, root);
330         }
331         interval_set(&node->li_node, extent->start, extent->end);
332
333         found = interval_insert(&node->li_node, root);
334         if (found) { /* The policy group found. */
335                 ldlm_interval_free(node);
336                 node = to_ldlm_interval(found);
337         }
338         ldlm_interval_attach(node, lock);
339         EXIT;
340 }
341
342 /**
343  * Process a granting attempt for flock lock.
344  * Must be called under ns lock held.
345  *
346  * This function looks for any conflicts for \a lock in the granted or
347  * waiting queues. The lock is granted if no conflicts are found in
348  * either queue.
349  */
350 int
351 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
352                         enum ldlm_process_intention intention,
353                         enum ldlm_error *err, struct list_head *work_list)
354 {
355         struct ldlm_resource *res = req->l_resource;
356         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
357         struct ldlm_lock *tmp;
358         struct ldlm_lock *ownlocks = NULL;
359         struct ldlm_lock *lock = NULL;
360         struct ldlm_lock *new = req;
361         struct ldlm_lock *new2 = NULL;
362         enum ldlm_mode mode = req->l_req_mode;
363         int local = ns_is_client(ns);
364         int added = (mode == LCK_NL);
365         int splitted = 0;
366         const struct ldlm_callback_suite null_cbs = { NULL };
367 #ifdef HAVE_SERVER_SUPPORT
368         struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
369                                         NULL : work_list);
370 #endif
371
372         ENTRY;
373         CDEBUG(D_DLMTRACE,
374                "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
375                *flags, new->l_policy_data.l_flock.owner,
376                new->l_policy_data.l_flock.pid, mode,
377                req->l_policy_data.l_flock.start,
378                req->l_policy_data.l_flock.end);
379
380         *err = ELDLM_OK;
381
382         if (local) {
383                 /* No blocking ASTs are sent to the clients for
384                  * Posix file & record locks
385                  */
386                 req->l_blocking_ast = NULL;
387         } else {
388                 /* Called on the server for lock cancels. */
389                 req->l_blocking_ast = ldlm_flock_blocking_ast;
390         }
391
392 reprocess:
393         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
394                 /* This loop determines where this processes locks start
395                  * in the resource lr_granted list.
396                  */
397                 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
398                         if (ldlm_same_flock_owner(lock, req)) {
399                                 ownlocks = lock;
400                                 break;
401                         }
402                 }
403         }
404 #ifdef HAVE_SERVER_SUPPORT
405         else {
406                 int reprocess_failed = 0;
407
408                 lockmode_verify(mode);
409
410                 /* This loop determines if there are existing locks
411                  * that conflict with the new lock request.
412                  */
413                 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
414                         if (ldlm_same_flock_owner(lock, req)) {
415                                 if (!ownlocks)
416                                         ownlocks = lock;
417                                 continue;
418                         }
419
420                         if (req->l_req_mode == LCK_PR &&
421                             lock->l_granted_mode == LCK_PR &&
422                             lock->l_policy_data.l_flock.start <=
423                                 req->l_policy_data.l_flock.start &&
424                             lock->l_policy_data.l_flock.end >=
425                                 req->l_policy_data.l_flock.end) {
426                                 /* there can't be granted WR lock */
427                                 break;
428                         }
429                         /* locks are compatible, overlap doesn't matter */
430                         if (lockmode_compat(lock->l_granted_mode, mode))
431                                 continue;
432
433                         if (!ldlm_flocks_overlap(lock, req))
434                                 continue;
435
436                         if (intention != LDLM_PROCESS_ENQUEUE) {
437                                 ldlm_flock_blocking_unlink(req);
438                                 ldlm_flock_blocking_link(req, lock);
439                                 if (ldlm_flock_deadlock(req, lock)) {
440                                         ldlm_flock_cancel_on_deadlock(
441                                                 req, grant_work);
442                                         RETURN(LDLM_ITER_CONTINUE);
443                                 }
444                                 reprocess_failed = 1;
445                                 break;
446                         }
447
448                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
449                                 ldlm_flock_destroy(req, mode, *flags);
450                                 *err = -EAGAIN;
451                                 RETURN(LDLM_ITER_STOP);
452                         }
453
454                         if (*flags & LDLM_FL_TEST_LOCK) {
455                                 ldlm_flock_destroy(req, mode, *flags);
456                                 req->l_req_mode = lock->l_granted_mode;
457                                 req->l_policy_data.l_flock.pid =
458                                         lock->l_policy_data.l_flock.pid;
459                                 req->l_policy_data.l_flock.start =
460                                         lock->l_policy_data.l_flock.start;
461                                 req->l_policy_data.l_flock.end =
462                                         lock->l_policy_data.l_flock.end;
463                                 *flags |= LDLM_FL_LOCK_CHANGED;
464                                 RETURN(LDLM_ITER_STOP);
465                         }
466
467                         /* add lock to blocking list before deadlock
468                          * check to prevent race
469                          */
470                         ldlm_flock_blocking_link(req, lock);
471
472                         if (ldlm_flock_deadlock(req, lock)) {
473                                 ldlm_flock_blocking_unlink(req);
474                                 ldlm_flock_destroy(req, mode, *flags);
475                                 *err = -EDEADLK;
476                                 RETURN(LDLM_ITER_STOP);
477                         }
478
479                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
480                         *flags |= LDLM_FL_BLOCK_GRANTED;
481                         RETURN(LDLM_ITER_STOP);
482                 }
483                 if (reprocess_failed)
484                         RETURN(LDLM_ITER_CONTINUE);
485         }
486
487         if (*flags & LDLM_FL_TEST_LOCK) {
488                 ldlm_flock_destroy(req, mode, *flags);
489                 req->l_req_mode = LCK_NL;
490                 *flags |= LDLM_FL_LOCK_CHANGED;
491                 RETURN(LDLM_ITER_STOP);
492         }
493
494         /* In case we had slept on this lock request take it off of the
495          * deadlock detection hash list.
496          */
497         ldlm_flock_blocking_unlink(req);
498 #endif /* HAVE_SERVER_SUPPORT */
499
500         /* Scan the locks owned by this process to find the insertion point
501          * (as locks are ordered), and to handle overlaps.
502          * We may have to merge or split existing locks.
503          */
504         if (ownlocks)
505                 lock = ownlocks;
506         else
507                 lock = list_entry(&res->lr_granted,
508                                   struct ldlm_lock, l_res_link);
509         list_for_each_entry_safe_from(lock, tmp, &res->lr_granted, l_res_link) {
510                 if (!ldlm_same_flock_owner(lock, new))
511                         break;
512
513                 if (lock->l_granted_mode == mode) {
514                         /* If the modes are the same then we need to process
515                          * locks that overlap OR adjoin the new lock. The extra
516                          * logic condition is necessary to deal with arithmetic
517                          * overflow and underflow.
518                          */
519                         if ((new->l_policy_data.l_flock.start >
520                              (lock->l_policy_data.l_flock.end + 1))
521                             && (lock->l_policy_data.l_flock.end !=
522                                 OBD_OBJECT_EOF))
523                                 continue;
524
525                         if ((new->l_policy_data.l_flock.end <
526                              (lock->l_policy_data.l_flock.start - 1))
527                             && (lock->l_policy_data.l_flock.start != 0))
528                                 break;
529
530                         if (new->l_policy_data.l_flock.start <
531                             lock->l_policy_data.l_flock.start) {
532                                 lock->l_policy_data.l_flock.start =
533                                         new->l_policy_data.l_flock.start;
534                         } else {
535                                 new->l_policy_data.l_flock.start =
536                                         lock->l_policy_data.l_flock.start;
537                         }
538
539                         if (new->l_policy_data.l_flock.end >
540                             lock->l_policy_data.l_flock.end) {
541                                 lock->l_policy_data.l_flock.end =
542                                         new->l_policy_data.l_flock.end;
543                         } else {
544                                 new->l_policy_data.l_flock.end =
545                                         lock->l_policy_data.l_flock.end;
546                         }
547
548                         if (added) {
549                                 ldlm_flock_destroy(lock, mode, *flags);
550                         } else {
551                                 new = lock;
552                                 added = 1;
553                         }
554                         continue;
555                 }
556
557                 if (new->l_policy_data.l_flock.start >
558                     lock->l_policy_data.l_flock.end)
559                         continue;
560
561                 if (new->l_policy_data.l_flock.end <
562                     lock->l_policy_data.l_flock.start)
563                         break;
564
565                 res->lr_flock_node.lfn_needs_reprocess = true;
566
567                 if (new->l_policy_data.l_flock.start <=
568                     lock->l_policy_data.l_flock.start) {
569                         if (new->l_policy_data.l_flock.end <
570                             lock->l_policy_data.l_flock.end) {
571                                 lock->l_policy_data.l_flock.start =
572                                         new->l_policy_data.l_flock.end + 1;
573                                 break;
574                         }
575                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
576                         continue;
577                 }
578                 if (new->l_policy_data.l_flock.end >=
579                     lock->l_policy_data.l_flock.end) {
580                         lock->l_policy_data.l_flock.end =
581                                 new->l_policy_data.l_flock.start - 1;
582                         ldlm_flock_range_update(lock, req);
583                         continue;
584                 }
585
586                 /* split the existing lock into two locks */
587
588                 /* if this is an F_UNLCK operation then we could avoid
589                  * allocating a new lock and use the req lock passed in
590                  * with the request but this would complicate the reply
591                  * processing since updates to req get reflected in the
592                  * reply. The client side replays the lock request so
593                  * it must see the original lock data in the reply.
594                  */
595
596                 /* XXX - if ldlm_lock_new() can sleep we should
597                  * release the lr_lock, allocate the new lock,
598                  * and restart processing this lock.
599                  */
600                 if (new2 == NULL) {
601                         unlock_res_and_lock(req);
602                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
603                                                 lock->l_granted_mode, &null_cbs,
604                                                 NULL, 0, LVB_T_NONE);
605                         lock_res_and_lock(req);
606                         if (IS_ERR(new2)) {
607                                 ldlm_flock_destroy(req, lock->l_granted_mode,
608                                                    *flags);
609                                 *err = PTR_ERR(new2);
610                                 RETURN(LDLM_ITER_STOP);
611                         }
612                         goto reprocess;
613                 }
614
615                 splitted = 1;
616
617                 new2->l_granted_mode = lock->l_granted_mode;
618                 new2->l_policy_data.l_flock.pid =
619                         new->l_policy_data.l_flock.pid;
620                 new2->l_policy_data.l_flock.owner =
621                         new->l_policy_data.l_flock.owner;
622                 new2->l_policy_data.l_flock.start =
623                         lock->l_policy_data.l_flock.start;
624                 new2->l_policy_data.l_flock.end =
625                         new->l_policy_data.l_flock.start - 1;
626                 lock->l_policy_data.l_flock.start =
627                         new->l_policy_data.l_flock.end + 1;
628                 new2->l_conn_export = lock->l_conn_export;
629                 if (lock->l_export != NULL) {
630                         new2->l_export = class_export_lock_get(lock->l_export,
631                                                                new2);
632                         if (new2->l_export->exp_lock_hash &&
633                             hlist_unhashed(&new2->l_exp_hash))
634                                 cfs_hash_add(new2->l_export->exp_lock_hash,
635                                              &new2->l_remote_handle,
636                                              &new2->l_exp_hash);
637                 }
638                 if (*flags == LDLM_FL_WAIT_NOREPROC)
639                         ldlm_lock_addref_internal_nolock(new2,
640                                                          lock->l_granted_mode);
641
642                 /* insert new2 at lock */
643                 ldlm_flock_add_lock(res, &lock->l_res_link, new2);
644                 LDLM_LOCK_RELEASE(new2);
645                 break;
646         }
647
648         /* if new2 is created but never used, destroy it*/
649         if (splitted == 0 && new2 != NULL)
650                 ldlm_lock_destroy_nolock(new2);
651
652         /* At this point we're granting the lock request. */
653         req->l_granted_mode = req->l_req_mode;
654
655         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
656         if (!added) {
657                 list_del_init(&req->l_res_link);
658                 /* insert new lock before "lock", which might be the
659                  * next lock for this owner, or might be the first
660                  * lock for the next owner, or might not be a lock at
661                  * all, but instead points at the head of the list
662                  */
663                 ldlm_flock_add_lock(res, &lock->l_res_link, req);
664         }
665
666         if (*flags != LDLM_FL_WAIT_NOREPROC) {
667 #ifdef HAVE_SERVER_SUPPORT
668                 if (intention == LDLM_PROCESS_ENQUEUE) {
669                         /* If this is an unlock, reprocess the waitq and
670                          * send completions ASTs for locks that can now be
671                          * granted. The only problem with doing this
672                          * reprocessing here is that the completion ASTs for
673                          * newly granted locks will be sent before the unlock
674                          * completion is sent. It shouldn't be an issue. Also
675                          * note that ldlm_process_flock_lock() will recurse,
676                          * but only once because 'intention' won't be
677                          * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
678                          */
679                         struct ldlm_flock_node *fn = &res->lr_flock_node;
680 restart:
681                         if (mode == LCK_NL && fn->lfn_needs_reprocess &&
682                             atomic_read(&fn->lfn_unlock_pending) == 0) {
683                                 LIST_HEAD(rpc_list);
684                                 int rc;
685
686                                 ldlm_reprocess_queue(res, &res->lr_waiting,
687                                                      &rpc_list,
688                                                      LDLM_PROCESS_RESCAN, 0);
689                                 fn->lfn_needs_reprocess = false;
690                                 unlock_res_and_lock(req);
691                                 rc = ldlm_run_ast_work(ns, &rpc_list,
692                                                        LDLM_WORK_CP_AST);
693                                 lock_res_and_lock(req);
694                                 if (rc == -ERESTART) {
695                                         fn->lfn_needs_reprocess = true;
696                                         GOTO(restart, rc);
697                                 }
698                         }
699                 } else {
700                         LASSERT(req->l_completion_ast);
701                         ldlm_add_ast_work_item(req, NULL, grant_work);
702                 }
703 #else /* !HAVE_SERVER_SUPPORT */
704                 /* The only one possible case for client-side calls flock
705                  * policy function is ldlm_flock_completion_ast inside which
706                  * carries LDLM_FL_WAIT_NOREPROC flag.
707                  */
708                 CERROR("Illegal parameter for client-side-only module.\n");
709                 LBUG();
710 #endif /* HAVE_SERVER_SUPPORT */
711         }
712
713         /* In case we're reprocessing the requested lock we can't destroy
714          * it until after calling ldlm_add_ast_work_item() above so that laawi()
715          * can bump the reference count on \a req. Otherwise \a req
716          * could be freed before the completion AST can be sent.
717          */
718         if (added)
719                 ldlm_flock_destroy(req, mode, *flags);
720
721         ldlm_resource_dump(D_INFO, res);
722         RETURN(LDLM_ITER_CONTINUE);
723 }
724
725 /**
726  * Flock completion callback function.
727  *
728  * \param lock [in,out]: A lock to be handled
729  * \param flags    [in]: flags
730  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
731  *
732  * \retval 0    : success
733  * \retval <0   : failure
734  */
735 int
736 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
737 {
738         struct file_lock *getlk = lock->l_ast_data;
739         struct obd_device *obd;
740         enum ldlm_error err;
741         int rc = 0;
742
743         ENTRY;
744
745         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
746         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
747                 lock_res_and_lock(lock);
748                 lock->l_flags |= LDLM_FL_FAIL_LOC;
749                 unlock_res_and_lock(lock);
750                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
751         }
752         CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
753                flags, data, getlk);
754
755         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
756
757         if (flags & LDLM_FL_FAILED)
758                 goto granted;
759
760         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
761                 if (NULL == data)
762                         /* mds granted the lock in the reply */
763                         goto granted;
764                 /* CP AST RPC: lock get granted, wake it up */
765                 wake_up(&lock->l_waitq);
766                 RETURN(0);
767         }
768
769         LDLM_DEBUG(lock,
770                    "client-side enqueue returned a blocked lock, sleeping");
771         obd = class_exp2obd(lock->l_conn_export);
772
773         /* Go to sleep until the lock is granted. */
774         rc = l_wait_event_abortable(lock->l_waitq,
775                                     is_granted_or_cancelled(lock));
776         if (rc < 0) {
777                 /* take lock off the deadlock detection hash list. */
778                 lock_res_and_lock(lock);
779                 ldlm_flock_blocking_unlink(lock);
780
781                 /* client side - set flag to prevent lock from being
782                  * put on LRU list
783                  */
784                 ldlm_set_cbpending(lock);
785                 unlock_res_and_lock(lock);
786
787                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
788                            rc);
789                 RETURN(rc);
790         }
791
792 granted:
793         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
794
795         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
796                 lock_res_and_lock(lock);
797                 /* DEADLOCK is always set with CBPENDING */
798                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
799                 unlock_res_and_lock(lock);
800                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
801         }
802         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
803                 lock_res_and_lock(lock);
804                 /* DEADLOCK is always set with CBPENDING */
805                 lock->l_flags |= (LDLM_FL_FAIL_LOC |
806                                   LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING);
807                 unlock_res_and_lock(lock);
808                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
809         }
810
811         lock_res_and_lock(lock);
812
813
814         /* Protect against race where lock could have been just destroyed
815          * due to overlap in ldlm_process_flock_lock().
816          */
817         if (ldlm_is_destroyed(lock)) {
818                 unlock_res_and_lock(lock);
819                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
820
821                 /* error is returned up to ldlm_cli_enqueue_fini() caller. */
822                 RETURN(-EIO);
823         }
824
825         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
826         ldlm_resource_unlink_lock(lock);
827
828         /* Import invalidation. We need to actually release the lock
829          * references being held, so that it can go away. No point in
830          * holding the lock even if app still believes it has it, since
831          * server already dropped it anyway. Only for granted locks too.
832          */
833         /* Do the same for DEADLOCK'ed locks. */
834         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
835                 int mode;
836
837                 if (flags & LDLM_FL_TEST_LOCK)
838                         LASSERT(ldlm_is_test_lock(lock));
839
840                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
841                         mode = getlk->fl_type;
842                 else
843                         mode = lock->l_req_mode;
844
845                 if (ldlm_is_flock_deadlock(lock)) {
846                         LDLM_DEBUG(lock,
847                                    "client-side enqueue deadlock received");
848                         rc = -EDEADLK;
849                 }
850                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
851                 unlock_res_and_lock(lock);
852
853                 /* Need to wake up the waiter if we were evicted */
854                 wake_up(&lock->l_waitq);
855
856                 /* An error is still to be returned, to propagate it up to
857                  * ldlm_cli_enqueue_fini() caller.
858                  */
859                 RETURN(rc ? : -EIO);
860         }
861
862         LDLM_DEBUG(lock, "client-side enqueue granted");
863
864         if (flags & LDLM_FL_TEST_LOCK) {
865                 /*
866                  * fcntl(F_GETLK) request
867                  * The old mode was saved in getlk->fl_type so that if the mode
868                  * in the lock changes we can decref the appropriate refcount.
869                  */
870                 LASSERT(ldlm_is_test_lock(lock));
871                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
872                 switch (lock->l_granted_mode) {
873                 case LCK_PR:
874                         getlk->fl_type = F_RDLCK;
875                         break;
876                 case LCK_PW:
877                         getlk->fl_type = F_WRLCK;
878                         break;
879                 default:
880                         getlk->fl_type = F_UNLCK;
881                 }
882                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
883                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
884                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
885         } else {
886                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
887
888                 /* We need to reprocess the lock to do merges or splits
889                  * with existing locks owned by this process.
890                  */
891                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
892         }
893         unlock_res_and_lock(lock);
894         RETURN(rc);
895 }
896 EXPORT_SYMBOL(ldlm_flock_completion_ast);
897
898 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
899                             void *data, int flag)
900 {
901         ENTRY;
902
903         LASSERT(lock);
904         LASSERT(flag == LDLM_CB_CANCELING);
905
906         /* take lock off the deadlock detection hash list. */
907         lock_res_and_lock(lock);
908         ldlm_flock_blocking_unlink(lock);
909         unlock_res_and_lock(lock);
910         RETURN(0);
911 }
912
913 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
914                                      union ldlm_policy_data *lpolicy)
915 {
916         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
917         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
918         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
919         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
920 }
921
922 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
923                                      union ldlm_wire_policy_data *wpolicy)
924 {
925         memset(wpolicy, 0, sizeof(*wpolicy));
926         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
927         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
928         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
929         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
930 }
931
932 /*
933  * Export handle<->flock hash operations.
934  */
935 static unsigned int
936 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key,
937                        const unsigned int bits)
938 {
939         return cfs_hash_64(*(__u64 *)key, bits);
940 }
941
942 static void *
943 ldlm_export_flock_key(struct hlist_node *hnode)
944 {
945         struct ldlm_lock *lock;
946
947         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
948         return &lock->l_policy_data.l_flock.owner;
949 }
950
951 static int
952 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
953 {
954         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
955 }
956
957 static void *
958 ldlm_export_flock_object(struct hlist_node *hnode)
959 {
960         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
961 }
962
963 static void
964 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
965 {
966         struct ldlm_lock *lock;
967         struct ldlm_flock *flock;
968
969         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
970         LDLM_LOCK_GET(lock);
971
972         flock = &lock->l_policy_data.l_flock;
973         LASSERT(flock->blocking_export != NULL);
974         class_export_get(flock->blocking_export);
975         atomic_inc(&flock->blocking_refs);
976 }
977
978 static void
979 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
980 {
981         struct ldlm_lock *lock;
982         struct ldlm_flock *flock;
983
984         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
985
986         flock = &lock->l_policy_data.l_flock;
987         LASSERT(flock->blocking_export != NULL);
988         class_export_put(flock->blocking_export);
989         if (atomic_dec_and_test(&flock->blocking_refs)) {
990                 flock->blocking_owner = 0;
991                 flock->blocking_export = NULL;
992         }
993         LDLM_LOCK_RELEASE(lock);
994 }
995
996 static struct cfs_hash_ops ldlm_export_flock_ops = {
997         .hs_hash        = ldlm_export_flock_hash,
998         .hs_key         = ldlm_export_flock_key,
999         .hs_keycmp      = ldlm_export_flock_keycmp,
1000         .hs_object      = ldlm_export_flock_object,
1001         .hs_get         = ldlm_export_flock_get,
1002         .hs_put         = ldlm_export_flock_put,
1003         .hs_put_locked  = ldlm_export_flock_put,
1004 };
1005
1006 int ldlm_init_flock_export(struct obd_export *exp)
1007 {
1008         if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
1009                 RETURN(0);
1010
1011         exp->exp_flock_hash =
1012                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
1013                                 HASH_EXP_LOCK_CUR_BITS,
1014                                 HASH_EXP_LOCK_MAX_BITS,
1015                                 HASH_EXP_LOCK_BKT_BITS, 0,
1016                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
1017                                 &ldlm_export_flock_ops,
1018                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
1019         if (!exp->exp_flock_hash)
1020                 RETURN(-ENOMEM);
1021
1022         RETURN(0);
1023 }
1024
1025 void ldlm_destroy_flock_export(struct obd_export *exp)
1026 {
1027         ENTRY;
1028         if (exp->exp_flock_hash) {
1029                 cfs_hash_putref(exp->exp_flock_hash);
1030                 exp->exp_flock_hash = NULL;
1031         }
1032         EXIT;
1033 }