Whamcloud - gitweb
LU-3467 ofd: use unified handler for OST requests
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #ifdef __KERNEL__
60 #include <lustre_dlm.h>
61 #include <obd_support.h>
62 #include <obd_class.h>
63 #include <lustre_lib.h>
64 #include <libcfs/list.h>
65 #else
66 #include <liblustre.h>
67 #include <obd_class.h>
68 #endif
69
70 #include "ldlm_internal.h"
71
72 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
73                             void *data, int flag);
74
75 /**
76  * list_for_remaining_safe - iterate over the remaining entries in a list
77  *              and safeguard against removal of a list entry.
78  * \param pos   the &struct list_head to use as a loop counter. pos MUST
79  *              have been initialized prior to using it in this macro.
80  * \param n     another &struct list_head to use as temporary storage
81  * \param head  the head for your list.
82  */
83 #define list_for_remaining_safe(pos, n, head) \
84         for (n = pos->next; pos != (head); pos = n, n = pos->next)
85
86 static inline int
87 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
88 {
89         return((new->l_policy_data.l_flock.owner ==
90                 lock->l_policy_data.l_flock.owner) &&
91                (new->l_export == lock->l_export));
92 }
93
94 static inline int
95 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
96 {
97         return((new->l_policy_data.l_flock.start <=
98                 lock->l_policy_data.l_flock.end) &&
99                (new->l_policy_data.l_flock.end >=
100                 lock->l_policy_data.l_flock.start));
101 }
102
103 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
104                                             struct ldlm_lock *lock)
105 {
106         /* For server only */
107         if (req->l_export == NULL)
108                 return;
109
110         LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
111
112         req->l_policy_data.l_flock.blocking_owner =
113                 lock->l_policy_data.l_flock.owner;
114         req->l_policy_data.l_flock.blocking_export =
115                 lock->l_export;
116         req->l_policy_data.l_flock.blocking_refs = 0;
117
118         cfs_hash_add(req->l_export->exp_flock_hash,
119                      &req->l_policy_data.l_flock.owner,
120                      &req->l_exp_flock_hash);
121 }
122
123 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
124 {
125         /* For server only */
126         if (req->l_export == NULL)
127                 return;
128
129         check_res_locked(req->l_resource);
130         if (req->l_export->exp_flock_hash != NULL &&
131             !cfs_hlist_unhashed(&req->l_exp_flock_hash))
132                 cfs_hash_del(req->l_export->exp_flock_hash,
133                              &req->l_policy_data.l_flock.owner,
134                              &req->l_exp_flock_hash);
135 }
136
137 static inline void
138 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
139 {
140         ENTRY;
141
142         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
143                    mode, flags);
144
145         /* Safe to not lock here, since it should be empty anyway */
146         LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
147
148         cfs_list_del_init(&lock->l_res_link);
149         if (flags == LDLM_FL_WAIT_NOREPROC && !ldlm_is_failed(lock)) {
150                 /* client side - set a flag to prevent sending a CANCEL */
151                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
152
153                 /* when reaching here, it is under lock_res_and_lock(). Thus,
154                    need call the nolock version of ldlm_lock_decref_internal*/
155                 ldlm_lock_decref_internal_nolock(lock, mode);
156         }
157
158         ldlm_lock_destroy_nolock(lock);
159         EXIT;
160 }
161
162 /**
163  * POSIX locks deadlock detection code.
164  *
165  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
166  * with, we need to iterate through all blocked POSIX locks for this
167  * export and see if there is a deadlock condition arising. (i.e. when
168  * one client holds a lock on something and want a lock on something
169  * else and at the same time another client has the opposite situation).
170  */
171
172 struct ldlm_flock_lookup_cb_data {
173         __u64 *bl_owner;
174         struct ldlm_lock *lock;
175         struct obd_export *exp;
176 };
177
178 static int ldlm_flock_lookup_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
179                                 cfs_hlist_node_t *hnode, void *data)
180 {
181         struct ldlm_flock_lookup_cb_data *cb_data = data;
182         struct obd_export *exp = cfs_hash_object(hs, hnode);
183         struct ldlm_lock *lock;
184
185         lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
186         if (lock == NULL)
187                 return 0;
188
189         /* Stop on first found lock. Same process can't sleep twice */
190         cb_data->lock = lock;
191         cb_data->exp = class_export_get(exp);
192
193         return 1;
194 }
195
196 static int
197 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
198 {
199         struct obd_export *req_exp = req->l_export;
200         struct obd_export *bl_exp = bl_lock->l_export;
201         __u64 req_owner = req->l_policy_data.l_flock.owner;
202         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
203
204         /* For server only */
205         if (req_exp == NULL)
206                 return 0;
207
208         class_export_get(bl_exp);
209         while (1) {
210                 struct ldlm_flock_lookup_cb_data cb_data = {
211                                         .bl_owner = &bl_owner,
212                                         .lock = NULL,
213                                         .exp = NULL };
214                 struct obd_export *bl_exp_new;
215                 struct ldlm_lock *lock = NULL;
216                 struct ldlm_flock *flock;
217
218                 if (bl_exp->exp_flock_hash != NULL) {
219                         cfs_hash_for_each_key(bl_exp->exp_obd->obd_nid_hash,
220                                 &bl_exp->exp_connection->c_peer.nid,
221                                 ldlm_flock_lookup_cb, &cb_data);
222                         lock = cb_data.lock;
223                 }
224                 if (lock == NULL)
225                         break;
226
227                 class_export_put(bl_exp);
228                 bl_exp = cb_data.exp;
229
230                 LASSERT(req != lock);
231                 flock = &lock->l_policy_data.l_flock;
232                 LASSERT(flock->owner == bl_owner);
233                 bl_owner = flock->blocking_owner;
234                 bl_exp_new = class_export_get(flock->blocking_export);
235                 class_export_put(bl_exp);
236
237                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
238                 bl_exp = bl_exp_new;
239
240                 if (bl_exp->exp_failed)
241                         break;
242
243                 if (bl_owner == req_owner &&
244                     (bl_exp->exp_connection->c_peer.nid ==
245                      req_exp->exp_connection->c_peer.nid)) {
246                         class_export_put(bl_exp);
247                         return 1;
248                 }
249         }
250         class_export_put(bl_exp);
251
252         return 0;
253 }
254
255 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
256                                                 cfs_list_t *work_list)
257 {
258         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
259
260         if ((exp_connect_flags(lock->l_export) &
261                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
262                 CERROR("deadlock found, but client doesn't "
263                                 "support flock canceliation\n");
264         } else {
265                 LASSERT(lock->l_completion_ast);
266                 LASSERT(!ldlm_is_ast_sent(lock));
267                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
268                         LDLM_FL_FLOCK_DEADLOCK;
269                 ldlm_flock_blocking_unlink(lock);
270                 ldlm_resource_unlink_lock(lock);
271                 ldlm_add_ast_work_item(lock, NULL, work_list);
272         }
273 }
274
275 /**
276  * Process a granting attempt for flock lock.
277  * Must be called under ns lock held.
278  *
279  * This function looks for any conflicts for \a lock in the granted or
280  * waiting queues. The lock is granted if no conflicts are found in
281  * either queue.
282  *
283  * It is also responsible for splitting a lock if a portion of the lock
284  * is released.
285  *
286  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
287  *   - blocking ASTs have already been sent
288  *
289  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
290  *   - blocking ASTs have not been sent yet, so list of conflicting locks
291  *     would be collected and ASTs sent.
292  */
293 int
294 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
295                         ldlm_error_t *err, cfs_list_t *work_list)
296 {
297         struct ldlm_resource *res = req->l_resource;
298         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
299         cfs_list_t *tmp;
300         cfs_list_t *ownlocks = NULL;
301         struct ldlm_lock *lock = NULL;
302         struct ldlm_lock *new = req;
303         struct ldlm_lock *new2 = NULL;
304         ldlm_mode_t mode = req->l_req_mode;
305         int local = ns_is_client(ns);
306         int added = (mode == LCK_NL);
307         int overlaps = 0;
308         int splitted = 0;
309         const struct ldlm_callback_suite null_cbs = { NULL };
310         ENTRY;
311
312         CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
313                LPU64" end "LPU64"\n", *flags,
314                new->l_policy_data.l_flock.owner,
315                new->l_policy_data.l_flock.pid, mode,
316                req->l_policy_data.l_flock.start,
317                req->l_policy_data.l_flock.end);
318
319         *err = ELDLM_OK;
320
321         if (local) {
322                 /* No blocking ASTs are sent to the clients for
323                  * Posix file & record locks */
324                 req->l_blocking_ast = NULL;
325         } else {
326                 /* Called on the server for lock cancels. */
327                 req->l_blocking_ast = ldlm_flock_blocking_ast;
328         }
329
330 reprocess:
331         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
332                 /* This loop determines where this processes locks start
333                  * in the resource lr_granted list. */
334                 cfs_list_for_each(tmp, &res->lr_granted) {
335                         lock = cfs_list_entry(tmp, struct ldlm_lock,
336                                               l_res_link);
337                         if (ldlm_same_flock_owner(lock, req)) {
338                                 ownlocks = tmp;
339                                 break;
340                         }
341                 }
342         } else {
343                 int reprocess_failed = 0;
344                 lockmode_verify(mode);
345
346                 /* This loop determines if there are existing locks
347                  * that conflict with the new lock request. */
348                 cfs_list_for_each(tmp, &res->lr_granted) {
349                         lock = cfs_list_entry(tmp, struct ldlm_lock,
350                                               l_res_link);
351
352                         if (ldlm_same_flock_owner(lock, req)) {
353                                 if (!ownlocks)
354                                         ownlocks = tmp;
355                                 continue;
356                         }
357
358                         /* locks are compatible, overlap doesn't matter */
359                         if (lockmode_compat(lock->l_granted_mode, mode))
360                                 continue;
361
362                         if (!ldlm_flocks_overlap(lock, req))
363                                 continue;
364
365                         if (!first_enq) {
366                                 reprocess_failed = 1;
367                                 if (ldlm_flock_deadlock(req, lock)) {
368                                         ldlm_flock_cancel_on_deadlock(req,
369                                                         work_list);
370                                         RETURN(LDLM_ITER_CONTINUE);
371                                 }
372                                 continue;
373                         }
374
375                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
376                                 ldlm_flock_destroy(req, mode, *flags);
377                                 *err = -EAGAIN;
378                                 RETURN(LDLM_ITER_STOP);
379                         }
380
381                         if (*flags & LDLM_FL_TEST_LOCK) {
382                                 ldlm_flock_destroy(req, mode, *flags);
383                                 req->l_req_mode = lock->l_granted_mode;
384                                 req->l_policy_data.l_flock.pid =
385                                         lock->l_policy_data.l_flock.pid;
386                                 req->l_policy_data.l_flock.start =
387                                         lock->l_policy_data.l_flock.start;
388                                 req->l_policy_data.l_flock.end =
389                                         lock->l_policy_data.l_flock.end;
390                                 *flags |= LDLM_FL_LOCK_CHANGED;
391                                 RETURN(LDLM_ITER_STOP);
392                         }
393
394                         /* add lock to blocking list before deadlock
395                          * check to prevent race */
396                         ldlm_flock_blocking_link(req, lock);
397
398                         if (ldlm_flock_deadlock(req, lock)) {
399                                 ldlm_flock_blocking_unlink(req);
400                                 ldlm_flock_destroy(req, mode, *flags);
401                                 *err = -EDEADLK;
402                                 RETURN(LDLM_ITER_STOP);
403                         }
404
405                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
406                         *flags |= LDLM_FL_BLOCK_GRANTED;
407                         RETURN(LDLM_ITER_STOP);
408                 }
409                 if (reprocess_failed)
410                         RETURN(LDLM_ITER_CONTINUE);
411         }
412
413         if (*flags & LDLM_FL_TEST_LOCK) {
414                 ldlm_flock_destroy(req, mode, *flags);
415                 req->l_req_mode = LCK_NL;
416                 *flags |= LDLM_FL_LOCK_CHANGED;
417                 RETURN(LDLM_ITER_STOP);
418         }
419
420         /* In case we had slept on this lock request take it off of the
421          * deadlock detection hash list. */
422         ldlm_flock_blocking_unlink(req);
423
424         /* Scan the locks owned by this process that overlap this request.
425          * We may have to merge or split existing locks. */
426
427         if (!ownlocks)
428                 ownlocks = &res->lr_granted;
429
430         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
431                 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
432
433                 if (!ldlm_same_flock_owner(lock, new))
434                         break;
435
436                 if (lock->l_granted_mode == mode) {
437                         /* If the modes are the same then we need to process
438                          * locks that overlap OR adjoin the new lock. The extra
439                          * logic condition is necessary to deal with arithmetic
440                          * overflow and underflow. */
441                         if ((new->l_policy_data.l_flock.start >
442                              (lock->l_policy_data.l_flock.end + 1))
443                             && (lock->l_policy_data.l_flock.end !=
444                                 OBD_OBJECT_EOF))
445                                 continue;
446
447                         if ((new->l_policy_data.l_flock.end <
448                              (lock->l_policy_data.l_flock.start - 1))
449                             && (lock->l_policy_data.l_flock.start != 0))
450                                 break;
451
452                         if (new->l_policy_data.l_flock.start <
453                             lock->l_policy_data.l_flock.start) {
454                                 lock->l_policy_data.l_flock.start =
455                                         new->l_policy_data.l_flock.start;
456                         } else {
457                                 new->l_policy_data.l_flock.start =
458                                         lock->l_policy_data.l_flock.start;
459                         }
460
461                         if (new->l_policy_data.l_flock.end >
462                             lock->l_policy_data.l_flock.end) {
463                                 lock->l_policy_data.l_flock.end =
464                                         new->l_policy_data.l_flock.end;
465                         } else {
466                                 new->l_policy_data.l_flock.end =
467                                         lock->l_policy_data.l_flock.end;
468                         }
469
470                         if (added) {
471                                 ldlm_flock_destroy(lock, mode, *flags);
472                         } else {
473                                 new = lock;
474                                 added = 1;
475                         }
476                         continue;
477                 }
478
479                 if (new->l_policy_data.l_flock.start >
480                     lock->l_policy_data.l_flock.end)
481                         continue;
482
483                 if (new->l_policy_data.l_flock.end <
484                     lock->l_policy_data.l_flock.start)
485                         break;
486
487                 ++overlaps;
488
489                 if (new->l_policy_data.l_flock.start <=
490                     lock->l_policy_data.l_flock.start) {
491                         if (new->l_policy_data.l_flock.end <
492                             lock->l_policy_data.l_flock.end) {
493                                 lock->l_policy_data.l_flock.start =
494                                         new->l_policy_data.l_flock.end + 1;
495                                 break;
496                         }
497                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
498                         continue;
499                 }
500                 if (new->l_policy_data.l_flock.end >=
501                     lock->l_policy_data.l_flock.end) {
502                         lock->l_policy_data.l_flock.end =
503                                 new->l_policy_data.l_flock.start - 1;
504                         continue;
505                 }
506
507                 /* split the existing lock into two locks */
508
509                 /* if this is an F_UNLCK operation then we could avoid
510                  * allocating a new lock and use the req lock passed in
511                  * with the request but this would complicate the reply
512                  * processing since updates to req get reflected in the
513                  * reply. The client side replays the lock request so
514                  * it must see the original lock data in the reply. */
515
516                 /* XXX - if ldlm_lock_new() can sleep we should
517                  * release the lr_lock, allocate the new lock,
518                  * and restart processing this lock. */
519                 if (!new2) {
520                         unlock_res_and_lock(req);
521                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
522                                                 lock->l_granted_mode, &null_cbs,
523                                                 NULL, 0, LVB_T_NONE);
524                         lock_res_and_lock(req);
525                         if (!new2) {
526                                 ldlm_flock_destroy(req, lock->l_granted_mode,
527                                                    *flags);
528                                 *err = -ENOLCK;
529                                 RETURN(LDLM_ITER_STOP);
530                         }
531                         goto reprocess;
532                 }
533
534                 splitted = 1;
535
536                 new2->l_granted_mode = lock->l_granted_mode;
537                 new2->l_policy_data.l_flock.pid =
538                         new->l_policy_data.l_flock.pid;
539                 new2->l_policy_data.l_flock.owner =
540                         new->l_policy_data.l_flock.owner;
541                 new2->l_policy_data.l_flock.start =
542                         lock->l_policy_data.l_flock.start;
543                 new2->l_policy_data.l_flock.end =
544                         new->l_policy_data.l_flock.start - 1;
545                 lock->l_policy_data.l_flock.start =
546                         new->l_policy_data.l_flock.end + 1;
547                 new2->l_conn_export = lock->l_conn_export;
548                 if (lock->l_export != NULL) {
549                         new2->l_export = class_export_lock_get(lock->l_export, new2);
550                         if (new2->l_export->exp_lock_hash &&
551                             cfs_hlist_unhashed(&new2->l_exp_hash))
552                                 cfs_hash_add(new2->l_export->exp_lock_hash,
553                                              &new2->l_remote_handle,
554                                              &new2->l_exp_hash);
555                 }
556                 if (*flags == LDLM_FL_WAIT_NOREPROC)
557                         ldlm_lock_addref_internal_nolock(new2,
558                                                          lock->l_granted_mode);
559
560                 /* insert new2 at lock */
561                 ldlm_resource_add_lock(res, ownlocks, new2);
562                 LDLM_LOCK_RELEASE(new2);
563                 break;
564         }
565
566         /* if new2 is created but never used, destroy it*/
567         if (splitted == 0 && new2 != NULL)
568                 ldlm_lock_destroy_nolock(new2);
569
570         /* At this point we're granting the lock request. */
571         req->l_granted_mode = req->l_req_mode;
572
573         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
574         if (!added) {
575                 cfs_list_del_init(&req->l_res_link);
576                 /* insert new lock before ownlocks in list. */
577                 ldlm_resource_add_lock(res, ownlocks, req);
578         }
579
580         if (*flags != LDLM_FL_WAIT_NOREPROC) {
581 #ifdef HAVE_SERVER_SUPPORT
582                 if (first_enq) {
583                         /* If this is an unlock, reprocess the waitq and
584                          * send completions ASTs for locks that can now be
585                          * granted. The only problem with doing this
586                          * reprocessing here is that the completion ASTs for
587                          * newly granted locks will be sent before the unlock
588                          * completion is sent. It shouldn't be an issue. Also
589                          * note that ldlm_process_flock_lock() will recurse,
590                          * but only once because first_enq will be false from
591                          * ldlm_reprocess_queue. */
592                         if ((mode == LCK_NL) && overlaps) {
593                                 CFS_LIST_HEAD(rpc_list);
594                                 int rc;
595 restart:
596                                 ldlm_reprocess_queue(res, &res->lr_waiting,
597                                                      &rpc_list);
598
599                                 unlock_res_and_lock(req);
600                                 rc = ldlm_run_ast_work(ns, &rpc_list,
601                                                        LDLM_WORK_CP_AST);
602                                 lock_res_and_lock(req);
603                                 if (rc == -ERESTART)
604                                         GOTO(restart, -ERESTART);
605                        }
606                 } else {
607                         LASSERT(req->l_completion_ast);
608                         ldlm_add_ast_work_item(req, NULL, work_list);
609                 }
610 #else /* !HAVE_SERVER_SUPPORT */
611                 /* The only one possible case for client-side calls flock
612                  * policy function is ldlm_flock_completion_ast inside which
613                  * carries LDLM_FL_WAIT_NOREPROC flag. */
614                 CERROR("Illegal parameter for client-side-only module.\n");
615                 LBUG();
616 #endif /* HAVE_SERVER_SUPPORT */
617         }
618
619         /* In case we're reprocessing the requested lock we can't destroy
620          * it until after calling ldlm_add_ast_work_item() above so that laawi()
621          * can bump the reference count on \a req. Otherwise \a req
622          * could be freed before the completion AST can be sent.  */
623         if (added)
624                 ldlm_flock_destroy(req, mode, *flags);
625
626         ldlm_resource_dump(D_INFO, res);
627         RETURN(LDLM_ITER_CONTINUE);
628 }
629
630 struct ldlm_flock_wait_data {
631         struct ldlm_lock *fwd_lock;
632         int               fwd_generation;
633 };
634
635 static void
636 ldlm_flock_interrupted_wait(void *data)
637 {
638         struct ldlm_lock *lock;
639         ENTRY;
640
641         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
642
643         /* take lock off the deadlock detection hash list. */
644         lock_res_and_lock(lock);
645         ldlm_flock_blocking_unlink(lock);
646
647         /* client side - set flag to prevent lock from being put on LRU list */
648         ldlm_set_cbpending(lock);
649         unlock_res_and_lock(lock);
650
651         EXIT;
652 }
653
654 /**
655  * Flock completion callback function.
656  *
657  * \param lock [in,out]: A lock to be handled
658  * \param flags    [in]: flags
659  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
660  *
661  * \retval 0    : success
662  * \retval <0   : failure
663  */
664 int
665 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
666 {
667         struct file_lock                *getlk = lock->l_ast_data;
668         struct obd_device              *obd;
669         struct obd_import              *imp = NULL;
670         struct ldlm_flock_wait_data     fwd;
671         struct l_wait_info              lwi;
672         ldlm_error_t                    err;
673         int                             rc = 0;
674         ENTRY;
675
676         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
677                flags, data, getlk);
678
679         /* Import invalidation. We need to actually release the lock
680          * references being held, so that it can go away. No point in
681          * holding the lock even if app still believes it has it, since
682          * server already dropped it anyway. Only for granted locks too. */
683         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
684             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
685                 if (lock->l_req_mode == lock->l_granted_mode &&
686                     lock->l_granted_mode != LCK_NL &&
687                     NULL == data)
688                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
689
690                 /* Need to wake up the waiter if we were evicted */
691                 wake_up(&lock->l_waitq);
692                 RETURN(0);
693         }
694
695         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
696
697         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
698                        LDLM_FL_BLOCK_CONV))) {
699                 if (NULL == data)
700                         /* mds granted the lock in the reply */
701                         goto granted;
702                 /* CP AST RPC: lock get granted, wake it up */
703                 wake_up(&lock->l_waitq);
704                 RETURN(0);
705         }
706
707         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
708                    "sleeping");
709         fwd.fwd_lock = lock;
710         obd = class_exp2obd(lock->l_conn_export);
711
712         /* if this is a local lock, there is no import */
713         if (NULL != obd)
714                 imp = obd->u.cli.cl_import;
715
716         if (NULL != imp) {
717                 spin_lock(&imp->imp_lock);
718                 fwd.fwd_generation = imp->imp_generation;
719                 spin_unlock(&imp->imp_lock);
720         }
721
722         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
723
724         /* Go to sleep until the lock is granted. */
725         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
726
727         if (rc) {
728                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
729                            rc);
730                 RETURN(rc);
731         }
732
733 granted:
734         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
735
736         if (ldlm_is_failed(lock)) {
737                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
738                 RETURN(-EIO);
739         }
740
741         LDLM_DEBUG(lock, "client-side enqueue granted");
742
743         lock_res_and_lock(lock);
744
745
746         /* Protect against race where lock could have been just destroyed
747          * due to overlap in ldlm_process_flock_lock().
748          */
749         if (ldlm_is_destroyed(lock)) {
750                 unlock_res_and_lock(lock);
751                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
752                 RETURN(0);
753         }
754
755         /* take lock off the deadlock detection hash list. */
756         ldlm_flock_blocking_unlink(lock);
757
758         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
759         cfs_list_del_init(&lock->l_res_link);
760
761         if (ldlm_is_flock_deadlock(lock)) {
762                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
763                 rc = -EDEADLK;
764         } else if (flags & LDLM_FL_TEST_LOCK) {
765                 /* fcntl(F_GETLK) request */
766                 /* The old mode was saved in getlk->fl_type so that if the mode
767                  * in the lock changes we can decref the appropriate refcount.*/
768                 ldlm_flock_destroy(lock, flock_type(getlk),
769                                    LDLM_FL_WAIT_NOREPROC);
770                 switch (lock->l_granted_mode) {
771                 case LCK_PR:
772                         flock_set_type(getlk, F_RDLCK);
773                         break;
774                 case LCK_PW:
775                         flock_set_type(getlk, F_WRLCK);
776                         break;
777                 default:
778                         flock_set_type(getlk, F_UNLCK);
779                 }
780                 flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);
781                 flock_set_start(getlk,
782                                 (loff_t)lock->l_policy_data.l_flock.start);
783                 flock_set_end(getlk,
784                               (loff_t)lock->l_policy_data.l_flock.end);
785         } else {
786                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
787
788                 /* We need to reprocess the lock to do merges or splits
789                  * with existing locks owned by this process. */
790                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
791         }
792         unlock_res_and_lock(lock);
793         RETURN(rc);
794 }
795 EXPORT_SYMBOL(ldlm_flock_completion_ast);
796
797 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
798                             void *data, int flag)
799 {
800         ENTRY;
801
802         LASSERT(lock);
803         LASSERT(flag == LDLM_CB_CANCELING);
804
805         /* take lock off the deadlock detection hash list. */
806         lock_res_and_lock(lock);
807         ldlm_flock_blocking_unlink(lock);
808         unlock_res_and_lock(lock);
809         RETURN(0);
810 }
811
812 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
813                                        ldlm_policy_data_t *lpolicy)
814 {
815         memset(lpolicy, 0, sizeof(*lpolicy));
816         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
817         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
818         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
819         /* Compat code, old clients had no idea about owner field and
820          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
821          * April 2011 */
822         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
823 }
824
825
826 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
827                                        ldlm_policy_data_t *lpolicy)
828 {
829         memset(lpolicy, 0, sizeof(*lpolicy));
830         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
831         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
832         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
833         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
834 }
835
836 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
837                                      ldlm_wire_policy_data_t *wpolicy)
838 {
839         memset(wpolicy, 0, sizeof(*wpolicy));
840         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
841         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
842         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
843         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
844 }
845
846 /*
847  * Export handle<->flock hash operations.
848  */
849 static unsigned
850 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
851 {
852         return cfs_hash_u64_hash(*(__u64 *)key, mask);
853 }
854
855 static void *
856 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
857 {
858         struct ldlm_lock *lock;
859
860         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
861         return &lock->l_policy_data.l_flock.owner;
862 }
863
864 static int
865 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
866 {
867         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
868 }
869
870 static void *
871 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
872 {
873         return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
874 }
875
876 static void
877 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
878 {
879         struct ldlm_lock *lock;
880         struct ldlm_flock *flock;
881
882         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
883         LDLM_LOCK_GET(lock);
884
885         flock = &lock->l_policy_data.l_flock;
886         LASSERT(flock->blocking_export != NULL);
887         class_export_get(flock->blocking_export);
888         flock->blocking_refs++;
889 }
890
891 static void
892 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
893 {
894         struct ldlm_lock *lock;
895         struct ldlm_flock *flock;
896
897         lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
898         LDLM_LOCK_RELEASE(lock);
899
900         flock = &lock->l_policy_data.l_flock;
901         LASSERT(flock->blocking_export != NULL);
902         class_export_put(flock->blocking_export);
903         if (--flock->blocking_refs == 0) {
904                 flock->blocking_owner = 0;
905                 flock->blocking_export = NULL;
906         }
907 }
908
909 static cfs_hash_ops_t ldlm_export_flock_ops = {
910         .hs_hash        = ldlm_export_flock_hash,
911         .hs_key         = ldlm_export_flock_key,
912         .hs_keycmp      = ldlm_export_flock_keycmp,
913         .hs_object      = ldlm_export_flock_object,
914         .hs_get         = ldlm_export_flock_get,
915         .hs_put         = ldlm_export_flock_put,
916         .hs_put_locked  = ldlm_export_flock_put,
917 };
918
919 int ldlm_init_flock_export(struct obd_export *exp)
920 {
921         if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
922                 RETURN(0);
923
924         exp->exp_flock_hash =
925                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
926                                 HASH_EXP_LOCK_CUR_BITS,
927                                 HASH_EXP_LOCK_MAX_BITS,
928                                 HASH_EXP_LOCK_BKT_BITS, 0,
929                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
930                                 &ldlm_export_flock_ops,
931                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
932         if (!exp->exp_flock_hash)
933                 RETURN(-ENOMEM);
934
935         RETURN(0);
936 }
937 EXPORT_SYMBOL(ldlm_init_flock_export);
938
939 void ldlm_destroy_flock_export(struct obd_export *exp)
940 {
941         ENTRY;
942         if (exp->exp_flock_hash) {
943                 cfs_hash_putref(exp->exp_flock_hash);
944                 exp->exp_flock_hash = NULL;
945         }
946         EXIT;
947 }
948 EXPORT_SYMBOL(ldlm_destroy_flock_export);