X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_io.c;h=b89d8fb4873634fde5ca209177e802103c6539f7;hp=e18d6f10bfcd5de3f381ee361cb050fe1aa4554f;hb=a71586d4ee8d6f039a413e2a0fd791db847a3c19;hpb=02c23a2e851fdebc3e2bde45a51fb043559504ab diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c index e18d6f1..b89d8fb 100644 --- a/lustre/mdt/mdt_io.c +++ b/lustre/mdt/mdt_io.c @@ -35,12 +35,6 @@ /* functions below are stubs for now, they will be implemented with * grant support on MDT */ -static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode, - char *jobid, long amount) -{ - return; -} - static inline void mdt_dom_read_lock(struct mdt_object *mo) { down_read(&mo->mot_dom_sem); @@ -61,23 +55,6 @@ static inline void mdt_dom_write_unlock(struct mdt_object *mo) up_write(&mo->mot_dom_sem); } -/** - * Lock prolongation for Data-on-MDT. - * This is similar to OFD code but for DOM ibits lock. - */ -static inline time64_t prolong_timeout(struct ptlrpc_request *req) -{ - struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; - time64_t req_timeout; - - if (AT_OFF) - return obd_timeout / 2; - - req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec; - return max_t(time64_t, at_est2timeout(at_get(&svcpt->scp_at_estimate)), - req_timeout); -} - static void mdt_dom_resource_prolong(struct ldlm_prolong_args *arg) { struct ldlm_resource *res; @@ -381,6 +358,8 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp, struct niobuf_remote *rnb, int *nr_local, struct niobuf_local *lnb, char *jobid) { + struct tgt_session_info *tsi = tgt_ses_info(env); + struct ptlrpc_request *req = tgt_ses_req(tsi); struct dt_object *dob; int i, j, rc, tot_bytes = 0; int maxlnb = *nr_local; @@ -439,7 +418,7 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp, if (unlikely(rc)) GOTO(buf_put, rc); - mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes); + mdt_counter_incr(req, LPROC_MDT_IO_READ, tot_bytes); RETURN(0); buf_put: dt_bufs_put(env, dob, lnb, *nr_local); @@ -454,6 +433,8 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp, struct niobuf_remote *rnb, int *nr_local, struct niobuf_local *lnb, char *jobid) { + struct tgt_session_info *tsi = tgt_ses_info(env); + struct ptlrpc_request *req = tgt_ses_req(tsi); struct dt_object *dob; int i, j, k, rc = 0, tot_bytes = 0; int maxlnb = *nr_local; @@ -513,7 +494,7 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp, if (likely(rc)) GOTO(err, rc); - mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes); + mdt_counter_incr(req, LPROC_MDT_IO_WRITE, tot_bytes); RETURN(0); err: dt_bufs_put(env, dob, lnb, *nr_local); @@ -605,9 +586,9 @@ static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt, static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp, struct mdt_device *mdt, struct mdt_object *mo, - struct lu_attr *la, int objcount, int niocount, - struct niobuf_local *lnb, unsigned long granted, - int old_rc) + struct lu_attr *la, struct obdo *oa, int objcount, + int niocount, struct niobuf_local *lnb, + unsigned long granted, int old_rc) { struct dt_device *dt = mdt->mdt_bottom; struct dt_object *dob; @@ -673,7 +654,7 @@ retry: GOTO(out_stop, rc); dt_write_lock(env, dob, 0); - rc = dt_write_commit(env, dob, lnb, niocount, th); + rc = dt_write_commit(env, dob, lnb, niocount, th, oa->o_size); if (rc) GOTO(unlock, rc); @@ -764,7 +745,7 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, la_from_obdo(la, oa, valid); - rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount, + rc = mdt_commitrw_write(env, exp, mdt, mo, la, oa, objcount, npages, lnb, oa->o_grant_used, old_rc); if (rc == 0) obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID); @@ -866,6 +847,7 @@ stop: int mdt_punch_hdl(struct tgt_session_info *tsi) { const struct obdo *oa = &tsi->tsi_ost_body->oa; + struct ptlrpc_request *req = tgt_ses_req(tsi); struct ost_body *repbody; struct mdt_thread_info *info; struct lu_attr *la; @@ -876,6 +858,7 @@ int mdt_punch_hdl(struct tgt_session_info *tsi) struct dt_object *dob; __u64 flags = 0; struct lustre_handle lh = { 0, }; + ktime_t kstart = ktime_get(); __u64 start, end; int rc; bool srvlock; @@ -959,8 +942,8 @@ int mdt_punch_hdl(struct tgt_session_info *tsi) GOTO(out_put, rc); mdt_dom_obj_lvb_update(tsi->tsi_env, mo, false); - mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH, - tsi->tsi_jobid, 1); + mdt_counter_incr(req, LPROC_MDT_IO_PUNCH, + ktime_us_delta(ktime_get(), kstart)); EXIT; out_put: lu_object_put(tsi->tsi_env, &mo->mot_obj); @@ -987,7 +970,7 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns, enum ldlm_mode mode; struct ldlm_lock *lock; struct ldlm_glimpse_work *gl_work; - struct list_head gl_list; + LIST_HEAD(gl_list); int rc; ENTRY; @@ -1028,7 +1011,6 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns, gl_work->gl_lock = LDLM_LOCK_GET(lock); /* The glimpse callback is sent to one single IO lock. As a result, * the gl_work list is just composed of one element */ - INIT_LIST_HEAD(&gl_list); list_add_tail(&gl_work->gl_list, &gl_list); /* There is actually no need for a glimpse descriptor when glimpsing * IO locks */ @@ -1426,6 +1408,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, int rc; loff_t offset; unsigned int len, copied = 0; + __u64 real_dom_size; int lnbs, nr_local, i; bool dom_lock = false; @@ -1439,6 +1422,11 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, } mbo = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE)) + RETURN(0); + + if (!mbo->mbo_dom_size) + RETURN(0); if (lustre_handle_is_used(lh)) { struct ldlm_lock *lock; @@ -1454,14 +1442,18 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open) RETURN(0); - if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE)) - RETURN(0); - - if (mbo->mbo_dom_size == 0) - RETURN(0); + /* if DoM object holds encrypted content, we need to make sure we + * send whole encryption units, or client will read corrupted content + */ + if (mbo->mbo_valid & LA_FLAGS && mbo->mbo_flags & LUSTRE_ENCRYPT_FL && + mbo->mbo_dom_size & ~LUSTRE_ENCRYPTION_MASK) + real_dom_size = (mbo->mbo_dom_size & LUSTRE_ENCRYPTION_MASK) + + LUSTRE_ENCRYPTION_UNIT_SIZE; + else + real_dom_size = mbo->mbo_dom_size; CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d\n", - mbo->mbo_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen); + real_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen); len = req->rq_reqmsg->lm_repsize - req->rq_replen; /* NB: at this moment we have the following sizes: @@ -1480,11 +1472,12 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, * 1) try to fit into the buffer we have * 2) return just file tail otherwise. */ - if (mbo->mbo_dom_size <= len) { + if (real_dom_size <= len) { /* can fit whole data */ - len = mbo->mbo_dom_size; + len = real_dom_size; offset = 0; - } else { + } else if (real_dom_size < + mdt_lmm_dom_stripesize(mti->mti_attr.ma_lmm)) { int tail, pgbits; /* File tail offset must be aligned with larger page size @@ -1502,16 +1495,22 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, } pgbits = max_t(int, PAGE_SHIFT, req->rq_export->exp_target_data.ted_pagebits); - tail = mbo->mbo_dom_size % (1 << pgbits); + tail = real_dom_size % (1 << pgbits); /* no partial tail or tail can't fit in reply */ if (tail == 0 || len < tail) RETURN(0); len = tail; - offset = mbo->mbo_dom_size - len; + offset = real_dom_size - len; + } else { + /* DOM stripe is fully written, so don't expect its tail + * will be used by append. + */ + RETURN(0); } - LASSERT((offset % PAGE_SIZE) == 0); + + LASSERT((offset & ~PAGE_MASK) == 0); rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE, sizeof(*rnb) + len); if (rc != 0) { @@ -1544,7 +1543,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, /* parse remote buffers to local buffers and prepare the latter */ lnbs = (len >> PAGE_SHIFT) + 1; - OBD_ALLOC(lnb, sizeof(*lnb) * lnbs); + OBD_ALLOC_PTR_ARRAY(lnb, lnbs); if (lnb == NULL) GOTO(unlock, rc = -ENOMEM); @@ -1585,7 +1584,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt, buf_put: dt_bufs_put(env, mo, lnb, nr_local); free: - OBD_FREE(lnb, sizeof(*lnb) * lnbs); + OBD_FREE_PTR_ARRAY(lnb, lnbs); unlock: dt_read_unlock(env, mo); lu_object_put(env, &mo->do_lu);