Whamcloud - gitweb
LU-15737 ofd: don't block destroys 98/55598/5
authorAlexander Boyko <alexander.boyko@hpe.com>
Sat, 15 Jun 2024 10:04:34 +0000 (06:04 -0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 8 Aug 2024 00:16:36 +0000 (00:16 +0000)
ofd_destoy_by_fid could sleep infinite for a GROUP lock
conflict. If all MDT osp_sync_inflight is spend for such destroys,
MDT would not be able to send destroys and setattr. And as a result
OST free space leakage.

This fix makes ldlm_cli_enqueue_local nonblocking for group locks,
and adds MDT repeat part of sync requests with errors.
Also patch adds a debugfs file to check hanged osp jobs.
lctl get_param osp.lustre-OST0000-osc-MDT0000.error_list

Adds recovery-small 160. It reproduces a situation when
MDT sends object destroys and it hangs at OST side,
because of conflicting GROUP lock.

Lustre: ll_ost02_068: service thread pid 51278 was inactive for
204.776 seconds. The thread might be hung...
Call Trace TBD:
ldlm_completion_ast+0x7ac/0x900 [ptlrpc]
ldlm_cli_enqueue_local+0x307/0x880 [ptlrpc]
ofd_destroy_by_fid+0x235/0x4a0 [ofd]
ofd_destroy_hdl+0x263/0xa10 [ofd]
tgt_request_handle+0xcc9/0x1a20 [ptlrpc]
ptlrpc_server_handle_request+0x23f/0xc60 [ptlrpc]
ptlrpc_main+0xc8b/0x15d0 [ptlrpc]

HPE-bug-id: LUS-12350
Signed-off-by: Alexander Boyko <alexander.boyko@hpe.com>
Change-Id: I396bf48d3d29f058f65095cbb4dbba11581534cc
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/55598
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andriy Skulysh <andriy.skulysh@hpe.com>
Reviewed-by: Vitaly Fertman <vitaly.fertman@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/ldlm/ldlm_extent.c
lustre/ofd/ofd_obd.c
lustre/osp/lproc_osp.c
lustre/osp/osp_internal.h
lustre/osp/osp_sync.c
lustre/tests/recovery-small.sh

index 3a48d81..7c3ee08 100644 (file)
@@ -755,9 +755,11 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req,
        RETURN(compat);
 destroylock:
        list_del_init(&req->l_res_link);
+       if (ldlm_is_local(req))
+               ldlm_lock_decref_internal_nolock(req, req_mode);
        ldlm_lock_destroy_nolock(req);
        RETURN(compat);
-       }
+}
 
 /**
  * This function refresh eviction timer for cancelled lock.
index fbc55a8..0597946 100644 (file)
@@ -919,7 +919,7 @@ int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
        struct lustre_handle lockh;
        union ldlm_policy_data policy = { .l_extent = { 0, OBD_OBJECT_EOF } };
        struct ofd_object *fo;
-       __u64 flags = LDLM_FL_AST_DISCARD_DATA;
+       __u64 flags = LDLM_FL_AST_DISCARD_DATA | LDLM_FL_BLOCK_NOWAIT;
        __u64 rc = 0;
 
        ENTRY;
@@ -939,14 +939,18 @@ int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
                                    NULL, NULL, 0, LVB_T_NONE, NULL, &lockh);
 
        /* We only care about the side-effects, just drop the lock. */
-       if (rc == ELDLM_OK)
+       if (rc == ELDLM_OK) {
                ldlm_lock_decref(&lockh, LCK_PW);
 
-       LASSERT(fo != NULL);
-
-       rc = ofd_destroy(env, fo, orphan);
-       EXIT;
+               LASSERT(fo != NULL);
 
+               rc = ofd_destroy(env, fo, orphan);
+               EXIT;
+       } else if (rc == -EAGAIN) {
+               CDEBUG(D_HA, "%s: Object "DFID" is locked, skiping destroy\n",
+                      ofd->ofd_osd_exp->exp_obd->obd_name, PFID(fid));
+               rc = -EIO;
+       }
        ofd_object_put(env, fo);
        RETURN(rc);
 }
index 90f124b..470e34e 100644 (file)
@@ -912,7 +912,6 @@ static ssize_t osp_conn_uuid_show(struct kobject *kobj, struct attribute *attr,
 
        return count;
 }
-
 LUSTRE_ATTR(ost_conn_uuid, 0444, osp_conn_uuid_show, NULL);
 LUSTRE_ATTR(mdt_conn_uuid, 0444, osp_conn_uuid_show, NULL);
 
@@ -1143,6 +1142,31 @@ static ssize_t force_sync_store(struct kobject *kobj, struct attribute *attr,
 }
 LUSTRE_WO_ATTR(force_sync);
 
+static int osp_sync_error_list_seq_show(struct seq_file *m, void *data)
+{
+       struct obd_device       *dev = m->private;
+       struct osp_device       *osp = lu2osp_dev(dev->obd_lu_dev);
+       struct osp_job_args     *ja;
+
+       if (osp == NULL)
+               return -EINVAL;
+
+       if (list_empty(&osp->opd_sync_error_list))
+               return 0;
+
+       spin_lock(&osp->opd_sync_lock);
+       list_for_each_entry(ja, &osp->opd_sync_error_list, ja_error_link) {
+               seq_printf(m, "%u ["DOSTID"] llog=["DOSTID"] idx %d\n",
+                          ja->ja_op, POSTID(&ja->ja_body.oa.o_oi),
+                          POSTID(&ja->ja_lcookie.lgc_lgl.lgl_oi),
+                          ja->ja_lcookie.lgc_index);
+       }
+       spin_unlock(&osp->opd_sync_lock);
+
+       return 0;
+}
+LDEBUGFS_SEQ_FOPS_RO(osp_sync_error_list);
+
 static struct ldebugfs_vars ldebugfs_osp_obd_vars[] = {
        { .name =       "connect_flags",
          .fops =       &osp_connect_flags_fops         },
@@ -1154,6 +1178,8 @@ static struct ldebugfs_vars ldebugfs_osp_obd_vars[] = {
          .fops =       &osp_import_fops                },
        { .name =       "state",
          .fops =       &osp_state_fops                 },
+       { .name =       "error_list",
+         .fops =       &osp_sync_error_list_fops       },
        { NULL }
 };
 
index 4ed3ac1..c1eecaa 100644 (file)
@@ -221,6 +221,10 @@ struct osp_device {
        struct list_head                 opd_sync_in_flight_list;
        /* list of remotely committed rpc */
        struct list_head                 opd_sync_committed_there;
+       /* list for failed rpcs */
+       struct list_head                 opd_sync_error_list;
+       atomic_t                         opd_sync_error_count;
+
        /* number of RPCs in flight - flow control */
        atomic_t                         opd_sync_rpcs_in_flight;
        int                              opd_sync_max_rpcs_in_flight;
@@ -392,6 +396,14 @@ struct osp_thandle {
        atomic_t                 ot_refcount;
 };
 
+struct osp_job_args {
+       struct list_head                ja_error_link;
+       struct llog_cookie              ja_lcookie;
+       struct ost_body                 ja_body;
+       enum ost_cmd                    ja_op;
+       ktime_t                         ja_time;
+};
+
 static inline struct osp_thandle *
 thandle_to_osp_thandle(struct thandle *th)
 {
index aa993d2..b941fea 100644 (file)
@@ -110,9 +110,10 @@ static inline int osp_sync_has_new_job(struct osp_device *d)
 static inline int osp_sync_in_flight_conflict(struct osp_device *d,
                                             struct llog_rec_hdr *h)
 {
-       struct osp_job_req_args *jra;
-       struct ost_id            ostid;
-       int                      conflict = 0;
+       struct osp_job_req_args *jra;
+       struct ost_id ostid;
+       struct osp_job_args *ja;
+       int conflict = 0;
 
        if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
            list_empty(&d->opd_sync_in_flight_list))
@@ -157,6 +158,13 @@ static inline int osp_sync_in_flight_conflict(struct osp_device *d,
                LASSERT(body);
 
                if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
+                       spin_unlock(&d->opd_sync_lock);
+                       return 1;
+               }
+       }
+
+       list_for_each_entry(ja, &d->opd_sync_error_list, ja_error_link) {
+               if (memcmp(&ostid, &ja->ja_body.oa.o_oi, sizeof(ostid)) == 0) {
                        conflict = 1;
                        break;
                }
@@ -234,6 +242,27 @@ static inline __u64 osp_sync_correct_id(struct osp_device *d,
        return correct_id;
 }
 
+static inline bool osp_sync_can_send_delayed(struct osp_device *d)
+{
+       struct osp_job_args *ja;
+       ktime_t now = ktime_get();
+
+       ENTRY;
+
+       if (likely(list_empty(&d->opd_sync_error_list)))
+               RETURN(false);
+
+       ja = list_first_entry(&d->opd_sync_error_list, typeof(*ja),
+                            ja_error_link);
+
+       if (ktime_before(ja->ja_time, now)) {
+               CDEBUG(D_INFO, "can send delayed osp job %px "DOSTID"\n", ja,
+                      POSTID(&ja->ja_body.oa.o_oi));
+               RETURN(true);
+       }
+       RETURN(false);
+}
+
 /**
  * Check and return ready-for-new status.
  *
@@ -265,6 +294,8 @@ static inline int osp_sync_can_process_new(struct osp_device *d,
                return 0;
        if (d->opd_sync_prev_done == 0)
                return 1;
+       if (osp_sync_can_send_delayed(d))
+               return 1;
        if (atomic_read(&d->opd_sync_changes) == 0)
                return 0;
        if (rec == NULL)
@@ -567,6 +598,7 @@ static int osp_sync_interpret(const struct lu_env *env,
                wake_up(&d->opd_sync_waitq);
        } else if (rc) {
                struct obd_import *imp = req->rq_import;
+               struct osp_job_args *ja = NULL;
                /*
                 * error happened, we'll try to repeat on next boot ?
                 */
@@ -582,7 +614,30 @@ static int osp_sync_interpret(const struct lu_env *env,
                        LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) > 0);
                        atomic_dec(&d->opd_sync_rpcs_in_progress);
                }
-
+               /* limit error list by 1K objects */
+               if (atomic_read(&d->opd_sync_error_count) < 1000)
+                       OBD_ALLOC_PTR(ja);
+               if (likely(ja)) {
+                       struct ost_body *body;
+
+                       body = req_capsule_client_get(&req->rq_pill,
+                                                     &RMF_OST_BODY);
+                       memcpy(&ja->ja_body, &body->oa, sizeof(ja->ja_body));
+
+                       ja->ja_lcookie = jra->jra_lcookie;
+                       ja->ja_op = lustre_msg_get_opc(req->rq_reqmsg);
+                       INIT_LIST_HEAD(&ja->ja_error_link);
+                       /* repeat an operation after OBD_TIMEOUT / 10 */
+                       ja->ja_time = ktime_add_ms(ktime_get(),
+                                       obd_timeout / 10 * MSEC_PER_SEC);
+                       spin_lock(&d->opd_sync_lock);
+                       list_add_tail(&ja->ja_error_link,
+                                     &d->opd_sync_error_list);
+                       spin_unlock(&d->opd_sync_lock);
+                       atomic_inc(&d->opd_sync_error_count);
+                       CDEBUG(D_INFO, "Added %px object "DOSTID"\n", ja,
+                              POSTID(&ja->ja_body.oa.o_oi));
+               }
                wake_up(&d->opd_sync_waitq);
        } else if (d->opd_pre != NULL &&
                   unlikely(d->opd_pre_status == -ENOSPC)) {
@@ -660,12 +715,14 @@ static void osp_sync_send_new_rpc(struct osp_device *d,
  * \retval ERR_PTR(errno)      on error
  */
 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
-                                              enum ost_cmd op,
-                                              const struct req_format *format)
+                                              enum ost_cmd op)
 {
-       struct ptlrpc_request   *req;
-       struct obd_import       *imp;
-       int                      rc;
+       struct ptlrpc_request *req;
+       struct obd_import *imp;
+       struct req_format *format;
+       int rc;
+
+       ENTRY;
 
        /* Prepare the request */
        imp = d->opd_obd->u.cli.cl_import;
@@ -674,6 +731,17 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
        if (CFS_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
                RETURN(ERR_PTR(-ENOMEM));
 
+       switch (op) {
+       case OST_DESTROY:
+               format = &RQF_OST_DESTROY;
+               break;
+       case OST_SETATTR:
+               format = &RQF_OST_SETATTR;
+               break;
+       default:
+               RETURN(ERR_PTR(-EOPNOTSUPP));
+       };
+
        req = ptlrpc_request_alloc(imp, format);
        if (req == NULL)
                RETURN(ERR_PTR(-ENOMEM));
@@ -681,7 +749,7 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
        if (rc) {
                ptlrpc_req_put(req);
-               return ERR_PTR(rc);
+               RETURN(ERR_PTR(rc));
        }
 
        req->rq_interpret_reply = osp_sync_interpret;
@@ -690,7 +758,7 @@ static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
 
        ptlrpc_request_set_replen(req);
 
-       return req;
+       RETURN(req);
 }
 
 /**
@@ -731,7 +799,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
                RETURN(1);
        }
 
-       req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
+       req = osp_sync_new_job(d, OST_SETATTR);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
@@ -785,7 +853,7 @@ static int osp_sync_new_unlink_job(struct osp_device *d,
        ENTRY;
        LASSERT(h->lrh_type == MDS_UNLINK_REC);
 
-       req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
+       req = osp_sync_new_job(d, OST_DESTROY);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
@@ -833,7 +901,7 @@ static int osp_sync_new_unlink64_job(struct osp_device *d,
 
        ENTRY;
        LASSERT(h->lrh_type == MDS_UNLINK64_REC);
-       req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
+       req = osp_sync_new_job(d, OST_DESTROY);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
@@ -850,6 +918,47 @@ static int osp_sync_new_unlink64_job(struct osp_device *d,
        RETURN(0);
 }
 
+static int osp_sync_new_err_job(struct osp_device *d,
+                               struct osp_job_args *ja)
+{
+       struct ptlrpc_request *req = NULL;
+       struct ost_body *body;
+       struct osp_job_req_args *jra;
+
+       ENTRY;
+
+       req = osp_sync_new_job(d, ja->ja_op);
+       if (IS_ERR(req))
+               RETURN(PTR_ERR(req));
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL) {
+               ptlrpc_req_put(req);
+               RETURN(-EFAULT);
+       }
+       memcpy(body, &ja->ja_body, sizeof(*body));
+
+       LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
+               d->opd_sync_max_rpcs_in_flight);
+
+       jra = ptlrpc_req_async_args(jra, req);
+       jra->jra_magic = OSP_JOB_MAGIC;
+       memcpy(&jra->jra_lcookie, &ja->ja_lcookie, sizeof(jra->jra_lcookie));
+       INIT_LIST_HEAD(&jra->jra_committed_link);
+       spin_lock(&d->opd_sync_lock);
+       list_add_tail(&jra->jra_in_flight_link, &d->opd_sync_in_flight_list);
+       spin_unlock(&d->opd_sync_lock);
+
+       CDEBUG(D_HA, "%s repeating %d operation object "DOSTID"\n",
+              d->opd_obd->obd_name, ja->ja_op, POSTID(&ja->ja_body.oa.o_oi));
+
+       atomic_dec(&d->opd_sync_error_count);
+       OBD_FREE_PTR(ja);
+       ptlrpcd_add_req(req);
+
+       RETURN(0);
+
+}
 /**
  * Process llog records.
  *
@@ -902,7 +1011,6 @@ static void osp_sync_process_record(const struct lu_env *env,
 
                RETURN_EXIT;
        }
-
        /*
         * now we prepare and fill requests to OST, put them on the queue
         * and fire after next commit callback
@@ -1111,6 +1219,41 @@ static void osp_sync_process_committed(const struct lu_env *env,
        EXIT;
 }
 
+static bool osp_sync_process_error_list(const struct lu_env *env,
+                                       struct osp_device *d)
+{
+       struct osp_job_args *ja;
+       ktime_t now = ktime_get();
+
+       if (likely(list_empty(&d->opd_sync_error_list)))
+               return false;
+
+       spin_lock(&d->opd_sync_lock);
+       ja = list_first_entry(&d->opd_sync_error_list, typeof(*ja),
+                            ja_error_link);
+       if (ktime_before(ja->ja_time, now))
+               list_del_init(&ja->ja_error_link);
+       else
+               ja = NULL;
+       spin_unlock(&d->opd_sync_lock);
+
+       if (ja) {
+               /* same counters as osp_sync_process_record */
+               atomic_inc(&d->opd_sync_rpcs_in_flight);
+               atomic_inc(&d->opd_sync_rpcs_in_progress);
+               if (osp_sync_new_err_job(d, ja)) {
+                       atomic_dec(&d->opd_sync_rpcs_in_flight);
+                       atomic_dec(&d->opd_sync_rpcs_in_progress);
+                       /* insert a ja back to list in case of error */
+                       spin_lock(&d->opd_sync_lock);
+                       list_add(&ja->ja_error_link, &d->opd_sync_error_list);
+                       spin_unlock(&d->opd_sync_lock);
+               }
+               return true;
+       }
+       return false;
+}
+
 /**
  * The core of the syncing mechanism.
  *
@@ -1132,7 +1275,8 @@ static int osp_sync_process_queues(const struct lu_env *env,
                                   struct llog_rec_hdr *rec,
                                   void *data)
 {
-       struct osp_device       *d = data;
+       struct osp_device *d = data;
+       long timeout = 1;
 
        do {
                if (!d->opd_sync_task) {
@@ -1146,22 +1290,35 @@ static int osp_sync_process_queues(const struct lu_env *env,
                /* if we there are changes to be processed and we have
                 * resources for this ... do now */
                if (osp_sync_can_process_new(d, rec)) {
-                       if (llh == NULL) {
-                               /* ask llog for another record */
-                               return 0;
+                       if (osp_sync_process_error_list(env, d))
+                               continue;
+
+                       if (timeout) {
+                               if (llh == NULL) {
+                                       /* ask llog for another record */
+                                       return 0;
+                               }
+                               osp_sync_process_record(env, d, llh, rec);
+                               llh = NULL;
+                               rec = NULL;
                        }
-                       osp_sync_process_record(env, d, llh, rec);
-                       llh = NULL;
-                       rec = NULL;
                }
                if (CFS_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK) &&
                            cfs_fail_val != 1)
                        msleep(1 * MSEC_PER_SEC);
 
-               wait_event_idle(d->opd_sync_waitq,
+               if (list_empty(&d->opd_sync_error_list)) {
+                       wait_event_idle(d->opd_sync_waitq,
                                !d->opd_sync_task ||
                                osp_sync_can_process_new(d, rec) ||
                                !list_empty(&d->opd_sync_committed_there));
+               } else {
+                       timeout = wait_event_idle_timeout(d->opd_sync_waitq,
+                               !d->opd_sync_task ||
+                               osp_sync_can_process_new(d, rec) ||
+                               !list_empty(&d->opd_sync_committed_there),
+                               cfs_time_seconds(1));
+               }
        } while (1);
 }
 
@@ -1193,14 +1350,15 @@ struct osp_sync_args {
  */
 static int osp_sync_thread(void *_args)
 {
-       struct osp_sync_args    *args = _args;
-       struct osp_device       *d = args->osa_dev;
-       struct llog_ctxt        *ctxt;
-       struct obd_device       *obd = d->opd_obd;
-       struct llog_handle      *llh;
-       struct lu_env           *env = &args->osa_env;
-       int                      rc, count;
-       bool                     wrapped;
+       struct osp_sync_args *args = _args;
+       struct osp_device *d = args->osa_dev;
+       struct llog_ctxt *ctxt;
+       struct obd_device *obd = d->opd_obd;
+       struct llog_handle *llh;
+       struct lu_env *env = &args->osa_env;
+       struct osp_job_args *ja, *tmp;
+       int rc, count;
+       bool wrapped;
 
        ENTRY;
 
@@ -1306,6 +1464,13 @@ wait:
        rc = llog_cleanup(env, ctxt);
        if (rc)
                CERROR("can't cleanup llog: %d\n", rc);
+       list_for_each_entry_safe(ja, tmp, &d->opd_sync_error_list,
+                                ja_error_link) {
+               CDEBUG(D_HA, "%s: failed to sync object "DOSTID"\n",
+                      d->opd_obd->obd_name, POSTID(&ja->ja_body.oa.o_oi));
+               list_del(&ja->ja_error_link);
+               OBD_FREE_PTR(ja);
+       }
 out:
        LASSERTF(atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
                 "%s: %d %d %sempty\n", d->opd_obd->obd_name,
@@ -1508,6 +1673,8 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d)
        init_waitqueue_head(&d->opd_sync_barrier_waitq);
        INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
        INIT_LIST_HEAD(&d->opd_sync_committed_there);
+       INIT_LIST_HEAD(&d->opd_sync_error_list);
+       atomic_set(&d->opd_sync_error_count, 0);
 
        if (d->opd_storage->dd_rdonly)
                RETURN(0);
index dd971ca..8881494 100755 (executable)
@@ -3685,6 +3685,46 @@ test_158a() {
 }
 run_test 158a "connect without access right"
 
+test_160() {
+       (( $MDS1_VERSION >= $(version_code 2.15.62) )) ||
+               skip "Need MDS version at least 2.15.62 for group lock destroy fix"
+
+       local threads=10
+       local size=$((1024*100))
+       local timeout=10
+       local pids
+
+       test_mkdir -i 0 -c 1 $DIR/$tdir
+       $LFS setstripe -c 1 -i 0 $DIR/$tdir
+
+       for ((i = 1; i <= threads; i++)); do
+               local file=$DIR/$tdir/file_$i
+               #open/group lock/write/unlink/pause 20s/group unlock/close
+               $MULTIOP $file OG1234w10240u_20g1234c &
+               pids[$i]=$!
+       done
+       sleep 2
+
+       #evict client from MDS, MDS starts to unlink files/destroy objects
+       mds_evict_client
+       client_reconnect
+
+       local step=3
+       for ((i = 1; i <= $((timeout / step + 1)); i++)); do
+               do_facet mds1 $LCTL get_param osp.$FSNAME-OST0000-osc-MDT0000.destroys_in_flight
+               sleep $step
+       done
+       local rc=$(do_facet mds1 $LCTL get_param -n osp.$FSNAME-OST0000-osc-MDT0000.destroys_in_flight)
+       do_facet mds1 $LCTL get_param osp.$FSNAME-OST0000-osc-MDT0000.error_list
+       echo inflight $rc
+       for ((i = 1; i <= threads; i++)); do
+               wait ${pids[$i]}
+       done
+
+       (( $rc <= 2 )) || error "destroying OST objects are blocked $rc"
+}
+run_test 160 "MDT destroys are blocked by grouplocks"
+
 complete_test $SECONDS
 check_and_cleanup_lustre
 exit_status