Whamcloud - gitweb
b=22310 do not break early in osc_io_submit() which might cause sub-optimal rpc size...
[fs/lustre-release.git] / lustre / osc / osc_io.c
index 4b4ae0a..b31fdf8 100644 (file)
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  */
 
-/** \addtogroup osc osc @{ */
-
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include "osc_cl_internal.h"
 
+/** \addtogroup osc 
+ *  @{ 
+ */
+
 /*****************************************************************************
  *
  * Type conversions.
@@ -99,12 +101,6 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
 }
 
 /**
- * How many pages osc_io_submit() queues before checking whether an RPC is
- * ready.
- */
-#define OSC_QUEUE_GRAIN (32)
-
-/**
  * An implementation of cl_io_operations::cio_io_submit() method for osc
  * layer. Iterates over pages in the in-queue, prepares each for io by calling
  * cl_page_prep() and then either submits them through osc_io_submit_page()
@@ -113,7 +109,8 @@ static void osc_io_unplug(const struct lu_env *env, struct osc_object *osc,
  */
 static int osc_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
-                         enum cl_req_type crt, struct cl_2queue *queue)
+                         enum cl_req_type crt, struct cl_2queue *queue,
+                         enum cl_req_priority priority)
 {
         struct cl_page    *page;
         struct cl_page    *tmp;
@@ -148,12 +145,17 @@ static int osc_io_submit(const struct lu_env *env,
                 osc = cl2osc(opg->ops_cl.cpl_obj);
                 exp = osc_export(osc);
 
+                if (priority > CRP_NORMAL) {
+                        cfs_spin_lock(&oap->oap_lock);
+                        oap->oap_async_flags |= ASYNC_HP;
+                        cfs_spin_unlock(&oap->oap_lock);
+                }
                 /*
                  * This can be checked without cli->cl_loi_list_lock, because
                  * ->oap_*_item are always manipulated when the page is owned.
                  */
-                if (!list_empty(&oap->oap_urgent_item) ||
-                    !list_empty(&oap->oap_rpc_item)) {
+                if (!cfs_list_empty(&oap->oap_urgent_item) ||
+                    !cfs_list_empty(&oap->oap_rpc_item)) {
                         result = -EBUSY;
                         break;
                 }
@@ -169,7 +171,7 @@ static int osc_io_submit(const struct lu_env *env,
                 result = cl_page_prep(env, io, page, crt);
                 if (result == 0) {
                         cl_page_list_move(qout, qin, page);
-                        if (list_empty(&oap->oap_pending_item)) {
+                        if (cfs_list_empty(&oap->oap_pending_item)) {
                                 osc_io_submit_page(env, cl2osc_io(env, ios),
                                                    opg, crt);
                         } else {
@@ -177,9 +179,18 @@ static int osc_io_submit(const struct lu_env *env,
                                                                   osc->oo_oinfo,
                                                                   oap,
                                                                   OSC_FLAGS);
-                                if (result != 0)
-                                        break;
+                                /*
+                                 * bug 18881: we can't just break out here when
+                                 * error occurrs after cl_page_prep has been
+                                 * called against the page. The correct
+                                 * way is to call page's completion routine,
+                                 * as in osc_oap_interrupted.  For simplicity,
+                                 * we just force osc_set_async_flags_base() to
+                                 * not return error.
+                                 */
+                                LASSERT(result == 0);
                         }
+                        opg->ops_submit_time = cfs_time_current();
                 } else {
                         LASSERT(result < 0);
                         if (result != -EALREADY)
@@ -191,17 +202,18 @@ static int osc_io_submit(const struct lu_env *env,
                          */
                         result = 0;
                 }
+
                 /*
-                 * Don't keep client_obd_list_lock() for too long.
+                 * We might hold client_obd_list_lock() for too long and cause
+                 * soft-lockups (see bug 16651). But on the other hand, pages
+                 * are queued here with ASYNC_URGENT flag, thus will be sent
+                 * out immediately once osc_io_unplug() be called, possibly
+                 * resulting sub-optimal RPCs.
                  *
-                 * XXX lock_need_resched() should be used here, but it is not
-                 * available in the older of supported kernels.
+                 * We think creating optimal-sized RPCs is more important than
+                 * avoiding the transient soft-lockups, plus I believe the
+                 * soft-locks only happen in full debug testing.
                  */
-                if (queued > OSC_QUEUE_GRAIN || cfs_need_resched()) {
-                        queued = 0;
-                        osc_io_unplug(env, osc, cli);
-                        cfs_cond_resched();
-                }
         }
 
         LASSERT(ergo(result == 0, cli != NULL));
@@ -345,16 +357,19 @@ static int osc_io_fault_start(const struct lu_env *env,
         RETURN(0);
 }
 
-static int osc_punch_upcall(void *a, int rc)
+static int osc_setattr_upcall(void *a, int rc)
 {
-        struct osc_punch_cbargs *args = a;
+        struct osc_setattr_cbargs *args = a;
 
         args->opc_rc = rc;
-        complete(&args->opc_sync);
+        cfs_complete(&args->opc_sync);
         return 0;
 }
 
-#ifdef __KERNEL__
+/* Disable osc_trunc_check() because it is naturally race between read and
+ * truncate. See bug 20645 for details.
+ */
+#if 0 && defined(__KERNEL__)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
@@ -381,7 +396,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
          * XXX this is quite expensive check.
          */
         cl_page_list_init(list);
-        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list);
+        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0);
 
         cl_page_list_for_each(page, list)
                 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start);
@@ -389,8 +404,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
         cl_page_list_disown(env, io, list);
         cl_page_list_fini(env, list);
 
-        spin_lock(&obj->oo_seatbelt);
-        list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], ops_inflight) {
+        cfs_spin_lock(&obj->oo_seatbelt);
+        cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE],
+                                ops_inflight) {
                 page = cp->ops_cl.cpl_page;
                 if (page->cp_index >= start + partial) {
                         cfs_task_t *submitter;
@@ -404,14 +420,14 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
                         libcfs_debug_dumpstack(submitter);
                 }
         }
-        spin_unlock(&obj->oo_seatbelt);
+        cfs_spin_unlock(&obj->oo_seatbelt);
 }
 #else /* __KERNEL__ */
 # define osc_trunc_check(env, io, oio, size) do {;} while (0)
 #endif
 
-static int osc_io_trunc_start(const struct lu_env *env,
-                              const struct cl_io_slice *slice)
+static int osc_io_setattr_start(const struct lu_env *env,
+                                const struct cl_io_slice *slice)
 {
         struct cl_io            *io     = slice->cis_io;
         struct osc_io           *oio    = cl2osc_io(env, slice);
@@ -419,23 +435,42 @@ static int osc_io_trunc_start(const struct lu_env *env,
         struct lov_oinfo        *loi    = cl2osc(obj)->oo_oinfo;
         struct cl_attr          *attr   = &osc_env_info(env)->oti_attr;
         struct obdo             *oa     = &oio->oi_oa;
-        struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg;
-        struct obd_capa         *capa;
-        loff_t                   size   = io->u.ci_truncate.tr_size;
-        int                      result;
-
-        memset(oa, 0, sizeof(*oa));
-
-        osc_trunc_check(env, io, oio, size);
+        struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg;
+        loff_t                   size   = io->u.ci_setattr.sa_attr.lvb_size;
+        unsigned int             ia_valid = io->u.ci_setattr.sa_valid;
+        int                      result = 0;
+        struct obd_info          oinfo = { { { 0 } } };
+
+        if (ia_valid & ATTR_SIZE)
+                osc_trunc_check(env, io, oio, size);
+
+        if (oio->oi_lockless == 0) {
+                cl_object_attr_lock(obj);
+                result = cl_object_attr_get(env, obj, attr);
+                if (result == 0) {
+                        unsigned int cl_valid = 0;
 
-        cl_object_attr_lock(obj);
-        result = cl_object_attr_get(env, obj, attr);
-        if (result == 0) {
-                attr->cat_size = attr->cat_kms = size;
-                result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS);
+                        if (ia_valid & ATTR_SIZE) {
+                                attr->cat_size = attr->cat_kms = size;
+                                cl_valid = (CAT_SIZE | CAT_KMS);
+                        }
+                        if (ia_valid & ATTR_MTIME_SET) {
+                                attr->cat_mtime = io->u.ci_setattr.sa_attr.lvb_mtime;
+                                cl_valid |= CAT_MTIME;
+                        }
+                        if (ia_valid & ATTR_ATIME_SET) {
+                                attr->cat_atime = io->u.ci_setattr.sa_attr.lvb_atime;
+                                cl_valid |= CAT_ATIME;
+                        }
+                        if (ia_valid & ATTR_CTIME_SET) {
+                                attr->cat_ctime = io->u.ci_setattr.sa_attr.lvb_ctime;
+                                cl_valid |= CAT_CTIME;
+                        }
+                        result = cl_object_attr_set(env, obj, attr, cl_valid);
+                }
+                cl_object_attr_unlock(obj);
         }
-        cl_object_attr_unlock(obj);
-
+        memset(oa, 0, sizeof(*oa));
         if (result == 0) {
                 oa->o_id = loi->loi_id;
                 oa->o_gr = loi->loi_gr;
@@ -444,86 +479,118 @@ static int osc_io_trunc_start(const struct lu_env *env,
                 oa->o_ctime = attr->cat_ctime;
                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
                         OBD_MD_FLCTIME | OBD_MD_FLMTIME;
-                if (oio->oi_lockless) {
-                        oa->o_flags = OBD_FL_TRUNCLOCK;
-                        oa->o_valid |= OBD_MD_FLFLAGS;
+                if (ia_valid & ATTR_SIZE) {
+                        oa->o_size = size;
+                        oa->o_blocks = OBD_OBJECT_EOF;
+                        oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+
+                        if (oio->oi_lockless) {
+                                oa->o_flags = OBD_FL_SRVLOCK;
+                                oa->o_valid |= OBD_MD_FLFLAGS;
+                        }
+                } else {
+                        LASSERT(oio->oi_lockless == 0);
                 }
-                oa->o_size = size;
-                oa->o_blocks = OBD_OBJECT_EOF;
-                oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-
-                capa = io->u.ci_truncate.tr_capa;
-                init_completion(&cbargs->opc_sync);
-                result = osc_punch_base(osc_export(cl2osc(obj)), oa, capa,
-                                        osc_punch_upcall, cbargs, PTLRPCD_SET);
+
+                oinfo.oi_oa = oa;
+                oinfo.oi_capa = io->u.ci_setattr.sa_capa;
+                cfs_init_completion(&cbargs->opc_sync);
+
+                if (ia_valid & ATTR_SIZE)
+                        result = osc_punch_base(osc_export(cl2osc(obj)),
+                                                &oinfo, osc_setattr_upcall,
+                                                cbargs, PTLRPCD_SET);
+                else
+                        result = osc_setattr_async_base(osc_export(cl2osc(obj)),
+                                                        &oinfo, NULL,
+                                                        osc_setattr_upcall,
+                                                        cbargs, PTLRPCD_SET);
         }
         return result;
 }
 
-static void osc_io_trunc_end(const struct lu_env *env,
-                             const struct cl_io_slice *slice)
+static void osc_io_setattr_end(const struct lu_env *env,
+                               const struct cl_io_slice *slice)
 {
         struct cl_io            *io     = slice->cis_io;
         struct osc_io           *oio    = cl2osc_io(env, slice);
-        struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg;
-        struct obdo             *oa     = &oio->oi_oa;
+        struct osc_setattr_cbargs *cbargs = &oio->oi_setattr_cbarg;
         int result;
 
-        wait_for_completion(&cbargs->opc_sync);
+        cfs_wait_for_completion(&cbargs->opc_sync);
 
         result = io->ci_result = cbargs->opc_rc;
         if (result == 0) {
                 struct cl_object *obj = slice->cis_obj;
-                if (oio->oi_lockless == 0) {
-                        struct cl_attr *attr = &osc_env_info(env)->oti_attr;
-                        int valid = 0;
-
-                        /* Update kms & size */
-                        if (oa->o_valid & OBD_MD_FLSIZE) {
-                                attr->cat_size = oa->o_size;
-                                attr->cat_kms  = oa->o_size;
-                                valid |= CAT_KMS|CAT_SIZE;
-                        }
-                        if (oa->o_valid & OBD_MD_FLBLOCKS) {
-                                attr->cat_blocks = oa->o_blocks;
-                                valid |= CAT_BLOCKS;
-                        }
-                        if (oa->o_valid & OBD_MD_FLMTIME) {
-                                attr->cat_mtime = oa->o_mtime;
-                                valid |= CAT_MTIME;
-                        }
-                        if (oa->o_valid & OBD_MD_FLCTIME) {
-                                attr->cat_ctime = oa->o_ctime;
-                                valid |= CAT_CTIME;
-                        }
-                        if (oa->o_valid & OBD_MD_FLATIME) {
-                                attr->cat_atime = oa->o_atime;
-                                valid |= CAT_ATIME;
-                        }
-                        cl_object_attr_lock(obj);
-                        result = cl_object_attr_set(env, obj, attr, valid);
-                        cl_object_attr_unlock(obj);
-                } else {  /* lockless truncate */
+                if (oio->oi_lockless) {
+                        /* lockless truncate */
                         struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+
+                        LASSERT(cl_io_is_trunc(io));
                         /* XXX: Need a lock. */
                         osd->od_stats.os_lockless_truncates++;
                 }
         }
+}
+
+static int osc_io_read_start(const struct lu_env *env,
+                             const struct cl_io_slice *slice)
+{
+        struct osc_io    *oio   = cl2osc_io(env, slice);
+        struct cl_object *obj   = slice->cis_obj;
+        struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
+        int              result = 0;
+        ENTRY;
+
+        if (oio->oi_lockless == 0) {
+                cl_object_attr_lock(obj);
+                result = cl_object_attr_get(env, obj, attr);
+                if (result == 0) {
+                        attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
+                        result = cl_object_attr_set(env, obj, attr,
+                                                    CAT_ATIME);
+                }
+                cl_object_attr_unlock(obj);
+        }
+        RETURN(result);
+}
+
+static int osc_io_write_start(const struct lu_env *env,
+                              const struct cl_io_slice *slice)
+{
+        struct osc_io    *oio   = cl2osc_io(env, slice);
+        struct cl_object *obj   = slice->cis_obj;
+        struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
+        int              result = 0;
+        ENTRY;
 
-        /* return result; */
+        if (oio->oi_lockless == 0) {
+                cl_object_attr_lock(obj);
+                result = cl_object_attr_get(env, obj, attr);
+                if (result == 0) {
+                        attr->cat_mtime = attr->cat_ctime =
+                                LTIME_S(CFS_CURRENT_TIME);
+                        result = cl_object_attr_set(env, obj, attr,
+                                                    CAT_MTIME | CAT_CTIME);
+                }
+                cl_object_attr_unlock(obj);
+        }
+        RETURN(result);
 }
 
 static const struct cl_io_operations osc_io_ops = {
         .op = {
                 [CIT_READ] = {
+                        .cio_start  = osc_io_read_start,
                         .cio_fini   = osc_io_fini
                 },
                 [CIT_WRITE] = {
+                        .cio_start  = osc_io_write_start,
                         .cio_fini   = osc_io_fini
                 },
-                [CIT_TRUNC] = {
-                        .cio_start  = osc_io_trunc_start,
-                        .cio_end    = osc_io_trunc_end
+                [CIT_SETATTR] = {
+                        .cio_start  = osc_io_setattr_start,
+                        .cio_end    = osc_io_setattr_end
                 },
                 [CIT_FAULT] = {
                         .cio_fini   = osc_io_fini,
@@ -596,25 +663,36 @@ static void osc_req_attr_set(const struct lu_env *env,
         }
         if (flags & OBD_MD_FLHANDLE) {
                 clerq = slice->crs_req;
-                LASSERT(!list_empty(&clerq->crq_pages));
+                LASSERT(!cfs_list_empty(&clerq->crq_pages));
                 apage = container_of(clerq->crq_pages.next,
                                      struct cl_page, cp_flight);
                 opg = osc_cl_page_osc(apage);
                 apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
                 lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
-                if (lock != NULL) {
-                        olck = osc_lock_at(lock);
-                        LASSERT(olck != NULL);
-                        /* check for lockless io. */
-                        if (olck->ols_lock != NULL) {
-                                oa->o_handle = olck->ols_lock->l_remote_handle;
-                                oa->o_valid |= OBD_MD_FLHANDLE;
-                        }
-                        cl_lock_put(env, lock);
-                } else {
-                        /* Should only be possible with liblustre */
-                        LASSERT(LIBLUSTRE_CLIENT);
+                if (lock == NULL) {
+                        struct cl_object_header *head;
+                        struct cl_lock          *scan;
+
+                        head = cl_object_header(apage->cp_obj);
+                        cfs_list_for_each_entry(scan, &head->coh_locks,
+                                                cll_linkage)
+                                CL_LOCK_DEBUG(D_ERROR, env, scan,
+                                              "no cover page!\n");
+                        CL_PAGE_DEBUG(D_ERROR, env, apage,
+                                      "dump uncover page!\n");
+                        libcfs_debug_dumpstack(NULL);
+                        LBUG();
+                }
+
+                olck = osc_lock_at(lock);
+                LASSERT(olck != NULL);
+                LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
+                /* check for lockless io. */
+                if (olck->ols_lock != NULL) {
+                        oa->o_handle = olck->ols_lock->l_remote_handle;
+                        oa->o_valid |= OBD_MD_FLHANDLE;
                 }
+                cl_lock_put(env, lock);
         }
 }
 
@@ -641,7 +719,7 @@ int osc_req_init(const struct lu_env *env, struct cl_device *dev,
         struct osc_req *or;
         int result;
 
-        OBD_SLAB_ALLOC_PTR(or, osc_req_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, CFS_ALLOC_IO);
         if (or != NULL) {
                 cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
                 result = 0;