/* functions below are stubs for now, they will be implemented with
* grant support on MDT */
-static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode,
- char *jobid, long amount)
-{
- return;
-}
-
static inline void mdt_dom_read_lock(struct mdt_object *mo)
{
down_read(&mo->mot_dom_sem);
up_write(&mo->mot_dom_sem);
}
-/**
- * Lock prolongation for Data-on-MDT.
- * This is similar to OFD code but for DOM ibits lock.
- */
-static inline time64_t prolong_timeout(struct ptlrpc_request *req)
-{
- struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- time64_t req_timeout;
-
- if (AT_OFF)
- return obd_timeout / 2;
-
- req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec;
- return max_t(time64_t, at_est2timeout(at_get(&svcpt->scp_at_estimate)),
- req_timeout);
-}
-
static void mdt_dom_resource_prolong(struct ldlm_prolong_args *arg)
{
struct ldlm_resource *res;
struct niobuf_remote *rnb, int *nr_local,
struct niobuf_local *lnb, char *jobid)
{
+ struct tgt_session_info *tsi = tgt_ses_info(env);
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
struct dt_object *dob;
int i, j, rc, tot_bytes = 0;
int maxlnb = *nr_local;
if (unlikely(rc))
GOTO(buf_put, rc);
- mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes);
+ mdt_counter_incr(req, LPROC_MDT_IO_READ, tot_bytes);
RETURN(0);
buf_put:
dt_bufs_put(env, dob, lnb, *nr_local);
struct niobuf_remote *rnb, int *nr_local,
struct niobuf_local *lnb, char *jobid)
{
+ struct tgt_session_info *tsi = tgt_ses_info(env);
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
struct dt_object *dob;
int i, j, k, rc = 0, tot_bytes = 0;
int maxlnb = *nr_local;
if (likely(rc))
GOTO(err, rc);
- mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes);
+ mdt_counter_incr(req, LPROC_MDT_IO_WRITE, tot_bytes);
RETURN(0);
err:
dt_bufs_put(env, dob, lnb, *nr_local);
static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp,
struct mdt_device *mdt, struct mdt_object *mo,
- struct lu_attr *la, int objcount, int niocount,
- struct niobuf_local *lnb, unsigned long granted,
- int old_rc)
+ struct lu_attr *la, struct obdo *oa, int objcount,
+ int niocount, struct niobuf_local *lnb,
+ unsigned long granted, int old_rc)
{
struct dt_device *dt = mdt->mdt_bottom;
struct dt_object *dob;
GOTO(out_stop, rc);
dt_write_lock(env, dob, 0);
- rc = dt_write_commit(env, dob, lnb, niocount, th);
+ rc = dt_write_commit(env, dob, lnb, niocount, th, oa->o_size);
if (rc)
GOTO(unlock, rc);
la_from_obdo(la, oa, valid);
- rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount,
+ rc = mdt_commitrw_write(env, exp, mdt, mo, la, oa, objcount,
npages, lnb, oa->o_grant_used, old_rc);
if (rc == 0)
obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID);
int mdt_punch_hdl(struct tgt_session_info *tsi)
{
const struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
struct ost_body *repbody;
struct mdt_thread_info *info;
struct lu_attr *la;
struct dt_object *dob;
__u64 flags = 0;
struct lustre_handle lh = { 0, };
+ ktime_t kstart = ktime_get();
__u64 start, end;
int rc;
bool srvlock;
GOTO(out_put, rc);
mdt_dom_obj_lvb_update(tsi->tsi_env, mo, false);
- mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH,
- tsi->tsi_jobid, 1);
+ mdt_counter_incr(req, LPROC_MDT_IO_PUNCH,
+ ktime_us_delta(ktime_get(), kstart));
EXIT;
out_put:
lu_object_put(tsi->tsi_env, &mo->mot_obj);
int rc;
loff_t offset;
unsigned int len, copied = 0;
+ __u64 real_dom_size;
int lnbs, nr_local, i;
bool dom_lock = false;
}
mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
+ RETURN(0);
+
+ if (!mbo->mbo_dom_size)
+ RETURN(0);
if (lustre_handle_is_used(lh)) {
struct ldlm_lock *lock;
if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open)
RETURN(0);
- if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
- RETURN(0);
-
- if (mbo->mbo_dom_size == 0)
- RETURN(0);
+ /* if DoM object holds encrypted content, we need to make sure we
+ * send whole encryption units, or client will read corrupted content
+ */
+ if (mbo->mbo_valid & LA_FLAGS && mbo->mbo_flags & LUSTRE_ENCRYPT_FL &&
+ mbo->mbo_dom_size & ~LUSTRE_ENCRYPTION_MASK)
+ real_dom_size = (mbo->mbo_dom_size & LUSTRE_ENCRYPTION_MASK) +
+ LUSTRE_ENCRYPTION_UNIT_SIZE;
+ else
+ real_dom_size = mbo->mbo_dom_size;
CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d\n",
- mbo->mbo_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
+ real_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
len = req->rq_reqmsg->lm_repsize - req->rq_replen;
/* NB: at this moment we have the following sizes:
* 1) try to fit into the buffer we have
* 2) return just file tail otherwise.
*/
- if (mbo->mbo_dom_size <= len) {
+ if (real_dom_size <= len) {
/* can fit whole data */
- len = mbo->mbo_dom_size;
+ len = real_dom_size;
offset = 0;
- } else {
+ } else if (real_dom_size <
+ mdt_lmm_dom_stripesize(mti->mti_attr.ma_lmm)) {
int tail, pgbits;
/* File tail offset must be aligned with larger page size
}
pgbits = max_t(int, PAGE_SHIFT,
req->rq_export->exp_target_data.ted_pagebits);
- tail = mbo->mbo_dom_size % (1 << pgbits);
+ tail = real_dom_size % (1 << pgbits);
/* no partial tail or tail can't fit in reply */
if (tail == 0 || len < tail)
RETURN(0);
len = tail;
- offset = mbo->mbo_dom_size - len;
+ offset = real_dom_size - len;
+ } else {
+ /* DOM stripe is fully written, so don't expect its tail
+ * will be used by append.
+ */
+ RETURN(0);
}
+
LASSERT((offset & ~PAGE_MASK) == 0);
rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE,
sizeof(*rnb) + len);
/* parse remote buffers to local buffers and prepare the latter */
lnbs = (len >> PAGE_SHIFT) + 1;
- OBD_ALLOC(lnb, sizeof(*lnb) * lnbs);
+ OBD_ALLOC_PTR_ARRAY(lnb, lnbs);
if (lnb == NULL)
GOTO(unlock, rc = -ENOMEM);
buf_put:
dt_bufs_put(env, mo, lnb, nr_local);
free:
- OBD_FREE(lnb, sizeof(*lnb) * lnbs);
+ OBD_FREE_PTR_ARRAY(lnb, lnbs);
unlock:
dt_read_unlock(env, mo);
lu_object_put(env, &mo->do_lu);