* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2015, Intel Corporation.
+ * Copyright (c) 2011, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include "tgt_internal.h"
+/** version recovery epoch */
+#define LR_EPOCH_BITS 32
/* Allocate a bitmap for a chunk of reply data slots */
static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
tgt->lut_obd->obd_name, REPLY_DATA,
lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
}
EXPORT_SYMBOL(tgt_client_free);
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
- struct lsd_client_data *lcd, loff_t *off, int index)
+static inline void tgt_check_lcd(const char *obd_name, int index,
+ struct lsd_client_data *lcd)
+{
+ size_t uuid_size = sizeof(lcd->lcd_uuid);
+
+ if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
+ lcd->lcd_uuid[uuid_size - 1] = '\0';
+
+ LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
+ lcd->lcd_uuid, obd_name, index);
+ }
+}
+
+static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
+ struct lsd_client_data *lcd,
+ loff_t *off, int index)
{
struct tgt_thread_info *tti = tgt_th_info(env);
int rc;
tti_buf_lcd(tti);
rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
if (rc == 0) {
- check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
+ tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
lcd_le_to_cpu(&tti->tti_lcd, lcd);
lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
lcd->lcd_last_close_result =
return rc;
}
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
- struct lsd_client_data *lcd, loff_t *off,
- struct thandle *th)
+static int tgt_client_data_write(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct lsd_client_data *lcd,
+ loff_t *off, struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
struct dt_object *dto;
return dt_record_write(env, dto, &tti->tti_buf, off, th);
}
+struct tgt_new_client_callback {
+ struct dt_txn_commit_cb lncc_cb;
+ struct obd_export *lncc_exp;
+};
+
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+ struct dt_txn_commit_cb *cb, int err)
+{
+ struct tgt_new_client_callback *ccb;
+
+ ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
+
+ LASSERT(ccb->lncc_exp->exp_obd);
+
+ CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+ ccb->lncc_exp->exp_obd->obd_name,
+ ccb->lncc_exp->exp_client_uuid.uuid);
+
+ spin_lock(&ccb->lncc_exp->exp_lock);
+
+ ccb->lncc_exp->exp_need_sync = 0;
+
+ spin_unlock(&ccb->lncc_exp->exp_lock);
+ class_export_cb_put(ccb->lncc_exp);
+
+ OBD_FREE_PTR(ccb);
+}
+
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+ struct tgt_new_client_callback *ccb;
+ struct dt_txn_commit_cb *dcb;
+ int rc;
+
+ OBD_ALLOC_PTR(ccb);
+ if (ccb == NULL)
+ return -ENOMEM;
+
+ ccb->lncc_exp = class_export_cb_get(exp);
+
+ dcb = &ccb->lncc_cb;
+ dcb->dcb_func = tgt_cb_new_client;
+ INIT_LIST_HEAD(&dcb->dcb_linkage);
+ strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+
+ rc = dt_trans_cb_add(th, dcb);
+ if (rc) {
+ class_export_cb_put(exp);
+ OBD_FREE_PTR(ccb);
+ }
+ return rc;
+}
+
/**
* Update client data in last_rcvd
*/
RETURN(-EINVAL);
}
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
th = dt_trans_create(env, tgt->lut_bottom);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
return rc;
}
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
+static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
{
struct tgt_thread_info *tti = tgt_th_info(env);
int rc;
return rc;
}
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
- struct thandle *th)
+static int tgt_server_data_write(const struct lu_env *env,
+ struct lu_target *tgt, struct thandle *th)
{
struct tgt_thread_info *tti = tgt_th_info(env);
struct dt_object *dto;
tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
spin_unlock(&tgt->lut_translock);
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
th = dt_trans_create(env, tgt->lut_bottom);
if (IS_ERR(th))
RETURN(PTR_ERR(th));
}
EXPORT_SYMBOL(tgt_server_data_update);
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
- loff_t size)
+static int tgt_truncate_last_rcvd(const struct lu_env *env,
+ struct lu_target *tgt, loff_t size)
{
struct dt_object *dt = tgt->lut_last_rcvd;
struct thandle *th;
ENTRY;
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
attr.la_size = size;
attr.la_valid = LA_SIZE;
}
spin_lock(&tgt->lut_translock);
- start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
+ start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
tgt->lut_lsd.lsd_start_epoch = start_epoch;
spin_unlock(&tgt->lut_translock);
} else {
spin_unlock(&ccb->llcc_tgt->lut_translock);
}
+
+ CDEBUG(D_HA, "%s: transno %lld is committed\n",
+ ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
+
out:
class_export_cb_put(ccb->llcc_exp);
- if (ccb->llcc_transno)
- CDEBUG(D_HA, "%s: transno %lld is committed\n",
- ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
OBD_FREE_PTR(ccb);
}
* Add commit callback function, it returns a non-zero value to inform
* caller to use sync transaction if necessary.
*/
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
- struct obd_export *exp, __u64 transno)
+static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+ struct obd_export *exp, __u64 transno)
{
struct tgt_last_committed_callback *ccb;
struct dt_txn_commit_cb *dcb;
return rc ? rc : exp->exp_need_sync;
}
-struct tgt_new_client_callback {
- struct dt_txn_commit_cb lncc_cb;
- struct obd_export *lncc_exp;
-};
-
-static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
- struct dt_txn_commit_cb *cb, int err)
-{
- struct tgt_new_client_callback *ccb;
-
- ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
-
- LASSERT(ccb->lncc_exp->exp_obd);
-
- CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
- ccb->lncc_exp->exp_obd->obd_name,
- ccb->lncc_exp->exp_client_uuid.uuid);
-
- spin_lock(&ccb->lncc_exp->exp_lock);
-
- ccb->lncc_exp->exp_need_sync = 0;
-
- spin_unlock(&ccb->lncc_exp->exp_lock);
- class_export_cb_put(ccb->lncc_exp);
-
- OBD_FREE_PTR(ccb);
-}
-
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
-{
- struct tgt_new_client_callback *ccb;
- struct dt_txn_commit_cb *dcb;
- int rc;
-
- OBD_ALLOC_PTR(ccb);
- if (ccb == NULL)
- return -ENOMEM;
-
- ccb->lncc_exp = class_export_cb_get(exp);
-
- dcb = &ccb->lncc_cb;
- dcb->dcb_func = tgt_cb_new_client;
- INIT_LIST_HEAD(&dcb->dcb_linkage);
- strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
-
- rc = dt_trans_cb_add(th, dcb);
- if (rc) {
- class_export_cb_put(exp);
- OBD_FREE_PTR(ccb);
- }
- return rc;
-}
-
/**
* Add new client to the last_rcvd upon new connection.
*
if (!lw_client) {
tti->tti_off = ted->ted_lr_off;
- rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+ if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
+ rc = -EIO;
+ else
+ rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
+ &tti->tti_off, th);
if (rc < 0) {
mutex_unlock(&ted->ted_lcd_lock);
RETURN(rc);
ENTRY;
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
RETURN(rc);
}
if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
- LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
- "using the wrong disk %s. Were the"
- " /dev/ assignments rearranged?\n",
- tgt->lut_obd->obd_uuid.uuid,
- lsd->lsd_uuid);
- RETURN(-EINVAL);
+ if (tgt->lut_bottom->dd_rdonly) {
+ /* Such difference may be caused by mounting
+ * up snapshot with new fsname under rd_only
+ * mode. But even if it was NOT, it will not
+ * damage the system because of "rd_only". */
+ memcpy(lsd->lsd_uuid,
+ tgt->lut_obd->obd_uuid.uuid,
+ sizeof(lsd->lsd_uuid));
+ } else {
+ LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+ "OBD %s using the wrong "
+ "disk %s. Were the /dev/ "
+ "assignments rearranged?\n",
+ tgt->lut_obd->obd_uuid.uuid,
+ lsd->lsd_uuid);
+ RETURN(-EINVAL);
+ }
}
if (lsd->lsd_osd_index != index) {
struct dt_object *dto;
int rc;
+ /* For readonly case, the caller should have got failure
+ * when start the transaction. If the logic comes here,
+ * there must be something wrong. */
+ if (unlikely(tgt->lut_bottom->dd_rdonly)) {
+ dump_stack();
+ LBUG();
+ }
+
/* if there is no session, then this transaction is not result of
* request processing but some local operation */
if (env->le_ses == NULL)