Whamcloud - gitweb
LU-12275 sec: encryption support for DoM files
[fs/lustre-release.git] / lustre / mdt / mdt_io.c
index e18d6f1..b89d8fb 100644 (file)
 
 /* functions below are stubs for now, they will be implemented with
  * grant support on MDT */
-static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode,
-                                      char *jobid, long amount)
-{
-       return;
-}
-
 static inline void mdt_dom_read_lock(struct mdt_object *mo)
 {
        down_read(&mo->mot_dom_sem);
@@ -61,23 +55,6 @@ static inline void mdt_dom_write_unlock(struct mdt_object *mo)
        up_write(&mo->mot_dom_sem);
 }
 
-/**
- * Lock prolongation for Data-on-MDT.
- * This is similar to OFD code but for DOM ibits lock.
- */
-static inline time64_t prolong_timeout(struct ptlrpc_request *req)
-{
-       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-       time64_t req_timeout;
-
-       if (AT_OFF)
-               return obd_timeout / 2;
-
-       req_timeout = req->rq_deadline - req->rq_arrival_time.tv_sec;
-       return max_t(time64_t, at_est2timeout(at_get(&svcpt->scp_at_estimate)),
-                    req_timeout);
-}
-
 static void mdt_dom_resource_prolong(struct ldlm_prolong_args *arg)
 {
        struct ldlm_resource *res;
@@ -381,6 +358,8 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
                           struct niobuf_remote *rnb, int *nr_local,
                           struct niobuf_local *lnb, char *jobid)
 {
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct ptlrpc_request *req = tgt_ses_req(tsi);
        struct dt_object *dob;
        int i, j, rc, tot_bytes = 0;
        int maxlnb = *nr_local;
@@ -439,7 +418,7 @@ static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
        if (unlikely(rc))
                GOTO(buf_put, rc);
 
-       mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes);
+       mdt_counter_incr(req, LPROC_MDT_IO_READ, tot_bytes);
        RETURN(0);
 buf_put:
        dt_bufs_put(env, dob, lnb, *nr_local);
@@ -454,6 +433,8 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
                            struct niobuf_remote *rnb, int *nr_local,
                            struct niobuf_local *lnb, char *jobid)
 {
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct ptlrpc_request *req = tgt_ses_req(tsi);
        struct dt_object *dob;
        int i, j, k, rc = 0, tot_bytes = 0;
        int maxlnb = *nr_local;
@@ -513,7 +494,7 @@ static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
        if (likely(rc))
                GOTO(err, rc);
 
-       mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes);
+       mdt_counter_incr(req, LPROC_MDT_IO_WRITE, tot_bytes);
        RETURN(0);
 err:
        dt_bufs_put(env, dob, lnb, *nr_local);
@@ -605,9 +586,9 @@ static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt,
 
 static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp,
                              struct mdt_device *mdt, struct mdt_object *mo,
-                             struct lu_attr *la, int objcount, int niocount,
-                             struct niobuf_local *lnb, unsigned long granted,
-                             int old_rc)
+                             struct lu_attr *la, struct obdo *oa, int objcount,
+                             int niocount, struct niobuf_local *lnb,
+                             unsigned long granted, int old_rc)
 {
        struct dt_device *dt = mdt->mdt_bottom;
        struct dt_object *dob;
@@ -673,7 +654,7 @@ retry:
                GOTO(out_stop, rc);
 
        dt_write_lock(env, dob, 0);
-       rc = dt_write_commit(env, dob, lnb, niocount, th);
+       rc = dt_write_commit(env, dob, lnb, niocount, th, oa->o_size);
        if (rc)
                GOTO(unlock, rc);
 
@@ -764,7 +745,7 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
 
                la_from_obdo(la, oa, valid);
 
-               rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount,
+               rc = mdt_commitrw_write(env, exp, mdt, mo, la, oa, objcount,
                                        npages, lnb, oa->o_grant_used, old_rc);
                if (rc == 0)
                        obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID);
@@ -866,6 +847,7 @@ stop:
 int mdt_punch_hdl(struct tgt_session_info *tsi)
 {
        const struct obdo *oa = &tsi->tsi_ost_body->oa;
+       struct ptlrpc_request *req = tgt_ses_req(tsi);
        struct ost_body *repbody;
        struct mdt_thread_info *info;
        struct lu_attr *la;
@@ -876,6 +858,7 @@ int mdt_punch_hdl(struct tgt_session_info *tsi)
        struct dt_object *dob;
        __u64 flags = 0;
        struct lustre_handle lh = { 0, };
+       ktime_t kstart = ktime_get();
        __u64 start, end;
        int rc;
        bool srvlock;
@@ -959,8 +942,8 @@ int mdt_punch_hdl(struct tgt_session_info *tsi)
                GOTO(out_put, rc);
 
        mdt_dom_obj_lvb_update(tsi->tsi_env, mo, false);
-       mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH,
-                           tsi->tsi_jobid, 1);
+       mdt_counter_incr(req, LPROC_MDT_IO_PUNCH,
+                        ktime_us_delta(ktime_get(), kstart));
        EXIT;
 out_put:
        lu_object_put(tsi->tsi_env, &mo->mot_obj);
@@ -987,7 +970,7 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns,
        enum ldlm_mode mode;
        struct ldlm_lock *lock;
        struct ldlm_glimpse_work *gl_work;
-       struct list_head gl_list;
+       LIST_HEAD(gl_list);
        int rc;
 
        ENTRY;
@@ -1028,7 +1011,6 @@ int mdt_do_glimpse(const struct lu_env *env, struct ldlm_namespace *ns,
        gl_work->gl_lock = LDLM_LOCK_GET(lock);
        /* The glimpse callback is sent to one single IO lock. As a result,
         * the gl_work list is just composed of one element */
-       INIT_LIST_HEAD(&gl_list);
        list_add_tail(&gl_work->gl_list, &gl_list);
        /* There is actually no need for a glimpse descriptor when glimpsing
         * IO locks */
@@ -1426,6 +1408,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        int rc;
        loff_t offset;
        unsigned int len, copied = 0;
+       __u64 real_dom_size;
        int lnbs, nr_local, i;
        bool dom_lock = false;
 
@@ -1439,6 +1422,11 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        }
 
        mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
+               RETURN(0);
+
+       if (!mbo->mbo_dom_size)
+               RETURN(0);
 
        if (lustre_handle_is_used(lh)) {
                struct ldlm_lock *lock;
@@ -1454,14 +1442,18 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open)
                RETURN(0);
 
-       if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
-               RETURN(0);
-
-       if (mbo->mbo_dom_size == 0)
-               RETURN(0);
+       /* if DoM object holds encrypted content, we need to make sure we
+        * send whole encryption units, or client will read corrupted content
+        */
+       if (mbo->mbo_valid & LA_FLAGS && mbo->mbo_flags & LUSTRE_ENCRYPT_FL &&
+           mbo->mbo_dom_size & ~LUSTRE_ENCRYPTION_MASK)
+               real_dom_size = (mbo->mbo_dom_size & LUSTRE_ENCRYPTION_MASK) +
+                               LUSTRE_ENCRYPTION_UNIT_SIZE;
+       else
+               real_dom_size = mbo->mbo_dom_size;
 
        CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d\n",
-              mbo->mbo_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
+              real_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
        len = req->rq_reqmsg->lm_repsize - req->rq_replen;
 
        /* NB: at this moment we have the following sizes:
@@ -1480,11 +1472,12 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
         * 1) try to fit into the buffer we have
         * 2) return just file tail otherwise.
         */
-       if (mbo->mbo_dom_size <= len) {
+       if (real_dom_size <= len) {
                /* can fit whole data */
-               len = mbo->mbo_dom_size;
+               len = real_dom_size;
                offset = 0;
-       } else {
+       } else if (real_dom_size <
+                  mdt_lmm_dom_stripesize(mti->mti_attr.ma_lmm)) {
                int tail, pgbits;
 
                /* File tail offset must be aligned with larger page size
@@ -1502,16 +1495,22 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
                }
                pgbits = max_t(int, PAGE_SHIFT,
                               req->rq_export->exp_target_data.ted_pagebits);
-               tail = mbo->mbo_dom_size % (1 << pgbits);
+               tail = real_dom_size % (1 << pgbits);
 
                /* no partial tail or tail can't fit in reply */
                if (tail == 0 || len < tail)
                        RETURN(0);
 
                len = tail;
-               offset = mbo->mbo_dom_size - len;
+               offset = real_dom_size - len;
+       } else {
+               /* DOM stripe is fully written, so don't expect its tail
+                * will be used by append.
+                */
+               RETURN(0);
        }
-       LASSERT((offset % PAGE_SIZE) == 0);
+
+       LASSERT((offset & ~PAGE_MASK) == 0);
        rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE,
                                     sizeof(*rnb) + len);
        if (rc != 0) {
@@ -1544,7 +1543,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
 
        /* parse remote buffers to local buffers and prepare the latter */
        lnbs = (len >> PAGE_SHIFT) + 1;
-       OBD_ALLOC(lnb, sizeof(*lnb) * lnbs);
+       OBD_ALLOC_PTR_ARRAY(lnb, lnbs);
        if (lnb == NULL)
                GOTO(unlock, rc = -ENOMEM);
 
@@ -1585,7 +1584,7 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
 buf_put:
        dt_bufs_put(env, mo, lnb, nr_local);
 free:
-       OBD_FREE(lnb, sizeof(*lnb) * lnbs);
+       OBD_FREE_PTR_ARRAY(lnb, lnbs);
 unlock:
        dt_read_unlock(env, mo);
        lu_object_put(env, &mo->do_lu);