Whamcloud - gitweb
LU-11595 mdt: fix read-on-open for big PAGE_SIZE
[fs/lustre-release.git] / lustre / mdt / mdt_io.c
index 9569cb2..3124c8c 100644 (file)
@@ -78,6 +78,36 @@ static inline time64_t prolong_timeout(struct ptlrpc_request *req)
                     req_timeout);
 }
 
+static void mdt_dom_resource_prolong(struct ldlm_prolong_args *arg)
+{
+       struct ldlm_resource *res;
+       struct ldlm_lock *lock;
+
+       ENTRY;
+
+       res = ldlm_resource_get(arg->lpa_export->exp_obd->obd_namespace, NULL,
+                               &arg->lpa_resid, LDLM_EXTENT, 0);
+       if (IS_ERR(res)) {
+               CDEBUG(D_DLMTRACE,
+                      "Failed to get resource for resid %llu/%llu\n",
+                      arg->lpa_resid.name[0], arg->lpa_resid.name[1]);
+               RETURN_EXIT;
+       }
+
+       lock_res(res);
+       list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+               if (ldlm_has_dom(lock)) {
+                       LDLM_DEBUG(lock, "DOM lock to prolong ");
+                       ldlm_lock_prolong_one(lock, arg);
+                       break;
+               }
+       }
+       unlock_res(res);
+       ldlm_resource_putref(res);
+
+       EXIT;
+}
+
 static void mdt_prolong_dom_lock(struct tgt_session_info *tsi,
                                 struct ldlm_prolong_args *data)
 {
@@ -102,9 +132,11 @@ static void mdt_prolong_dom_lock(struct tgt_session_info *tsi,
                        ldlm_lock_prolong_one(lock, data);
                        lock->l_last_used = ktime_get();
                        LDLM_LOCK_PUT(lock);
-                       RETURN_EXIT;
+                       if (data->lpa_locks_cnt > 0)
+                               RETURN_EXIT;
                }
        }
+       mdt_dom_resource_prolong(data);
        EXIT;
 }
 
@@ -700,8 +732,15 @@ int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                                        oa->o_flags = OBD_FL_NO_GRPQUOTA;
                        }
 
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_PRJQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_PRJQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_PRJQUOTA;
+                       }
+
                        oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLUSRQUOTA |
-                                      OBD_MD_FLGRPQUOTA;
+                                      OBD_MD_FLGRPQUOTA | OBD_MD_FLPRJQUOTA;
                }
        } else if (cmd == OBD_BRW_READ) {
                /* If oa != NULL then mdt_preprw_read updated the inode
@@ -994,7 +1033,7 @@ int mdt_dom_object_size(const struct lu_env *env, struct mdt_device *mdt,
 
        /* Update lvbo data if DoM lock returned or if LVB is not yet valid. */
        if (dom_lock || !mdt_dom_lvb_is_valid(res))
-               mdt_dom_lvbo_update(res, NULL, NULL, false);
+               mdt_dom_lvbo_update(env, res, NULL, NULL, false);
 
        mdt_lvb2body(res, mb);
        ldlm_resource_putref(res);
@@ -1092,7 +1131,7 @@ int mdt_glimpse_enqueue(struct mdt_thread_info *mti, struct ldlm_namespace *ns,
 fill_mbo:
        /* LVB can be without valid data in case of DOM */
        if (!mdt_dom_lvb_is_valid(res))
-               mdt_dom_lvbo_update(res, lock, NULL, false);
+               mdt_dom_lvbo_update(mti->mti_env, res, lock, NULL, false);
        mdt_lvb2body(res, mbo);
        RETURN(rc);
 }
@@ -1193,14 +1232,14 @@ void mdt_dom_discard_data(struct mdt_thread_info *info,
 
        /* Tell the clients that the object is gone now and that they should
         * throw away any cached pages. */
-       rc = ldlm_cli_enqueue_local(mdt->mdt_namespace, res_id, LDLM_IBITS,
-                                   policy, LCK_PW, &flags, ldlm_blocking_ast,
-                                   ldlm_completion_ast, NULL, NULL, 0,
-                                   LVB_T_NONE, NULL, &dom_lh);
+       rc = ldlm_cli_enqueue_local(info->mti_env, mdt->mdt_namespace, res_id,
+                                   LDLM_IBITS, policy, LCK_PW, &flags,
+                                   ldlm_blocking_ast, ldlm_completion_ast,
+                                   NULL, NULL, 0, LVB_T_NONE, NULL, &dom_lh);
 
        /* We only care about the side-effects, just drop the lock. */
        if (rc == ELDLM_OK)
-               ldlm_lock_decref(&dom_lh, LCK_PW);
+               ldlm_lock_decref_and_cancel(&dom_lh, LCK_PW);
 }
 
 /* check if client has already DoM lock for given resource */
@@ -1426,9 +1465,26 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
                len = mbo->mbo_dom_size;
                offset = 0;
        } else {
-               int tail = mbo->mbo_dom_size % PAGE_SIZE;
+               int tail, pgbits;
+
+               /* File tail offset must be aligned with larger page size
+                * between client and server, so the maximum page size is
+                * used here to align offset.
+                *
+                * NB: DOM feature was introduced when server supports pagebits
+                * already, so it should be always non-zero value. Report error
+                * if it is not for some reason.
+                */
+               if (!req->rq_export->exp_target_data.ted_pagebits) {
+                       CERROR("%s: client page bits are not saved on server\n",
+                              mdt_obd_name(mdt));
+                       RETURN(0);
+               }
+               pgbits = max_t(int, PAGE_SHIFT,
+                              req->rq_export->exp_target_data.ted_pagebits);
+               tail = mbo->mbo_dom_size % (1 << pgbits);
 
-               /* no tail or tail can't fit in reply */
+               /* no partial tail or tail can't fit in reply */
                if (tail == 0 || len < tail)
                        RETURN(0);
 
@@ -1443,22 +1499,23 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
                GOTO(out, rc = -E2BIG);
        }
 
-       /* re-take MDT_BODY buffer after the buffer growing above */
+       /* re-take MDT_BODY and NIOBUF_INLINE buffers after the buffer grow */
        mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
        fid = &mbo->mbo_fid1;
        if (!fid_is_sane(fid))
-               RETURN(0);
+               GOTO(out, rc = -EINVAL);
 
        rnb = req_capsule_server_get(tsi->tsi_pill, &RMF_NIOBUF_INLINE);
        if (rnb == NULL)
                GOTO(out, rc = -EPROTO);
+
        buf = (char *)rnb + sizeof(*rnb);
        rnb->rnb_len = len;
        rnb->rnb_offset = offset;
 
        mo = dt_locate(env, dt, fid);
        if (IS_ERR(mo))
-               GOTO(out, rc = PTR_ERR(mo));
+               GOTO(out_rnb, rc = PTR_ERR(mo));
        LASSERT(mo != NULL);
 
        dt_read_lock(env, mo, 0);
@@ -1496,11 +1553,14 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
        }
        CDEBUG(D_INFO, "Read %i (wanted %u) bytes from %llu\n", copied,
               len, offset);
-       if (copied < len)
+       if (copied < len) {
                CWARN("%s: read %i bytes for "DFID
                      " but wanted %u, is size wrong?\n",
                      tsi->tsi_exp->exp_obd->obd_name, copied,
                      PFID(&tsi->tsi_fid), len);
+               /* Ignore partially copied data */
+               copied = 0;
+       }
        EXIT;
 buf_put:
        dt_bufs_put(env, mo, lnb, nr_local);
@@ -1509,9 +1569,15 @@ free:
 unlock:
        dt_read_unlock(env, mo);
        lu_object_put(env, &mo->do_lu);
+out_rnb:
+       rnb->rnb_len = copied;
 out:
-       if (rnb != NULL)
-               rnb->rnb_len = copied;
+       /* Don't fail OPEN request if read-on-open is failed, but drop
+        * a message in log about the error.
+        */
+       if (rc)
+               CDEBUG(D_INFO, "Read-on-open is failed, rc = %d", rc);
+
        RETURN(0);
 }