Whamcloud - gitweb
b=19387 integrate LST into acc-sm
[fs/lustre-release.git] / lustre / osc / osc_io.c
index bdd9681..6190433 100644 (file)
  *   Author: Nikita Danilov <nikita.danilov@sun.com>
  */
 
-/** \addtogroup osc osc @{ */
-
 #define DEBUG_SUBSYSTEM S_OSC
 
 #include "osc_cl_internal.h"
 
+/** \addtogroup osc 
+ *  @{ 
+ */
+
 /*****************************************************************************
  *
  * Type conversions.
@@ -150,16 +152,16 @@ static int osc_io_submit(const struct lu_env *env,
                 exp = osc_export(osc);
 
                 if (priority > CRP_NORMAL) {
-                        spin_lock(&oap->oap_lock);
+                        cfs_spin_lock(&oap->oap_lock);
                         oap->oap_async_flags |= ASYNC_HP;
-                        spin_unlock(&oap->oap_lock);
+                        cfs_spin_unlock(&oap->oap_lock);
                 }
                 /*
                  * This can be checked without cli->cl_loi_list_lock, because
                  * ->oap_*_item are always manipulated when the page is owned.
                  */
-                if (!list_empty(&oap->oap_urgent_item) ||
-                    !list_empty(&oap->oap_rpc_item)) {
+                if (!cfs_list_empty(&oap->oap_urgent_item) ||
+                    !cfs_list_empty(&oap->oap_rpc_item)) {
                         result = -EBUSY;
                         break;
                 }
@@ -175,7 +177,7 @@ static int osc_io_submit(const struct lu_env *env,
                 result = cl_page_prep(env, io, page, crt);
                 if (result == 0) {
                         cl_page_list_move(qout, qin, page);
-                        if (list_empty(&oap->oap_pending_item)) {
+                        if (cfs_list_empty(&oap->oap_pending_item)) {
                                 osc_io_submit_page(env, cl2osc_io(env, ios),
                                                    opg, crt);
                         } else {
@@ -194,6 +196,7 @@ static int osc_io_submit(const struct lu_env *env,
                                  */
                                 LASSERT(result == 0);
                         }
+                        opg->ops_submit_time = cfs_time_current();
                 } else {
                         LASSERT(result < 0);
                         if (result != -EALREADY)
@@ -377,11 +380,14 @@ static int osc_punch_upcall(void *a, int rc)
         struct osc_punch_cbargs *args = a;
 
         args->opc_rc = rc;
-        complete(&args->opc_sync);
+        cfs_complete(&args->opc_sync);
         return 0;
 }
 
-#ifdef __KERNEL__
+/* Disable osc_trunc_check() because it is naturally race between read and
+ * truncate. See bug 20645 for details.
+ */
+#if 0 && defined(__KERNEL__)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
@@ -408,7 +414,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
          * XXX this is quite expensive check.
          */
         cl_page_list_init(list);
-        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list);
+        cl_page_gang_lookup(env, clob, io, start + partial, CL_PAGE_EOF, list, 0);
 
         cl_page_list_for_each(page, list)
                 CL_PAGE_DEBUG(D_ERROR, env, page, "exists %lu\n", start);
@@ -416,8 +422,9 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
         cl_page_list_disown(env, io, list);
         cl_page_list_fini(env, list);
 
-        spin_lock(&obj->oo_seatbelt);
-        list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE], ops_inflight) {
+        cfs_spin_lock(&obj->oo_seatbelt);
+        cfs_list_for_each_entry(cp, &obj->oo_inflight[CRT_WRITE],
+                                ops_inflight) {
                 page = cp->ops_cl.cpl_page;
                 if (page->cp_index >= start + partial) {
                         cfs_task_t *submitter;
@@ -431,7 +438,7 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
                         libcfs_debug_dumpstack(submitter);
                 }
         }
-        spin_unlock(&obj->oo_seatbelt);
+        cfs_spin_unlock(&obj->oo_seatbelt);
 }
 #else /* __KERNEL__ */
 # define osc_trunc_check(env, io, oio, size) do {;} while (0)
@@ -451,9 +458,6 @@ static int osc_io_trunc_start(const struct lu_env *env,
         loff_t                   size   = io->u.ci_truncate.tr_size;
         int                      result = 0;
 
-
-        memset(oa, 0, sizeof(*oa));
-
         osc_trunc_check(env, io, oio, size);
 
         if (oio->oi_lockless == 0) {
@@ -467,6 +471,7 @@ static int osc_io_trunc_start(const struct lu_env *env,
                 cl_object_attr_unlock(obj);
         }
 
+        memset(oa, 0, sizeof(*oa));
         if (result == 0) {
                 oa->o_id = loi->loi_id;
                 oa->o_gr = loi->loi_gr;
@@ -476,7 +481,7 @@ static int osc_io_trunc_start(const struct lu_env *env,
                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME |
                         OBD_MD_FLCTIME | OBD_MD_FLMTIME;
                 if (oio->oi_lockless) {
-                        oa->o_flags = OBD_FL_TRUNCLOCK;
+                        oa->o_flags = OBD_FL_SRVLOCK;
                         oa->o_valid |= OBD_MD_FLFLAGS;
                 }
                 oa->o_size = size;
@@ -484,7 +489,7 @@ static int osc_io_trunc_start(const struct lu_env *env,
                 oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
 
                 capa = io->u.ci_truncate.tr_capa;
-                init_completion(&cbargs->opc_sync);
+                cfs_init_completion(&cbargs->opc_sync);
                 result = osc_punch_base(osc_export(cl2osc(obj)), oa, capa,
                                         osc_punch_upcall, cbargs, PTLRPCD_SET);
         }
@@ -500,7 +505,7 @@ static void osc_io_trunc_end(const struct lu_env *env,
         struct obdo             *oa     = &oio->oi_oa;
         int result;
 
-        wait_for_completion(&cbargs->opc_sync);
+        cfs_wait_for_completion(&cbargs->opc_sync);
 
         result = io->ci_result = cbargs->opc_rc;
         if (result == 0) {
@@ -627,15 +632,30 @@ static void osc_req_attr_set(const struct lu_env *env,
         }
         if (flags & OBD_MD_FLHANDLE) {
                 clerq = slice->crs_req;
-                LASSERT(!list_empty(&clerq->crq_pages));
+                LASSERT(!cfs_list_empty(&clerq->crq_pages));
                 apage = container_of(clerq->crq_pages.next,
                                      struct cl_page, cp_flight);
                 opg = osc_cl_page_osc(apage);
                 apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
                 lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
-                LASSERT(lock != NULL);
+                if (lock == NULL) {
+                        struct cl_object_header *head;
+                        struct cl_lock          *scan;
+
+                        head = cl_object_header(apage->cp_obj);
+                        cfs_list_for_each_entry(scan, &head->coh_locks,
+                                                cll_linkage)
+                                CL_LOCK_DEBUG(D_ERROR, env, scan,
+                                              "no cover page!\n");
+                        CL_PAGE_DEBUG(D_ERROR, env, apage,
+                                      "dump uncover page!\n");
+                        libcfs_debug_dumpstack(NULL);
+                        LBUG();
+                }
+
                 olck = osc_lock_at(lock);
                 LASSERT(olck != NULL);
+                LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
                 /* check for lockless io. */
                 if (olck->ols_lock != NULL) {
                         oa->o_handle = olck->ols_lock->l_remote_handle;