if (OBT(obd) && OBP(obd, postrecov)) {
rc = OBP(obd, postrecov)(obd);
if (rc >= 0)
- CERROR("Cleanup %d orphans after recovery was aborted\n", rc);
+ CWARN("Cleanup %d orphans after recovery was aborted\n", rc);
else
CERROR("postrecov failed %d\n", rc);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
return;
}
- CERROR("%s: starting recovery timer (%us)\n", obd->obd_name,
+ CWARN("%s: starting recovery timer (%us)\n", obd->obd_name,
OBD_RECOVERY_TIMEOUT / HZ);
obd->obd_recovery_handler = handler;
obd->obd_recovery_timer.function = target_recovery_expired;
if (recovery_done) {
struct list_head *tmp, *n;
ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
- CERROR("%s: all clients recovered, sending delayed replies\n",
+ CWARN("%s: all clients recovered, sending delayed replies\n",
obd->obd_name);
obd->obd_recovering = 0;
if (OBT(obd) && OBP(obd, postrecov)) {
rc2 = OBP(obd, postrecov)(obd);
if (rc2 >= 0)
- CERROR("%s: all clients recovered, %d MDS orphans "
+ CWARN("%s: all clients recovered, %d MDS orphans "
"deleted\n", obd->obd_name, rc2);
else
CERROR("postrecov failed %d\n", rc2);
child_inode = dchild->d_inode;
if (mds_inode_is_orphan(child_inode) &&
mds_open_orphan_count(child_inode)) {
- CDEBUG(D_ERROR, "orphan %s was re-opened during "
- "recovery\n", ptr->d_name);
+ CWARN("orphan %s was re-opened during recovery\n",
+ ptr->d_name);
GOTO(next, rc2 = 0);
}
rc2 = mds_unlink(obd, dchild, child_inode, pending_dir);
if (rc2 == 0) {
item ++;
- CDEBUG(D_ERROR, "removed orphan %s from MDS and OST\n",
+ CWARN("removed orphan %s from MDS and OST\n",
ptr->d_name);
} else {
l_dput(dchild);
}
EXPORT_SYMBOL(llog_cancel);
+/* callback func for llog_process in llog_obd_origin_setup */
+static int cat_cancel_cb(struct llog_handle *cathandle,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+ struct llog_handle *loghandle;
+ struct llog_log_hdr *llh;
+ int rc, index;
+ ENTRY;
+
+ if (le32_to_cpu(rec->lrh_type) != LLOG_LOGID_MAGIC) {
+ CERROR("invalid record in catalog\n");
+ RETURN(-EINVAL);
+ }
+ CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
+ le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+
+ rc = llog_cat_id2handle(cathandle, &loghandle, &lir->lid_id);
+ if (rc) {
+ CERROR("Cannot find handle for log "LPX64"\n", lir->lid_id.lgl_oid);
+ RETURN(rc);
+ }
+
+ llh = loghandle->lgh_hdr;
+ if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (le32_to_cpu(llh->llh_count) == 1)) {
+ rc = llog_destroy(loghandle);
+ if (rc)
+ CERROR("failure destroying log during postsetup: %d\n", rc);
+ LASSERT(rc == 0);
+
+ index = loghandle->u.phd.phd_cookie.lgc_index;
+ if (cathandle->u.chd.chd_current_log == loghandle)
+ cathandle->u.chd.chd_current_log = NULL;
+ llog_free_handle(loghandle);
+
+ LASSERT(index);
+ rc = llog_cancel_rec(cathandle, index);
+ if (rc == 0)
+ CWARN("cancel log "LPX64":%x at index %u of catalog "LPX64"\n",
+ lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
+ le32_to_cpu(rec->lrh_index), cathandle->lgh_id.lgl_oid);
+ }
+
+ RETURN(rc);
+}
/* lop_setup method for filter/osc */
// XXX how to set exports
LASSERT(ctxt);
log_gen_init(ctxt);
+ down(&ctxt->loc_sem);
if (logid->lgl_oid)
rc = llog_create(ctxt, &handle, logid, NULL);
else {
ctxt->loc_handle = handle;
push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
- rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
+ rc = llog_init_handle(handle, LLOG_F_IS_CAT, NULL);
pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = llog_process(handle, (llog_cb_t)cat_cancel_cb, NULL);
+ if (rc)
+ CERROR("llog_process with cat_cancel_cb failed: %d\n", rc);
out:
+ up(&ctxt->loc_sem);
if (ctxt && rc) {
obd->obd_llog_ctxt[index] = NULL;
OBD_FREE(ctxt, sizeof(*ctxt));
int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
{
+ struct llog_handle *cathandle, *n, *loghandle;
+ struct llog_log_hdr *llh;
+ int rc, index;
+ ENTRY;
+
if (!ctxt)
return 0;
- if (ctxt->loc_handle)
+ cathandle = ctxt->loc_handle;
+ if (cathandle) {
+ list_for_each_entry_safe(loghandle, n, &cathandle->u.chd.chd_head,
+ u.phd.phd_entry) {
+ llh = loghandle->lgh_hdr;
+ if ((le32_to_cpu(llh->llh_flags) & LLOG_F_ZAP_WHEN_EMPTY) &&
+ (le32_to_cpu(llh->llh_count) == 1)) {
+ rc = llog_destroy(loghandle);
+ if (rc)
+ CERROR("failure destroying log during cleanup: %d\n",
+ rc);
+ LASSERT(rc == 0);
+
+ index = loghandle->u.phd.phd_cookie.lgc_index;
+ if (cathandle->u.chd.chd_current_log == loghandle)
+ cathandle->u.chd.chd_current_log = NULL;
+ llog_free_handle(loghandle);
+
+ LASSERT(index);
+ rc = llog_cancel_rec(cathandle, index);
+ if (rc == 0)
+ CWARN("cancel plain log at index %u of catalog "LPX64"\n",
+ index, cathandle->lgh_id.lgl_oid);
+ }
+ }
llog_cat_put(ctxt->loc_handle);
-
+ }
return 0;
}
EXPORT_SYMBOL(llog_obd_origin_cleanup);
rc = obd_destroy(exp, oa, NULL, &oti);
obdo_free(oa);
if (rc == -ENOENT) {
- CERROR("object already removed: send cookie\n");
+ CWARN("object already removed: send cookie\n");
llog_cancel(ctxt, NULL, 1, &cookie, 0);
RETURN(0);
}
if (rc == 0)
- CERROR("object: "LPU64" in record destroyed successful\n", oid);
+ CWARN("object: "LPU64" in record is destroyed\n", oid);
RETURN(rc);
}
#include <portals/list.h>
#include <linux/lvfs.h>
-
-#ifdef ENABLE_ORPHANS
-int llog_origin_handle_cancel(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct obd_device *disk_obd;
- struct llog_cookie *logcookies;
- struct llog_ctxt *ctxt;
- int num_cookies, rc = 0;
- struct obd_run_ctxt saved;
- struct llog_handle *cathandle;
- ENTRY;
-
- logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
- num_cookies = req->rq_reqmsg->buflens[0]/sizeof(*logcookies);
- if (logcookies == NULL || num_cookies == 0) {
- DEBUG_REQ(D_HA, req, "no cookies sent");
- RETURN(-EFAULT);
- }
-
- ctxt = llog_get_context(obd, logcookies->lgc_subsys);
- if (ctxt == NULL) {
- CERROR("llog subsys not setup or already cleanup\n");
- RETURN(-ENOENT);
- }
- down(&ctxt->loc_sem);
- disk_obd = ctxt->loc_exp->exp_obd;
- cathandle = ctxt->loc_handle;
- LASSERT(cathandle);
-
- push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
- rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies);
- if (rc)
- CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
- else
- CERROR("cancel %d llog-records successful\n", num_cookies);
-
- pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
- up(&ctxt->loc_sem);
-
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_origin_handle_cancel);
-#endif
-
int llog_origin_connect(struct llog_ctxt *ctxt, int count,
struct llog_logid *logid,
struct llog_ctxt_gen *gen)
RETURN(rc);
}
+
+#ifdef ENABLE_ORPHANS
+int llog_origin_handle_cancel(struct ptlrpc_request *req)
+{
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct obd_device *disk_obd;
+ struct llog_cookie *logcookies;
+ struct llog_ctxt *ctxt;
+ int num_cookies, rc = 0;
+ struct obd_run_ctxt saved;
+ struct llog_handle *cathandle;
+ ENTRY;
+
+ logcookies = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*logcookies));
+ num_cookies = req->rq_reqmsg->buflens[0]/sizeof(*logcookies);
+ if (logcookies == NULL || num_cookies == 0) {
+ DEBUG_REQ(D_HA, req, "no cookies sent");
+ RETURN(-EFAULT);
+ }
+
+ ctxt = llog_get_context(obd, logcookies->lgc_subsys);
+ if (ctxt == NULL) {
+ CWARN("llog subsys not setup or already cleanup\n");
+ RETURN(-ENOENT);
+ }
+ down(&ctxt->loc_sem);
+ disk_obd = ctxt->loc_exp->exp_obd;
+ cathandle = ctxt->loc_handle;
+ LASSERT(cathandle);
+
+ push_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
+ rc = llog_cat_cancel_records(cathandle, num_cookies, logcookies);
+ if (rc)
+ CERROR("cancel %d llog-records failed: %d\n", num_cookies, rc);
+ else
+ CWARN("cancel %d llog-records\n", num_cookies);
+
+ pop_ctxt(&saved, &disk_obd->obd_ctxt, NULL);
+ up(&ctxt->loc_sem);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_origin_handle_cancel);
+#endif
. ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
# Skip these tests
-ALWAYS_EXCEPT=""
+ALWAYS_EXCEPT="35"
gen_config() {
grep -q "tag-$i" $DIR/$tfile-$i || error "f1c-$i"
done
rm -rf $DIR/$tfile-*
- sleep 5
+ sleep 3
# waiting for commitment of removal
}
run_test 5 "|x| 220 open(O_CREAT)"
fail_abort mds
kill -USR1 $pid
[ -e $DIR/$tfile ] && return 1
- sleep 5
+ sleep 3
# wait for commitment of removal
return 0
}
run_test 34 "abort recovery before client does replay (test mds_cleanup_orphans)"
+# bug 2278 - generate one orphan on OST, then destroy it during recovery from llog
+test_35() {
+ touch $DIR/$tfile
+
+ echo 0x80000119 > /proc/sys/lustre/fail_loc
+ rm -f $DIR/$tfile &
+ sleep 1
+ # give a chance to remove from MDS
+ fail_abort mds
+ $CHECKSTAT -t file $DIR/$tfile && return 1 || true
+}
+run_test 35 "test recovery from llog for unlink op"
+
equals_msg test complete, cleaning up
cleanup