* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
/* schedule next update */
d->opd_statfs_fresh_till = cfs_time_shift(d->opd_statfs_maxage);
- cfs_timer_arm(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
+ mod_timer(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
d->opd_statfs_update_in_progress = 0;
CDEBUG(D_CACHE, "updated statfs %p\n", d);
/*
* no updates till reply
*/
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
d->opd_statfs_update_in_progress = 1;
* is replied
*/
d->opd_statfs_fresh_till = cfs_time_shift(-1);
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
wake_up(&d->opd_pre_waitq);
}
}
RETURN(rc);
}
+ LASSERT(d->opd_pre->osp_pre_delorphan_sent != 0);
spin_lock(&d->opd_pre_lock);
if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
d->opd_pre_create_count = d->opd_pre_max_create_count / 2;
int update_status = 0;
int rc;
int diff;
+ struct lu_fid fid;
ENTRY;
/*
- * wait for local recovery to finish, so we can cleanup orphans
- * orphans are all objects since "last used" (assigned), but
- * there might be objects reserved and in some cases they won't
- * be used. we can't cleanup them till we're sure they won't be
- * used. also can't we allow new reservations because they may
- * end up getting orphans being cleaned up below. so we block
- * new reservations and wait till all reserved objects either
- * user or released.
+ * wait for local recovery to finish, so we can cleanup orphans.
+ * orphans are all objects since "last used" (assigned).
+ * consider reserved objects as created otherwise we can get into
+ * a livelock when one blocked thread holding a reservation can
+ * block recovery. see LU-8367 for the details. in some cases this
+ * can result in gaps (i.e. leaked objects), but we've got LFSCK...
+ *
+ * do not allow new reservations because they may end up getting
+ * orphans being cleaned up below. so we block new reservations.
*/
spin_lock(&d->opd_pre_lock);
d->opd_pre_recovering = 1;
* catch all osp_precreate_reserve() calls who find
* "!opd_pre_recovering".
*/
- l_wait_event(d->opd_pre_waitq,
- (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+ l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
!osp_precreate_running(d) || d->opd_got_disconnected,
&lwi);
if (!osp_precreate_running(d) || d->opd_got_disconnected)
GOTO(out, rc = -EAGAIN);
- CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
- d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
-
*last_fid = d->opd_last_used_fid;
/* The OSP should already get the valid seq now */
LASSERT(!fid_is_zero(last_fid));
if (body == NULL)
GOTO(out, rc = -EPROTO);
- body->oa.o_flags = OBD_FL_DELORPHAN;
+ body->oa.o_flags = 0;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
+ /* unless this is the very first DELORPHAN (when we really
+ * can destroy some orphans), just tell OST to recreate
+ * missing objects in our precreate pool */
+ spin_lock(&d->opd_pre_lock);
+ if (d->opd_pre->osp_pre_delorphan_sent) {
+ fid = d->opd_pre_last_created_fid;
+ } else {
+ fid = d->opd_last_used_fid;
+ body->oa.o_flags = OBD_FL_DELORPHAN;
+ }
+ spin_unlock(&d->opd_pre_lock);
+ fid_to_ostid(&fid, &body->oa.o_oi);
+
+ CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+ d->opd_obd->obd_name, PFID(&fid));
ptlrpc_request_set_replen(req);
ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
spin_lock(&d->opd_pre_lock);
- diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+ diff = osp_fid_diff(&fid, last_fid);
if (diff > 0) {
d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
- d->opd_pre_last_created_fid = d->opd_last_used_fid;
+ d->opd_pre_last_created_fid = *last_fid;
} else {
d->opd_pre_create_count = OST_MIN_PRECREATE;
d->opd_pre_last_created_fid = *last_fid;
*/
LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
LUSTRE_DATA_SEQ_MAX_WIDTH);
- d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+ if (d->opd_pre->osp_pre_delorphan_sent == 0)
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
d->opd_pre_create_slow = 0;
spin_unlock(&d->opd_pre_lock);
+ d->opd_pre->osp_pre_delorphan_sent = 1;
CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
"last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
if (req)
ptlrpc_req_finished(req);
- spin_lock(&d->opd_pre_lock);
- d->opd_pre_recovering = 0;
- spin_unlock(&d->opd_pre_lock);
-
/*
* If rc is zero, the pre-creation window should have been emptied.
* Since waking up the herd would be useless without pre-created
} else {
wake_up(&d->opd_pre_user_waitq);
}
+ } else {
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 0;
+ spin_unlock(&d->opd_pre_lock);
}
RETURN(rc);
* need to be connected to OST
*/
while (osp_precreate_running(d)) {
+ if (d->opd_pre_recovering &&
+ d->opd_imp_connected &&
+ !d->opd_got_disconnected)
+ break;
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
d->opd_new_connection,
* Clean up orphans or recreate missing objects.
*/
rc = osp_precreate_cleanup_orphans(&env, d);
- if (rc != 0)
+ if (rc != 0) {
+ schedule_timeout_interruptible(cfs_time_seconds(1));
continue;
+ }
/*
* connected, can handle precreates now
*/
if (d->opd_pre_max_create_count == 0)
RETURN(-ENOBUFS);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_OSP_PRECREATE_WAIT)) {
+ if (d->opd_index == cfs_fail_val)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OSP_PRECREATE_WAIT,
+ obd_timeout);
+ }
+
/*
* wait till:
* - preallocation is done
CDEBUG(D_OTHER, "current %llu, fresh till %llu\n",
(unsigned long long)cfs_time_current(),
(unsigned long long)d->opd_statfs_fresh_till);
- cfs_timer_init(&d->opd_statfs_timer, osp_statfs_timer_cb, d);
+ setup_timer(&d->opd_statfs_timer, osp_statfs_timer_cb,
+ (unsigned long)d);
/*
* start thread handling precreation and statfs updates
ENTRY;
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
if (d->opd_pre == NULL)
RETURN_EXIT;