Whamcloud - gitweb
LU-12635 build: Support for gcc -Wimplicit-fallthrough
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index e44346a..bcb4ff9 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -44,6 +40,8 @@
 
 #include "tgt_internal.h"
 
+/** version recovery epoch */
+#define LR_EPOCH_BITS  32
 
 /* Allocate a bitmap for a chunk of reply data slots */
 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
@@ -150,6 +148,13 @@ static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
        int chunk;
        int b;
 
+       if (lut->lut_obd->obd_stopping)
+               /*
+                * in case of failover keep the bit set in order to
+                * avoid overwriting slots in reply_data which might
+                * be required by resent rpcs
+                */
+               return 0;
        chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
        b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
 
@@ -218,6 +223,9 @@ static int tgt_reply_header_write(const struct lu_env *env,
                tgt->lut_obd->obd_name, REPLY_DATA,
                lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
        buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
        buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
@@ -387,6 +395,8 @@ int tgt_client_alloc(struct obd_export *exp)
 
        spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
        INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
+       spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
+       INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
 
        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
        if (exp->exp_target_data.ted_lcd == NULL)
@@ -410,6 +420,8 @@ void tgt_client_free(struct obd_export *exp)
 
        LASSERT(exp != exp->exp_obd->obd_self_export);
 
+       tgt_fmd_cleanup(exp);
+
        /* free reply data */
        mutex_lock(&ted->ted_lcd_lock);
        list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
@@ -448,8 +460,22 @@ void tgt_client_free(struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_free);
 
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
-                        struct lsd_client_data *lcd, loff_t *off, int index)
+static inline void tgt_check_lcd(const char *obd_name, int index,
+                                struct lsd_client_data *lcd)
+{
+       size_t uuid_size = sizeof(lcd->lcd_uuid);
+
+       if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
+               lcd->lcd_uuid[uuid_size - 1] = '\0';
+
+               LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
+                              lcd->lcd_uuid, obd_name, index);
+       }
+}
+
+static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
+                               struct lsd_client_data *lcd,
+                               loff_t *off, int index)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -457,7 +483,7 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        tti_buf_lcd(tti);
        rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
        if (rc == 0) {
-               check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
+               tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
                lcd_le_to_cpu(&tti->tti_lcd, lcd);
                lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
                lcd->lcd_last_close_result =
@@ -475,9 +501,10 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        return rc;
 }
 
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct lsd_client_data *lcd, loff_t *off,
-                         struct thandle *th)
+static int tgt_client_data_write(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                struct lsd_client_data *lcd,
+                                loff_t *off, struct thandle *th)
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -492,6 +519,59 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
        return dt_record_write(env, dto, &tti->tti_buf, off, th);
 }
 
+struct tgt_new_client_callback {
+       struct dt_txn_commit_cb  lncc_cb;
+       struct obd_export       *lncc_exp;
+};
+
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+                             struct dt_txn_commit_cb *cb, int err)
+{
+       struct tgt_new_client_callback *ccb;
+
+       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
+
+       LASSERT(ccb->lncc_exp->exp_obd);
+
+       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+              ccb->lncc_exp->exp_obd->obd_name,
+              ccb->lncc_exp->exp_client_uuid.uuid);
+
+       spin_lock(&ccb->lncc_exp->exp_lock);
+
+       ccb->lncc_exp->exp_need_sync = 0;
+
+       spin_unlock(&ccb->lncc_exp->exp_lock);
+       class_export_cb_put(ccb->lncc_exp);
+
+       OBD_FREE_PTR(ccb);
+}
+
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+       struct tgt_new_client_callback  *ccb;
+       struct dt_txn_commit_cb         *dcb;
+       int                              rc;
+
+       OBD_ALLOC_PTR(ccb);
+       if (ccb == NULL)
+               return -ENOMEM;
+
+       ccb->lncc_exp = class_export_cb_get(exp);
+
+       dcb = &ccb->lncc_cb;
+       dcb->dcb_func = tgt_cb_new_client;
+       INIT_LIST_HEAD(&dcb->dcb_linkage);
+       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+
+       rc = dt_trans_cb_add(th, dcb);
+       if (rc) {
+               class_export_cb_put(exp);
+               OBD_FREE_PTR(ccb);
+       }
+       return rc;
+}
+
 /**
  * Update client data in last_rcvd
  */
@@ -512,12 +592,14 @@ static int tgt_client_data_update(const struct lu_env *env,
                RETURN(-EINVAL);
        }
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
        tti_buf_lcd(tti);
-       mutex_lock(&ted->ted_lcd_lock);
        rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     &tti->tti_buf,
                                     ted->ted_lr_off, th);
@@ -527,6 +609,9 @@ static int tgt_client_data_update(const struct lu_env *env,
        rc = dt_trans_start_local(env, tgt->lut_bottom, th);
        if (rc)
                GOTO(out, rc);
+
+       mutex_lock(&ted->ted_lcd_lock);
+
        /*
         * Until this operations will be committed the sync is needed
         * for this export. This should be done _after_ starting the
@@ -545,9 +630,11 @@ static int tgt_client_data_update(const struct lu_env *env,
 
        tti->tti_off = ted->ted_lr_off;
        rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+
+       mutex_unlock(&ted->ted_lcd_lock);
+
        EXIT;
 out:
-       mutex_unlock(&ted->ted_lcd_lock);
        dt_trans_stop(env, tgt->lut_bottom, th);
        CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
               "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
@@ -556,7 +643,7 @@ out:
        return rc;
 }
 
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
+static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -574,8 +661,8 @@ int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
         return rc;
 }
 
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct thandle *th)
+static int tgt_server_data_write(const struct lu_env *env,
+                                struct lu_target *tgt, struct thandle *th)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -619,6 +706,9 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
        tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
        spin_unlock(&tgt->lut_translock);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -646,8 +736,8 @@ out:
 }
 EXPORT_SYMBOL(tgt_server_data_update);
 
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
-                          loff_t size)
+static int tgt_truncate_last_rcvd(const struct lu_env *env,
+                                 struct lu_target *tgt, loff_t size)
 {
        struct dt_object *dt = tgt->lut_last_rcvd;
        struct thandle   *th;
@@ -656,6 +746,9 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
 
        ENTRY;
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        attr.la_size = size;
        attr.la_valid = LA_SIZE;
 
@@ -718,7 +811,7 @@ void tgt_boot_epoch_update(struct lu_target *tgt)
        }
 
        spin_lock(&tgt->lut_translock);
-       start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
+       start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
        tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
        tgt->lut_lsd.lsd_start_epoch = start_epoch;
        spin_unlock(&tgt->lut_translock);
@@ -800,20 +893,25 @@ static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
                spin_unlock(&ccb->llcc_tgt->lut_translock);
 
                ptlrpc_commit_replies(ccb->llcc_exp);
-               tgt_cancel_slc_locks(ccb->llcc_transno);
+               tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
        } else {
                spin_unlock(&ccb->llcc_tgt->lut_translock);
        }
+
+       CDEBUG(D_HA, "%s: transno %lld is committed\n",
+              ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
+
 out:
        class_export_cb_put(ccb->llcc_exp);
-       if (ccb->llcc_transno)
-               CDEBUG(D_HA, "%s: transno %lld is committed\n",
-                      ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
        OBD_FREE_PTR(ccb);
 }
 
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
-                          struct obd_export *exp, __u64 transno)
+/**
+ * Add commit callback function, it returns a non-zero value to inform
+ * caller to use sync transaction if necessary.
+ */
+static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+                                 struct obd_export *exp, __u64 transno)
 {
        struct tgt_last_committed_callback      *ccb;
        struct dt_txn_commit_cb                 *dcb;
@@ -842,60 +940,9 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
                /* report failure to force synchronous operation */
                return -EPERM;
 
-       return rc;
-}
-
-struct tgt_new_client_callback {
-       struct dt_txn_commit_cb  lncc_cb;
-       struct obd_export       *lncc_exp;
-};
-
-static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
-                             struct dt_txn_commit_cb *cb, int err)
-{
-       struct tgt_new_client_callback *ccb;
-
-       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
-
-       LASSERT(ccb->lncc_exp->exp_obd);
-
-       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
-              ccb->lncc_exp->exp_obd->obd_name,
-              ccb->lncc_exp->exp_client_uuid.uuid);
-
-       spin_lock(&ccb->lncc_exp->exp_lock);
-
-       ccb->lncc_exp->exp_need_sync = 0;
-
-       spin_unlock(&ccb->lncc_exp->exp_lock);
-       class_export_cb_put(ccb->lncc_exp);
-
-       OBD_FREE_PTR(ccb);
-}
-
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
-{
-       struct tgt_new_client_callback  *ccb;
-       struct dt_txn_commit_cb         *dcb;
-       int                              rc;
-
-       OBD_ALLOC_PTR(ccb);
-       if (ccb == NULL)
-               return -ENOMEM;
-
-       ccb->lncc_exp = class_export_cb_get(exp);
-
-       dcb = &ccb->lncc_cb;
-       dcb->dcb_func = tgt_cb_new_client;
-       INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
-
-       rc = dt_trans_cb_add(th, dcb);
-       if (rc) {
-               class_export_cb_put(exp);
-               OBD_FREE_PTR(ccb);
-       }
-       return rc;
+       /* if exp_need_sync is set, return non-zero value to force
+        * a sync transaction. */
+       return rc ? rc : exp->exp_need_sync;
 }
 
 /**
@@ -1073,6 +1120,9 @@ int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
        if (exp->exp_flags & OBD_OPT_FAILOVER)
                RETURN(0);
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
+               RETURN(0);
+
        /* Make sure the server's last_transno is up to date.
         * This should be done before zeroing client slot so last_transno will
         * be in server data or in client data in case of failure */
@@ -1334,7 +1384,11 @@ static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        if (!lw_client) {
                tti->tti_off = ted->ted_lr_off;
-               rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+               if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
+                       rc = -EIO;
+               else
+                       rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
+                                                  &tti->tti_off, th);
                if (rc < 0) {
                        mutex_unlock(&ted->ted_lcd_lock);
                        RETURN(rc);
@@ -1405,6 +1459,9 @@ static int tgt_clients_data_init(const struct lu_env *env,
 
        ENTRY;
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
@@ -1604,12 +1661,23 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
                        RETURN(rc);
                }
                if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
-                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
-                                          "using the wrong disk %s. Were the"
-                                          " /dev/ assignments rearranged?\n",
-                                          tgt->lut_obd->obd_uuid.uuid,
-                                          lsd->lsd_uuid);
-                       RETURN(-EINVAL);
+                       if (tgt->lut_bottom->dd_rdonly) {
+                               /* Such difference may be caused by mounting
+                                * up snapshot with new fsname under rd_only
+                                * mode. But even if it was NOT, it will not
+                                * damage the system because of "rd_only". */
+                               memcpy(lsd->lsd_uuid,
+                                      tgt->lut_obd->obd_uuid.uuid,
+                                      sizeof(lsd->lsd_uuid));
+                       } else {
+                               LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+                                                  "OBD %s using the wrong "
+                                                  "disk %s. Were the /dev/ "
+                                                  "assignments rearranged?\n",
+                                                  tgt->lut_obd->obd_uuid.uuid,
+                                                  lsd->lsd_uuid);
+                               RETURN(-EINVAL);
+                       }
                }
 
                if (lsd->lsd_osd_index != index) {
@@ -1726,6 +1794,14 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        struct dt_object        *dto;
        int                      rc;
 
+       /* For readonly case, the caller should have got failure
+        * when start the transaction. If the logic comes here,
+        * there must be something wrong. */
+       if (unlikely(tgt->lut_bottom->dd_rdonly)) {
+               dump_stack();
+               LBUG();
+       }
+
        /* if there is no session, then this transaction is not result of
         * request processing but some local operation */
        if (env->le_ses == NULL)
@@ -1745,13 +1821,10 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
                 * because a replay slot has not been assigned.  This should be
                 * replaced by dmu_tx_hold_append() when available.
                 */
-               tti->tti_off = atomic_read(&tgt->lut_num_clients) * 8 *
-                               sizeof(struct lsd_reply_data);
                tti->tti_buf.lb_buf = NULL;
                tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
                dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
-               rc = dt_declare_record_write(env, dto, &tti->tti_buf,
-                                            tti->tti_off, th);
+               rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
                if (rc)
                        return rc;
        } else {