Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osc / osc_lock.c
index 9e46679..3502f70 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * Implementation of cl_lock for OSC layer.
  *
@@ -65,7 +64,7 @@ static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle)
 /**
  * Invariant that has to be true all of the time.
  */
-static int osc_lock_invariant(struct osc_lock *ols)
+static inline int osc_lock_invariant(struct osc_lock *ols)
 {
        struct ldlm_lock *lock        = osc_handle_ptr(&ols->ols_handle);
        struct ldlm_lock *olock       = ols->ols_dlmlock;
@@ -239,8 +238,8 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *oscl,
                /* extend the lock extent, otherwise it will have problem when
                 * we decide whether to grant a lockless lock. */
                descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
-               descr->cld_start = cl_index(descr->cld_obj, ext->start);
-               descr->cld_end   = cl_index(descr->cld_obj, ext->end);
+               descr->cld_start = ext->start >> PAGE_SHIFT;
+               descr->cld_end   = ext->end >> PAGE_SHIFT;
                descr->cld_gid   = ext->gid;
 
                /* no lvb update for matched lock */
@@ -291,18 +290,7 @@ static int osc_lock_upcall(void *cookie, struct lustre_handle *lockh,
                osc_lock_granted(env, oscl, lockh);
 
        /* Error handling, some errors are tolerable. */
-       if (oscl->ols_locklessable && rc == -EUSERS) {
-               /* This is a tolerable error, turn this lock into
-                * lockless lock.
-                */
-               osc_object_set_contended(cl2osc(slice->cls_obj));
-               LASSERT(slice->cls_ops != oscl->ols_lockless_ops);
-
-               /* Change this lock to ldlmlock-less lock. */
-               osc_lock_to_lockless(env, oscl, 1);
-               oscl->ols_state = OLS_GRANTED;
-               rc = 0;
-       } else if (oscl->ols_glimpse && rc == -ENAVAIL) {
+       if (oscl->ols_glimpse && rc == -ENAVAIL) {
                LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY);
                osc_lock_lvb_update(env, cl2osc(slice->cls_obj),
                                    NULL, &oscl->ols_lvb);
@@ -428,7 +416,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
        unlock_res_and_lock(dlmlock);
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_CANCEL, 5);
 
        /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or
         * the object has been destroyed. */
@@ -439,8 +427,8 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env,
 
                /* Destroy pages covered by the extent of the DLM lock */
                result = osc_lock_flush(cl2osc(obj),
-                                       cl_index(obj, extent->start),
-                                       cl_index(obj, extent->end),
+                                       extent->start >> PAGE_SHIFT,
+                                       extent->end >> PAGE_SHIFT,
                                        mode, discard);
 
                /* losing a lock, update kms */
@@ -601,7 +589,7 @@ int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
                dlmlock = NULL;
 
                if (obj == NULL && res->lr_type == LDLM_EXTENT) {
-                       if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
+                       if (CFS_FAIL_CHECK(OBD_FAIL_OSC_NO_SIZE_DATA))
                                break;
 
                        lock_res(res);
@@ -649,15 +637,21 @@ out:
 EXPORT_SYMBOL(osc_ldlm_glimpse_ast);
 
 static bool weigh_cb(const struct lu_env *env, struct cl_io *io,
-                    struct osc_page *ops, void *cbdata)
+                    void **pvec, int count, void *cbdata)
 {
-       struct cl_page *page = ops->ops_cl.cpl_page;
+       int i;
+
+       for (i = 0; i < count; i++) {
+               struct osc_page *ops = pvec[i];
+               struct cl_page *page = ops->ops_cl.cpl_page;
 
-       if (cl_page_is_vmlocked(env, page) || PageDirty(page->cp_vmpage) ||
-           PageWriteback(page->cp_vmpage))
-               return false;
+               if (PageLocked(page->cp_vmpage) ||
+                   PageDirty(page->cp_vmpage) ||
+                   PageWriteback(page->cp_vmpage))
+                       return false;
 
-       *(pgoff_t *)cbdata = osc_index(ops) + 1;
+               *(pgoff_t *)cbdata = osc_index(ops) + 1;
+       }
        return true;
 }
 
@@ -678,10 +672,10 @@ static unsigned long osc_lock_weight(const struct lu_env *env,
        if (result != 0)
                RETURN(1);
 
-       page_index = cl_index(obj, start);
+       page_index = start >> PAGE_SHIFT;
 
        if (!osc_page_gang_lookup(env, io, oscobj,
-                                 page_index, cl_index(obj, end),
+                                 page_index, end >> PAGE_SHIFT,
                                  weigh_cb, (void *)&page_index))
                result = 1;
        cl_io_fini(env, io);
@@ -799,7 +793,6 @@ void osc_lock_to_lockless(const struct lu_env *env,
        struct cl_io *io = oio->oi_cl.cis_io;
        struct cl_object *obj = slice->cls_obj;
        struct osc_object *oob = cl2osc(obj);
-       const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
        struct obd_connect_data *ocd;
 
        LASSERT(ols->ols_state == OLS_NEW ||
@@ -818,12 +811,7 @@ void osc_lock_to_lockless(const struct lu_env *env,
                                        (io->ci_lockreq == CILR_MAYBE) &&
                                        (ocd->ocd_connect_flags &
                                         OBD_CONNECT_SRVLOCK);
-               if (io->ci_lockreq == CILR_NEVER ||
-                   /* lockless IO */
-                   (ols->ols_locklessable && osc_object_is_contended(oob)) ||
-                   /* lockless truncate */
-                   (cl_io_is_trunc(io) && osd->od_lockless_truncate &&
-                    (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK))) {
+               if (io->ci_lockreq == CILR_NEVER) {
                        ols->ols_locklessable = 1;
                        slice->cls_ops = ols->ols_lockless_ops;
                }
@@ -858,16 +846,16 @@ static bool osc_lock_compatible(const struct osc_lock *qing,
 void osc_lock_wake_waiters(const struct lu_env *env, struct osc_object *osc,
                           struct osc_lock *oscl)
 {
+       struct osc_lock *scan;
+
        spin_lock(&osc->oo_ol_spin);
        list_del_init(&oscl->ols_nextlock_oscobj);
        spin_unlock(&osc->oo_ol_spin);
 
        spin_lock(&oscl->ols_lock);
-       while (!list_empty(&oscl->ols_waiting_list)) {
-               struct osc_lock *scan;
-
-               scan = list_entry(oscl->ols_waiting_list.next, struct osc_lock,
-                                 ols_wait_entry);
+       while ((scan = list_first_entry_or_null(&oscl->ols_waiting_list,
+                                               struct osc_lock,
+                                               ols_wait_entry)) != NULL) {
                list_del_init(&scan->ols_wait_entry);
 
                cl_sync_io_note(env, scan->ols_owner, 0);
@@ -970,7 +958,7 @@ static int osc_lock_enqueue(const struct lu_env *env,
         ENTRY;
 
        LASSERTF(ergo(oscl->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ),
-               "lock = %p, ols = %p\n", lock, oscl);
+               "lock = %px, ols = %px\n", lock, oscl);
 
        if (oscl->ols_state == OLS_GRANTED)
                RETURN(0);
@@ -1044,6 +1032,11 @@ enqueue_base:
                if (osc_lock_is_lockless(oscl)) {
                        oio->oi_lockless = 1;
                } else if (!async) {
+                       if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_IDLE_RACE)) {
+                               CFS_RACE(OBD_FAIL_PTLRPC_IDLE_RACE);
+                               set_current_state(TASK_UNINTERRUPTIBLE);
+                               schedule_timeout(cfs_time_seconds(1) / 2);
+                       }
                        LASSERT(oscl->ols_state == OLS_GRANTED);
                        LASSERT(oscl->ols_hold);
                        LASSERT(oscl->ols_dlmlock != NULL);
@@ -1155,16 +1148,8 @@ static void osc_lock_lockless_cancel(const struct lu_env *env,
 {
        struct osc_lock      *ols   = cl2osc_lock(slice);
        struct osc_object    *osc   = cl2osc(slice->cls_obj);
-       struct cl_lock_descr *descr = &slice->cls_lock->cll_descr;
-       int result;
 
        LASSERT(ols->ols_dlmlock == NULL);
-       result = osc_lock_flush(osc, descr->cld_start, descr->cld_end,
-                               descr->cld_mode, false);
-        if (result)
-                CERROR("Pages for lockless lock %p were not purged(%d)\n",
-                       ols, result);
-
        osc_lock_wake_waiters(env, osc, ols);
 }
 
@@ -1186,9 +1171,9 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
                return;
 
        if (likely(io->ci_type == CIT_WRITE)) {
-               io_start = cl_index(obj, io->u.ci_rw.crw_pos);
-               io_end = cl_index(obj, io->u.ci_rw.crw_pos +
-                                               io->u.ci_rw.crw_count - 1);
+               io_start = io->u.ci_rw.crw_pos >> PAGE_SHIFT;
+               io_end = (io->u.ci_rw.crw_pos +
+                         io->u.ci_rw.crw_bytes - 1) >> PAGE_SHIFT;
        } else {
                LASSERT(cl_io_is_mkwrite(io));
                io_start = io_end = io->u.ci_fault.ft_index;
@@ -1206,6 +1191,22 @@ void osc_lock_set_writer(const struct lu_env *env, const struct cl_io *io,
 }
 EXPORT_SYMBOL(osc_lock_set_writer);
 
+void osc_lock_set_reader(const struct lu_env *env, const struct cl_io *io,
+                        struct cl_object *obj, struct osc_lock *oscl)
+{
+       struct osc_io *oio = osc_env_io(env);
+
+       if (!cl_object_same(io->ci_obj, obj))
+               return;
+
+       if (oscl->ols_glimpse || osc_lock_is_lockless(oscl))
+               return;
+
+       if (oio->oi_read_osclock == NULL)
+               oio->oi_read_osclock = oscl;
+}
+EXPORT_SYMBOL(osc_lock_set_reader);
+
 int osc_lock_init(const struct lu_env *env,
                  struct cl_object *obj, struct cl_lock *lock,
                  const struct cl_io *io)
@@ -1232,8 +1233,6 @@ int osc_lock_init(const struct lu_env *env,
 
        oscl->ols_flags = osc_enq2ldlm_flags(enqflags);
        oscl->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
-       if (lock->cll_descr.cld_mode == CLM_GROUP)
-               oscl->ols_flags |= LDLM_FL_ATOMIC_CB;
 
        if (oscl->ols_flags & LDLM_FL_HAS_INTENT) {
                oscl->ols_flags |= LDLM_FL_BLOCK_GRANTED;
@@ -1248,11 +1247,12 @@ int osc_lock_init(const struct lu_env *env,
        if (!(enqflags & CEF_MUST))
                /* try to convert this lock to a lockless lock */
                osc_lock_to_lockless(env, oscl, (enqflags & CEF_NEVER));
-       if (oscl->ols_locklessable && !(enqflags & CEF_DISCARD_DATA))
-               oscl->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
 
        if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io))
                osc_lock_set_writer(env, io, obj, oscl);
+       else if (io->ci_type == CIT_READ ||
+                (io->ci_type == CIT_FAULT && !io->u.ci_fault.ft_mkwrite))
+               osc_lock_set_reader(env, io, obj, oscl);
 
        LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %#llx",
                          lock, oscl, oscl->ols_flags);