Whamcloud - gitweb
LU-16159 lod: cancel update llogs upon recovery abort 84/48584/26
authorLai Siyao <lai.siyao@whamcloud.com>
Sun, 28 Aug 2022 18:35:25 +0000 (14:35 -0400)
committerOleg Drokin <green@whamcloud.com>
Tue, 13 Dec 2022 16:07:08 +0000 (16:07 +0000)
If recovery is aborted, cancel update catalog from catlist, and keep
them on disk for some time (for debug purpose), as can avoid
accumulating stale update records, and also avoid recovery problems
if update llogs are corrupt.

Update llogs are canceled after recovery completes and before regular
request processing. For these logs, their ctime will be set, and log
header will be marked with LLOG_F_MAX_AGE|LLOG_F_RM_ON_ERR, and when
30 days passed, they will be removed automatically.

Tidy up recovery abort code:
* if obd_abort_recovery is set, or OBD is stopping, stop both
  client recovery and MDT recovery.
* otherwise if obd_abort_mdt_recovery is set, stop MDT recovery only.

lctl llog_print support printing update log FIDs used by specified
MDT:
* "lctl --device <MDT> llog_print update_log" will list all update
  llog FIDs used by this MDT device.

Disabled replay-single.sh 100c stripe check because abort_recovery
will cancel update llogs, and won't replay them upon next recovery.

Added replay-single.sh 100d.

Formatall in the end of replay-single.sh because directory unlink may
fail.

Test-Parameters: mdscount=2 mdtcount=4 testlist=replay-single,replay-single,replay-single,replay-single,replay-single,replay-single,replay-single,replay-single,replay-single
Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: Ie2bda6c097d65f5c51cba66c2dbf6ae4a5d36dda
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/48584
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Mikhail Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
12 files changed:
lustre/include/lustre_log.h
lustre/include/obd.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/ldlm/ldlm_lib.c
lustre/lod/lod_dev.c
lustre/mdd/mdd_device.c
lustre/mdt/mdt_handler.c
lustre/obdclass/dt_object.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_ioctl.c
lustre/tests/replay-single.sh

index 1fdca73..1ee2d24 100644 (file)
@@ -109,6 +109,7 @@ int llog_backup(const struct lu_env *env, struct obd_device *obd,
 int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
                     const struct obd_uuid *uuid);
 __u64 llog_size(const struct lu_env *env, struct llog_handle *llh);
+int llog_retain(const struct lu_env *env, struct llog_handle *log);
 
 /* llog_process flags */
 #define LLOG_FLAG_NODEAMON 0x0001
@@ -180,6 +181,8 @@ __u32 llog_cat_free_space(struct llog_handle *cat_llh);
 int llog_cat_reverse_process(const struct lu_env *env,
                             struct llog_handle *cat_llh, llog_cb_t cb,
                             void *data);
+int llog_cat_retain_cb(const struct lu_env *env, struct llog_handle *cat,
+                      struct llog_rec_hdr *rec, void *data);
 /* llog_obd.c */
 int llog_setup(const struct lu_env *env, struct obd_device *obd,
               struct obd_llog_group *olg, int index,
@@ -190,12 +193,20 @@ int llog_sync(struct llog_ctxt *ctxt, struct obd_export *exp, int flags);
 
 /* llog_ioctl.c */
 struct obd_ioctl_data;
+int llog_print_cb(const struct lu_env *env, struct llog_handle *handle,
+                 struct llog_rec_hdr *rec, void *data);
 int llog_ioctl(const struct lu_env *env, struct llog_ctxt *ctxt, int cmd,
               struct obd_ioctl_data *data);
 int llog_catalog_list(const struct lu_env *env, struct dt_device *d,
                      int count, struct obd_ioctl_data *data,
                      const struct lu_fid *fid);
 
+struct llog_print_data {
+       struct obd_ioctl_data *lprd_data;
+       unsigned int           lprd_cfg_flags;
+       bool                   lprd_raw;
+};
+
 /* llog_net.c */
 int llog_initiator_connect(struct llog_ctxt *ctxt);
 
index 5ce322b..3b8e6db 100644 (file)
@@ -612,8 +612,8 @@ struct obd_device {
                obd_attached:1,         /* finished attach */
                obd_set_up:1,           /* finished setup */
                obd_recovering:1,       /* there are recoverable clients */
-               obd_abort_recovery:1,   /* recovery expired */
-               obd_abort_recov_mdt:1,  /* only abort recovery between MDTs */
+               obd_abort_recovery:1,   /* abort client and MDT recovery */
+               obd_abort_mdt_recovery:1, /* abort recovery between MDTs */
                obd_version_recov:1,    /* obd uses version checking */
                obd_replayable:1,       /* recovery enabled; inform clients */
                obd_no_recov:1,         /* fail instead of retry messages */
@@ -761,6 +761,19 @@ int obd_nid_export_for_each(struct obd_device *obd, struct lnet_nid *nid,
                            void *data);
 int obd_nid_add(struct obd_device *obd, struct obd_export *exp);
 void obd_nid_del(struct obd_device *obd, struct obd_export *exp);
+
+/* both client and MDT recovery are aborted, or MDT is stopping  */
+static inline bool obd_recovery_abort(struct obd_device *obd)
+{
+       return obd->obd_stopping || obd->obd_abort_recovery;
+}
+
+/* MDT recovery is aborted, or MDT is stopping */
+static inline bool obd_mdt_recovery_abort(struct obd_device *obd)
+{
+       return obd->obd_stopping || obd->obd_abort_recovery ||
+              obd->obd_abort_mdt_recovery;
+}
 #endif
 
 /* get/set_info keys */
index 3de5098..74b725b 100644 (file)
@@ -2954,6 +2954,7 @@ enum llog_flag {
        LLOG_F_EXT_X_OMODE      = 0x100,
        LLOG_F_EXT_X_XATTR      = 0x200,
        LLOG_F_RM_ON_ERR        = 0x400,
+       LLOG_F_MAX_AGE          = 0x800,
 
        /* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from
         * catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here,
index b8973d1..2a7aa27 100644 (file)
@@ -49,6 +49,7 @@
 #include <lustre_dlm.h>
 #include <lustre_net.h>
 #include <lustre_sec.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
 #include "ldlm_internal.h"
 
 /*
@@ -1875,8 +1876,9 @@ void target_cleanup_recovery(struct obd_device *obd)
                EXIT;
                return;
        }
-       obd->obd_recovering = obd->obd_abort_recovery = 0;
-       obd->obd_abort_recov_mdt = 0;
+       obd->obd_recovering = 0;
+       obd->obd_abort_recovery = 0;
+       obd->obd_abort_mdt_recovery = 0;
        spin_unlock(&obd->obd_dev_lock);
 
        spin_lock(&obd->obd_recovery_task_lock);
@@ -2123,7 +2125,7 @@ static int check_for_next_transno(struct lu_target *lut)
                req_transno = lustre_msg_get_transno(req->rq_reqmsg);
        }
 
-       if (!obd->obd_abort_recov_mdt && tdtd)
+       if (!obd_mdt_recovery_abort(obd) && tdtd)
                update_transno = distribute_txn_get_next_transno(tdtd);
 
        connected = atomic_read(&obd->obd_connected_clients);
@@ -2137,13 +2139,13 @@ static int check_for_next_transno(struct lu_target *lut)
               connected, completed,
               queue_len, req_transno, next_transno);
 
-       if (obd->obd_abort_recovery) {
+       if (obd_recovery_abort(obd)) {
                CDEBUG(D_HA, "waking for aborted recovery\n");
                wake_up = 1;
        } else if (obd->obd_recovery_expired) {
                CDEBUG(D_HA, "waking for expired recovery\n");
                wake_up = 1;
-       } else if (!obd->obd_abort_recov_mdt && tdtd && req &&
+       } else if (!obd_mdt_recovery_abort(obd) && tdtd && req &&
                   is_req_replayed_by_update(req)) {
                LASSERTF(req_transno < next_transno,
                         "req_transno %llu next_transno%llu\n", req_transno,
@@ -2211,7 +2213,7 @@ static int check_update_llog(struct lu_target *lut)
        struct obd_device *obd = lut->lut_obd;
        struct target_distribute_txn_data *tdtd = lut->lut_tdtd;
 
-       if (obd->obd_abort_recovery) {
+       if (obd_mdt_recovery_abort(obd)) {
                CDEBUG(D_HA, "waking for aborted recovery\n");
                return 1;
        }
@@ -2255,7 +2257,7 @@ repeat:
                 * left in the queue
                 */
                spin_lock(&obd->obd_recovery_task_lock);
-               if (!obd->obd_abort_recov_mdt && lut->lut_tdtd) {
+               if (!obd_mdt_recovery_abort(obd) && lut->lut_tdtd) {
                        next_update_transno =
                                distribute_txn_get_next_transno(lut->lut_tdtd);
 
@@ -2279,7 +2281,7 @@ repeat:
                        }
                }
 
-               if (next_update_transno != 0 && !obd->obd_abort_recovery) {
+               if (next_update_transno != 0 && !obd_recovery_abort(obd)) {
                        obd->obd_next_recovery_transno = next_update_transno;
                        spin_unlock(&obd->obd_recovery_task_lock);
                        /*
@@ -2309,7 +2311,7 @@ repeat:
                                  cfs_time_seconds(60)) == 0)
                ; /* wait indefinitely for event, but don't trigger watchdog */
 
-       if (obd->obd_abort_recovery) {
+       if (obd_recovery_abort(obd)) {
                CWARN("recovery is aborted, evict exports in recovery\n");
                if (lut->lut_tdtd != NULL) {
                        tdtd = lut->lut_tdtd;
@@ -2474,7 +2476,7 @@ static int check_for_recovery_ready(struct lu_target *lut)
               atomic_read(&obd->obd_max_recoverable_clients),
               obd->obd_abort_recovery, obd->obd_recovery_expired);
 
-       if (!obd->obd_abort_recovery && !obd->obd_recovery_expired) {
+       if (!obd_recovery_abort(obd) && !obd->obd_recovery_expired) {
                LASSERT(clnts <=
                        atomic_read(&obd->obd_max_recoverable_clients));
                if (clnts + obd->obd_stale_clients <
@@ -2482,20 +2484,16 @@ static int check_for_recovery_ready(struct lu_target *lut)
                        return 0;
        }
 
-       if (!obd->obd_abort_recov_mdt && lut->lut_tdtd != NULL) {
-               if (!lut->lut_tdtd->tdtd_replay_ready &&
-                   !obd->obd_abort_recovery && !obd->obd_stopping) {
-                       /*
-                        * Let's extend recovery timer, in case the recovery
-                        * timer expired, and some clients got evicted
-                        */
-                       extend_recovery_timer(obd, obd->obd_recovery_timeout,
-                                             true);
-                       CDEBUG(D_HA,
-                              "%s update recovery is not ready, extend recovery %d\n",
-                              obd->obd_name, obd->obd_recovery_timeout);
-                       return 0;
-               }
+       if (!obd_mdt_recovery_abort(obd) && lut->lut_tdtd &&
+           !lut->lut_tdtd->tdtd_replay_ready) {
+               /* Let's extend recovery timer, in case the recovery timer
+                * expired, and some clients got evicted
+                */
+               extend_recovery_timer(obd, obd->obd_recovery_timeout, true);
+               CDEBUG(D_HA,
+                      "%s update recovery is not ready, extend recovery %d\n",
+                      obd->obd_name, obd->obd_recovery_timeout);
+               return 0;
        }
 
        return 1;
@@ -2534,7 +2532,7 @@ static __u64 get_next_transno(struct lu_target *lut, int *type)
        if (type != NULL)
                *type = REQUEST_RECOVERY;
 
-       if (!tdtd || obd->obd_abort_recov_mdt)
+       if (!tdtd || obd_mdt_recovery_abort(obd))
                RETURN(transno);
 
        update_transno = distribute_txn_get_next_transno(tdtd);
@@ -2821,13 +2819,20 @@ static int target_recovery_thread(void *arg)
        CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
        /** Update server last boot epoch */
        tgt_boot_epoch_update(lut);
+
+       /* cancel update llogs upon recovery abort */
+       if (obd->obd_abort_recovery || obd->obd_abort_mdt_recovery)
+               OBP(obd, iocontrol)(OBD_IOC_LLOG_CANCEL, obd->obd_self_export,
+                                   0, NULL, NULL);
+
        /*
         * We drop recoverying flag to forward all new requests
         * to regular mds_handle() since now
         */
        spin_lock(&obd->obd_dev_lock);
-       obd->obd_recovering = obd->obd_abort_recovery = 0;
-       obd->obd_abort_recov_mdt = 0;
+       obd->obd_recovering = 0;
+       obd->obd_abort_recovery = 0;
+       obd->obd_abort_mdt_recovery = 0;
        spin_unlock(&obd->obd_dev_lock);
        spin_lock(&obd->obd_recovery_task_lock);
        target_cancel_recovery_timer(obd);
index 1c72ce4..522cf18 100644 (file)
 #include <md_object.h>
 #include <lustre_fid.h>
 #include <uapi/linux/lustre/lustre_param.h>
+#include <uapi/linux/lustre/lustre_ioctl.h>
 #include <lustre_update.h>
 #include <lustre_log.h>
 #include <lustre_lmv.h>
+#include <llog_swab.h>
 
 #include "lod_internal.h"
 
@@ -275,12 +277,6 @@ struct lod_recovery_data {
        struct completion       *lrd_started;
 };
 
-static bool lod_recovery_abort(struct obd_device *top)
-{
-       return (top->obd_stopping || top->obd_abort_recovery ||
-               top->obd_abort_recov_mdt);
-}
-
 /**
  * process update recovery record
  *
@@ -336,7 +332,7 @@ static int lod_process_recovery_updates(const struct lu_env *env,
               PLOGID(&llh->lgh_id), rec->lrh_index);
        lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
 
-       if (lod_recovery_abort(lut->lut_obd))
+       if (obd_mdt_recovery_abort(lut->lut_obd))
                return -ESHUTDOWN;
 
        return insert_update_records_to_replay_list(lut->lut_tdtd,
@@ -344,6 +340,100 @@ static int lod_process_recovery_updates(const struct lu_env *env,
                                        cookie, index);
 }
 
+/* retain old catalog, create new catalog and update catlist */
+static int lod_sub_recreate_llog(const struct lu_env *env,
+                                struct lod_device *lod, struct dt_device *dt,
+                                int index)
+{
+       struct lod_thread_info *lti = lod_env_info(env);
+       struct llog_ctxt *ctxt;
+       struct llog_handle *lgh;
+       struct llog_catid *cid = &lti->lti_cid;
+       struct lu_fid *fid = &lti->lti_fid;
+       struct obd_device *obd;
+       int rc;
+
+       ENTRY;
+       lu_update_log_fid(fid, index);
+       rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
+       if (rc < 0)
+               RETURN(rc);
+
+       rc = llog_osd_get_cat_list(env, dt, index, 1, NULL, fid);
+       if (rc < 0) {
+               CERROR("%s: can't access update_log: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       obd = dt->dd_lu_dev.ld_obd;
+       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+       if (ctxt->loc_handle) {
+               /* retain old catalog */
+               llog_retain(env, ctxt->loc_handle);
+               llog_cat_close(env, ctxt->loc_handle);
+               LASSERT(!ctxt->loc_handle);
+       }
+
+       ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+       ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
+       rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
+       if (rc < 0)
+               GOTO(out_put, rc);
+
+       LASSERT(lgh != NULL);
+       rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
+       cid->lci_logid = lgh->lgh_id;
+       rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
+       ctxt->loc_handle = lgh;
+
+       CDEBUG(D_INFO, "%s: recreate catalog "DFID"\n",
+              obd->obd_name, PLOGID(&cid->lci_logid));
+out_close:
+       if (rc)
+               llog_cat_close(env, lgh);
+out_put:
+       llog_ctxt_put(ctxt);
+       RETURN(rc);
+}
+
+/* retain update catalog and llogs, and create a new catalog */
+static int lod_sub_cancel_llog(const struct lu_env *env,
+                              struct lod_device *lod, struct dt_device *dt,
+                              int index)
+{
+       struct llog_ctxt *ctxt;
+       int rc = 0;
+
+       ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+                               LLOG_UPDATELOG_ORIG_CTXT);
+       if (!ctxt)
+               return 0;
+
+       if (ctxt->loc_handle) {
+               LCONSOLE(D_INFO, "%s: cancel update llog "DFID"\n",
+                        dt->dd_lu_dev.ld_obd->obd_name,
+                        PLOGID(&ctxt->loc_handle->lgh_id));
+               /* set startcat to "lgh_last_idx + 1" to zap empty llogs */
+               llog_cat_process(env, ctxt->loc_handle, NULL, NULL,
+                                ctxt->loc_handle->lgh_last_idx + 1, 0);
+               /* set retention on logs to simplify reclamation */
+               llog_process_or_fork(env, ctxt->loc_handle, llog_cat_retain_cb,
+                                    NULL, NULL, false);
+               /* retain old catalog and create a new one */
+               lod_sub_recreate_llog(env, lod, dt, index);
+       }
+       llog_ctxt_put(ctxt);
+       return rc;
+}
+
 /**
  * recovery thread for update log
  *
@@ -404,7 +494,7 @@ again:
 
        top_device = lod->lod_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
        if (rc < 0 && dt != lod->lod_child &&
-           !lod_recovery_abort(top_device->ld_obd)) {
+           !obd_mdt_recovery_abort(top_device->ld_obd)) {
                if (rc == -EBADR) {
                        /* remote update llog is shorter than expected from
                         * local header. Cached copy could be de-synced during
@@ -433,20 +523,21 @@ again:
 
        llog_ctxt_put(ctxt);
        if (rc < 0) {
-               CERROR("%s get update log failed: rc = %d\n",
-                      dt->dd_lu_dev.ld_obd->obd_name, rc);
-               spin_lock(&top_device->ld_obd->obd_dev_lock);
-               if (!lod_recovery_abort(top_device->ld_obd))
-                       /* abort just MDT-MDT recovery */
-                       top_device->ld_obd->obd_abort_recov_mdt = 1;
-               spin_unlock(&top_device->ld_obd->obd_dev_lock);
-               GOTO(out, rc);
+               CERROR("%s: get update log duration %lld, retries %d, failed: rc = %d\n",
+                      dt->dd_lu_dev.ld_obd->obd_name,
+                      ktime_get_real_seconds() - start, retries, rc);
+               /* abort MDT recovery of this target, but not all targets,
+                * because recovery still has chance to succeed.
+                */
+               if (!obd_mdt_recovery_abort(top_device->ld_obd))
+                       lod_sub_cancel_llog(env, lod, dt, lrd->lrd_idx);
+       } else {
+               CDEBUG(D_HA,
+                      "%s retrieved update log, duration %lld, retries %d\n",
+                      dt->dd_lu_dev.ld_obd->obd_name,
+                      ktime_get_real_seconds() - start, retries);
        }
 
-       CDEBUG(D_HA, "%s retrieved update log, duration %lld, retries %d\n",
-              dt->dd_lu_dev.ld_obd->obd_name, ktime_get_real_seconds() - start,
-              retries);
-
        spin_lock(&lod->lod_lock);
        if (!lrd->lrd_ltd)
                lod->lod_child_got_update_log = 1;
@@ -455,13 +546,13 @@ again:
 
        if (!lod->lod_child_got_update_log) {
                spin_unlock(&lod->lod_lock);
-               GOTO(out, rc = 0);
+               GOTO(out, rc);
        }
 
        lod_foreach_mdt(lod, mdt) {
                if (!mdt->ltd_got_update_log) {
                        spin_unlock(&lod->lod_lock);
-                       GOTO(out, rc = 0);
+                       GOTO(out, rc);
                }
        }
        lut->lut_tdtd->tdtd_replay_ready = 1;
@@ -482,7 +573,7 @@ out:
                wait_var_event(lrd, kthread_should_stop());
        lu_env_fini(env);
        OBD_FREE_PTR(lrd);
-       return 0;
+       return rc;
 }
 
 /**
@@ -1165,6 +1256,167 @@ static int lod_sub_init_llogs(const struct lu_env *env, struct lod_device *lod)
        RETURN(rc);
 }
 
+#define UPDATE_LOG_MAX_AGE     (30 * 24 * 60 * 60)     /* 30 days, in sec */
+
+static int lod_update_log_stale(const struct lu_env *env, struct dt_object *dto,
+                               struct lu_buf *buf)
+{
+       struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+       struct llog_log_hdr *hdr;
+       loff_t off = 0;
+       int rc;
+
+       ENTRY;
+       rc = dt_attr_get(env, dto, attr);
+       if (rc)
+               RETURN(rc);
+
+       if (!(attr->la_valid & (LA_CTIME | LA_SIZE)))
+               RETURN(-EFAULT);
+
+       /* by default update log ctime is not set */
+       if (attr->la_ctime == 0)
+               RETURN(0);
+
+       /* update log not expired yet */
+       if (attr->la_ctime + UPDATE_LOG_MAX_AGE > ktime_get_real_seconds())
+               RETURN(0);
+
+       if (attr->la_size == 0)
+               RETURN(-EFAULT);
+
+       rc = dt_read(env, dto, buf, &off);
+       if (rc < 0)
+               RETURN(rc);
+
+       hdr = (struct llog_log_hdr *)buf->lb_buf;
+       if (LLOG_REC_HDR_NEEDS_SWABBING(&hdr->llh_hdr))
+               lustre_swab_llog_hdr(hdr);
+       /* log header is sane and flag LLOG_F_MAX_AGE|LLOG_F_RM_ON_ERR is set */
+       if (rc >= sizeof(*hdr) &&
+           hdr->llh_hdr.lrh_type == LLOG_HDR_MAGIC &&
+           (hdr->llh_flags & (LLOG_F_MAX_AGE | LLOG_F_RM_ON_ERR)) ==
+           (LLOG_F_MAX_AGE | LLOG_F_RM_ON_ERR))
+               RETURN(1);
+
+       RETURN(0);
+}
+
+/**
+ * Reclaim stale update log.
+ *
+ * When update log is canceld (upon recovery abort), it's not destroy, but
+ * canceled from catlist, and set ctime and LLOG_F_MAX_AGE|LLOG_F_RM_ON_ERR,
+ * which is kept for debug. If it expired (more than UPDATE_LOG_MAX_AGE seconds
+ * passed), destroy it to save space.
+ */
+static int lod_update_log_gc(const struct lu_env *env, struct lod_device *lod,
+                            struct dt_object *dir, struct dt_object *dto,
+                            const char *name)
+{
+       struct dt_device *dt = lod->lod_child;
+       struct thandle *th;
+       int rc;
+
+       ENTRY;
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_delete(env, dir, (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(out_trans, rc);
+
+       rc = dt_declare_destroy(env, dto, th);
+       if (rc)
+               GOTO(out_trans, rc);
+
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc)
+               GOTO(out_trans, rc);
+
+       rc = dt_delete(env, dir, (const struct dt_key *)name, th);
+       if (rc)
+               GOTO(out_trans, rc);
+
+       rc = dt_destroy(env, dto, th);
+       GOTO(out_trans, rc);
+out_trans:
+       dt_trans_stop(env, dt, th);
+
+       return rc;
+}
+
+/* reclaim stale update llogs under "update_log_dir" */
+static int lod_update_log_dir_gc(const struct lu_env *env,
+                                struct lod_device *lod,
+                                struct dt_object *dir)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct lu_buf *buf = &info->lti_linkea_buf;
+       struct lu_dirent *ent = (struct lu_dirent *)info->lti_key;
+       struct lu_fid *fid = &info->lti_fid;
+       struct dt_it *it;
+       const struct dt_it_ops *iops;
+       struct dt_object *dto;
+       int rc;
+
+       ENTRY;
+
+       if (unlikely(!dt_try_as_dir(env, dir, true)))
+               RETURN(-ENOTDIR);
+
+       lu_buf_alloc(buf, sizeof(struct llog_log_hdr));
+       if (!buf->lb_buf)
+               RETURN(-ENOMEM);
+
+       iops = &dir->do_index_ops->dio_it;
+       it = iops->init(env, dir, LUDA_64BITHASH);
+       if (IS_ERR(it))
+               GOTO(out, rc = PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc == 0)
+               rc = iops->next(env, it);
+       else if (rc > 0)
+               rc = 0;
+
+       while (rc == 0) {
+               rc = iops->rec(env, it, (struct dt_rec *)ent, LUDA_64BITHASH);
+               if (rc != 0)
+                       break;
+
+               ent->lde_namelen = le16_to_cpu(ent->lde_namelen);
+               if (ent->lde_name[0] == '.') {
+                       if (ent->lde_namelen == 1)
+                               goto next;
+
+                       if (ent->lde_namelen == 2 && ent->lde_name[1] == '.')
+                               goto next;
+               }
+
+               fid_le_to_cpu(fid, &ent->lde_fid);
+               dto = dt_locate(env, lod->lod_child, fid);
+               if (IS_ERR(dto))
+                       goto next;
+
+               buf->lb_len = sizeof(struct llog_log_hdr);
+               if (lod_update_log_stale(env, dto, buf) == 1)
+                       lod_update_log_gc(env, lod, dir, dto, ent->lde_name);
+               dt_object_put(env, dto);
+next:
+               rc = iops->next(env, it);
+       }
+
+       iops->put(env, it);
+       iops->fini(env, it);
+out:
+       buf->lb_len = sizeof(struct llog_log_hdr);
+       lu_buf_free(buf);
+
+       RETURN(rc > 0 ? 0 : rc);
+}
+
 /**
  * Implementation of lu_device_operations::ldo_prepare() for LOD
  *
@@ -1222,6 +1474,7 @@ static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
        if (IS_ERR(dto))
                GOTO(out_put, rc = PTR_ERR(dto));
 
+       lod_update_log_dir_gc(env, lod, dto);
        dt_object_put(env, dto);
 
        rc = lod_prepare_distribute_txn(env, lod);
@@ -2349,8 +2602,188 @@ static int lod_pool_del_q(struct obd_device *obd, char *poolname)
        return err;
 }
 
+static inline int lod_sub_print_llog(const struct lu_env *env,
+                                    struct dt_device *dt, void *data)
+{
+       struct llog_ctxt *ctxt;
+       int rc = 0;
+
+       ENTRY;
+       ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
+                               LLOG_UPDATELOG_ORIG_CTXT);
+       if (!ctxt)
+               RETURN(0);
+
+       if (ctxt->loc_handle) {
+               struct llog_print_data *lprd = data;
+               struct obd_ioctl_data *ioc_data = lprd->lprd_data;
+               int l, remains;
+               long from;
+               char *out;
+
+               LASSERT(ioc_data);
+               if (ioc_data->ioc_inllen1 > 0) {
+                       remains = ioc_data->ioc_inllen4 +
+                                 round_up(ioc_data->ioc_inllen1, 8) +
+                                 round_up(ioc_data->ioc_inllen2, 8) +
+                                 round_up(ioc_data->ioc_inllen3, 8);
+
+                       rc = kstrtol(ioc_data->ioc_inlbuf2, 0, &from);
+                       if (rc)
+                               GOTO(ctxt_put, rc);
+
+                       /* second iteration from jt_llog_print_iter() */
+                       if (from > 1)
+                               GOTO(ctxt_put, rc = 0);
+
+                       out = ioc_data->ioc_bulk;
+                       ioc_data->ioc_inllen1 = 0;
+               } else {
+                       out = ioc_data->ioc_bulk + ioc_data->ioc_offset;
+                       remains = ioc_data->ioc_count;
+               }
+
+               l = snprintf(out, remains, "%s [catalog]: "DFID"\n",
+                            ctxt->loc_obd->obd_name,
+                            PLOGID(&ctxt->loc_handle->lgh_id));
+               out += l;
+               remains -= l;
+               if (remains <= 0) {
+                       CERROR("%s: not enough space for print log records: rc = %d\n",
+                              ctxt->loc_obd->obd_name, -LLOG_EEMPTY);
+                       GOTO(ctxt_put, rc = -LLOG_EEMPTY);
+               }
+
+               ioc_data->ioc_offset += l;
+               ioc_data->ioc_count = remains;
+
+               rc = llog_process_or_fork(env, ctxt->loc_handle, llog_print_cb,
+                                         data, NULL, false);
+       }
+       GOTO(ctxt_put, rc);
+ctxt_put:
+       llog_ctxt_put(ctxt);
+
+       return rc;
+}
+
+/* print update catalog and update logs FID of all sub devices */
+static int lod_llog_print(const struct lu_env *env, struct lod_device *lod,
+                         void *data)
+{
+       struct lod_tgt_desc *mdt;
+       bool empty = true;
+       int rc = 0;
+
+       ENTRY;
+       rc = lod_sub_print_llog(env, lod->lod_child, data);
+       if (!rc) {
+               empty = false;
+       } else if (rc == -LLOG_EEMPTY) {
+               rc = 0;
+       } else {
+               CERROR("%s: llog_print failed: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       lod_getref(&lod->lod_mdt_descs);
+       lod_foreach_mdt(lod, mdt) {
+               rc = lod_sub_print_llog(env, mdt->ltd_tgt, data);
+               if (!rc) {
+                       empty = false;
+               } else if (rc == -LLOG_EEMPTY) {
+                       rc = 0;
+               } else {
+                       CERROR("%s: llog_print of MDT %u failed: rc = %d\n",
+                              lod2obd(lod)->obd_name, mdt->ltd_index, rc);
+                       break;
+               }
+       }
+       lod_putref(lod, &lod->lod_mdt_descs);
+
+       RETURN(rc ? rc : empty ? -LLOG_EEMPTY : 0);
+}
+
+/* cancel update catalog from update catlist */
+static int lod_llog_cancel(const struct lu_env *env, struct lod_device *lod)
+{
+       struct lod_tgt_desc *tgt;
+       int index;
+       int rc;
+       int rc2;
+
+       rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
+       if (rc < 0)
+               return rc;
+
+       rc = lod_sub_cancel_llog(env, lod, lod->lod_child, index);
+
+       lod_getref(&lod->lod_mdt_descs);
+       lod_foreach_mdt(lod, tgt) {
+               LASSERT(tgt && tgt->ltd_tgt);
+               rc2 = lod_sub_cancel_llog(env, lod, tgt->ltd_tgt,
+                                         tgt->ltd_index);
+               if (rc2 && !rc)
+                       rc = rc2;
+       }
+       lod_putref(lod, &lod->lod_mdt_descs);
+
+       return rc;
+}
+
+static int lod_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
+                        void *karg, void __user *uarg)
+{
+       struct obd_device *obd = exp->exp_obd;
+       struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
+       struct obd_ioctl_data *data = karg;
+       struct lu_env env;
+       int rc;
+
+       ENTRY;
+       rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: can't initialize env: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       switch (cmd) {
+       case OBD_IOC_LLOG_PRINT: {
+               struct llog_print_data lprd = {
+                       .lprd_data = data,
+                       .lprd_raw = data->ioc_u32_1,
+               };
+               char *logname;
+
+               logname = data->ioc_inlbuf1;
+               if (strcmp(logname, lod_update_log_name) != 0) {
+                       rc = -EINVAL;
+                       CERROR("%s: llog iocontrol support %s only: rc = %d\n",
+                              lod2obd(lod)->obd_name, lod_update_log_name, rc);
+                       RETURN(rc);
+               }
+
+               LASSERT(data->ioc_inllen1 > 0);
+               rc = lod_llog_print(&env, lod, &lprd);
+               break;
+       }
+       case OBD_IOC_LLOG_CANCEL:
+               rc = lod_llog_cancel(&env, lod);
+               break;
+       default:
+               CERROR("%s: unrecognized ioctl %#x by %s\n",
+                      obd->obd_name, cmd, current->comm);
+               rc = -ENOTTY;
+       }
+       lu_env_fini(&env);
+
+       RETURN(rc);
+}
+
 static const struct obd_ops lod_obd_device_ops = {
-       .o_owner        = THIS_MODULE,
+       .o_owner        = THIS_MODULE,
        .o_connect      = lod_obd_connect,
        .o_disconnect   = lod_obd_disconnect,
        .o_get_info     = lod_obd_get_info,
@@ -2359,6 +2792,7 @@ static const struct obd_ops lod_obd_device_ops = {
        .o_pool_rem     = lod_pool_remove_q,
        .o_pool_add     = lod_pool_add_q,
        .o_pool_del     = lod_pool_del_q,
+       .o_iocontrol    = lod_iocontrol,
 };
 
 static int __init lod_init(void)
index b295a0d..6fb4889 100644 (file)
@@ -2283,6 +2283,10 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
                                 (struct lfsck_query *)karg);
                RETURN(rc);
        }
+       case OBD_IOC_LLOG_PRINT:
+       case OBD_IOC_LLOG_CANCEL:
+               rc = obd_iocontrol(cmd, mdd->mdd_child_exp, len, karg, NULL);
+               RETURN(rc);
        }
 
        /* Below ioctls use obd_ioctl_data */
index 7ea83d6..0a413ed 100644 (file)
@@ -7405,17 +7405,17 @@ static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                         void *karg, void __user *uarg)
 {
-        struct lu_env      env;
-        struct obd_device *obd = exp->exp_obd;
-        struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
-        struct dt_device  *dt = mdt->mdt_bottom;
-        int rc;
+       struct lu_env      env;
+       struct obd_device *obd = exp->exp_obd;
+       struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+       struct dt_device  *dt = mdt->mdt_bottom;
+       int rc;
 
-        ENTRY;
-        CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
-        rc = lu_env_init(&env, LCT_MD_THREAD);
-        if (rc)
-                RETURN(rc);
+       ENTRY;
+       CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc)
+               RETURN(rc);
 
        switch (cmd) {
        case OBD_IOC_SYNC:
@@ -7432,7 +7432,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                if (data->ioc_type & OBD_FLG_ABORT_RECOV_MDT) {
                        CERROR("%s: Aborting MDT recovery\n",
                               mdt_obd_name(mdt));
-                       obd->obd_abort_recov_mdt = 1;
+                       obd->obd_abort_mdt_recovery = 1;
                        wake_up(&obd->obd_next_transno_waitq);
                } else { /* if (data->ioc_type & OBD_FLG_ABORT_RECOV_OST) */
                        /* lctl didn't set OBD_FLG_ABORT_RECOV_OST < 2.13.57 */
@@ -7444,13 +7444,15 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                rc = 0;
                break;
        }
-        case OBD_IOC_CHANGELOG_REG:
-        case OBD_IOC_CHANGELOG_DEREG:
-        case OBD_IOC_CHANGELOG_CLEAR:
+       case OBD_IOC_CHANGELOG_REG:
+       case OBD_IOC_CHANGELOG_DEREG:
+       case OBD_IOC_CHANGELOG_CLEAR:
+       case OBD_IOC_LLOG_PRINT:
+       case OBD_IOC_LLOG_CANCEL:
                rc = mdt->mdt_child->md_ops->mdo_iocontrol(&env,
                                                           mdt->mdt_child,
                                                           cmd, len, karg);
-                break;
+               break;
        case OBD_IOC_START_LFSCK: {
                struct md_device *next = mdt->mdt_child;
                struct obd_ioctl_data *data = karg;
@@ -7490,17 +7492,18 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                                                 data->ioc_inlbuf1);
                break;
        }
-        case OBD_IOC_GET_OBJ_VERSION: {
-                struct mdt_thread_info *mti;
-                mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
-                memset(mti, 0, sizeof *mti);
-                mti->mti_env = &env;
-                mti->mti_mdt = mdt;
-                mti->mti_exp = exp;
+       case OBD_IOC_GET_OBJ_VERSION: {
+               struct mdt_thread_info *mti;
 
-                rc = mdt_ioc_version_get(mti, karg);
-                break;
-        }
+               mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
+               memset(mti, 0, sizeof(*mti));
+               mti->mti_env = &env;
+               mti->mti_mdt = mdt;
+               mti->mti_exp = exp;
+
+               rc = mdt_ioc_version_get(mti, karg);
+               break;
+       }
        case OBD_IOC_CATLOGLIST: {
                struct mdt_thread_info *mti;
 
@@ -7516,8 +7519,8 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                        mdt_obd_name(mdt), cmd, rc);
        }
 
-        lu_env_fini(&env);
-        RETURN(rc);
+       lu_env_fini(&env);
+       RETURN(rc);
 }
 
 static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
index 1d0a39c..a7355de 100644 (file)
@@ -527,7 +527,8 @@ int dt_record_write(const struct lu_env *env, struct dt_object *dt,
        LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
        LASSERT(th != NULL);
        LASSERT(dt->do_body_ops);
-       LASSERT(dt->do_body_ops->dbo_write);
+       LASSERTF(dt->do_body_ops->dbo_write, DFID"\n",
+                PFID(lu_object_fid(&dt->do_lu)));
 
        size = dt->do_body_ops->dbo_write(env, dt, buf, pos, th);
        if (size < 0)
index f1b3424..81e2c9f 100644 (file)
@@ -231,7 +231,7 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        llh = loghandle->lgh_hdr;
 
-       CDEBUG(D_RPCTRACE, "Canceling %d records, first %d in log "DFID"\n",
+       CDEBUG(D_OTHER, "Canceling %d records, first %d in log "DFID"\n",
               num, index[0], PLOGID(&loghandle->lgh_id));
 
        dt = lu2dt_dev(loghandle->lgh_obj->do_lu.lo_dev);
@@ -267,7 +267,7 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
                        GOTO(out_unlock, rc = -EINVAL);
                }
                if (!__test_and_clear_bit_le(index[i], LLOG_HDR_BITMAP(llh))) {
-                       CDEBUG(D_RPCTRACE, "Catalog index %u already clear?\n",
+                       CDEBUG(D_OTHER, "Catalog index %u already clear?\n",
                               index[i]);
                        GOTO(out_unlock, rc = -ENOENT);
                }
@@ -1549,3 +1549,49 @@ __u64 llog_size(const struct lu_env *env, struct llog_handle *llh)
 }
 EXPORT_SYMBOL(llog_size);
 
+/* set llog ctime to current, and set LLOG_F_RM_ON_ERR|LLOG_F_MAX_AGE flag in
+ * log header. It will be reclaimed when expired (UPDATE_LOG_MAX_AGE old).
+ */
+int llog_retain(const struct lu_env *env, struct llog_handle *log)
+{
+       struct dt_object *dto = log->lgh_obj;
+       struct dt_device *dt = lu2dt_dev(log->lgh_obj->do_lu.lo_dev);
+       struct lu_attr la = { 0 };
+       struct thandle *th;
+       int rc;
+
+       la.la_ctime = ktime_get_real_seconds();
+       la.la_valid = LA_CTIME;
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               return PTR_ERR(th);
+
+       th->th_wait_submit = 1;
+       log->lgh_hdr->llh_flags |= LLOG_F_MAX_AGE | LLOG_F_RM_ON_ERR;
+       rc = llog_declare_write_rec(env, log, &log->lgh_hdr->llh_hdr, -1, th);
+       if (rc)
+               goto out_trans;
+
+       rc = dt_declare_attr_set(env, dto, &la, th);
+       if (rc)
+               goto out_trans;
+
+       rc = dt_trans_start_local(env, dt, th);
+       if (rc)
+               goto out_trans;
+
+       rc = llog_write_rec(env, log, &log->lgh_hdr->llh_hdr, NULL,
+                           LLOG_HEADER_IDX, th);
+       if (rc)
+               goto out_trans;
+
+       rc = dt_attr_set(env, dto, &la, th);
+out_trans:
+       dt_trans_stop(env, dt, th);
+
+       CDEBUG(D_OTHER, "retain log "DFID" rc = %d\n",
+              PLOGID(&log->lgh_id), rc);
+       return rc;
+}
+EXPORT_SYMBOL(llog_retain);
index c20fcdd..a0078d2 100644 (file)
@@ -1180,3 +1180,30 @@ int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
                       PLOGID(&cathandle->lgh_id));
        return rc;
 }
+
+/* retain log in catalog, and zap it if log is empty */
+int llog_cat_retain_cb(const struct lu_env *env, struct llog_handle *cat,
+                      struct llog_rec_hdr *rec, void *data)
+{
+       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
+       struct llog_handle *log;
+       int rc;
+
+       if (rec->lrh_type != LLOG_LOGID_MAGIC)
+               return -EINVAL;
+
+       rc = llog_cat_id2handle(env, cat, &log, &lir->lid_id);
+       if (rc) {
+               CDEBUG(D_IOCTL, "cannot find log "DFID"\n",
+                      PLOGID(&lir->lid_id));
+               return -ENOENT;
+       }
+
+       llog_retain(env, log);
+       llog_handle_put(env, log);
+
+       return rc;
+}
+EXPORT_SYMBOL(llog_cat_retain_cb);
+
+
index b215ac6..e1afb48 100644 (file)
@@ -211,15 +211,9 @@ static inline bool llog_idx_is_eof(struct llog_handle *llh, __u32 cur_idx)
        return cur_idx >= last_idx;
 }
 
-struct llog_print_data {
-       struct obd_ioctl_data *lprd_data;
-       unsigned int           lprd_cfg_flags;
-       bool                   lprd_raw;
-};
-
 #define MARKER_DIFF    10
-static int llog_print_cb(const struct lu_env *env, struct llog_handle *handle,
-                        struct llog_rec_hdr *rec, void *data)
+int llog_print_cb(const struct lu_env *env, struct llog_handle *handle,
+                 struct llog_rec_hdr *rec, void *data)
 {
        struct llog_print_data *lprd = data;
        struct obd_ioctl_data *ioc_data = lprd->lprd_data;
@@ -250,6 +244,11 @@ static int llog_print_cb(const struct lu_env *env, struct llog_handle *handle,
 
                out = ioc_data->ioc_bulk;
                ioc_data->ioc_inllen1 = 0;
+               ioc_data->ioc_offset = 0;
+               ioc_data->ioc_count = remains;
+       } else if (ioc_data) {
+               out = ioc_data->ioc_bulk + ioc_data->ioc_offset;
+               remains = ioc_data->ioc_count;
        }
 
        cur_index = rec->lrh_index;
@@ -298,8 +297,18 @@ static int llog_print_cb(const struct lu_env *env, struct llog_handle *handle,
                RETURN(-LLOG_EEMPTY);
        }
 
+       if (ioc_data) {
+               /* save offset and remains, then we don't always rely on those
+                * static variables, which is more flexible.
+                */
+               ioc_data->ioc_offset += l;
+               ioc_data->ioc_count = remains;
+       }
+
        RETURN(0);
 }
+EXPORT_SYMBOL(llog_print_cb);
+
 static int llog_remove_log(const struct lu_env *env, struct llog_handle *cat,
                           struct llog_logid *logid)
 {
@@ -340,7 +349,6 @@ static int llog_delete_cb(const struct lu_env *env, struct llog_handle *handle,
        RETURN(rc);
 }
 
-
 int llog_ioctl(const struct lu_env *env, struct llog_ctxt *ctxt, int cmd,
               struct obd_ioctl_data *data)
 {
index 11176e8..5ccbbed 100755 (executable)
@@ -3544,11 +3544,10 @@ test_100b() {
 run_test 100b "DNE: create striped dir, fail MDT0"
 
 test_100c() {
-       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
-       ([ $FAILURE_MODE == "HARD" ] &&
-               [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) &&
-               skip "MDTs needs to be on diff hosts for HARD fail mode" &&
-               return 0
+       (( $MDSCOUNT >= 2 )) || skip "needs >= 2 MDTs"
+       [[ "$FAILURE_MODE" != "HARD" ||
+          "$(facet_host mds1)" != "$(facet_host mds2)" ]] ||
+               skip "MDTs needs to be on diff hosts for HARD fail mode"
 
        local striped_dir=$DIR/$tdir/striped_dir
 
@@ -3564,13 +3563,47 @@ test_100c() {
        fail_abort mds2 abort_recov_mdt
 
        createmany -o $striped_dir/f-%d 20 &&
-                       error "createmany -o $DIR/$tfile should fail"
+               error "createmany -o $DIR/$tfile should fail"
 
        fail mds2
-       striped_dir_check_100 || error "striped dir check failed"
-       rm -rf $DIR/$tdir || error "rmdir failed"
+
+       # $striped_dir creation partly fails due to abort_recov_mdt,
+       # but at least this directory should be able to be deleted
+       #$LFS rm_entry $striped_dir
+       #rm -rf $DIR/$tdir || error "rmdir failed"
+}
+run_test 100c "DNE: create striped dir, abort_recov_mdt mds2"
+
+test_100d() {
+       (( $MDSCOUNT > 1 )) || skip "needs > 1 MDTs"
+       (( $MDS1_VERSION >= $(version_code 2.15.52.144) )) ||
+               skip "Need MDS version 2.15.52.144+"
+
+       test_mkdir -c $MDSCOUNT $DIR/$tdir || error "mkdir $tdir failed"
+       $LFS setdirstripe -D -i -1 -c $MDSCOUNT $DIR/$tdir ||
+               error "set $tdir default LMV failed"
+       createmany -d $DIR/$tdir/s 100 || error "create subdir failed"
+
+       local index=$((RANDOM % MDSCOUNT))
+       local devname=$(mdtname_from_index $index)
+       local mdt=mds$((index + 1))
+
+       local count
+       local log
+
+       # cancel update llog upon recovery abort
+       do_facet $mdt $LCTL --device $devname llog_print update_log
+       log=$(do_facet $mdt "$LCTL --device $devname llog_print update_log |
+               awk '/index/ { print \\\$4; exit }'")
+       log=${log:1:-1}
+       count=$(do_facet $mdt "$LCTL --device $devname llog_print update_log |
+               grep -c index")
+       (( count > 0 )) || error "no update logs found"
+       fail_abort $mdt || error "fail_abort $mdt failed"
+       wait_update_facet $mdt "$LCTL --device $devname llog_print update_log |
+               grep -c index" 0 60 || error "update logs not canceled"
 }
-run_test 100c "DNE: create striped dir, fail MDT0"
+run_test 100d "DNE: cancel update logs upon recovery abort"
 
 test_101() { #LU-5648
        mkdir -p $DIR/$tdir/d1
@@ -5002,6 +5035,10 @@ test_135() {
 }
 run_test 135 "Server failure in lock replay phase"
 
+# LU-16159 abort_recovery may cause directory unlink fail, now that LFSCK can't
+# fix all the inconsistencies, formatall so it won't fail in cleanup
+(( $MDS1_VERSION >= $(version_code 2.15.52.63) )) && formatall
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status