Whamcloud - gitweb
LU-6179 llite: Implement ladvise lockahead
[fs/lustre-release.git] / lustre / ofd / ofd_dlm.c
index 4d86785..c18ade0 100644 (file)
 #include "ofd_internal.h"
 
 struct ofd_intent_args {
-       struct ldlm_lock        **victim;
+       struct list_head        gl_list;
        __u64                    size;
-       int                     *liblustre;
+       bool                    no_glimpse_ast;
+       int                     error;
 };
 
+int ofd_dlm_init(void)
+{
+       ldlm_glimpse_work_kmem = kmem_cache_create("ldlm_glimpse_work_kmem",
+                                            sizeof(struct ldlm_glimpse_work),
+                                            0, 0, NULL);
+       if (ldlm_glimpse_work_kmem == NULL)
+               return -ENOMEM;
+       else
+               return 0;
+}
+
+void ofd_dlm_exit(void)
+{
+       if (ldlm_glimpse_work_kmem) {
+               kmem_cache_destroy(ldlm_glimpse_work_kmem);
+               ldlm_glimpse_work_kmem = NULL;
+       }
+}
+
 /**
  * OFD interval callback.
  *
  * The interval_callback_t is part of interval_iterate_reverse() and is called
  * for each interval in tree. The OFD interval callback searches for locks
- * covering extents beyond the given args->size. This is used to decide if LVB
- * data is outdated.
+ * covering extents beyond the given args->size. This is used to decide if the
+ * size is too small and needs to be updated.  Note that we are only interested
+ * in growing the size, as truncate is the only operation which can shrink it,
+ * and it is handled differently.  This is why we only look at locks beyond the
+ * current size.
+ *
+ * It finds the highest lock (by starting point) in this interval, and adds it
+ * to the list of locks to glimpse.  We must glimpse a list of locks - rather
+ * than only the highest lock on the file - because lockahead creates extent
+ * locks in advance of IO, and so breaks the assumption that the holder of the
+ * highest lock knows the current file size.
+ *
+ * This assumption is normally true because locks which are created as part of
+ * IO - rather than in advance of it - are guaranteed to be 'active', i.e.,
+ * involved in IO, and the holder of the highest 'active' lock always knows the
+ * current file size, because the size is either not changing or the holder of
+ * that lock is responsible for updating it.
+ *
+ * So we need only glimpse until we find the first client with an 'active'
+ * lock.
+ *
+ * Unfortunately, there is no way to know if a manually requested/speculative
+ * lock is 'active' from the server side.  So when we see a potentially
+ * speculative lock, we must send a glimpse for that lock unless we have
+ * already sent a glimpse to the holder of that lock.
+ *
+ * However, *all* non-speculative locks are active.  So we can stop glimpsing
+ * as soon as we find a non-speculative lock.  Currently, all speculative PW
+ * locks have LDLM_FL_NO_EXPANSION set, and we use this to identify them.  This
+ * is enforced by an assertion in osc_lock_init, which references this comment.
+ *
+ * If that ever changes, we will either need to find a new way to identify
+ * active locks or we will need to consider all PW locks (we will still only
+ * glimpse one per client).
+ *
+ * Note that it is safe to glimpse only the 'top' lock from each interval
+ * because ofd_intent_cb is only called for PW extent locks, and for PW locks,
+ * there is only one lock per interval.
  *
  * \param[in] n                interval node
- * \param[in] args     intent arguments
+ * \param[in,out] args intent arguments, gl work list for identified locks
  *
  * \retval             INTERVAL_ITER_STOP if the interval is lower than
  *                     file size, caller stops execution
@@ -71,39 +127,89 @@ static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
        struct ldlm_interval     *node = (struct ldlm_interval *)n;
        struct ofd_intent_args   *arg = args;
        __u64                     size = arg->size;
-       struct ldlm_lock        **v = arg->victim;
+       struct ldlm_lock         *victim_lock = NULL;
        struct ldlm_lock         *lck;
+       struct ldlm_glimpse_work *gl_work = NULL;
+       int rc = 0;
 
        /* If the interval is lower than the current file size, just break. */
        if (interval_high(n) <= size)
-               return INTERVAL_ITER_STOP;
+               GOTO(out, rc = INTERVAL_ITER_STOP);
 
+       /* Find the 'victim' lock from this interval */
        list_for_each_entry(lck, &node->li_group, l_sl_policy) {
-               /* Don't send glimpse ASTs to liblustre clients.
-                * They aren't listening for them, and they do
-                * entirely synchronous I/O anyways. */
-               if (lck->l_export == NULL || lck->l_export->exp_libclient)
-                       continue;
-
-               if (*arg->liblustre)
-                       *arg->liblustre = 0;
 
-               if (*v == NULL) {
-                       *v = LDLM_LOCK_GET(lck);
-               } else if ((*v)->l_policy_data.l_extent.start <
-                          lck->l_policy_data.l_extent.start) {
-                       LDLM_LOCK_RELEASE(*v);
-                       *v = LDLM_LOCK_GET(lck);
-               }
+               victim_lock = LDLM_LOCK_GET(lck);
 
                /* the same policy group - every lock has the
                 * same extent, so needn't do it any more */
                break;
        }
 
-       return INTERVAL_ITER_CONT;
-}
+       /* l_export can be null in race with eviction - In that case, we will
+        * not find any locks in this interval */
+       if (!victim_lock)
+               GOTO(out, rc = INTERVAL_ITER_CONT);
+
+       /*
+        * This check is for lock taken in ofd_destroy_by_fid() that does
+        * not have l_glimpse_ast set. So the logic is: if there is a lock
+        * with no l_glimpse_ast set, this object is being destroyed already.
+        * Hence, if you are grabbing DLM locks on the server, always set
+        * non-NULL glimpse_ast (e.g., ldlm_request.c::ldlm_glimpse_ast()).
+        */
+       if (victim_lock->l_glimpse_ast == NULL) {
+               LDLM_DEBUG(victim_lock, "no l_glimpse_ast");
+               arg->no_glimpse_ast = true;
+               GOTO(out_release, rc = INTERVAL_ITER_STOP);
+       }
 
+       /* If NO_EXPANSION is not set, this is an active lock, and we don't need
+        * to glimpse any further once we've glimpsed the client holding this
+        * lock.  So set us up to stop.  See comment above this function. */
+       if (!(victim_lock->l_flags & LDLM_FL_NO_EXPANSION))
+               rc = INTERVAL_ITER_STOP;
+       else
+               rc = INTERVAL_ITER_CONT;
+
+       /* Check to see if we're already set up to send a glimpse to this
+        * client; if so, don't add this lock to the glimpse list - We need
+        * only glimpse each client once. (And if we know that client holds
+        * an active lock, we can stop glimpsing.  So keep the rc set in the
+        * check above.) */
+       list_for_each_entry(gl_work, &arg->gl_list, gl_list) {
+               if (gl_work->gl_lock->l_export == victim_lock->l_export)
+                       GOTO(out_release, rc);
+       }
+
+       if (!OBD_FAIL_CHECK(OBD_FAIL_OST_GL_WORK_ALLOC))
+               OBD_SLAB_ALLOC_PTR_GFP(gl_work, ldlm_glimpse_work_kmem,
+                                      GFP_ATOMIC);
+
+       if (!gl_work) {
+               arg->error = -ENOMEM;
+               GOTO(out_release, rc = INTERVAL_ITER_STOP);
+       }
+
+       /* Populate the gl_work structure. */
+       gl_work->gl_lock = victim_lock;
+       list_add_tail(&gl_work->gl_list, &arg->gl_list);
+       /* There is actually no need for a glimpse descriptor when glimpsing
+        * extent locks */
+       gl_work->gl_desc = NULL;
+       /* This tells ldlm_work_gl_ast_lock this was allocated from a slab and
+        * must be freed in a slab-aware manner. */
+       gl_work->gl_flags = LDLM_GL_WORK_SLAB_ALLOCATED;
+
+       GOTO(out, rc);
+
+out_release:
+       /* If the victim doesn't go on the glimpse list, we must release it */
+       LDLM_LOCK_RELEASE(victim_lock);
+
+out:
+       return rc;
+}
 /**
  * OFD lock intent policy
  *
@@ -124,20 +230,20 @@ static enum interval_iter ofd_intent_cb(struct interval_node *n, void *args)
  * \retval             ELDLM_LOCK_REPLACED if already granted lock was found
  *                     and placed in \a lockp
  * \retval             ELDLM_LOCK_ABORTED in other cases except error
- * \retval             negative value on error
+ * \retval             negative errno on error
  */
 int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                      void *req_cookie, enum ldlm_mode mode, __u64 flags,
                      void *data)
 {
        struct ptlrpc_request *req = req_cookie;
-       struct ldlm_lock *lock = *lockp, *l = NULL;
+       struct ldlm_lock *lock = *lockp;
        struct ldlm_resource *res = lock->l_resource;
        ldlm_processing_policy policy;
        struct ost_lvb *res_lvb, *reply_lvb;
        struct ldlm_reply *rep;
        enum ldlm_error err;
-       int idx, rc, only_liblustre = 1;
+       int idx, rc;
        struct ldlm_interval_tree *tree;
        struct ofd_intent_args arg;
        __u32 repsize[3] = {
@@ -145,11 +251,12 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
        };
-       struct ldlm_glimpse_work gl_work = {};
-       struct list_head gl_list;
+       struct ldlm_glimpse_work *pos, *tmp;
        ENTRY;
 
-       INIT_LIST_HEAD(&gl_list);
+       INIT_LIST_HEAD(&arg.gl_list);
+       arg.no_glimpse_ast = false;
+       arg.error = 0;
        lock->l_lvb_type = LVB_T_OST;
        policy = ldlm_get_processing_policy(res);
        LASSERT(policy != NULL);
@@ -195,13 +302,7 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
 
        /* The lock met with no resistance; we're finished. */
        if (rc == LDLM_ITER_CONTINUE) {
-               /* do not grant locks to the liblustre clients: they cannot
-                * handle ASTs robustly.  We need to do this while still
-                * holding ns_lock to avoid the lock remaining on the res_link
-                * list (and potentially being added to l_pending_list by an
-                * AST) when we are going to drop this lock ASAP. */
-               if (lock->l_export->exp_libclient ||
-                   OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
+               if (OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2)) {
                        ldlm_resource_unlink_lock(lock);
                        err = ELDLM_LOCK_ABORTED;
                } else {
@@ -233,74 +334,48 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
         *  res->lr_lvb_sem.
         */
        arg.size = reply_lvb->lvb_size;
-       arg.victim = &l;
-       arg.liblustre = &only_liblustre;
 
+       /* Check for PW locks beyond the size in the LVB, build the list
+        * of locks to glimpse (arg.gl_list) */
        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
                tree = &res->lr_itree[idx];
                if (tree->lit_mode == LCK_PR)
                        continue;
 
                interval_iterate_reverse(tree->lit_root, ofd_intent_cb, &arg);
+               if (arg.error) {
+                       unlock_res(res);
+                       GOTO(out, rc = arg.error);
+               }
        }
        unlock_res(res);
 
        /* There were no PW locks beyond the size in the LVB; finished. */
-       if (l == NULL) {
-               if (only_liblustre) {
-                       /* If we discovered a liblustre client with a PW lock,
-                        * however, the LVB may be out of date!  The LVB is
-                        * updated only on glimpse (which we don't do for
-                        * liblustre clients) and cancel (which the client
-                        * obviously has not yet done).  So if it has written
-                        * data but kept the lock, the LVB is stale and needs
-                        * to be updated from disk.
-                        *
-                        * Of course, this will all disappear when we switch to
-                        * taking liblustre locks on the OST. */
-                       ldlm_res_lvbo_update(res, NULL, 1);
-               }
+       if (list_empty(&arg.gl_list))
                RETURN(ELDLM_LOCK_ABORTED);
-       }
 
-       /*
-        * This check is for lock taken in ofd_destroy_by_fid() that does
-        * not have l_glimpse_ast set. So the logic is: if there is a lock
-        * with no l_glimpse_ast set, this object is being destroyed already.
-        * Hence, if you are grabbing DLM locks on the server, always set
-        * non-NULL glimpse_ast (e.g., ldlm_request.c::ldlm_glimpse_ast()).
-        */
-       if (l->l_glimpse_ast == NULL) {
+       if (arg.no_glimpse_ast) {
                /* We are racing with unlink(); just return -ENOENT */
                rep->lock_policy_res1 = ptlrpc_status_hton(-ENOENT);
-               goto out;
+               GOTO(out, ELDLM_LOCK_ABORTED);
        }
 
-       /* Populate the gl_work structure.
-        * Grab additional reference on the lock which will be released in
-        * ldlm_work_gl_ast_lock() */
-       gl_work.gl_lock = LDLM_LOCK_GET(l);
-       /* The glimpse callback is sent to one single extent lock. As a result,
-        * the gl_work list is just composed of one element */
-       list_add_tail(&gl_work.gl_list, &gl_list);
-       /* There is actually no need for a glimpse descriptor when glimpsing
-        * extent locks */
-       gl_work.gl_desc = NULL;
-       /* the ldlm_glimpse_work structure is allocated on the stack */
-       gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
-
-       rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
-
-       if (!list_empty(&gl_list))
-               LDLM_LOCK_RELEASE(l);
+       /* this will update the LVB */
+       ldlm_glimpse_locks(res, &arg.gl_list);
 
        lock_res(res);
        *reply_lvb = *res_lvb;
        unlock_res(res);
 
 out:
-       LDLM_LOCK_RELEASE(l);
+       /* If the list is not empty, we failed to glimpse some locks and
+        * must clean up.  Usually due to a race with unlink.*/
+       list_for_each_entry_safe(pos, tmp, &arg.gl_list, gl_list) {
+               list_del(&pos->gl_list);
+               LDLM_LOCK_RELEASE(pos->gl_lock);
+               OBD_SLAB_FREE_PTR(pos, ldlm_glimpse_work_kmem);
+       }
 
-       RETURN(ELDLM_LOCK_ABORTED);
+       RETURN(rc < 0 ? rc : ELDLM_LOCK_ABORTED);
 }