Whamcloud - gitweb
LU-13415 dom: use DoM optimization for DOM+OST files
[fs/lustre-release.git] / lustre / mdt / mdt_io.c
index 8961fa2..4dc67be 100644 (file)
@@ -61,23 +61,6 @@ static inline void mdt_dom_write_unlock(struct mdt_object *mo)
        up_write(&mo->mot_dom_sem);
 }
 
-/**
- * Lock prolongation for Data-on-MDT.
- * This is similar to OFD code but for DOM ibits lock.
- */
-static inline time64_t prolong_timeout(struct ptlrpc_request *req)
-{
-       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-       time64_t req_timeout;
-
-       if (AT_OFF)
-               return obd_timeout / 2;
-
-       req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec;
-       return max_t(time64_t, at_est2timeout(at_get(&svcpt->scp_at_estimate)),
-                    req_timeout);
-}
-
 static void mdt_dom_resource_prolong(struct ldlm_prolong_args *arg)
 {
        struct ldlm_resource *res;
@@ -211,7 +194,7 @@ static int mdt_rw_hpreq_check(struct ptlrpc_request *req)
 
        if (pa.lpa_blocks_cnt > 0) {
                CDEBUG(D_DLMTRACE,
-                      "%s: refreshed %u locks timeout for req %p",
+                      "%s: refreshed %u locks timeout for req %p\n",
                       tgt_name(tsi->tsi_tgt), pa.lpa_blocks_cnt, req);
                RETURN(1);
        }
@@ -383,6 +366,7 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
 {
        struct dt_object *dob;
        int i, j, rc, tot_bytes = 0;
+       int maxlnb = *nr_local;
        int level;
 
        ENTRY;
@@ -420,11 +404,12 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
        dob = mdt_obj2dt(mo);
        /* parse remote buffers to local buffers and prepare the latter */
        for (i = 0, j = 0; i < niocount; i++) {
-               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 0);
+               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, maxlnb, 0);
                if (unlikely(rc < 0))
                        GOTO(buf_put, rc);
                /* correct index for local buffers to continue with */
                j += rc;
+               maxlnb -= rc;
                *nr_local += rc;
                tot_bytes += rnb[i].rnb_len;
        }
@@ -454,6 +439,7 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
 {
        struct dt_object *dob;
        int i, j, k, rc = 0, tot_bytes = 0;
+       int maxlnb = *nr_local;
 
        ENTRY;
 
@@ -491,7 +477,7 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
        dob = mdt_obj2dt(mo);
        /* parse remote buffers to local buffers and prepare the latter */
        for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
-               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 1);
+               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, maxlnb, 1);
                if (unlikely(rc < 0))
                        GOTO(err, rc);
                /* correct index for local buffers to continue with */
@@ -501,6 +487,7 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
                                lnb[j + k].lnb_rc = -ENOSPC;
                }
                j += rc;
+               maxlnb -= rc;
                *nr_local += rc;
                tot_bytes += rnb[i].rnb_len;
        }
@@ -983,7 +970,7 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns,
        enum ldlm_mode mode;
        struct ldlm_lock *lock;
        struct ldlm_glimpse_work *gl_work;
-       struct list_head gl_list;
+       LIST_HEAD(gl_list);
        int rc;
 
        ENTRY;
@@ -1024,7 +1011,6 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns,
        gl_work->gl_lock = LDLM_LOCK_GET(lock);
        /* The glimpse callback is sent to one single IO lock. As a result,
         * the gl_work list is just composed of one element */
-       INIT_LIST_HEAD(&gl_list);
        list_add_tail(&gl_work->gl_list, &gl_list);
        /* There is actually no need for a glimpse descriptor when glimpsing
         * IO locks */
@@ -1435,6 +1421,11 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        }
 
        mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
+               RETURN(0);
+
+       if (!mbo->mbo_dom_size)
+               RETURN(0);
 
        if (lustre_handle_is_used(lh)) {
                struct ldlm_lock *lock;
@@ -1450,12 +1441,6 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open)
                RETURN(0);
 
-       if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
-               RETURN(0);
-
-       if (mbo->mbo_dom_size == 0)
-               RETURN(0);
-
        CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d\n",
               mbo->mbo_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
        len = req->rq_reqmsg->lm_repsize - req->rq_replen;
@@ -1480,7 +1465,8 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
                /* can fit whole data */
                len = mbo->mbo_dom_size;
                offset = 0;
-       } else {
+       } else if (mbo->mbo_dom_size <
+                  mdt_lmm_dom_stripesize(mti->mti_attr.ma_lmm)) {
                int tail, pgbits;
 
                /* File tail offset must be aligned with larger page size
@@ -1506,8 +1492,14 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
 
                len = tail;
                offset = mbo->mbo_dom_size - len;
+       } else {
+               /* DOM stripe is fully written, so don't expect its tail
+                * will be used by append.
+                */
+               RETURN(0);
        }
-       LASSERT((offset % PAGE_SIZE) == 0);
+
+       LASSERT((offset & ~PAGE_MASK) == 0);
        rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE,
                                     sizeof(*rnb) + len);
        if (rc != 0) {
@@ -1544,7 +1536,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        if (lnb == NULL)
                GOTO(unlock, rc = -ENOMEM);
 
-       rc = dt_bufs_get(env, mo, rnb, lnb, 0);
+       rc = dt_bufs_get(env, mo, rnb, lnb, lnbs, 0);
        if (unlikely(rc < 0))
                GOTO(free, rc);
        LASSERT(rc <= lnbs);