Whamcloud - gitweb
LU-14393 recovery: reply reconstruction for batched RPCs
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index 8a8a587..5e0ff3c 100644 (file)
@@ -307,7 +307,7 @@ static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
        lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
        lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
        lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
-
+       lrd->lrd_batch_idx       = le32_to_cpu(buf->lrd_batch_idx);
        return 0;
 }
 
@@ -1226,9 +1226,10 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                       struct ptlrpc_request *req,
                       struct thandle *th, bool update_lrd_file)
 {
-       struct lsd_reply_data   *lrd;
-       int     i;
-       int     rc;
+       struct tgt_session_info *tsi = NULL;
+       struct lsd_reply_data *lrd;
+       int i = -1;
+       int rc;
 
        lrd = &trd->trd_reply;
        /* update export last transno */
@@ -1237,32 +1238,45 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
        mutex_unlock(&ted->ted_lcd_lock);
 
-       if (tgt != NULL) {
+       if (!tgt) {
+               trd->trd_index = TRD_INDEX_MEMORY;
+               GOTO(add_reply_data, rc = 0);
+       }
+
+       if (env) {
+               tsi = tgt_ses_info(env);
+               if (tsi->tsi_batch_trd) {
+                       LASSERT(tsi->tsi_batch_env);
+                       trd = tsi->tsi_batch_trd;
+                       i = trd->trd_index;
+               }
+       }
+
+       if (i == -1) {
                /* find a empty slot */
                i = tgt_find_free_reply_slot(tgt);
                if (unlikely(i < 0)) {
-                       CERROR("%s: couldn't find a slot for reply data: "
-                              "rc = %d\n", tgt_name(tgt), i);
+                       CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
+                              tgt_name(tgt), i);
                        RETURN(i);
                }
                trd->trd_index = i;
+       }
 
-               if (update_lrd_file) {
-                       loff_t  off;
+       if (update_lrd_file) {
+               loff_t  off;
 
-                       /* write reply data to disk */
-                       off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
-                       rc = tgt_reply_data_write(env, tgt, lrd, off, th);
-                       if (unlikely(rc != 0)) {
-                               CERROR("%s: can't update %s file: rc = %d\n",
-                                      tgt_name(tgt), REPLY_DATA, rc);
-                               GOTO(free_slot, rc);
-                       }
+               /* write reply data to disk */
+               off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+               rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+               if (unlikely(rc != 0)) {
+                       CERROR("%s: can't update %s file: rc = %d\n",
+                              tgt_name(tgt), REPLY_DATA, rc);
+                       GOTO(free_slot, rc);
                }
-       } else {
-               trd->trd_index = TRD_INDEX_MEMORY;
        }
 
+add_reply_data:
        /* add reply data to target export's reply list */
        mutex_lock(&ted->ted_lcd_lock);
        if (req != NULL) {
@@ -1278,14 +1292,23 @@ static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
                        GOTO(free_slot, rc = -EBADR);
                }
 
-               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
+               if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
+                   !(tsi && tsi->tsi_batch_env &&
+                     trd->trd_reply.lrd_batch_idx > 0))
                        tgt_clean_by_tag(req->rq_export, req->rq_xid,
                                         trd->trd_tag);
        }
-       list_add(&trd->trd_list, &ted->ted_reply_list);
-       ted->ted_reply_cnt++;
-       if (ted->ted_reply_cnt > ted->ted_reply_max)
-               ted->ted_reply_max = ted->ted_reply_cnt;
+
+       /*
+        * For the batched RPC, all sub requests use one common @trd for the
+        * reply data.
+        */
+       if (list_empty(&trd->trd_list)) {
+               list_add(&trd->trd_list, &ted->ted_reply_list);
+               ted->ted_reply_cnt++;
+               if (ted->ted_reply_cnt > ted->ted_reply_max)
+                       ted->ted_reply_max = ted->ted_reply_cnt;
+       }
        mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
@@ -1310,23 +1333,42 @@ int tgt_mk_reply_data(const struct lu_env *env,
                      bool write_update,
                      __u64 transno)
 {
-       struct tg_reply_data    *trd;
-       struct lsd_reply_data   *lrd;
-       __u64                   *pre_versions = NULL;
-       int                     rc;
+       struct tg_reply_data *trd = NULL;
+       struct lsd_reply_data *lrd;
+       __u64 *pre_versions = NULL;
        struct tgt_session_info *tsi = NULL;
+       int rc;
 
-       OBD_ALLOC_PTR(trd);
-       if (unlikely(trd == NULL))
-               RETURN(-ENOMEM);
-
-       if (env != NULL)
+       if (env != NULL) {
                tsi = tgt_ses_info(env);
+               if (tsi->tsi_batch_trd) {
+                       LASSERT(tsi->tsi_batch_env);
+                       trd = tsi->tsi_batch_trd;
+               }
+       }
+
+       if (trd == NULL) {
+               OBD_ALLOC_PTR(trd);
+               if (unlikely(trd == NULL))
+                       RETURN(-ENOMEM);
+
+               INIT_LIST_HEAD(&trd->trd_list);
+       }
 
        /* fill reply data information */
        lrd = &trd->trd_reply;
        lrd->lrd_transno = transno;
-       if (req != NULL) {
+       if (tsi && tsi->tsi_batch_env) {
+               if (tsi->tsi_batch_idx == 0) {
+                       LASSERT(req != NULL);
+                       tsi->tsi_batch_trd = trd;
+                       trd->trd_index = -1;
+                       lrd->lrd_xid = req->rq_xid;
+                       trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+                       lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+               }
+               lrd->lrd_batch_idx = tsi->tsi_batch_idx;
+       } else if (req != NULL) {
                lrd->lrd_xid = req->rq_xid;
                trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
                lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
@@ -1999,7 +2041,7 @@ int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
 
        echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
 
-       if (tsi->tsi_has_trans && !echo_client) {
+       if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
                if (!tsi->tsi_mult_trans) {
                        CDEBUG(D_HA, "More than one transaction %llu\n",
                               tti->tti_transno);