Whamcloud - gitweb
LU-12759 osc: don't re-enable grant shrink on reconnect
[fs/lustre-release.git] / lustre / osc / osc_request.c
index b4ed99b..42c628e 100644 (file)
@@ -751,6 +751,7 @@ static int osc_shrink_grant_interpret(const struct lu_env *env,
        osc_update_grant(cli, body);
 out:
        OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+       aa->aa_oa = NULL;
 
        return rc;
 }
@@ -844,9 +845,9 @@ static int osc_should_shrink_grant(struct client_obd *client)
        if (client->cl_import == NULL)
                return 0;
 
-        if ((client->cl_import->imp_connect_data.ocd_connect_flags &
-             OBD_CONNECT_GRANT_SHRINK) == 0)
-                return 0;
+       if (!OCD_HAS_FLAG(&client->cl_import->imp_connect_data, GRANT_SHRINK) ||
+           client->cl_import->imp_grant_shrink_disabled)
+               return 0;
 
        if (ktime_get_seconds() >= next_shrink - 5) {
                /* Get the current RPC size directly, instead of going via:
@@ -1727,19 +1728,20 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                &req->rq_import->imp_connection->c_peer;
        struct ost_body *body;
        u32 client_cksum = 0;
-        ENTRY;
 
-        if (rc < 0 && rc != -EDQUOT) {
-                DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
-                RETURN(rc);
-        }
+       ENTRY;
 
-        LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
-        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL) {
-                DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
-                RETURN(-EPROTO);
-        }
+       if (rc < 0 && rc != -EDQUOT) {
+               DEBUG_REQ(D_INFO, req, "Failed request: rc = %d", rc);
+               RETURN(rc);
+       }
+
+       LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
+       body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               DEBUG_REQ(D_INFO, req, "cannot unpack body");
+               RETURN(-EPROTO);
+       }
 
        /* set/clear over quota flag for a uid/gid/projid */
        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
@@ -1747,42 +1749,45 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                unsigned qid[LL_MAXQUOTAS] = {
                                         body->oa.o_uid, body->oa.o_gid,
                                         body->oa.o_projid };
-               CDEBUG(D_QUOTA, "setdq for [%u %u %u] with valid %#llx, flags %x\n",
+               CDEBUG(D_QUOTA,
+                      "setdq for [%u %u %u] with valid %#llx, flags %x\n",
                       body->oa.o_uid, body->oa.o_gid, body->oa.o_projid,
                       body->oa.o_valid, body->oa.o_flags);
                       osc_quota_setdq(cli, req->rq_xid, qid, body->oa.o_valid,
                                       body->oa.o_flags);
-        }
+       }
 
-        osc_update_grant(cli, body);
+       osc_update_grant(cli, body);
 
-        if (rc < 0)
-                RETURN(rc);
+       if (rc < 0)
+               RETURN(rc);
 
-        if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
-                client_cksum = aa->aa_oa->o_cksum; /* save for later */
+       if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
+               client_cksum = aa->aa_oa->o_cksum; /* save for later */
 
-        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
-                if (rc > 0) {
-                        CERROR("Unexpected +ve rc %d\n", rc);
-                        RETURN(-EPROTO);
-                }
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
+               if (rc > 0) {
+                       CERROR("%s: unexpected positive size %d\n",
+                              obd_name, rc);
+                       RETURN(-EPROTO);
+               }
 
                if (req->rq_bulk != NULL &&
                    sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
-                        RETURN(-EAGAIN);
+                       RETURN(-EAGAIN);
 
-                if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
-                    check_write_checksum(&body->oa, peer, client_cksum,
+               if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
+                   check_write_checksum(&body->oa, peer, client_cksum,
                                         body->oa.o_cksum, aa))
-                        RETURN(-EAGAIN);
+                       RETURN(-EAGAIN);
 
-                rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
-                                     aa->aa_page_count, aa->aa_ppga);
-                GOTO(out, rc);
-        }
+               rc = check_write_rcs(req, aa->aa_requested_nob,
+                                    aa->aa_nio_count, aa->aa_page_count,
+                                    aa->aa_ppga);
+               GOTO(out, rc);
+       }
 
-        /* The rest of this function executes only for OST_READs */
+       /* The rest of this function executes only for OST_READs */
 
        if (req->rq_bulk == NULL) {
                rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO,
@@ -1792,20 +1797,20 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                /* if unwrap_bulk failed, return -EAGAIN to retry */
                rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
        }
-        if (rc < 0)
-                GOTO(out, rc = -EAGAIN);
+       if (rc < 0)
+               GOTO(out, rc = -EAGAIN);
 
-        if (rc > aa->aa_requested_nob) {
-                CERROR("Unexpected rc %d (%d requested)\n", rc,
-                       aa->aa_requested_nob);
-                RETURN(-EPROTO);
-        }
+       if (rc > aa->aa_requested_nob) {
+               CERROR("%s: unexpected size %d, requested %d\n", obd_name,
+                      rc, aa->aa_requested_nob);
+               RETURN(-EPROTO);
+       }
 
        if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) {
-                CERROR ("Unexpected rc %d (%d transferred)\n",
-                        rc, req->rq_bulk->bd_nob_transferred);
-                return (-EPROTO);
-        }
+               CERROR("%s: unexpected size %d, transferred %d\n", obd_name,
+                      rc, req->rq_bulk->bd_nob_transferred);
+               RETURN(-EPROTO);
+       }
 
        if (req->rq_bulk == NULL) {
                /* short io */
@@ -1899,32 +1904,34 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                        CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
                        rc = 0;
                }
-        } else if (unlikely(client_cksum)) {
-                static int cksum_missed;
-
-                cksum_missed++;
-                if ((cksum_missed & (-cksum_missed)) == cksum_missed)
-                        CERROR("Checksum %u requested from %s but not sent\n",
-                               cksum_missed, libcfs_nid2str(peer->nid));
-        } else {
-                rc = 0;
-        }
+       } else if (unlikely(client_cksum)) {
+               static int cksum_missed;
+
+               cksum_missed++;
+               if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+                       CERROR("%s: checksum %u requested from %s but not sent\n",
+                              obd_name, cksum_missed,
+                              libcfs_nid2str(peer->nid));
+       } else {
+               rc = 0;
+       }
 out:
        if (rc >= 0)
                lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
                                     aa->aa_oa, &body->oa);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int osc_brw_redo_request(struct ptlrpc_request *request,
                                struct osc_brw_async_args *aa, int rc)
 {
-        struct ptlrpc_request *new_req;
-        struct osc_brw_async_args *new_aa;
-        struct osc_async_page *oap;
-        ENTRY;
+       struct ptlrpc_request *new_req;
+       struct osc_brw_async_args *new_aa;
+       struct osc_async_page *oap;
+       ENTRY;
 
+       /* The below message is checked in replay-ost-single.sh test_8ae*/
        DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
                  "redo for recoverable error %d", rc);
 
@@ -2117,6 +2124,7 @@ static int brw_interpret(const struct lu_env *env,
                cl_object_attr_unlock(obj);
        }
        OBD_SLAB_FREE_PTR(aa->aa_oa, osc_obdo_kmem);
+       aa->aa_oa = NULL;
 
        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && rc == 0)
                osc_inc_unstable_pages(req);
@@ -2331,7 +2339,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
        }
        spin_unlock(&cli->cl_loi_list_lock);
 
-       DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
+       DEBUG_REQ(D_INODE, req, "%d pages, aa %p, now %ur/%uw in flight",
                  page_count, aa, cli->cl_r_in_flight,
                  cli->cl_w_in_flight);
        OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, cfs_fail_val);
@@ -2483,9 +2491,8 @@ struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
  * release locks just after they are obtained. */
 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                     __u64 *flags, union ldlm_policy_data *policy,
-                    struct ost_lvb *lvb, int kms_valid,
-                    osc_enqueue_upcall_f upcall, void *cookie,
-                    struct ldlm_enqueue_info *einfo,
+                    struct ost_lvb *lvb, osc_enqueue_upcall_f upcall,
+                    void *cookie, struct ldlm_enqueue_info *einfo,
                     struct ptlrpc_request_set *rqset, int async,
                     bool speculative)
 {
@@ -2503,15 +2510,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
        policy->l_extent.end |= ~PAGE_MASK;
 
-       /*
-        * kms is not valid when either object is completely fresh (so that no
-        * locks are cached), or object was evicted. In the latter case cached
-        * lock cannot be used, because it would prime inode state with
-        * potentially stale LVB.
-        */
-       if (!kms_valid)
-               goto no_match;
-
         /* Next, search for already existing extent locks that will cover us */
         /* If we're trying to read, we also search for an existing PW lock.  The
          * VFS and page cache already protect us locally, so lots of readers/
@@ -2574,7 +2572,6 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                }
        }
 
-no_match:
        if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK))
                RETURN(-ENOLCK);
 
@@ -2641,9 +2638,10 @@ no_match:
        RETURN(rc);
 }
 
-int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
-                  enum ldlm_type type, union ldlm_policy_data *policy,
-                  enum ldlm_mode mode, __u64 *flags, void *data,
+int osc_match_base(const struct lu_env *env, struct obd_export *exp,
+                  struct ldlm_res_id *res_id, enum ldlm_type type,
+                  union ldlm_policy_data *policy, enum ldlm_mode mode,
+                  __u64 *flags, struct osc_object *obj,
                   struct lustre_handle *lockh, int unref)
 {
        struct obd_device *obd = exp->exp_obd;
@@ -2671,11 +2669,19 @@ int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
        if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
                RETURN(rc);
 
-       if (data != NULL) {
+       if (obj != NULL) {
                struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
                LASSERT(lock != NULL);
-               if (!osc_set_lock_data(lock, data)) {
+               if (osc_set_lock_data(lock, obj)) {
+                       lock_res_and_lock(lock);
+                       if (!ldlm_is_lvb_cached(lock)) {
+                               LASSERT(lock->l_ast_data == obj);
+                               osc_lock_lvb_update(env, obj, lock, NULL);
+                               ldlm_set_lvb_cached(lock);
+                       }
+                       unlock_res_and_lock(lock);
+               } else {
                        ldlm_lock_decref(lockh, rc);
                        rc = 0;
                }