*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include "tgt_internal.h"
+/** version recovery epoch */
+#define LR_EPOCH_BITS 32
/* Allocate a bitmap for a chunk of reply data slots */
static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
tgt->lut_obd->obd_name, REPLY_DATA,
lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
}
EXPORT_SYMBOL(tgt_client_free);
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
- struct lsd_client_data *lcd, loff_t *off, int index)
+static inline void tgt_check_lcd(const char *obd_name, int index,
+ struct lsd_client_data *lcd)
+{
+ size_t uuid_size = sizeof(lcd->lcd_uuid);
+
+ if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
+ lcd->lcd_uuid[uuid_size - 1] = '\0';
+
+ LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
+ lcd->lcd_uuid, obd_name, index);
+ }
+}
+
+static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
+ struct lsd_client_data *lcd,
+ loff_t *off, int index)
{
struct tgt_thread_info *tti = tgt_th_info(env);
int rc;
tti_buf_lcd(tti);
rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
if (rc == 0) {
- check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
+ tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
lcd_le_to_cpu(&tti->tti_lcd, lcd);
lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
lcd->lcd_last_close_result =
ptlrpc_status_ntoh(lcd->lcd_last_close_result);
}
- CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
- ", last_xid = "LPU64", last_result = %u, last_data = %u, "
- "last_close_transno = "LPU64", last_close_xid = "LPU64", "
+ CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
+ ", last_xid = %llu, last_result = %u, last_data = %u, "
+ "last_close_transno = %llu, last_close_xid = %llu, "
"last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
*off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
lcd->lcd_last_result, lcd->lcd_last_data,
return rc;
}
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
- struct lsd_client_data *lcd, loff_t *off,
- struct thandle *th)
+static int tgt_client_data_write(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct lsd_client_data *lcd,
+ loff_t *off, struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
struct dt_object *dto;
return dt_record_write(env, dto, &tti->tti_buf, off, th);
}
+struct tgt_new_client_callback {
+ struct dt_txn_commit_cb lncc_cb;
+ struct obd_export *lncc_exp;
+};
+
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct tgt_new_client_callback *ccb;
+
+ ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
+
+ LASSERT(ccb->lncc_exp->exp_obd);
+
+ CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+ ccb->lncc_exp->exp_obd->obd_name,
+ ccb->lncc_exp->exp_client_uuid.uuid);
+
+ spin_lock(&ccb->lncc_exp->exp_lock);
+
+ ccb->lncc_exp->exp_need_sync = 0;
+
+ spin_unlock(&ccb->lncc_exp->exp_lock);
+ class_export_cb_put(ccb->lncc_exp);
+
+ OBD_FREE_PTR(ccb);
+}
+
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+ struct tgt_new_client_callback *ccb;
+ struct dt_txn_commit_cb *dcb;
+ int rc;
+
+ OBD_ALLOC_PTR(ccb);
+ if (ccb == NULL)
+ return -ENOMEM;
+
+ ccb->lncc_exp = class_export_cb_get(exp);
+
+ dcb = &ccb->lncc_cb;
+ dcb->dcb_func = tgt_cb_new_client;
+ INIT_LIST_HEAD(&dcb->dcb_linkage);
+ strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+
+ rc = dt_trans_cb_add(th, dcb);
+ if (rc) {
+ class_export_cb_put(exp);
+ OBD_FREE_PTR(ccb);
+ }
+ return rc;
+}
+
/**
* Update client data in last_rcvd
*/
RETURN(-EINVAL);
}
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
th = dt_trans_create(env, tgt->lut_bottom);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
tti_buf_lcd(tti);
- mutex_lock(&ted->ted_lcd_lock);
rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
&tti->tti_buf,
ted->ted_lr_off, th);
rc = dt_trans_start_local(env, tgt->lut_bottom, th);
if (rc)
GOTO(out, rc);
+
+ mutex_lock(&ted->ted_lcd_lock);
+
/*
* Until this operations will be committed the sync is needed
* for this export. This should be done _after_ starting the
tti->tti_off = ted->ted_lr_off;
rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+
+ mutex_unlock(&ted->ted_lcd_lock);
+
EXIT;
out:
- mutex_unlock(&ted->ted_lcd_lock);
dt_trans_stop(env, tgt->lut_bottom, th);
CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
- "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+ "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
return rc;
}
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
+static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
{
struct tgt_thread_info *tti = tgt_th_info(env);
int rc;
lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
- "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+ "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
return rc;
}
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
- struct thandle *th)
+static int tgt_server_data_write(const struct lu_env *env,
+ struct lu_target *tgt, struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
struct dt_object *dto;
rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
- "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+ "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
RETURN(rc);
ENTRY;
CDEBUG(D_SUPER,
- "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+ "%s: mount_count is %llu, last_transno is %llu\n",
tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
tgt->lut_last_transno);
tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
spin_unlock(&tgt->lut_translock);
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
th = dt_trans_create(env, tgt->lut_bottom);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
dt_trans_stop(env, tgt->lut_bottom, th);
CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
- "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+ "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
RETURN(rc);
}
EXPORT_SYMBOL(tgt_server_data_update);
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
- loff_t size)
+static int tgt_truncate_last_rcvd(const struct lu_env *env,
+ struct lu_target *tgt, loff_t size)
{
struct dt_object *dt = tgt->lut_last_rcvd;
struct thandle *th;
ENTRY;
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
attr.la_size = size;
attr.la_valid = LA_SIZE;
}
spin_lock(&tgt->lut_translock);
- start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
+ start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
tgt->lut_lsd.lsd_start_epoch = start_epoch;
spin_unlock(&tgt->lut_translock);
spin_unlock(&ccb->llcc_tgt->lut_translock);
ptlrpc_commit_replies(ccb->llcc_exp);
- tgt_cancel_slc_locks(ccb->llcc_transno);
+ tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
} else {
spin_unlock(&ccb->llcc_tgt->lut_translock);
}
+
+ CDEBUG(D_HA, "%s: transno %lld is committed\n",
+ ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
+
out:
class_export_cb_put(ccb->llcc_exp);
- if (ccb->llcc_transno)
- CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
- ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
OBD_FREE_PTR(ccb);
}
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
- struct obd_export *exp, __u64 transno)
+/**
+ * Add commit callback function, it returns a non-zero value to inform
+ * caller to use sync transaction if necessary.
+ */
+static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+ struct obd_export *exp, __u64 transno)
{
struct tgt_last_committed_callback *ccb;
struct dt_txn_commit_cb *dcb;
/* report failure to force synchronous operation */
return -EPERM;
- return rc;
-}
-
-struct tgt_new_client_callback {
- struct dt_txn_commit_cb lncc_cb;
- struct obd_export *lncc_exp;
-};
-
-static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
- struct dt_txn_commit_cb *cb, int err)
-{
- struct tgt_new_client_callback *ccb;
-
- ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
-
- LASSERT(ccb->lncc_exp->exp_obd);
-
- CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
- ccb->lncc_exp->exp_obd->obd_name,
- ccb->lncc_exp->exp_client_uuid.uuid);
-
- spin_lock(&ccb->lncc_exp->exp_lock);
-
- ccb->lncc_exp->exp_need_sync = 0;
-
- spin_unlock(&ccb->lncc_exp->exp_lock);
- class_export_cb_put(ccb->lncc_exp);
-
- OBD_FREE_PTR(ccb);
-}
-
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
-{
- struct tgt_new_client_callback *ccb;
- struct dt_txn_commit_cb *dcb;
- int rc;
-
- OBD_ALLOC_PTR(ccb);
- if (ccb == NULL)
- return -ENOMEM;
-
- ccb->lncc_exp = class_export_cb_get(exp);
-
- dcb = &ccb->lncc_cb;
- dcb->dcb_func = tgt_cb_new_client;
- INIT_LIST_HEAD(&dcb->dcb_linkage);
- strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
-
- rc = dt_trans_cb_add(th, dcb);
- if (rc) {
- class_export_cb_put(exp);
- OBD_FREE_PTR(ccb);
- }
- return rc;
+ /* if exp_need_sync is set, return non-zero value to force
+ * a sync transaction. */
+ return rc ? rc : exp->exp_need_sync;
}
/**
exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
RETURN(0);
+ /* Slot may be not yet assigned, use case is race between Client
+ * reconnect and forced eviction */
+ if (ted->ted_lr_idx < 0) {
+ CWARN("%s: client with UUID '%s' not in bitmap\n",
+ tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
+ RETURN(0);
+ }
+
CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
ted->ted_lcd->lcd_uuid);
if (exp->exp_flags & OBD_OPT_FAILOVER)
RETURN(0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
+ RETURN(0);
+
/* Make sure the server's last_transno is up to date.
* This should be done before zeroing client slot so last_transno will
* be in server data or in client data in case of failure */
spin_lock(&tgt->lut_translock);
if (th->th_result != 0) {
if (tti->tti_transno != 0) {
- CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
+ CERROR("%s: replay transno %llu failed: rc = %d\n",
tgt_name(tgt), tti->tti_transno, th->th_result);
}
} else if (tti->tti_transno == 0) {
}
/* filling reply data */
- CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+ CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
tti->tti_transno, tgt->lut_obd->obd_last_committed);
if (req != NULL) {
if (*transno_p > tti->tti_transno) {
if (!tgt->lut_no_reconstruct) {
CERROR("%s: trying to overwrite bigger transno:"
- "on-disk: "LPU64", new: "LPU64" replay: "
+ "on-disk: %llu, new: %llu replay: "
"%d. See LU-617.\n", tgt_name(tgt),
*transno_p, tti->tti_transno,
req_is_replay(req));
if (!lw_client) {
tti->tti_off = ted->ted_lr_off;
- rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+ if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
+ rc = -EIO;
+ else
+ rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
+ &tti->tti_off, th);
if (rc < 0) {
mutex_unlock(&ted->ted_lcd_lock);
RETURN(rc);
ENTRY;
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
/* These exports are cleaned up by disconnect, so they
* need to be set up like real exports as connect does.
*/
- CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
- " srv lr: "LPU64" lx: "LPU64" gen %u\n", lcd->lcd_uuid,
+ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
+ " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
cl_idx, last_transno, lsd->lsd_last_transno,
lcd_last_xid(lcd), lcd->lcd_generation);
}
/* Need to check last_rcvd even for duplicated exports. */
- CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+ CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
cl_idx, last_transno);
spin_lock(&tgt->lut_translock);
RETURN(rc);
}
if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
- LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
- "using the wrong disk %s. Were the"
- " /dev/ assignments rearranged?\n",
- tgt->lut_obd->obd_uuid.uuid,
- lsd->lsd_uuid);
- RETURN(-EINVAL);
+ if (tgt->lut_bottom->dd_rdonly) {
+ /* Such difference may be caused by mounting
+ * up snapshot with new fsname under rd_only
+ * mode. But even if it was NOT, it will not
+ * damage the system because of "rd_only". */
+ memcpy(lsd->lsd_uuid,
+ tgt->lut_obd->obd_uuid.uuid,
+ sizeof(lsd->lsd_uuid));
+ } else {
+ LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+ "OBD %s using the wrong "
+ "disk %s. Were the /dev/ "
+ "assignments rearranged?\n",
+ tgt->lut_obd->obd_uuid.uuid,
+ lsd->lsd_uuid);
+ RETURN(-EINVAL);
+ }
}
if (lsd->lsd_osd_index != index) {
lsd->lsd_mount_count++;
CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
- CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+ CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
tgt_name(tgt), tgt->lut_last_transno);
- CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+ CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
tgt_name(tgt), lsd->lsd_mount_count);
CDEBUG(D_INODE, "%s: server data size: %u\n",
tgt_name(tgt), lsd->lsd_server_size);
struct dt_object *dto;
int rc;
+ /* For readonly case, the caller should have got failure
+ * when start the transaction. If the logic comes here,
+ * there must be something wrong. */
+ if (unlikely(tgt->lut_bottom->dd_rdonly)) {
+ dump_stack();
+ LBUG();
+ }
+
/* if there is no session, then this transaction is not result of
* request processing but some local operation */
if (env->le_ses == NULL)
* because a replay slot has not been assigned. This should be
* replaced by dmu_tx_hold_append() when available.
*/
- tti->tti_off = atomic_read(&tgt->lut_num_clients) * 8 *
- sizeof(struct lsd_reply_data);
tti->tti_buf.lb_buf = NULL;
tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
- rc = dt_declare_record_write(env, dto, &tti->tti_buf,
- tti->tti_off, th);
+ rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
if (rc)
return rc;
} else {
if (tti->tti_has_trans && !echo_client) {
if (tti->tti_mult_trans == 0) {
- CDEBUG(D_HA, "More than one transaction "LPU64"\n",
+ CDEBUG(D_HA, "More than one transaction %llu\n",
tti->tti_transno);
RETURN(0);
}