Whamcloud - gitweb
LU-3749 recovery: save versions from reply into lastrcvd
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index d9021f0..2694aaa 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -93,7 +93,7 @@ void tgt_client_free(struct obd_export *exp)
                return;
        /* Clear bit when lcd is freed */
        LASSERT(lut->lut_client_bitmap);
-       if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
+       if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
                CERROR("%s: client %u bit already clear in bitmap\n",
                       exp->exp_obd->obd_name, ted->ted_lr_idx);
                LBUG();
@@ -112,6 +112,9 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        if (rc == 0) {
                check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
                lcd_le_to_cpu(&tti->tti_lcd, lcd);
+               lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
+               lcd->lcd_last_close_result =
+                       ptlrpc_status_ntoh(lcd->lcd_last_close_result);
        }
 
        CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
@@ -132,6 +135,9 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
 
+       lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
+       lcd->lcd_last_close_result =
+               ptlrpc_status_hton(lcd->lcd_last_close_result);
        lcd_cpu_to_le(lcd, &tti->tti_lcd);
        tti_buf_lcd(tti);
 
@@ -176,9 +182,9 @@ int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
                /* can't add callback, do sync now */
                th->th_sync = 1;
        } else {
-               cfs_spin_lock(&exp->exp_lock);
+               spin_lock(&exp->exp_lock);
                exp->exp_need_sync = 1;
-               cfs_spin_unlock(&exp->exp_lock);
+               spin_unlock(&exp->exp_lock);
        }
 
        tti->tti_off = ted->ted_lr_off;
@@ -253,9 +259,9 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
               tgt->lut_last_transno);
 
        /* Always save latest transno to keep it fresh */
-       cfs_spin_lock(&tgt->lut_translock);
+       spin_lock(&tgt->lut_translock);
        tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
-       cfs_spin_unlock(&tgt->lut_translock);
+       spin_unlock(&tgt->lut_translock);
 
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
@@ -355,20 +361,20 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
                return;
        }
 
-       cfs_spin_lock(&tgt->lut_translock);
+       spin_lock(&tgt->lut_translock);
        start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
        tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
        tgt->lut_lsd.lsd_start_epoch = start_epoch;
-       cfs_spin_unlock(&tgt->lut_translock);
+       spin_unlock(&tgt->lut_translock);
 
        CFS_INIT_LIST_HEAD(&client_list);
        /**
         * The recovery is not yet finished and final queue can still be updated
         * with resend requests. Move final list to separate one for processing
         */
-       cfs_spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
+       spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
        cfs_list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
-       cfs_spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+       spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
 
        /**
         * go through list of exports participated in recovery and
@@ -380,9 +386,9 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
                        tgt_client_epoch_update(&env, req->rq_export);
        }
        /** return list back at once */
-       cfs_spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
+       spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
        cfs_list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
-       cfs_spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+       spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
        /** update server epoch */
        tgt_server_data_update(&env, tgt, 1);
        lu_env_fini(&env);
@@ -409,17 +415,17 @@ void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
        LASSERT(ccb->llcc_tgt != NULL);
        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
 
-       cfs_spin_lock(&ccb->llcc_tgt->lut_translock);
+       spin_lock(&ccb->llcc_tgt->lut_translock);
        if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
                ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
 
        LASSERT(ccb->llcc_exp);
        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
-               cfs_spin_unlock(&ccb->llcc_tgt->lut_translock);
+               spin_unlock(&ccb->llcc_tgt->lut_translock);
                ptlrpc_commit_replies(ccb->llcc_exp);
        } else {
-               cfs_spin_unlock(&ccb->llcc_tgt->lut_translock);
+               spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
        class_export_cb_put(ccb->llcc_exp);
        if (ccb->llcc_transno)
@@ -455,7 +461,7 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
                OBD_FREE_PTR(ccb);
        }
 
-       if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+       if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                /* report failure to force synchronous operation */
                return -EPERM;
 
@@ -481,9 +487,9 @@ void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
               ccb->lncc_exp->exp_obd->obd_name,
               ccb->lncc_exp->exp_client_uuid.uuid);
 
-       cfs_spin_lock(&ccb->lncc_exp->exp_lock);
+       spin_lock(&ccb->lncc_exp->exp_lock);
        ccb->lncc_exp->exp_need_sync = 0;
-       cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
+       spin_unlock(&ccb->lncc_exp->exp_lock);
        class_export_cb_put(ccb->lncc_exp);
 
        OBD_FREE_PTR(ccb);
@@ -533,15 +539,15 @@ int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
        if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
                RETURN(0);
 
-       cfs_mutex_init(&ted->ted_lcd_lock);
+       mutex_init(&ted->ted_lcd_lock);
 
-       if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+       if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
         * there's no need for extra complication here
         */
-       idx = cfs_find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
+       idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
 repeat:
        if (idx >= LR_MAX_CLIENTS ||
            OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
@@ -549,8 +555,8 @@ repeat:
                       tgt->lut_obd->obd_name,  idx);
                RETURN(-EOVERFLOW);
        }
-       if (cfs_test_and_set_bit(idx, tgt->lut_client_bitmap)) {
-               idx = cfs_find_next_zero_bit(tgt->lut_client_bitmap,
+       if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
+               idx = find_next_zero_bit(tgt->lut_client_bitmap,
                                             LR_MAX_CLIENTS, idx);
                goto repeat;
        }
@@ -599,10 +605,10 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
        LASSERTF(idx >= 0, "%d\n", idx);
 
        if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
-           (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
-       if (cfs_test_and_set_bit(idx, tgt->lut_client_bitmap)) {
+       if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
                CERROR("%s: client %d: bit already set in bitmap!!\n",
                       tgt->lut_obd->obd_name,  idx);
                LBUG();
@@ -615,7 +621,7 @@ int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
        ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
                          idx * tgt->lut_lsd.lsd_client_size;
 
-       cfs_mutex_init(&ted->ted_lcd_lock);
+       mutex_init(&ted->ted_lcd_lock);
 
        LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
 
@@ -636,7 +642,7 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
        if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
                    (char *)tgt->lut_obd->obd_uuid.uuid) ||
-           (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
        CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
@@ -645,7 +651,7 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 
        /* Clear the bit _after_ zeroing out the client so we don't
           race with filter_client_add and zero out new clients.*/
-       if (!cfs_test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
+       if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
                CERROR("%s: client %u: bit already clear in bitmap!!\n",
                       tgt->lut_obd->obd_name, ted->ted_lr_idx);
                LBUG();
@@ -666,10 +672,10 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
                RETURN(rc);
        }
 
-       cfs_mutex_lock(&ted->ted_lcd_lock);
+       mutex_lock(&ted->ted_lcd_lock);
        memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
        rc = tgt_client_data_update(env, exp);
-       cfs_mutex_unlock(&ted->ted_lcd_lock);
+       mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
               "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
@@ -678,3 +684,158 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        RETURN(rc);
 }
 EXPORT_SYMBOL(tgt_client_del);
+
+/*
+ * last_rcvd & last_committed update callbacks
+ */
+int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
+                        struct dt_object *obj, __u64 opdata,
+                        struct thandle *th, struct ptlrpc_request *req)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct tg_export_data   *ted;
+       __u64                   *transno_p;
+       int                      rc = 0;
+       bool                     lw_client, update = false;
+
+       ENTRY;
+
+       if (tsi->tsi_has_trans) {
+               /* XXX: currently there are allowed cases, but the wrong cases
+                * are also possible, so better check is needed here */
+               CDEBUG(D_INFO, "More than one transaction "LPU64"\n",
+                      tti->tti_transno);
+               return 0;
+       }
+
+       tsi->tsi_has_trans = 1;
+       /* that can be OUT target and we need tgt_session_info */
+       if (req == NULL) {
+               req = tgt_ses_req(tsi);
+               if (req == NULL) /* echo client case */
+                       RETURN(0);
+       }
+
+       ted = &req->rq_export->exp_target_data;
+
+       lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+
+       tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+       spin_lock(&tgt->lut_translock);
+       if (th->th_result != 0) {
+               if (tti->tti_transno != 0) {
+                       CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
+                              tgt_name(tgt), tti->tti_transno, th->th_result);
+               }
+       } else if (tti->tti_transno == 0) {
+               tti->tti_transno = ++tgt->lut_last_transno;
+       } else {
+               /* should be replay */
+               if (tti->tti_transno > tgt->lut_last_transno)
+                       tgt->lut_last_transno = tti->tti_transno;
+       }
+       spin_unlock(&tgt->lut_translock);
+
+       /** VBR: set new versions */
+       if (th->th_result == 0 && obj != NULL)
+               dt_version_set(env, obj, tti->tti_transno, th);
+
+       /* filling reply data */
+       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+              tti->tti_transno, tgt->lut_obd->obd_last_committed);
+
+       req->rq_transno = tti->tti_transno;
+       lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
+
+       /* if can't add callback, do sync write */
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
+                                               tti->tti_transno);
+
+       if (lw_client) {
+               /* All operations performed by LW clients are synchronous and
+                * we store the committed transno in the last_rcvd header */
+               spin_lock(&tgt->lut_translock);
+               if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
+                       tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
+                       update = true;
+               }
+               spin_unlock(&tgt->lut_translock);
+               /* Although lightweight (LW) connections have no slot in
+                * last_rcvd, we still want to maintain the in-memory
+                * lsd_client_data structure in order to properly handle reply
+                * reconstruction. */
+       } else if (ted->ted_lr_off <= 0) {
+               CERROR("%s: client idx %d has offset %lld\n",
+                      tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
+               RETURN(-EINVAL);
+       }
+
+       /* if the export has already been disconnected, we have no last_rcvd
+        * slot, update server data with latest transno then */
+       if (ted->ted_lcd == NULL) {
+               CWARN("commit transaction for disconnected client %s: rc %d\n",
+                     req->rq_export->exp_client_uuid.uuid, rc);
+               GOTO(srv_update, rc = 0);
+       }
+
+       mutex_lock(&ted->ted_lcd_lock);
+       LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
+               transno_p = &ted->ted_lcd->lcd_last_close_transno;
+               ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_close_result = th->th_result;
+       } else {
+               /* VBR: save versions in last_rcvd for reconstruct. */
+               __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+
+               if (pre_versions) {
+                       ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
+                       ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
+                       ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
+                       ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
+               }
+               transno_p = &ted->ted_lcd->lcd_last_transno;
+               ted->ted_lcd->lcd_last_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_result = th->th_result;
+               /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
+                * see struct ldlm_reply->lock_policy_res1; */
+               ted->ted_lcd->lcd_last_data = opdata;
+       }
+
+       /* Update transno in slot only if non-zero number, i.e. no errors */
+       if (likely(tti->tti_transno != 0)) {
+               if (*transno_p > tti->tti_transno) {
+                       CERROR("%s: trying to overwrite bigger transno:"
+                              "on-disk: "LPU64", new: "LPU64" replay: %d. "
+                              "see LU-617.\n", tgt_name(tgt), *transno_p,
+                              tti->tti_transno, req_is_replay(req));
+                       if (req_is_replay(req)) {
+                               spin_lock(&req->rq_export->exp_lock);
+                               req->rq_export->exp_vbr_failed = 1;
+                               spin_unlock(&req->rq_export->exp_lock);
+                       }
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
+               }
+               *transno_p = tti->tti_transno;
+       }
+
+       if (!lw_client) {
+               tti->tti_off = ted->ted_lr_off;
+               rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+               if (rc < 0) {
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(rc);
+               }
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+       EXIT;
+srv_update:
+       if (update)
+               rc = tgt_server_data_write(env, tgt, th);
+       return rc;
+}
+EXPORT_SYMBOL(tgt_last_rcvd_update);
+