Whamcloud - gitweb
LU-17174 misc: fix hash functions
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24  * Developed under the sponsorship of the US Government under
25  * Subcontract No. B514193
26  *
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2017, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  */
35
36 /**
37  * This file implements POSIX lock type for Lustre.
38  * Its policy properties are start and end of extent and PID.
39  *
40  * These locks are only done through MDS due to POSIX semantics requiring
41  * e.g. that locks could be only partially released and as such split into
42  * two parts, and also that two adjacent locks from the same process may be
43  * merged into a single wider lock.
44  *
45  * Lock modes are mapped like this:
46  * PR and PW for READ and WRITE locks
47  * NL to request a releasing of a portion of the lock
48  *
49  * These flock locks never timeout.
50  */
51
52 #define DEBUG_SUBSYSTEM S_LDLM
53
54 #include <linux/list.h>
55 #ifdef HAVE_LINUX_FILELOCK_HEADER
56 #include <linux/filelock.h>
57 #endif
58 #include <lustre_dlm.h>
59 #include <obd_support.h>
60 #include <obd_class.h>
61 #include <lustre_lib.h>
62
63 #include "ldlm_internal.h"
64
65 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
66                             void *data, int flag);
67
68 static inline int
69 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
70 {
71         return ((new->l_policy_data.l_flock.owner ==
72                  lock->l_policy_data.l_flock.owner) &&
73                 (new->l_export == lock->l_export));
74 }
75
76 static inline int
77 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
78 {
79         return ((new->l_policy_data.l_flock.start <=
80                  lock->l_policy_data.l_flock.end) &&
81                 (new->l_policy_data.l_flock.end >=
82                  lock->l_policy_data.l_flock.start));
83 }
84
85 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
86                                             struct ldlm_lock *lock)
87 {
88         /* For server only */
89         if (req->l_export == NULL)
90                 return;
91
92         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
93
94         req->l_policy_data.l_flock.blocking_owner =
95                 lock->l_policy_data.l_flock.owner;
96         req->l_policy_data.l_flock.blocking_export =
97                 lock->l_export;
98         atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
99
100         cfs_hash_add(req->l_export->exp_flock_hash,
101                      &req->l_policy_data.l_flock.owner,
102                      &req->l_exp_flock_hash);
103 }
104
105 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
106 {
107         /* For server only */
108         if (req->l_export == NULL)
109                 return;
110
111         check_res_locked(req->l_resource);
112         if (req->l_export->exp_flock_hash != NULL &&
113             !hlist_unhashed(&req->l_exp_flock_hash))
114                 cfs_hash_del(req->l_export->exp_flock_hash,
115                              &req->l_policy_data.l_flock.owner,
116                              &req->l_exp_flock_hash);
117 }
118
119 static inline void
120 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
121 {
122         ENTRY;
123
124         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: %#llx)",
125                    mode, flags);
126
127         /* Safe to not lock here, since it should be empty anyway */
128         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
129
130         list_del_init(&lock->l_res_link);
131         if (flags == LDLM_FL_WAIT_NOREPROC) {
132                 /* client side - set a flag to prevent sending a CANCEL */
133                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
134
135                 /* when reaching here, it is under lock_res_and_lock(). Thus,
136                  * need call the nolock version of ldlm_lock_decref_internal
137                  */
138                 ldlm_lock_decref_internal_nolock(lock, mode);
139         }
140
141         ldlm_lock_destroy_nolock(lock);
142         EXIT;
143 }
144
145 #ifdef HAVE_SERVER_SUPPORT
146 /**
147  * POSIX locks deadlock detection code.
148  *
149  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
150  * with, we need to iterate through all blocked POSIX locks for this
151  * export and see if there is a deadlock condition arising. (i.e. when
152  * one client holds a lock on something and want a lock on something
153  * else and at the same time another client has the opposite situation).
154  */
155
156 struct ldlm_flock_lookup_cb_data {
157         __u64 *bl_owner;
158         struct ldlm_lock *lock;
159         struct obd_export *exp;
160 };
161
162 static int ldlm_flock_lookup_cb(struct obd_export *exp, void *data)
163 {
164         struct ldlm_flock_lookup_cb_data *cb_data = data;
165         struct ldlm_lock *lock;
166
167         if (exp->exp_failed)
168                 return 0;
169
170         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
171         if (lock == NULL)
172                 return 0;
173
174         /* Stop on first found lock. Same process can't sleep twice */
175         cb_data->lock = lock;
176         cb_data->exp = class_export_get(exp);
177
178         return 1;
179 }
180
181 static int
182 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
183 {
184         struct obd_export *req_exp = req->l_export;
185         struct obd_export *bl_exp = bl_lock->l_export;
186         __u64 req_owner = req->l_policy_data.l_flock.owner;
187         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
188
189         /* For server only */
190         if (req_exp == NULL)
191                 return 0;
192
193         class_export_get(bl_exp);
194         while (1) {
195                 struct ldlm_flock_lookup_cb_data cb_data = {
196                         .bl_owner = &bl_owner,
197                         .lock = NULL,
198                         .exp = NULL,
199                 };
200                 struct ptlrpc_connection *bl_exp_conn;
201                 struct obd_export *bl_exp_new;
202                 struct ldlm_lock *lock = NULL;
203                 struct ldlm_flock *flock;
204
205                 bl_exp_conn = bl_exp->exp_connection;
206                 if (bl_exp->exp_flock_hash != NULL) {
207                         int found;
208
209                         found = obd_nid_export_for_each(bl_exp->exp_obd,
210                                                         &bl_exp_conn->c_peer.nid,
211                                                         ldlm_flock_lookup_cb,
212                                                         &cb_data);
213                         if (found)
214                                 lock = cb_data.lock;
215                 }
216                 if (lock == NULL)
217                         break;
218
219                 class_export_put(bl_exp);
220                 bl_exp = cb_data.exp;
221
222                 LASSERT(req != lock);
223                 flock = &lock->l_policy_data.l_flock;
224                 LASSERT(flock->owner == bl_owner);
225                 bl_owner = flock->blocking_owner;
226                 bl_exp_new = class_export_get(flock->blocking_export);
227                 class_export_put(bl_exp);
228
229                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
230                 bl_exp = bl_exp_new;
231
232                 if (bl_exp->exp_failed)
233                         break;
234
235                 if (bl_owner == req_owner &&
236                     nid_same(&bl_exp_conn->c_peer.nid,
237                               &req_exp->exp_connection->c_peer.nid)) {
238                         class_export_put(bl_exp);
239                         return 1;
240                 }
241         }
242         class_export_put(bl_exp);
243
244         return 0;
245 }
246
247 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
248                                           struct list_head *work_list)
249 {
250         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
251
252         if ((exp_connect_flags(lock->l_export) &
253              OBD_CONNECT_FLOCK_DEAD) == 0) {
254                 CERROR("deadlock found, but client doesn't support flock canceliation\n");
255         } else {
256                 LASSERT(lock->l_completion_ast);
257                 LASSERT(!ldlm_is_ast_sent(lock));
258                 lock->l_flags |= (LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
259                                   LDLM_FL_FLOCK_DEADLOCK);
260                 ldlm_flock_blocking_unlink(lock);
261                 ldlm_resource_unlink_lock(lock);
262                 ldlm_add_ast_work_item(lock, NULL, work_list);
263         }
264 }
265 #endif /* HAVE_SERVER_SUPPORT */
266
267 /**
268  * Process a granting attempt for flock lock.
269  * Must be called under ns lock held.
270  *
271  * This function looks for any conflicts for \a lock in the granted or
272  * waiting queues. The lock is granted if no conflicts are found in
273  * either queue.
274  */
275 int
276 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
277                         enum ldlm_process_intention intention,
278                         enum ldlm_error *err, struct list_head *work_list)
279 {
280         struct ldlm_resource *res = req->l_resource;
281         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
282         struct ldlm_lock *tmp;
283         struct ldlm_lock *ownlocks = NULL;
284         struct ldlm_lock *lock = NULL;
285         struct ldlm_lock *new = req;
286         struct ldlm_lock *new2 = NULL;
287         enum ldlm_mode mode = req->l_req_mode;
288         int local = ns_is_client(ns);
289         int added = (mode == LCK_NL);
290         int splitted = 0;
291         const struct ldlm_callback_suite null_cbs = { NULL };
292 #ifdef HAVE_SERVER_SUPPORT
293         struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
294                                         NULL : work_list);
295 #endif
296
297         ENTRY;
298         CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
299                "%llu end %llu\n", *flags,
300                new->l_policy_data.l_flock.owner,
301                new->l_policy_data.l_flock.pid, mode,
302                req->l_policy_data.l_flock.start,
303                req->l_policy_data.l_flock.end);
304
305         *err = ELDLM_OK;
306
307         if (local) {
308                 /* No blocking ASTs are sent to the clients for
309                  * Posix file & record locks
310                  */
311                 req->l_blocking_ast = NULL;
312         } else {
313                 /* Called on the server for lock cancels. */
314                 req->l_blocking_ast = ldlm_flock_blocking_ast;
315         }
316
317 reprocess:
318         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
319                 /* This loop determines where this processes locks start
320                  * in the resource lr_granted list.
321                  */
322                 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
323                         if (ldlm_same_flock_owner(lock, req)) {
324                                 ownlocks = lock;
325                                 break;
326                         }
327                 }
328         }
329 #ifdef HAVE_SERVER_SUPPORT
330         else {
331                 int reprocess_failed = 0;
332                 lockmode_verify(mode);
333
334                 /* This loop determines if there are existing locks
335                  * that conflict with the new lock request.
336                  */
337                 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
338                         if (ldlm_same_flock_owner(lock, req)) {
339                                 if (!ownlocks)
340                                         ownlocks = lock;
341                                 continue;
342                         }
343
344                         if (req->l_req_mode == LCK_PR &&
345                             lock->l_granted_mode == LCK_PR &&
346                             lock->l_policy_data.l_flock.start <=
347                                 req->l_policy_data.l_flock.start &&
348                             lock->l_policy_data.l_flock.end >=
349                                 req->l_policy_data.l_flock.end) {
350                                 /* there can't be granted WR lock */
351                                 break;
352                         }
353                         /* locks are compatible, overlap doesn't matter */
354                         if (lockmode_compat(lock->l_granted_mode, mode))
355                                 continue;
356
357                         if (!ldlm_flocks_overlap(lock, req))
358                                 continue;
359
360                         if (intention != LDLM_PROCESS_ENQUEUE) {
361                                 if (ldlm_flock_deadlock(req, lock)) {
362                                         ldlm_flock_cancel_on_deadlock(
363                                                 req, grant_work);
364                                         RETURN(LDLM_ITER_CONTINUE);
365                                 }
366                                 reprocess_failed = 1;
367                                 break;
368                         }
369
370                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
371                                 ldlm_flock_destroy(req, mode, *flags);
372                                 *err = -EAGAIN;
373                                 RETURN(LDLM_ITER_STOP);
374                         }
375
376                         if (*flags & LDLM_FL_TEST_LOCK) {
377                                 ldlm_flock_destroy(req, mode, *flags);
378                                 req->l_req_mode = lock->l_granted_mode;
379                                 req->l_policy_data.l_flock.pid =
380                                         lock->l_policy_data.l_flock.pid;
381                                 req->l_policy_data.l_flock.start =
382                                         lock->l_policy_data.l_flock.start;
383                                 req->l_policy_data.l_flock.end =
384                                         lock->l_policy_data.l_flock.end;
385                                 *flags |= LDLM_FL_LOCK_CHANGED;
386                                 RETURN(LDLM_ITER_STOP);
387                         }
388
389                         /* add lock to blocking list before deadlock
390                          * check to prevent race
391                          */
392                         ldlm_flock_blocking_link(req, lock);
393
394                         if (ldlm_flock_deadlock(req, lock)) {
395                                 ldlm_flock_blocking_unlink(req);
396                                 ldlm_flock_destroy(req, mode, *flags);
397                                 *err = -EDEADLK;
398                                 RETURN(LDLM_ITER_STOP);
399                         }
400
401                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
402                         *flags |= LDLM_FL_BLOCK_GRANTED;
403                         RETURN(LDLM_ITER_STOP);
404                 }
405                 if (reprocess_failed)
406                         RETURN(LDLM_ITER_CONTINUE);
407         }
408
409         if (*flags & LDLM_FL_TEST_LOCK) {
410                 ldlm_flock_destroy(req, mode, *flags);
411                 req->l_req_mode = LCK_NL;
412                 *flags |= LDLM_FL_LOCK_CHANGED;
413                 RETURN(LDLM_ITER_STOP);
414         }
415
416         /* In case we had slept on this lock request take it off of the
417          * deadlock detection hash list.
418          */
419         ldlm_flock_blocking_unlink(req);
420 #endif /* HAVE_SERVER_SUPPORT */
421
422         /* Scan the locks owned by this process to find the insertion point
423          * (as locks are ordered), and to handle overlaps.
424          * We may have to merge or split existing locks.
425          */
426         if (ownlocks)
427                 lock = ownlocks;
428         else
429                 lock = list_entry(&res->lr_granted,
430                                   struct ldlm_lock, l_res_link);
431         list_for_each_entry_safe_from(lock, tmp, &res->lr_granted, l_res_link) {
432                 if (!ldlm_same_flock_owner(lock, new))
433                         break;
434
435                 if (lock->l_granted_mode == mode) {
436                         /* If the modes are the same then we need to process
437                          * locks that overlap OR adjoin the new lock. The extra
438                          * logic condition is necessary to deal with arithmetic
439                          * overflow and underflow.
440                          */
441                         if ((new->l_policy_data.l_flock.start >
442                              (lock->l_policy_data.l_flock.end + 1))
443                             && (lock->l_policy_data.l_flock.end !=
444                                 OBD_OBJECT_EOF))
445                                 continue;
446
447                         if ((new->l_policy_data.l_flock.end <
448                              (lock->l_policy_data.l_flock.start - 1))
449                             && (lock->l_policy_data.l_flock.start != 0))
450                                 break;
451
452                         if (new->l_policy_data.l_flock.start <
453                             lock->l_policy_data.l_flock.start) {
454                                 lock->l_policy_data.l_flock.start =
455                                         new->l_policy_data.l_flock.start;
456                         } else {
457                                 new->l_policy_data.l_flock.start =
458                                         lock->l_policy_data.l_flock.start;
459                         }
460
461                         if (new->l_policy_data.l_flock.end >
462                             lock->l_policy_data.l_flock.end) {
463                                 lock->l_policy_data.l_flock.end =
464                                         new->l_policy_data.l_flock.end;
465                         } else {
466                                 new->l_policy_data.l_flock.end =
467                                         lock->l_policy_data.l_flock.end;
468                         }
469
470                         if (added) {
471                                 ldlm_flock_destroy(lock, mode, *flags);
472                         } else {
473                                 new = lock;
474                                 added = 1;
475                         }
476                         continue;
477                 }
478
479                 if (new->l_policy_data.l_flock.start >
480                     lock->l_policy_data.l_flock.end)
481                         continue;
482
483                 if (new->l_policy_data.l_flock.end <
484                     lock->l_policy_data.l_flock.start)
485                         break;
486
487                 res->lr_flock_node.lfn_needs_reprocess = true;
488
489                 if (new->l_policy_data.l_flock.start <=
490                     lock->l_policy_data.l_flock.start) {
491                         if (new->l_policy_data.l_flock.end <
492                             lock->l_policy_data.l_flock.end) {
493                                 lock->l_policy_data.l_flock.start =
494                                         new->l_policy_data.l_flock.end + 1;
495                                 break;
496                         }
497                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
498                         continue;
499                 }
500                 if (new->l_policy_data.l_flock.end >=
501                     lock->l_policy_data.l_flock.end) {
502                         lock->l_policy_data.l_flock.end =
503                                 new->l_policy_data.l_flock.start - 1;
504                         continue;
505                 }
506
507                 /* split the existing lock into two locks */
508
509                 /* if this is an F_UNLCK operation then we could avoid
510                  * allocating a new lock and use the req lock passed in
511                  * with the request but this would complicate the reply
512                  * processing since updates to req get reflected in the
513                  * reply. The client side replays the lock request so
514                  * it must see the original lock data in the reply.
515                  */
516
517                 /* XXX - if ldlm_lock_new() can sleep we should
518                  * release the lr_lock, allocate the new lock,
519                  * and restart processing this lock.
520                  */
521                 if (new2 == NULL) {
522                         unlock_res_and_lock(req);
523                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
524                                                 lock->l_granted_mode, &null_cbs,
525                                                 NULL, 0, LVB_T_NONE);
526                         lock_res_and_lock(req);
527                         if (IS_ERR(new2)) {
528                                 ldlm_flock_destroy(req, lock->l_granted_mode,
529                                                    *flags);
530                                 *err = PTR_ERR(new2);
531                                 RETURN(LDLM_ITER_STOP);
532                         }
533                         goto reprocess;
534                 }
535
536                 splitted = 1;
537
538                 new2->l_granted_mode = lock->l_granted_mode;
539                 new2->l_policy_data.l_flock.pid =
540                         new->l_policy_data.l_flock.pid;
541                 new2->l_policy_data.l_flock.owner =
542                         new->l_policy_data.l_flock.owner;
543                 new2->l_policy_data.l_flock.start =
544                         lock->l_policy_data.l_flock.start;
545                 new2->l_policy_data.l_flock.end =
546                         new->l_policy_data.l_flock.start - 1;
547                 lock->l_policy_data.l_flock.start =
548                         new->l_policy_data.l_flock.end + 1;
549                 new2->l_conn_export = lock->l_conn_export;
550                 if (lock->l_export != NULL) {
551                         new2->l_export = class_export_lock_get(lock->l_export,
552                                                                new2);
553                         if (new2->l_export->exp_lock_hash &&
554                             hlist_unhashed(&new2->l_exp_hash))
555                                 cfs_hash_add(new2->l_export->exp_lock_hash,
556                                              &new2->l_remote_handle,
557                                              &new2->l_exp_hash);
558                 }
559                 if (*flags == LDLM_FL_WAIT_NOREPROC)
560                         ldlm_lock_addref_internal_nolock(new2,
561                                                          lock->l_granted_mode);
562
563                 /* insert new2 at lock */
564                 ldlm_resource_add_lock(res, &lock->l_res_link, new2);
565                 LDLM_LOCK_RELEASE(new2);
566                 break;
567         }
568
569         /* if new2 is created but never used, destroy it*/
570         if (splitted == 0 && new2 != NULL)
571                 ldlm_lock_destroy_nolock(new2);
572
573         /* At this point we're granting the lock request. */
574         req->l_granted_mode = req->l_req_mode;
575
576         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
577         if (!added) {
578                 list_del_init(&req->l_res_link);
579                 /* insert new lock before "lock", which might be the
580                  * next lock for this owner, or might be the first
581                  * lock for the next owner, or might not be a lock at
582                  * all, but instead points at the head of the list
583                  */
584                 ldlm_resource_add_lock(res, &lock->l_res_link, req);
585         }
586
587         if (*flags != LDLM_FL_WAIT_NOREPROC) {
588 #ifdef HAVE_SERVER_SUPPORT
589                 if (intention == LDLM_PROCESS_ENQUEUE) {
590                         /* If this is an unlock, reprocess the waitq and
591                          * send completions ASTs for locks that can now be
592                          * granted. The only problem with doing this
593                          * reprocessing here is that the completion ASTs for
594                          * newly granted locks will be sent before the unlock
595                          * completion is sent. It shouldn't be an issue. Also
596                          * note that ldlm_process_flock_lock() will recurse,
597                          * but only once because 'intention' won't be
598                          * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
599                          */
600                         struct ldlm_flock_node *fn = &res->lr_flock_node;
601 restart:
602                         if (mode == LCK_NL && fn->lfn_needs_reprocess &&
603                             atomic_read(&fn->lfn_unlock_pending) == 0) {
604                                 LIST_HEAD(rpc_list);
605                                 int rc;
606
607                                 ldlm_reprocess_queue(res, &res->lr_waiting,
608                                                      &rpc_list,
609                                                      LDLM_PROCESS_RESCAN, 0);
610                                 fn->lfn_needs_reprocess = false;
611                                 unlock_res_and_lock(req);
612                                 rc = ldlm_run_ast_work(ns, &rpc_list,
613                                                        LDLM_WORK_CP_AST);
614                                 lock_res_and_lock(req);
615                                 if (rc == -ERESTART) {
616                                         fn->lfn_needs_reprocess = true;
617                                         GOTO(restart, rc);
618                                 }
619                         }
620                 } else {
621                         LASSERT(req->l_completion_ast);
622                         ldlm_add_ast_work_item(req, NULL, grant_work);
623                 }
624 #else /* !HAVE_SERVER_SUPPORT */
625                 /* The only one possible case for client-side calls flock
626                  * policy function is ldlm_flock_completion_ast inside which
627                  * carries LDLM_FL_WAIT_NOREPROC flag.
628                  */
629                 CERROR("Illegal parameter for client-side-only module.\n");
630                 LBUG();
631 #endif /* HAVE_SERVER_SUPPORT */
632         }
633
634         /* In case we're reprocessing the requested lock we can't destroy
635          * it until after calling ldlm_add_ast_work_item() above so that laawi()
636          * can bump the reference count on \a req. Otherwise \a req
637          * could be freed before the completion AST can be sent.
638          */
639         if (added)
640                 ldlm_flock_destroy(req, mode, *flags);
641
642         ldlm_resource_dump(D_INFO, res);
643         RETURN(LDLM_ITER_CONTINUE);
644 }
645
646 /**
647  * Flock completion callback function.
648  *
649  * \param lock [in,out]: A lock to be handled
650  * \param flags    [in]: flags
651  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
652  *
653  * \retval 0    : success
654  * \retval <0   : failure
655  */
656 int
657 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
658 {
659         struct file_lock *getlk = lock->l_ast_data;
660         struct obd_device *obd;
661         enum ldlm_error err;
662         int rc = 0;
663         ENTRY;
664
665         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
666         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
667                 lock_res_and_lock(lock);
668                 lock->l_flags |= LDLM_FL_FAIL_LOC;
669                 unlock_res_and_lock(lock);
670                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
671         }
672         CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
673                flags, data, getlk);
674
675         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
676
677         if (flags & LDLM_FL_FAILED)
678                 goto granted;
679
680         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
681                 if (NULL == data)
682                         /* mds granted the lock in the reply */
683                         goto granted;
684                 /* CP AST RPC: lock get granted, wake it up */
685                 wake_up(&lock->l_waitq);
686                 RETURN(0);
687         }
688
689         LDLM_DEBUG(lock,
690                    "client-side enqueue returned a blocked lock, sleeping");
691         obd = class_exp2obd(lock->l_conn_export);
692
693         /* Go to sleep until the lock is granted. */
694         rc = l_wait_event_abortable(lock->l_waitq,
695                                     is_granted_or_cancelled(lock));
696         if (rc < 0) {
697                 /* take lock off the deadlock detection hash list. */
698                 lock_res_and_lock(lock);
699                 ldlm_flock_blocking_unlink(lock);
700
701                 /* client side - set flag to prevent lock from being
702                  * put on LRU list
703                  */
704                 ldlm_set_cbpending(lock);
705                 unlock_res_and_lock(lock);
706
707                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
708                            rc);
709                 RETURN(rc);
710         }
711
712 granted:
713         CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
714
715         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
716                 lock_res_and_lock(lock);
717                 /* DEADLOCK is always set with CBPENDING */
718                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
719                 unlock_res_and_lock(lock);
720                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
721         }
722         if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
723                 lock_res_and_lock(lock);
724                 /* DEADLOCK is always set with CBPENDING */
725                 lock->l_flags |= (LDLM_FL_FAIL_LOC |
726                                   LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING);
727                 unlock_res_and_lock(lock);
728                 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
729         }
730
731         lock_res_and_lock(lock);
732
733
734         /* Protect against race where lock could have been just destroyed
735          * due to overlap in ldlm_process_flock_lock().
736          */
737         if (ldlm_is_destroyed(lock)) {
738                 unlock_res_and_lock(lock);
739                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
740
741                 /* An error is still to be returned, to propagate it up to
742                  * ldlm_cli_enqueue_fini() caller. */
743                 RETURN(-EIO);
744         }
745
746         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
747         ldlm_resource_unlink_lock(lock);
748
749         /* Import invalidation. We need to actually release the lock
750          * references being held, so that it can go away. No point in
751          * holding the lock even if app still believes it has it, since
752          * server already dropped it anyway. Only for granted locks too.
753          */
754         /* Do the same for DEADLOCK'ed locks. */
755         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
756                 int mode;
757
758                 if (flags & LDLM_FL_TEST_LOCK)
759                         LASSERT(ldlm_is_test_lock(lock));
760
761                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
762                         mode = getlk->fl_type;
763                 else
764                         mode = lock->l_req_mode;
765
766                 if (ldlm_is_flock_deadlock(lock)) {
767                         LDLM_DEBUG(lock, "client-side enqueue deadlock "
768                                    "received");
769                         rc = -EDEADLK;
770                 }
771                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
772                 unlock_res_and_lock(lock);
773
774                 /* Need to wake up the waiter if we were evicted */
775                 wake_up(&lock->l_waitq);
776
777                 /* An error is still to be returned, to propagate it up to
778                  * ldlm_cli_enqueue_fini() caller.
779                  */
780                 RETURN(rc ? : -EIO);
781         }
782
783         LDLM_DEBUG(lock, "client-side enqueue granted");
784
785         if (flags & LDLM_FL_TEST_LOCK) {
786                 /*
787                  * fcntl(F_GETLK) request
788                  * The old mode was saved in getlk->fl_type so that if the mode
789                  * in the lock changes we can decref the appropriate refcount.
790                  */
791                 LASSERT(ldlm_is_test_lock(lock));
792                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
793                 switch (lock->l_granted_mode) {
794                 case LCK_PR:
795                         getlk->fl_type = F_RDLCK;
796                         break;
797                 case LCK_PW:
798                         getlk->fl_type = F_WRLCK;
799                         break;
800                 default:
801                         getlk->fl_type = F_UNLCK;
802                 }
803                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
804                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
805                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
806         } else {
807                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
808
809                 /* We need to reprocess the lock to do merges or splits
810                  * with existing locks owned by this process.
811                  */
812                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
813         }
814         unlock_res_and_lock(lock);
815         RETURN(rc);
816 }
817 EXPORT_SYMBOL(ldlm_flock_completion_ast);
818
819 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
820                             void *data, int flag)
821 {
822         ENTRY;
823
824         LASSERT(lock);
825         LASSERT(flag == LDLM_CB_CANCELING);
826
827         /* take lock off the deadlock detection hash list. */
828         lock_res_and_lock(lock);
829         ldlm_flock_blocking_unlink(lock);
830         unlock_res_and_lock(lock);
831         RETURN(0);
832 }
833
834 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
835                                      union ldlm_policy_data *lpolicy)
836 {
837         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
838         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
839         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
840         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
841 }
842
843 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
844                                      union ldlm_wire_policy_data *wpolicy)
845 {
846         memset(wpolicy, 0, sizeof(*wpolicy));
847         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
848         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
849         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
850         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
851 }
852
853 /*
854  * Export handle<->flock hash operations.
855  */
856 static unsigned int
857 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key,
858                        const unsigned int bits)
859 {
860         return cfs_hash_64(*(__u64 *)key, bits);
861 }
862
863 static void *
864 ldlm_export_flock_key(struct hlist_node *hnode)
865 {
866         struct ldlm_lock *lock;
867
868         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
869         return &lock->l_policy_data.l_flock.owner;
870 }
871
872 static int
873 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
874 {
875         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
876 }
877
878 static void *
879 ldlm_export_flock_object(struct hlist_node *hnode)
880 {
881         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
882 }
883
884 static void
885 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
886 {
887         struct ldlm_lock *lock;
888         struct ldlm_flock *flock;
889
890         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
891         LDLM_LOCK_GET(lock);
892
893         flock = &lock->l_policy_data.l_flock;
894         LASSERT(flock->blocking_export != NULL);
895         class_export_get(flock->blocking_export);
896         atomic_inc(&flock->blocking_refs);
897 }
898
899 static void
900 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
901 {
902         struct ldlm_lock *lock;
903         struct ldlm_flock *flock;
904
905         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
906
907         flock = &lock->l_policy_data.l_flock;
908         LASSERT(flock->blocking_export != NULL);
909         class_export_put(flock->blocking_export);
910         if (atomic_dec_and_test(&flock->blocking_refs)) {
911                 flock->blocking_owner = 0;
912                 flock->blocking_export = NULL;
913         }
914         LDLM_LOCK_RELEASE(lock);
915 }
916
917 static struct cfs_hash_ops ldlm_export_flock_ops = {
918         .hs_hash        = ldlm_export_flock_hash,
919         .hs_key         = ldlm_export_flock_key,
920         .hs_keycmp      = ldlm_export_flock_keycmp,
921         .hs_object      = ldlm_export_flock_object,
922         .hs_get         = ldlm_export_flock_get,
923         .hs_put         = ldlm_export_flock_put,
924         .hs_put_locked  = ldlm_export_flock_put,
925 };
926
927 int ldlm_init_flock_export(struct obd_export *exp)
928 {
929         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
930                 RETURN(0);
931
932         exp->exp_flock_hash =
933                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
934                                 HASH_EXP_LOCK_CUR_BITS,
935                                 HASH_EXP_LOCK_MAX_BITS,
936                                 HASH_EXP_LOCK_BKT_BITS, 0,
937                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
938                                 &ldlm_export_flock_ops,
939                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
940         if (!exp->exp_flock_hash)
941                 RETURN(-ENOMEM);
942
943         RETURN(0);
944 }
945
946 void ldlm_destroy_flock_export(struct obd_export *exp)
947 {
948         ENTRY;
949         if (exp->exp_flock_hash) {
950                 cfs_hash_putref(exp->exp_flock_hash);
951                 exp->exp_flock_hash = NULL;
952         }
953         EXIT;
954 }