Whamcloud - gitweb
e900f0004dc7f845e3d29ec7939cb864008b7ad4
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2014, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include <lustre_dlm.h>
60 #include <obd_support.h>
61 #include <obd_class.h>
62 #include <lustre_lib.h>
63 #include <libcfs/list.h>
64
65 #include "ldlm_internal.h"
66
67 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
68                             void *data, int flag);
69
70 /**
71  * list_for_remaining_safe - iterate over the remaining entries in a list
72  *              and safeguard against removal of a list entry.
73  * \param pos   the &struct list_head to use as a loop counter. pos MUST
74  *              have been initialized prior to using it in this macro.
75  * \param n     another &struct list_head to use as temporary storage
76  * \param head  the head for your list.
77  */
78 #define list_for_remaining_safe(pos, n, head) \
79         for (n = pos->next; pos != (head); pos = n, n = pos->next)
80
81 static inline int
82 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
83 {
84         return((new->l_policy_data.l_flock.owner ==
85                 lock->l_policy_data.l_flock.owner) &&
86                (new->l_export == lock->l_export));
87 }
88
89 static inline int
90 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
91 {
92         return((new->l_policy_data.l_flock.start <=
93                 lock->l_policy_data.l_flock.end) &&
94                (new->l_policy_data.l_flock.end >=
95                 lock->l_policy_data.l_flock.start));
96 }
97
98 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
99                                             struct ldlm_lock *lock)
100 {
101         /* For server only */
102         if (req->l_export == NULL)
103                 return;
104
105         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
106
107         req->l_policy_data.l_flock.blocking_owner =
108                 lock->l_policy_data.l_flock.owner;
109         req->l_policy_data.l_flock.blocking_export =
110                 lock->l_export;
111         req->l_policy_data.l_flock.blocking_refs = 0;
112
113         cfs_hash_add(req->l_export->exp_flock_hash,
114                      &req->l_policy_data.l_flock.owner,
115                      &req->l_exp_flock_hash);
116 }
117
118 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
119 {
120         /* For server only */
121         if (req->l_export == NULL)
122                 return;
123
124         check_res_locked(req->l_resource);
125         if (req->l_export->exp_flock_hash != NULL &&
126             !hlist_unhashed(&req->l_exp_flock_hash))
127                 cfs_hash_del(req->l_export->exp_flock_hash,
128                              &req->l_policy_data.l_flock.owner,
129                              &req->l_exp_flock_hash);
130 }
131
132 static inline void
133 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
134 {
135         ENTRY;
136
137         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: %#llx)",
138                    mode, flags);
139
140         /* Safe to not lock here, since it should be empty anyway */
141         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
142
143         list_del_init(&lock->l_res_link);
144         if (flags == LDLM_FL_WAIT_NOREPROC) {
145                 /* client side - set a flag to prevent sending a CANCEL */
146                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
147
148                 /* when reaching here, it is under lock_res_and_lock(). Thus,
149                    need call the nolock version of ldlm_lock_decref_internal*/
150                 ldlm_lock_decref_internal_nolock(lock, mode);
151         }
152
153         ldlm_lock_destroy_nolock(lock);
154         EXIT;
155 }
156
157 /**
158  * POSIX locks deadlock detection code.
159  *
160  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
161  * with, we need to iterate through all blocked POSIX locks for this
162  * export and see if there is a deadlock condition arising. (i.e. when
163  * one client holds a lock on something and want a lock on something
164  * else and at the same time another client has the opposite situation).
165  */
166
167 struct ldlm_flock_lookup_cb_data {
168         __u64 *bl_owner;
169         struct ldlm_lock *lock;
170         struct obd_export *exp;
171 };
172
173 static int ldlm_flock_lookup_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
174                                 struct hlist_node *hnode, void *data)
175 {
176         struct ldlm_flock_lookup_cb_data *cb_data = data;
177         struct obd_export *exp = cfs_hash_object(hs, hnode);
178         struct ldlm_lock *lock;
179
180         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
181         if (lock == NULL)
182                 return 0;
183
184         /* Stop on first found lock. Same process can't sleep twice */
185         cb_data->lock = lock;
186         cb_data->exp = class_export_get(exp);
187
188         return 1;
189 }
190
191 static int
192 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
193 {
194         struct obd_export *req_exp = req->l_export;
195         struct obd_export *bl_exp = bl_lock->l_export;
196         __u64 req_owner = req->l_policy_data.l_flock.owner;
197         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
198
199         /* For server only */
200         if (req_exp == NULL)
201                 return 0;
202
203         class_export_get(bl_exp);
204         while (1) {
205                 struct ldlm_flock_lookup_cb_data cb_data = {
206                                         .bl_owner = &bl_owner,
207                                         .lock = NULL,
208                                         .exp = NULL };
209                 struct obd_export *bl_exp_new;
210                 struct ldlm_lock *lock = NULL;
211                 struct ldlm_flock *flock;
212
213                 if (bl_exp->exp_flock_hash != NULL) {
214                         cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
215                                 &bl_exp->exp_connection->c_peer.nid,
216                                 ldlm_flock_lookup_cb, &cb_data);
217                         lock = cb_data.lock;
218                 }
219                 if (lock == NULL)
220                         break;
221
222                 class_export_put(bl_exp);
223                 bl_exp = cb_data.exp;
224
225                 LASSERT(req != lock);
226                 flock = &lock->l_policy_data.l_flock;
227                 LASSERT(flock->owner == bl_owner);
228                 bl_owner = flock->blocking_owner;
229                 bl_exp_new = class_export_get(flock->blocking_export);
230                 class_export_put(bl_exp);
231
232                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
233                 bl_exp = bl_exp_new;
234
235                 if (bl_exp->exp_failed)
236                         break;
237
238                 if (bl_owner == req_owner &&
239                     (bl_exp->exp_connection->c_peer.nid ==
240                      req_exp->exp_connection->c_peer.nid)) {
241                         class_export_put(bl_exp);
242                         return 1;
243                 }
244         }
245         class_export_put(bl_exp);
246
247         return 0;
248 }
249
250 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
251                                           struct list_head *work_list)
252 {
253         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
254
255         if ((exp_connect_flags(lock->l_export) &
256                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
257                 CERROR("deadlock found, but client doesn't "
258                                 "support flock canceliation\n");
259         } else {
260                 LASSERT(lock->l_completion_ast);
261                 LASSERT(!ldlm_is_ast_sent(lock));
262                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
263                         LDLM_FL_FLOCK_DEADLOCK;
264                 ldlm_flock_blocking_unlink(lock);
265                 ldlm_resource_unlink_lock(lock);
266                 ldlm_add_ast_work_item(lock, NULL, work_list);
267         }
268 }
269
270 /**
271  * Process a granting attempt for flock lock.
272  * Must be called under ns lock held.
273  *
274  * This function looks for any conflicts for \a lock in the granted or
275  * waiting queues. The lock is granted if no conflicts are found in
276  * either queue.
277  *
278  * It is also responsible for splitting a lock if a portion of the lock
279  * is released.
280  *
281  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
282  *   - blocking ASTs have already been sent
283  *
284  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
285  *   - blocking ASTs have not been sent yet, so list of conflicting locks
286  *     would be collected and ASTs sent.
287  */
288 int
289 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
290                         enum ldlm_error *err, struct list_head *work_list)
291 {
292         struct ldlm_resource *res = req->l_resource;
293         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
294         struct list_head *tmp;
295         struct list_head *ownlocks = NULL;
296         struct ldlm_lock *lock = NULL;
297         struct ldlm_lock *new = req;
298         struct ldlm_lock *new2 = NULL;
299         enum ldlm_mode mode = req->l_req_mode;
300         int local = ns_is_client(ns);
301         int added = (mode == LCK_NL);
302         int overlaps = 0;
303         int splitted = 0;
304         const struct ldlm_callback_suite null_cbs = { NULL };
305         ENTRY;
306
307         CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
308                "%llu end %llu\n", *flags,
309                new->l_policy_data.l_flock.owner,
310                new->l_policy_data.l_flock.pid, mode,
311                req->l_policy_data.l_flock.start,
312                req->l_policy_data.l_flock.end);
313
314         *err = ELDLM_OK;
315
316         if (local) {
317                 /* No blocking ASTs are sent to the clients for
318                  * Posix file & record locks */
319                 req->l_blocking_ast = NULL;
320         } else {
321                 /* Called on the server for lock cancels. */
322                 req->l_blocking_ast = ldlm_flock_blocking_ast;
323         }
324
325 reprocess:
326         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
327                 /* This loop determines where this processes locks start
328                  * in the resource lr_granted list. */
329                 list_for_each(tmp, &res->lr_granted) {
330                         lock = list_entry(tmp, struct ldlm_lock,
331                                               l_res_link);
332                         if (ldlm_same_flock_owner(lock, req)) {
333                                 ownlocks = tmp;
334                                 break;
335                         }
336                 }
337         } else {
338                 int reprocess_failed = 0;
339                 lockmode_verify(mode);
340
341                 /* This loop determines if there are existing locks
342                  * that conflict with the new lock request. */
343                 list_for_each(tmp, &res->lr_granted) {
344                         lock = list_entry(tmp, struct ldlm_lock,
345                                               l_res_link);
346
347                         if (ldlm_same_flock_owner(lock, req)) {
348                                 if (!ownlocks)
349                                         ownlocks = tmp;
350                                 continue;
351                         }
352
353                         /* locks are compatible, overlap doesn't matter */
354                         if (lockmode_compat(lock->l_granted_mode, mode))
355                                 continue;
356
357                         if (!ldlm_flocks_overlap(lock, req))
358                                 continue;
359
360                         if (!first_enq) {
361                                 reprocess_failed = 1;
362                                 if (ldlm_flock_deadlock(req, lock)) {
363                                         ldlm_flock_cancel_on_deadlock(req,
364                                                         work_list);
365                                         RETURN(LDLM_ITER_CONTINUE);
366                                 }
367                                 continue;
368                         }
369
370                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
371                                 ldlm_flock_destroy(req, mode, *flags);
372                                 *err = -EAGAIN;
373                                 RETURN(LDLM_ITER_STOP);
374                         }
375
376                         if (*flags & LDLM_FL_TEST_LOCK) {
377                                 ldlm_flock_destroy(req, mode, *flags);
378                                 req->l_req_mode = lock->l_granted_mode;
379                                 req->l_policy_data.l_flock.pid =
380                                         lock->l_policy_data.l_flock.pid;
381                                 req->l_policy_data.l_flock.start =
382                                         lock->l_policy_data.l_flock.start;
383                                 req->l_policy_data.l_flock.end =
384                                         lock->l_policy_data.l_flock.end;
385                                 *flags |= LDLM_FL_LOCK_CHANGED;
386                                 RETURN(LDLM_ITER_STOP);
387                         }
388
389                         /* add lock to blocking list before deadlock
390                          * check to prevent race */
391                         ldlm_flock_blocking_link(req, lock);
392
393                         if (ldlm_flock_deadlock(req, lock)) {
394                                 ldlm_flock_blocking_unlink(req);
395                                 ldlm_flock_destroy(req, mode, *flags);
396                                 *err = -EDEADLK;
397                                 RETURN(LDLM_ITER_STOP);
398                         }
399
400                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
401                         *flags |= LDLM_FL_BLOCK_GRANTED;
402                         RETURN(LDLM_ITER_STOP);
403                 }
404                 if (reprocess_failed)
405                         RETURN(LDLM_ITER_CONTINUE);
406         }
407
408         if (*flags & LDLM_FL_TEST_LOCK) {
409                 ldlm_flock_destroy(req, mode, *flags);
410                 req->l_req_mode = LCK_NL;
411                 *flags |= LDLM_FL_LOCK_CHANGED;
412                 RETURN(LDLM_ITER_STOP);
413         }
414
415         /* In case we had slept on this lock request take it off of the
416          * deadlock detection hash list. */
417         ldlm_flock_blocking_unlink(req);
418
419         /* Scan the locks owned by this process that overlap this request.
420          * We may have to merge or split existing locks. */
421
422         if (!ownlocks)
423                 ownlocks = &res->lr_granted;
424
425         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
426                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
427
428                 if (!ldlm_same_flock_owner(lock, new))
429                         break;
430
431                 if (lock->l_granted_mode == mode) {
432                         /* If the modes are the same then we need to process
433                          * locks that overlap OR adjoin the new lock. The extra
434                          * logic condition is necessary to deal with arithmetic
435                          * overflow and underflow. */
436                         if ((new->l_policy_data.l_flock.start >
437                              (lock->l_policy_data.l_flock.end + 1))
438                             && (lock->l_policy_data.l_flock.end !=
439                                 OBD_OBJECT_EOF))
440                                 continue;
441
442                         if ((new->l_policy_data.l_flock.end <
443                              (lock->l_policy_data.l_flock.start - 1))
444                             && (lock->l_policy_data.l_flock.start != 0))
445                                 break;
446
447                         if (new->l_policy_data.l_flock.start <
448                             lock->l_policy_data.l_flock.start) {
449                                 lock->l_policy_data.l_flock.start =
450                                         new->l_policy_data.l_flock.start;
451                         } else {
452                                 new->l_policy_data.l_flock.start =
453                                         lock->l_policy_data.l_flock.start;
454                         }
455
456                         if (new->l_policy_data.l_flock.end >
457                             lock->l_policy_data.l_flock.end) {
458                                 lock->l_policy_data.l_flock.end =
459                                         new->l_policy_data.l_flock.end;
460                         } else {
461                                 new->l_policy_data.l_flock.end =
462                                         lock->l_policy_data.l_flock.end;
463                         }
464
465                         if (added) {
466                                 ldlm_flock_destroy(lock, mode, *flags);
467                         } else {
468                                 new = lock;
469                                 added = 1;
470                         }
471                         continue;
472                 }
473
474                 if (new->l_policy_data.l_flock.start >
475                     lock->l_policy_data.l_flock.end)
476                         continue;
477
478                 if (new->l_policy_data.l_flock.end <
479                     lock->l_policy_data.l_flock.start)
480                         break;
481
482                 ++overlaps;
483
484                 if (new->l_policy_data.l_flock.start <=
485                     lock->l_policy_data.l_flock.start) {
486                         if (new->l_policy_data.l_flock.end <
487                             lock->l_policy_data.l_flock.end) {
488                                 lock->l_policy_data.l_flock.start =
489                                         new->l_policy_data.l_flock.end + 1;
490                                 break;
491                         }
492                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
493                         continue;
494                 }
495                 if (new->l_policy_data.l_flock.end >=
496                     lock->l_policy_data.l_flock.end) {
497                         lock->l_policy_data.l_flock.end =
498                                 new->l_policy_data.l_flock.start - 1;
499                         continue;
500                 }
501
502                 /* split the existing lock into two locks */
503
504                 /* if this is an F_UNLCK operation then we could avoid
505                  * allocating a new lock and use the req lock passed in
506                  * with the request but this would complicate the reply
507                  * processing since updates to req get reflected in the
508                  * reply. The client side replays the lock request so
509                  * it must see the original lock data in the reply. */
510
511                 /* XXX - if ldlm_lock_new() can sleep we should
512                  * release the lr_lock, allocate the new lock,
513                  * and restart processing this lock. */
514                 if (new2 == NULL) {
515                         unlock_res_and_lock(req);
516                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
517                                                 lock->l_granted_mode, &null_cbs,
518                                                 NULL, 0, LVB_T_NONE);
519                         lock_res_and_lock(req);
520                         if (IS_ERR(new2)) {
521                                 ldlm_flock_destroy(req, lock->l_granted_mode,
522                                                    *flags);
523                                 *err = PTR_ERR(new2);
524                                 RETURN(LDLM_ITER_STOP);
525                         }
526                         goto reprocess;
527                 }
528
529                 splitted = 1;
530
531                 new2->l_granted_mode = lock->l_granted_mode;
532                 new2->l_policy_data.l_flock.pid =
533                         new->l_policy_data.l_flock.pid;
534                 new2->l_policy_data.l_flock.owner =
535                         new->l_policy_data.l_flock.owner;
536                 new2->l_policy_data.l_flock.start =
537                         lock->l_policy_data.l_flock.start;
538                 new2->l_policy_data.l_flock.end =
539                         new->l_policy_data.l_flock.start - 1;
540                 lock->l_policy_data.l_flock.start =
541                         new->l_policy_data.l_flock.end + 1;
542                 new2->l_conn_export = lock->l_conn_export;
543                 if (lock->l_export != NULL) {
544                         new2->l_export = class_export_lock_get(lock->l_export, new2);
545                         if (new2->l_export->exp_lock_hash &&
546                             hlist_unhashed(&new2->l_exp_hash))
547                                 cfs_hash_add(new2->l_export->exp_lock_hash,
548                                              &new2->l_remote_handle,
549                                              &new2->l_exp_hash);
550                 }
551                 if (*flags == LDLM_FL_WAIT_NOREPROC)
552                         ldlm_lock_addref_internal_nolock(new2,
553                                                          lock->l_granted_mode);
554
555                 /* insert new2 at lock */
556                 ldlm_resource_add_lock(res, ownlocks, new2);
557                 LDLM_LOCK_RELEASE(new2);
558                 break;
559         }
560
561         /* if new2 is created but never used, destroy it*/
562         if (splitted == 0 && new2 != NULL)
563                 ldlm_lock_destroy_nolock(new2);
564
565         /* At this point we're granting the lock request. */
566         req->l_granted_mode = req->l_req_mode;
567
568         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
569         if (!added) {
570                 list_del_init(&req->l_res_link);
571                 /* insert new lock before ownlocks in list. */
572                 ldlm_resource_add_lock(res, ownlocks, req);
573         }
574
575         if (*flags != LDLM_FL_WAIT_NOREPROC) {
576 #ifdef HAVE_SERVER_SUPPORT
577                 if (first_enq) {
578                         /* If this is an unlock, reprocess the waitq and
579                          * send completions ASTs for locks that can now be
580                          * granted. The only problem with doing this
581                          * reprocessing here is that the completion ASTs for
582                          * newly granted locks will be sent before the unlock
583                          * completion is sent. It shouldn't be an issue. Also
584                          * note that ldlm_process_flock_lock() will recurse,
585                          * but only once because first_enq will be false from
586                          * ldlm_reprocess_queue. */
587                         if ((mode == LCK_NL) && overlaps) {
588                                 struct list_head rpc_list;
589                                 int rc;
590
591                                 INIT_LIST_HEAD(&rpc_list);
592 restart:
593                                 ldlm_reprocess_queue(res, &res->lr_waiting,
594                                                      &rpc_list);
595
596                                 unlock_res_and_lock(req);
597                                 rc = ldlm_run_ast_work(ns, &rpc_list,
598                                                        LDLM_WORK_CP_AST);
599                                 lock_res_and_lock(req);
600                                 if (rc == -ERESTART)
601                                         GOTO(restart, rc);
602                        }
603                 } else {
604                         LASSERT(req->l_completion_ast);
605                         ldlm_add_ast_work_item(req, NULL, work_list);
606                 }
607 #else /* !HAVE_SERVER_SUPPORT */
608                 /* The only one possible case for client-side calls flock
609                  * policy function is ldlm_flock_completion_ast inside which
610                  * carries LDLM_FL_WAIT_NOREPROC flag. */
611                 CERROR("Illegal parameter for client-side-only module.\n");
612                 LBUG();
613 #endif /* HAVE_SERVER_SUPPORT */
614         }
615
616         /* In case we're reprocessing the requested lock we can't destroy
617          * it until after calling ldlm_add_ast_work_item() above so that laawi()
618          * can bump the reference count on \a req. Otherwise \a req
619          * could be freed before the completion AST can be sent.  */
620         if (added)
621                 ldlm_flock_destroy(req, mode, *flags);
622
623         ldlm_resource_dump(D_INFO, res);
624         RETURN(LDLM_ITER_CONTINUE);
625 }
626
627 struct ldlm_flock_wait_data {
628         struct ldlm_lock *fwd_lock;
629         int               fwd_generation;
630 };
631
632 static void
633 ldlm_flock_interrupted_wait(void *data)
634 {
635         struct ldlm_lock *lock;
636         ENTRY;
637
638         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
639
640         /* take lock off the deadlock detection hash list. */
641         lock_res_and_lock(lock);
642         ldlm_flock_blocking_unlink(lock);
643
644         /* client side - set flag to prevent lock from being put on LRU list */
645         ldlm_set_cbpending(lock);
646         unlock_res_and_lock(lock);
647
648         EXIT;
649 }
650
651 /**
652  * Flock completion callback function.
653  *
654  * \param lock [in,out]: A lock to be handled
655  * \param flags    [in]: flags
656  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
657  *
658  * \retval 0    : success
659  * \retval <0   : failure
660  */
661 int
662 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
663 {
664         struct file_lock *getlk = lock->l_ast_data;
665         struct obd_device *obd;
666         struct obd_import *imp = NULL;
667         struct ldlm_flock_wait_data fwd;
668         struct l_wait_info lwi;
669         enum ldlm_error err;
670         int rc = 0;
671         ENTRY;
672
673         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
674         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
675                 lock_res_and_lock(lock);
676                 lock->l_flags |= LDLM_FL_FAIL_LOC;
677                 unlock_res_and_lock(lock);
678                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
679         }
680         CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
681                flags, data, getlk);
682
683         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
684
685         if (flags & LDLM_FL_FAILED)
686                 goto granted;
687
688         if (!(flags & LDLM_FL_BLOCKED_MASK)) {
689                 if (NULL == data)
690                         /* mds granted the lock in the reply */
691                         goto granted;
692                 /* CP AST RPC: lock get granted, wake it up */
693                 wake_up(&lock->l_waitq);
694                 RETURN(0);
695         }
696
697         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
698                    "sleeping");
699         fwd.fwd_lock = lock;
700         obd = class_exp2obd(lock->l_conn_export);
701
702         /* if this is a local lock, there is no import */
703         if (NULL != obd)
704                 imp = obd->u.cli.cl_import;
705
706         if (NULL != imp) {
707                 spin_lock(&imp->imp_lock);
708                 fwd.fwd_generation = imp->imp_generation;
709                 spin_unlock(&imp->imp_lock);
710         }
711
712         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
713
714         /* Go to sleep until the lock is granted. */
715         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
716
717         if (rc) {
718                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
719                            rc);
720                 RETURN(rc);
721         }
722
723 granted:
724         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
725
726         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
727                 lock_res_and_lock(lock);
728                 /* DEADLOCK is always set with CBPENDING */
729                 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
730                 unlock_res_and_lock(lock);
731                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
732         }
733         if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
734                 lock_res_and_lock(lock);
735                 /* DEADLOCK is always set with CBPENDING */
736                 lock->l_flags |= LDLM_FL_FAIL_LOC |
737                                  LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
738                 unlock_res_and_lock(lock);
739                 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
740         }
741
742         lock_res_and_lock(lock);
743
744
745         /* Protect against race where lock could have been just destroyed
746          * due to overlap in ldlm_process_flock_lock().
747          */
748         if (ldlm_is_destroyed(lock)) {
749                 unlock_res_and_lock(lock);
750                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
751
752                 /* An error is still to be returned, to propagate it up to
753                  * ldlm_cli_enqueue_fini() caller. */
754                 RETURN(-EIO);
755         }
756
757         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
758         ldlm_resource_unlink_lock(lock);
759
760         /* Import invalidation. We need to actually release the lock
761          * references being held, so that it can go away. No point in
762          * holding the lock even if app still believes it has it, since
763          * server already dropped it anyway. Only for granted locks too. */
764         /* Do the same for DEADLOCK'ed locks. */
765         if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
766                 int mode;
767
768                 if (flags & LDLM_FL_TEST_LOCK)
769                         LASSERT(ldlm_is_test_lock(lock));
770
771                 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
772                         mode = flock_type(getlk);
773                 else
774                         mode = lock->l_granted_mode;
775
776                 if (ldlm_is_flock_deadlock(lock)) {
777                         LDLM_DEBUG(lock, "client-side enqueue deadlock "
778                                    "received");
779                         rc = -EDEADLK;
780                 }
781                 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
782                 unlock_res_and_lock(lock);
783
784                 /* Need to wake up the waiter if we were evicted */
785                 wake_up(&lock->l_waitq);
786
787                 /* An error is still to be returned, to propagate it up to
788                  * ldlm_cli_enqueue_fini() caller. */
789                 RETURN(rc ? : -EIO);
790         }
791
792         LDLM_DEBUG(lock, "client-side enqueue granted");
793
794         if (flags & LDLM_FL_TEST_LOCK) {
795                 /* fcntl(F_GETLK) request */
796                 /* The old mode was saved in getlk->fl_type so that if the mode
797                  * in the lock changes we can decref the appropriate refcount.*/
798                 LASSERT(ldlm_is_test_lock(lock));
799                 ldlm_flock_destroy(lock, flock_type(getlk),
800                                    LDLM_FL_WAIT_NOREPROC);
801                 switch (lock->l_granted_mode) {
802                 case LCK_PR:
803                         flock_set_type(getlk, F_RDLCK);
804                         break;
805                 case LCK_PW:
806                         flock_set_type(getlk, F_WRLCK);
807                         break;
808                 default:
809                         flock_set_type(getlk, F_UNLCK);
810                 }
811                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
812                 flock_set_start(getlk,
813                                 (loff_t)lock->l_policy_data.l_flock.start);
814                 flock_set_end(getlk,
815                               (loff_t)lock->l_policy_data.l_flock.end);
816         } else {
817                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
818
819                 /* We need to reprocess the lock to do merges or splits
820                  * with existing locks owned by this process. */
821                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
822         }
823         unlock_res_and_lock(lock);
824         RETURN(rc);
825 }
826 EXPORT_SYMBOL(ldlm_flock_completion_ast);
827
828 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
829                             void *data, int flag)
830 {
831         ENTRY;
832
833         LASSERT(lock);
834         LASSERT(flag == LDLM_CB_CANCELING);
835
836         /* take lock off the deadlock detection hash list. */
837         lock_res_and_lock(lock);
838         ldlm_flock_blocking_unlink(lock);
839         unlock_res_and_lock(lock);
840         RETURN(0);
841 }
842
843 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
844                                      union ldlm_policy_data *lpolicy)
845 {
846         memset(lpolicy, 0, sizeof(*lpolicy));
847         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
848         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
849         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
850         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
851 }
852
853 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
854                                      union ldlm_wire_policy_data *wpolicy)
855 {
856         memset(wpolicy, 0, sizeof(*wpolicy));
857         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
858         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
859         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
860         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
861 }
862
863 /*
864  * Export handle<->flock hash operations.
865  */
866 static unsigned
867 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
868 {
869         return cfs_hash_u64_hash(*(__u64 *)key, mask);
870 }
871
872 static void *
873 ldlm_export_flock_key(struct hlist_node *hnode)
874 {
875         struct ldlm_lock *lock;
876
877         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
878         return &lock->l_policy_data.l_flock.owner;
879 }
880
881 static int
882 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
883 {
884         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
885 }
886
887 static void *
888 ldlm_export_flock_object(struct hlist_node *hnode)
889 {
890         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
891 }
892
893 static void
894 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
895 {
896         struct ldlm_lock *lock;
897         struct ldlm_flock *flock;
898
899         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
900         LDLM_LOCK_GET(lock);
901
902         flock = &lock->l_policy_data.l_flock;
903         LASSERT(flock->blocking_export != NULL);
904         class_export_get(flock->blocking_export);
905         flock->blocking_refs++;
906 }
907
908 static void
909 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
910 {
911         struct ldlm_lock *lock;
912         struct ldlm_flock *flock;
913
914         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
915         LDLM_LOCK_RELEASE(lock);
916
917         flock = &lock->l_policy_data.l_flock;
918         LASSERT(flock->blocking_export != NULL);
919         class_export_put(flock->blocking_export);
920         if (--flock->blocking_refs == 0) {
921                 flock->blocking_owner = 0;
922                 flock->blocking_export = NULL;
923         }
924 }
925
926 static struct cfs_hash_ops ldlm_export_flock_ops = {
927         .hs_hash        = ldlm_export_flock_hash,
928         .hs_key         = ldlm_export_flock_key,
929         .hs_keycmp      = ldlm_export_flock_keycmp,
930         .hs_object      = ldlm_export_flock_object,
931         .hs_get         = ldlm_export_flock_get,
932         .hs_put         = ldlm_export_flock_put,
933         .hs_put_locked  = ldlm_export_flock_put,
934 };
935
936 int ldlm_init_flock_export(struct obd_export *exp)
937 {
938         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
939                 RETURN(0);
940
941         exp->exp_flock_hash =
942                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
943                                 HASH_EXP_LOCK_CUR_BITS,
944                                 HASH_EXP_LOCK_MAX_BITS,
945                                 HASH_EXP_LOCK_BKT_BITS, 0,
946                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
947                                 &ldlm_export_flock_ops,
948                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
949         if (!exp->exp_flock_hash)
950                 RETURN(-ENOMEM);
951
952         RETURN(0);
953 }
954
955 void ldlm_destroy_flock_export(struct obd_export *exp)
956 {
957         ENTRY;
958         if (exp->exp_flock_hash) {
959                 cfs_hash_putref(exp->exp_flock_hash);
960                 exp->exp_flock_hash = NULL;
961         }
962         EXIT;
963 }