Whamcloud - gitweb
LU-14876 out: don't connect to busy MDS-MDS export
[fs/lustre-release.git] / lustre / target / out_handler.c
index 6f5dd68..57c0d91 100644 (file)
@@ -52,7 +52,7 @@ static void out_reconstruct(const struct lu_env *env, struct dt_device *dt,
                            struct object_update_reply *reply,
                            int index)
 {
-       CDEBUG(D_INFO, "%s: fork reply reply %p index %d: rc = %d\n",
+       CDEBUG(D_HA, "%s: fork reply reply %p index %d: rc = %d\n",
               dt_obd_name(dt), reply, index, 0);
 
        object_update_result_insert(reply, NULL, 0, index, 0);
@@ -64,16 +64,10 @@ typedef void (*out_reconstruct_t)(const struct lu_env *env,
                                  struct object_update_reply *reply,
                                  int index);
 
-static inline int out_check_resent(const struct lu_env *env,
-                                  struct dt_device *dt,
-                                  struct dt_object *obj,
-                                  struct ptlrpc_request *req,
-                                  out_reconstruct_t reconstruct,
-                                  struct object_update_reply *reply,
-                                  int index)
+static inline bool out_check_resent(struct ptlrpc_request *req)
 {
        if (likely(!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)))
-               return 0;
+               return false;
 
        if (req_xid_is_last(req)) {
                struct lsd_client_data *lcd;
@@ -89,14 +83,12 @@ static inline int out_check_resent(const struct lu_env *env,
                lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
                lustre_msg_set_status(req->rq_repmsg, req->rq_status);
 
-               DEBUG_REQ(D_RPCTRACE, req, "restoring resent RPC");
-
-               reconstruct(env, dt, obj, reply, index);
-               return 1;
+               DEBUG_REQ(D_HA, req, "reconstruct resent RPC");
+               return true;
        }
-       DEBUG_REQ(D_HA, req, "no reply for RESENT req (have %lld)",
-                req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
-       return 0;
+       DEBUG_REQ(D_HA, req, "reprocess RESENT req, last_xid is %lld",
+                 req->rq_export->exp_target_data.ted_lcd->lcd_last_xid);
+       return false;
 }
 
 static int out_create(struct tgt_session_info *tsi)
@@ -121,7 +113,7 @@ static int out_create(struct tgt_session_info *tsi)
                RETURN(PTR_ERR(wobdo));
        }
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                lustre_swab_obdo(wobdo);
        lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
@@ -134,7 +126,7 @@ static int out_create(struct tgt_session_info *tsi)
                               tgt_name(tsi->tsi_tgt), PTR_ERR(fid));
                        RETURN(PTR_ERR(fid));
                }
-               if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+               if (req_capsule_req_need_swab(tsi->tsi_pill))
                        lustre_swab_lu_fid(fid);
                if (!fid_is_sane(fid)) {
                        CERROR("%s: invalid fid "DFID": rc = %d\n",
@@ -177,7 +169,7 @@ static int out_attr_set(struct tgt_session_info *tsi)
        attr->la_valid = 0;
        attr->la_valid = 0;
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                lustre_swab_obdo(wobdo);
        lustre_get_wire_obdo(NULL, lobdo, wobdo);
        la_from_obdo(attr, lobdo, lobdo->o_valid);
@@ -443,7 +435,7 @@ static int out_xattr_set(struct tgt_session_info *tsi)
                RETURN(PTR_ERR(tmp));
        }
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                __swab32s(tmp);
        flag = *tmp;
 
@@ -540,7 +532,7 @@ static int out_index_insert(struct tgt_session_info *tsi)
                RETURN(PTR_ERR(fid));
        }
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                lustre_swab_lu_fid(fid);
 
        if (!fid_is_sane(fid)) {
@@ -556,7 +548,7 @@ static int out_index_insert(struct tgt_session_info *tsi)
                RETURN(PTR_ERR(ptype));
        }
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                __swab32s(ptype);
 
        rec->rec_fid = fid;
@@ -657,7 +649,7 @@ static int out_write(struct tgt_session_info *tsi)
                RETURN(PTR_ERR(tmp));
        }
 
-       if (ptlrpc_req_need_swab(tsi->tsi_pill->rc_req))
+       if (req_capsule_req_need_swab(tsi->tsi_pill))
                __swab64s(tmp);
        pos = *tmp;
 
@@ -973,6 +965,8 @@ int out_handle(struct tgt_session_info *tsi)
        int                             rc1 = 0;
        int                             ouh_size, reply_size;
        int                             updates;
+       bool need_reconstruct;
+
        ENTRY;
 
        req_capsule_set(pill, &RQF_OUT_UPDATE);
@@ -996,7 +990,7 @@ int out_handle(struct tgt_session_info *tsi)
        if (update_buf_count == 0)
                RETURN(err_serious(-EPROTO));
 
-       OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
+       OBD_ALLOC_PTR_ARRAY(update_bufs, update_buf_count);
        if (update_bufs == NULL)
                RETURN(err_serious(-ENOMEM));
 
@@ -1017,8 +1011,7 @@ int out_handle(struct tgt_session_info *tsi)
 
                desc = ptlrpc_prep_bulk_exp(pill->rc_req, page_count,
                                            PTLRPC_BULK_OPS_COUNT,
-                                           PTLRPC_BULK_GET_SINK |
-                                           PTLRPC_BULK_BUF_KIOV,
+                                           PTLRPC_BULK_GET_SINK,
                                            MDS_BULK_PORTAL,
                                            &ptlrpc_bulk_kiov_nopin_ops);
                if (desc == NULL)
@@ -1055,8 +1048,8 @@ int out_handle(struct tgt_session_info *tsi)
                int                              j;
 
                our = update_bufs[i];
-               if (ptlrpc_req_need_swab(pill->rc_req))
-                       lustre_swab_object_update_request(our);
+               if (req_capsule_req_need_swab(pill))
+                       lustre_swab_object_update_request(our, 0);
 
                if (our->ourq_magic != UPDATE_REQUEST_MAGIC) {
                        CERROR("%s: invalid update buffer magic %x"
@@ -1072,7 +1065,7 @@ int out_handle(struct tgt_session_info *tsi)
                        update = object_update_request_get(our, j, NULL);
                        if (update == NULL)
                                GOTO(out, rc = err_serious(-EPROTO));
-                       if (ptlrpc_req_need_swab(pill->rc_req))
+                       if (req_capsule_req_need_swab(pill))
                                lustre_swab_object_update(update);
 
                        if (!fid_is_sane(&update->ou_fid)) {
@@ -1116,6 +1109,8 @@ int out_handle(struct tgt_session_info *tsi)
        tti->tti_u.update.tti_update_reply = reply;
        tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
 
+       need_reconstruct = out_check_resent(pill->rc_req);
+
        /* Walk through updates in the request to execute them */
        for (i = 0; i < update_buf_count; i++) {
                struct tgt_handler      *h;
@@ -1163,12 +1158,19 @@ int out_handle(struct tgt_session_info *tsi)
 
                        /* Check resend case only for modifying RPC */
                        if (h->th_flags & IS_MUTABLE) {
-                               struct ptlrpc_request *req = tgt_ses_req(tsi);
+                               /* sanity check for last XID changing */
+                               if (unlikely(!need_reconstruct &&
+                                            req_xid_is_last(pill->rc_req))) {
+                                       DEBUG_REQ(D_ERROR, pill->rc_req,
+                                                 "unexpected last XID change");
+                                       GOTO(next, rc = -EINVAL);
+                               }
 
-                               if (out_check_resent(env, dt, dt_obj, req,
-                                                    out_reconstruct, reply,
-                                                    reply_index))
+                               if (need_reconstruct) {
+                                       out_reconstruct(env, dt, dt_obj, reply,
+                                                       reply_index);
                                        GOTO(next, rc = 0);
+                               }
 
                                if (dt->dd_rdonly)
                                        GOTO(next, rc = -EROFS);
@@ -1177,6 +1179,10 @@ int out_handle(struct tgt_session_info *tsi)
                        /* start transaction for modification RPC only */
                        if (h->th_flags & IS_MUTABLE && current_batchid == -1) {
                                current_batchid = update->ou_batchid;
+
+                               if (reply_index == 0)
+                                       CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
+
                                rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
                                if (rc != 0)
                                        GOTO(next, rc);
@@ -1232,7 +1238,7 @@ out_free:
                        }
                }
 
-               OBD_FREE(update_bufs, sizeof(*update_bufs) * update_buf_count);
+               OBD_FREE_PTR_ARRAY(update_bufs, update_buf_count);
        }
 
        if (desc != NULL)