Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index a0e3190..ef82a95 100644 (file)
@@ -60,6 +60,8 @@
 
 static const struct cl_lock_operations osc_lock_ops;
 static const struct cl_lock_operations osc_lock_lockless_ops;
+static void osc_lock_to_lockless(const struct lu_env *env,
+                                 struct osc_lock *ols, int force);
 
 int osc_lock_is_lockless(const struct osc_lock *olck)
 {
@@ -247,7 +249,7 @@ static int osc_enq2ldlm_flags(__u32 enqflags)
 {
         int result = 0;
 
-        LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0);
+        LASSERT((enqflags & ~CEF_MASK) == 0);
 
         if (enqflags & CEF_NONBLOCK)
                 result |= LDLM_FL_BLOCK_NOWAIT;
@@ -303,19 +305,6 @@ static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
         cl_lock_put(env, lock);
 }
 
-static void osc_lock_to_lockless(struct osc_lock *olck)
-{
-        struct cl_lock_slice *slice = &olck->ols_cl;
-        struct cl_lock  *lock       = slice->cls_lock;
-
-        /*
-         * TODO: Discover which locks we need to convert the lock
-         * to ldlmlockless.
-         */
-        LASSERT(cl_lock_is_mutexed(lock));
-        slice->cls_ops = &osc_lock_lockless_ops;
-}
-
 /**
  * Updates object attributes from a lock value block (lvb) received together
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
@@ -323,6 +312,8 @@ static void osc_lock_to_lockless(struct osc_lock *olck)
  *
  * This can be optimized to not update attributes when lock is a result of a
  * local match.
+ *
+ * Called under lock and resource spin-locks.
  */
 static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
                                 int rc)
@@ -355,6 +346,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
                 dlmlock = olck->ols_lock;
                 LASSERT(dlmlock != NULL);
 
+                /* re-grab LVB from a dlm lock under DLM spin-locks. */
+                *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
                 size = lvb->lvb_size;
                 /* Extend KMS up to the end of this lock and no further
                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
@@ -371,7 +364,7 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
                                    lvb->lvb_size, oinfo->loi_kms,
                                    dlmlock->l_policy_data.l_extent.end);
                 }
-                ldlm_lock_allow_match(dlmlock);
+                ldlm_lock_allow_match_locked(dlmlock);
         } else if (rc == -ENAVAIL && olck->ols_glimpse) {
                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
                        " kms="LPU64"\n", lvb->lvb_size, oinfo->loi_kms);
@@ -386,6 +379,13 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck,
         EXIT;
 }
 
+/**
+ * Called when a lock is granted, from an upcall (when server returned a
+ * granted lock), or from completion AST, when server returned a blocked lock.
+ *
+ * Called under lock and resource spin-locks, that are released temporarily
+ * here.
+ */
 static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
                              struct ldlm_lock *dlmlock, int rc)
 {
@@ -410,11 +410,19 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
                  * tell upper layers the extent of the lock that was actually
                  * granted
                  */
-                cl_lock_modify(env, lock, descr);
-                LINVRNT(osc_lock_invariant(olck));
                 olck->ols_state = OLS_GRANTED;
                 osc_lock_lvb_update(env, olck, rc);
+
+                /* release DLM spin-locks to allow cl_lock_{modify,signal}()
+                 * to take a semaphore on a parent lock. This is safe, because
+                 * spin-locks are needed to protect consistency of
+                 * dlmlock->l_*_mode and LVB, and we have finished processing
+                 * them. */
+                unlock_res_and_lock(dlmlock);
+                cl_lock_modify(env, lock, descr);
                 cl_lock_signal(env, lock);
+                LINVRNT(osc_lock_invariant(olck));
+                lock_res_and_lock(dlmlock);
         }
         EXIT;
 }
@@ -435,7 +443,6 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
         LASSERT(olck->ols_lock == NULL);
         olck->ols_lock = dlmlock;
         spin_unlock(&osc_ast_guard);
-        unlock_res_and_lock(dlmlock);
 
         /*
          * Lock might be not yet granted. In this case, completion ast
@@ -444,6 +451,8 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck)
          */
         if (dlmlock->l_granted_mode == dlmlock->l_req_mode)
                 osc_lock_granted(env, olck, dlmlock, 0);
+        unlock_res_and_lock(dlmlock);
+
         /*
          * osc_enqueue_interpret() decrefs asynchronous locks, counter
          * this.
@@ -519,7 +528,7 @@ static int osc_lock_upcall(void *cookie, int errcode)
                         LASSERT(slice->cls_ops == &osc_lock_ops);
 
                         /* Change this lock to ldlmlock-less lock. */
-                        osc_lock_to_lockless(olck);
+                        osc_lock_to_lockless(env, olck, 1);
                         olck->ols_state = OLS_GRANTED;
                         rc = 0;
                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
@@ -762,6 +771,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
                          * to lock->l_lvb_data, store it in osc_lock.
                          */
                         LASSERT(dlmlock->l_lvb_data != NULL);
+                        lock_res_and_lock(dlmlock);
                         olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data;
                         if (olck->ols_lock == NULL)
                                 /*
@@ -778,6 +788,7 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock,
                                 osc_lock_granted(env, olck, dlmlock, dlmrc);
                         if (dlmrc != 0)
                                 cl_lock_error(env, lock, dlmrc);
+                        unlock_res_and_lock(dlmlock);
                         cl_lock_mutex_put(env, lock);
                         osc_ast_data_put(env, olck);
                         result = 0;
@@ -998,6 +1009,61 @@ static int osc_lock_cancel_wait(const struct lu_env *env, struct cl_lock *lock,
 }
 
 /**
+ * Determine if the lock should be converted into a lockless lock.
+ *
+ * Steps to check:
+ * - if the lock has an explicite requirment for a non-lockless lock;
+ * - if the io lock request type ci_lockreq;
+ * - send the enqueue rpc to ost to make the further decision;
+ * - special treat to truncate lockless lock
+ *
+ *  Additional policy can be implemented here, e.g., never do lockless-io
+ *  for large extents.
+ */
+static void osc_lock_to_lockless(const struct lu_env *env,
+                                 struct osc_lock *ols, int force)
+{
+        struct cl_lock_slice *slice = &ols->ols_cl;
+        struct cl_lock *lock        = slice->cls_lock;
+
+        LASSERT(ols->ols_state == OLS_NEW ||
+                ols->ols_state == OLS_UPCALL_RECEIVED);
+
+        if (force) {
+                ols->ols_locklessable = 1;
+                LASSERT(cl_lock_is_mutexed(lock));
+                slice->cls_ops = &osc_lock_lockless_ops;
+        } else {
+                struct osc_io *oio     = osc_env_io(env);
+                struct cl_io  *io      = oio->oi_cl.cis_io;
+                struct cl_object *obj  = slice->cls_obj;
+                struct osc_object *oob = cl2osc(obj);
+                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+                struct obd_connect_data *ocd;
+
+                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+                        io->ci_lockreq == CILR_MAYBE ||
+                        io->ci_lockreq == CILR_NEVER);
+
+                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+                ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
+                                (io->ci_lockreq == CILR_MAYBE) &&
+                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
+                if (io->ci_lockreq == CILR_NEVER ||
+                        /* lockless IO */
+                    (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+                        /* lockless truncate */
+                    (io->ci_type == CIT_TRUNC &&
+                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
+                      osd->od_lockless_truncate)) {
+                        ols->ols_locklessable = 1;
+                        slice->cls_ops = &osc_lock_lockless_ops;
+                }
+        }
+        LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
+}
+
+/**
  * Cancel all conflicting locks and wait for them to be destroyed.
  *
  * This function is used for two purposes:
@@ -1190,8 +1256,6 @@ static int osc_lock_enqueue(const struct lu_env *env,
         osc_lock_build_res(env, obj, resname);
         osc_lock_build_policy(env, lock, policy);
         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
-        if (ols->ols_locklessable)
-                ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
         if (osc_deadlock_is_possible(env, lock))
                 ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
@@ -1199,29 +1263,40 @@ static int osc_lock_enqueue(const struct lu_env *env,
 
         result = osc_lock_enqueue_wait(env, ols);
         if (result == 0) {
-                /* a reference for lock, passed as an upcall cookie */
-                cl_lock_get(lock);
-                lu_ref_add(&lock->cll_reference, "upcall", lock);
-                ols->ols_state = OLS_ENQUEUED;
+                if (!(enqflags & CEF_MUST))
+                        /* try to convert this lock to a lockless lock */
+                        osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+                if (!osc_lock_is_lockless(ols)) {
+                        if (ols->ols_locklessable)
+                                ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+                        /* a reference for lock, passed as an upcall cookie */
+                        cl_lock_get(lock);
+                        lu_ref_add(&lock->cll_reference, "upcall", lock);
+                        ols->ols_state = OLS_ENQUEUED;
 
-                /*
-                 * XXX: this is possible blocking point as
-                 * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
-                 * LDLM_CP_CALLBACK.
-                 */
-                result = osc_enqueue_base(osc_export(obj), resname,
+                        /*
+                         * XXX: this is possible blocking point as
+                         * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
+                         * LDLM_CP_CALLBACK.
+                         */
+                        result = osc_enqueue_base(osc_export(obj), resname,
                                           &ols->ols_flags, policy,
                                           &ols->ols_lvb,
                                           obj->oo_oinfo->loi_kms_valid,
                                           osc_lock_upcall,
                                           ols, einfo, &ols->ols_handle,
                                           PTLRPCD_SET, 1);
-                if (result != 0) {
-                        lu_ref_del(&lock->cll_reference, "upcall", lock);
-                        cl_lock_put(env, lock);
+                        if (result != 0) {
+                                lu_ref_del(&lock->cll_reference,
+                                           "upcall", lock);
+                                cl_lock_put(env, lock);
+                        }
+                } else {
+                        ols->ols_state = OLS_GRANTED;
                 }
         }
-
+        LASSERT(ergo(ols->ols_glimpse, !osc_lock_is_lockless(ols)));
         RETURN(result);
 }
 
@@ -1473,18 +1548,8 @@ static int osc_lock_lockless_enqueue(const struct lu_env *env,
                                      const struct cl_lock_slice *slice,
                                      struct cl_io *_, __u32 enqflags)
 {
-        struct osc_lock          *ols     = cl2osc_lock(slice);
-        struct cl_lock           *lock    = ols->ols_cl.cls_lock;
-        int result;
-
-        LASSERT(cl_lock_is_mutexed(lock));
-        LASSERT(lock->cll_state == CLS_QUEUING);
-        LASSERT(ols->ols_state == OLS_NEW);
-
-        result = osc_lock_enqueue_wait(env, ols);
-        if (result == 0)
-                ols->ols_state = OLS_GRANTED;
-        return result;
+        LBUG();
+        return 0;
 }
 
 static int osc_lock_lockless_unuse(const struct lu_env *env,
@@ -1537,7 +1602,11 @@ static void osc_lock_lockless_state(const struct lu_env *env,
         if (state == CLS_HELD) {
                 LASSERT(lock->ols_owner == NULL);
                 lock->ols_owner = oio;
-                oio->oi_lockless = 1;
+
+                /* set the io to be lockless if this lock is for io's
+                 * host object */
+                if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
+                        oio->oi_lockless = 1;
         } else
                 lock->ols_owner = NULL;
 }
@@ -1563,56 +1632,16 @@ static const struct cl_lock_operations osc_lock_lockless_ops = {
 
 int osc_lock_init(const struct lu_env *env,
                   struct cl_object *obj, struct cl_lock *lock,
-                  const struct cl_io *io)
+                  const struct cl_io *_)
 {
-        struct osc_lock   *clk;
-        struct osc_io     *oio = osc_env_io(env);
-        struct osc_object *oob = cl2osc(obj);
+        struct osc_lock *clk;
         int result;
 
-        OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(clk, osc_lock_kmem, CFS_ALLOC_IO);
         if (clk != NULL) {
-                const struct cl_lock_operations *ops;
-                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
-                struct obd_connect_data *ocd;
-
                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
                 clk->ols_state = OLS_NEW;
-
-                /*
-                 * Check if we need to do lockless IO here.
-                 * Following conditions must be satisfied:
-                 * - the current IO must be locklessable;
-                 * - the stripe is in contention;
-                 * - requested lock is not a glimpse.
-                 *
-                 * if not, we have to inherit the locklessable flag to
-                 * osc_lock, and let ost make the decision.
-                 *
-                 * Additional policy can be implemented here, e.g., never do
-                 * lockless-io for large extents.
-                 */
-                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
-                        io->ci_lockreq == CILR_MAYBE ||
-                        io->ci_lockreq == CILR_NEVER);
-                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
-                clk->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
-                                (io->ci_lockreq == CILR_MAYBE) &&
-                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
-                ops = &osc_lock_ops;
-                if (io->ci_lockreq == CILR_NEVER ||
-                    /* lockless IO */
-                    (clk->ols_locklessable && osc_object_is_contended(oob)) ||
-                     /* lockless truncate */
-                    (io->ci_type == CIT_TRUNC &&
-                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
-                     osd->od_lockless_truncate)) {
-                        ops = &osc_lock_lockless_ops;
-                        oio->oi_lockless     = 1;
-                        clk->ols_locklessable = 1;
-                }
-
-                cl_lock_slice_add(lock, &clk->ols_cl, obj, ops);
+                cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
                 result = 0;
         } else
                 result = -ENOMEM;