* Copyright 2009 Sun Microsystems, Inc. All rights reserved
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include <obd_class.h>
#include <md_object.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
+#include <uapi/linux/lustre/lustre_param.h>
#include <lustre_update.h>
#include <lustre_log.h>
*tgt = range.lsr_index;
*type = range.lsr_flags;
- CDEBUG(D_INFO, "%s: got tgt %x for sequence: "LPX64"\n",
+ CDEBUG(D_INFO, "%s: got tgt %x for sequence: %#llx\n",
lod2obd(lod)->obd_name, *tgt, fid_seq(fid));
RETURN(0);
if (rec->lrh_len !=
llog_update_record_size((struct llog_update_record *)rec)) {
- CERROR("%s broken update record! index %u "DOSTID":%u :"
+ CERROR("%s broken update record! index %u "DFID".%u :"
" rc = %d\n", lod2obd(lrd->lrd_lod)->obd_name, index,
- POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO);
- return -EIO;
+ PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index, -EIO);
+ return -EINVAL;
}
cookie->lgc_lgl = llh->lgh_id;
cookie->lgc_index = rec->lrh_index;
cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
- CDEBUG(D_HA, "%s: process recovery updates "DOSTID":%u\n",
+ CDEBUG(D_HA, "%s: process recovery updates "DFID".%u\n",
lod2obd(lrd->lrd_lod)->obd_name,
- POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index);
+ PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index);
lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
+ if (lut->lut_obd->obd_stopping ||
+ lut->lut_obd->obd_abort_recovery)
+ return -ESHUTDOWN;
+
return insert_update_records_to_replay_list(lut->lut_tdtd,
(struct llog_update_record *)rec,
cookie, index);
*/
static int lod_sub_recovery_thread(void *arg)
{
- struct lod_recovery_data *lrd = arg;
- struct lod_device *lod = lrd->lrd_lod;
- struct dt_device *dt;
- struct ptlrpc_thread *thread = lrd->lrd_thread;
- struct llog_ctxt *ctxt;
- struct lu_env env;
- int rc;
+ struct lod_recovery_data *lrd = arg;
+ struct lod_device *lod = lrd->lrd_lod;
+ struct dt_device *dt;
+ struct ptlrpc_thread *thread = lrd->lrd_thread;
+ struct llog_ctxt *ctxt = NULL;
+ struct lu_env env;
+ struct lu_target *lut;
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_tgt_desc *tgt = NULL;
+ time64_t start;
+ int retries = 0;
+ int i;
+ int rc;
ENTRY;
thread->t_flags = SVC_RUNNING;
RETURN(rc);
}
+ lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+ atomic_inc(&lut->lut_tdtd->tdtd_recovery_threads_count);
if (lrd->lrd_ltd == NULL)
dt = lod->lod_child;
else
dt = lrd->lrd_ltd->ltd_tgt;
+ start = ktime_get_real_seconds();
+
again:
rc = lod_sub_prep_llog(&env, lod, dt, lrd->lrd_idx);
- if (rc != 0)
- GOTO(out, rc);
-
- /* Process the recovery record */
- ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, LLOG_UPDATELOG_ORIG_CTXT);
- LASSERT(ctxt != NULL);
- LASSERT(ctxt->loc_handle != NULL);
-
- rc = llog_cat_process(&env, ctxt->loc_handle,
- lod_process_recovery_updates, lrd, 0, 0);
- llog_ctxt_put(ctxt);
+ if (!rc && !lod->lod_child->dd_rdonly) {
+ /* Process the recovery record */
+ ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+ LLOG_UPDATELOG_ORIG_CTXT);
+ LASSERT(ctxt != NULL);
+ LASSERT(ctxt->loc_handle != NULL);
+
+ rc = llog_cat_process(&env, ctxt->loc_handle,
+ lod_process_recovery_updates, lrd, 0, 0);
+ }
if (rc < 0) {
struct lu_device *top_device;
top_device = lod->lod_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
/* Because the remote target might failover at the same time,
* let's retry here */
- if (rc == -ETIMEDOUT && dt != lod->lod_child &&
- !top_device->ld_obd->obd_force_abort_recovery)
+ if ((rc == -ETIMEDOUT || rc == -EAGAIN || rc == -EIO) &&
+ dt != lod->lod_child &&
+ !top_device->ld_obd->obd_abort_recovery &&
+ !top_device->ld_obd->obd_stopping) {
+ if (ctxt != NULL) {
+ if (ctxt->loc_handle != NULL)
+ llog_cat_close(&env,
+ ctxt->loc_handle);
+ llog_ctxt_put(ctxt);
+ }
+ retries++;
+ CDEBUG(D_HA, "%s get update log failed %d, retry\n",
+ dt->dd_lu_dev.ld_obd->obd_name, rc);
goto again;
+ }
- CERROR("%s getting update log failed: rc = %d\n",
+ CERROR("%s get update log failed: rc = %d\n",
dt->dd_lu_dev.ld_obd->obd_name, rc);
+ llog_ctxt_put(ctxt);
+
+ spin_lock(&top_device->ld_obd->obd_dev_lock);
+ if (!top_device->ld_obd->obd_abort_recovery &&
+ !top_device->ld_obd->obd_stopping)
+ top_device->ld_obd->obd_abort_recovery = 1;
+ spin_unlock(&top_device->ld_obd->obd_dev_lock);
+
GOTO(out, rc);
}
+ llog_ctxt_put(ctxt);
- CDEBUG(D_HA, "%s retrieve update log: rc = %d\n",
- dt->dd_lu_dev.ld_obd->obd_name, rc);
+ CDEBUG(D_HA, "%s retrieved update log, duration %lld, retries %d\n",
+ dt->dd_lu_dev.ld_obd->obd_name, ktime_get_real_seconds() - start,
+ retries);
+ spin_lock(&lod->lod_lock);
if (lrd->lrd_ltd == NULL)
lod->lod_child_got_update_log = 1;
else
lrd->lrd_ltd->ltd_got_update_log = 1;
- if (lod->lod_child_got_update_log) {
- struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
- struct lod_tgt_desc *tgt = NULL;
- bool all_got_log = true;
- int i;
-
- cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
- tgt = LTD_TGT(ltd, i);
- if (!tgt->ltd_got_update_log) {
- all_got_log = false;
- break;
- }
- }
-
- if (all_got_log) {
- struct lu_target *lut;
+ if (!lod->lod_child_got_update_log) {
+ spin_unlock(&lod->lod_lock);
+ GOTO(out, rc = 0);
+ }
- lut = lod2lu_dev(lod)->ld_site->ls_tgt;
- CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
- lut->lut_obd->obd_name);
- lut->lut_tdtd->tdtd_replay_ready = 1;
- wake_up(&lut->lut_obd->obd_next_transno_waitq);
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (!tgt->ltd_got_update_log) {
+ spin_unlock(&lod->lod_lock);
+ GOTO(out, rc = 0);
}
}
+ lut->lut_tdtd->tdtd_replay_ready = 1;
+ spin_unlock(&lod->lod_lock);
+
+ CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
+ lut->lut_obd->obd_name);
+ wake_up(&lut->lut_obd->obd_next_transno_waitq);
+ EXIT;
out:
OBD_FREE_PTR(lrd);
thread->t_flags = SVC_STOPPED;
+ atomic_dec(&lut->lut_tdtd->tdtd_recovery_threads_count);
+ wake_up(&lut->lut_tdtd->tdtd_recovery_threads_waitq);
wake_up(&thread->t_ctl_waitq);
lu_env_fini(&env);
- RETURN(rc);
+ return rc;
}
/**
lod_putref(lod, ltd);
}
+static char *lod_show_update_logs_retrievers(void *data, int *size, int *count)
+{
+ struct lod_device *lod = (struct lod_device *)data;
+ struct lu_target *lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+ struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
+ struct lod_tgt_desc *tgt = NULL;
+ char *buf;
+ int len = 0;
+ int rc;
+ int i;
+
+ *count = atomic_read(&lut->lut_tdtd->tdtd_recovery_threads_count);
+ if (*count == 0) {
+ *size = 0;
+ return NULL;
+ }
+
+ *size = 5 * *count + 1;
+ OBD_ALLOC(buf, *size);
+ if (buf == NULL)
+ return NULL;
+
+ *count = 0;
+ memset(buf, 0, *size);
+
+ if (!lod->lod_child_got_update_log) {
+ rc = lodname2mdt_index(lod2obd(lod)->obd_name, &i);
+ LASSERTF(rc == 0, "Fail to parse target index: rc = %d\n", rc);
+
+ rc = snprintf(buf + len, *size - len, " %04x", i);
+ LASSERT(rc > 0);
+
+ len += rc;
+ (*count)++;
+ }
+
+ cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
+ tgt = LTD_TGT(ltd, i);
+ if (!tgt->ltd_got_update_log) {
+ rc = snprintf(buf + len, *size - len, " %04x", i);
+ if (unlikely(rc <= 0))
+ break;
+
+ len += rc;
+ (*count)++;
+ }
+ }
+
+ return buf;
+}
+
/**
* Prepare distribute txn
*
RETURN(rc);
}
+ tdtd->tdtd_show_update_logs_retrievers =
+ lod_show_update_logs_retrievers;
+ tdtd->tdtd_show_retrievers_cbdata = lod;
+
lut->lut_tdtd = tdtd;
RETURN(0);
struct lu_target *lut;
lut = lod2lu_dev(lod)->ld_site->ls_tgt;
+ target_recovery_fini(lut->lut_obd);
if (lut->lut_tdtd == NULL)
return;
GOTO(out, rc);
}
+
obd = lod2obd(lod);
- rc = class_process_proc_param(PARAM_LOV, obd->obd_vars,
+ if (strstr(param, PARAM_LOD) != NULL)
+ rc = class_process_proc_param(PARAM_LOD, obd->obd_vars,
+ lcfg, obd);
+ else
+ rc = class_process_proc_param(PARAM_LOV, obd->obd_vars,
lcfg, obd);
if (rc > 0)
rc = 0;
+
GOTO(out, rc);
}
case LCFG_PRE_CLEANUP: {
break;
}
case LCFG_CLEANUP: {
+ if (lod->lod_md_root != NULL) {
+ dt_object_put(env, &lod->lod_md_root->ldo_obj);
+ lod->lod_md_root = NULL;
+ }
+
/*
* do cleanup on underlying storage only when
* all OSPs are cleaned up, as they use that OSD as well
if (IS_ERR(dto))
GOTO(out_put, rc = PTR_ERR(dto));
- lu_object_put(env, &dto->do_lu);
+ dt_object_put(env, dto);
/* Create update log dir */
lu_update_log_dir_fid(fid, index);
if (IS_ERR(dto))
GOTO(out_put, rc = PTR_ERR(dto));
- lu_object_put(env, &dto->do_lu);
+ dt_object_put(env, dto);
rc = lod_prepare_distribute_txn(env, lod);
if (rc != 0)
GOTO(out_put, rc);
out_put:
- lu_object_put(env, &root->do_lu);
+ dt_object_put(env, root);
RETURN(rc);
}
{
struct lod_device *lod = dt2lod_dev(dev);
struct lod_ost_desc *ost;
+ struct lod_mdt_desc *mdt;
unsigned int i;
int rc = 0;
ENTRY;
LASSERT(ost && ost->ltd_ost);
rc = dt_sync(env, ost->ltd_ost);
if (rc) {
- CERROR("%s: can't sync %u: %d\n",
+ CERROR("%s: can't sync ost %u: %d\n",
lod2obd(lod)->obd_name, i, rc);
break;
}
}
lod_putref(lod, &lod->lod_ost_descs);
+
+ if (rc)
+ RETURN(rc);
+
+ lod_getref(&lod->lod_mdt_descs);
+ lod_foreach_mdt(lod, i) {
+ mdt = MDT_TGT(lod, i);
+ LASSERT(mdt && mdt->ltd_mdt);
+ rc = dt_sync(env, mdt->ltd_mdt);
+ if (rc) {
+ CERROR("%s: can't sync mdt %u: %d\n",
+ lod2obd(lod)->obd_name, i, rc);
+ break;
+ }
+ }
+ lod_putref(lod, &lod->lod_mdt_descs);
+
if (rc == 0)
rc = dt_sync(env, lod->lod_child);
dt_conf_get(env, &lod->lod_dt_dev, &ddp);
lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
+ lod->lod_dom_max_stripesize = (1ULL << 20); /* 1Mb as default value */
/* setup obd to be used with old lov code */
rc = lod_pools_init(lod, cfg);
if (rc)
GOTO(out_pools, rc);
- spin_lock_init(&lod->lod_desc_lock);
+ spin_lock_init(&lod->lod_lock);
spin_lock_init(&lod->lod_connects_lock);
lod_tgt_desc_init(&lod->lod_mdt_descs);
lod_tgt_desc_init(&lod->lod_ost_descs);
struct lu_device *next = &lod->lod_child->dd_lu_dev;
ENTRY;
+ if (atomic_read(&lu->ld_ref) > 0 &&
+ !cfs_hash_is_empty(lu->ld_site->ls_obj_hash)) {
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
+ lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
+ }
LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
dt_device_fini(&lod->lod_dt_dev);
OBD_FREE_PTR(lod);
struct lu_context_key *key, void *data)
{
struct lod_thread_info *info = data;
+ struct lod_layout_component *lds =
+ info->lti_def_striping.lds_def_comp_entries;
+ struct ost_pool *inuse = &info->lti_inuse_osts;
+
/* allocated in lod_get_lov_ea
* XXX: this is overload, a tread may have such store but used only
* once. Probably better would be pool of such stores per LOD.
info->lti_ea_store_size = 0;
}
lu_buf_free(&info->lti_linkea_buf);
+
+ if (lds != NULL)
+ lod_free_def_comp_entries(&info->lti_def_striping);
+
+ if (inuse->op_size)
+ OBD_FREE(inuse->op_array, inuse->op_size);
+
+ if (info->lti_comp_size > 0)
+ OBD_FREE(info->lti_comp_idx,
+ info->lti_comp_size * sizeof(__u32));
+
OBD_FREE_PTR(info);
}