*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* = import is disconnected =
*
* = import is inactive =
- * in this case osp_declare_object_create() returns an error
+ * in this case osp_declare_create() returns an error
*
*/
/* schedule next update */
d->opd_statfs_fresh_till = cfs_time_shift(d->opd_statfs_maxage);
- cfs_timer_arm(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
+ mod_timer(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
d->opd_statfs_update_in_progress = 0;
CDEBUG(D_CACHE, "updated statfs %p\n", d);
/*
* no updates till reply
*/
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
d->opd_statfs_update_in_progress = 1;
* is replied
*/
d->opd_statfs_fresh_till = cfs_time_shift(-1);
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
wake_up(&d->opd_pre_waitq);
}
}
/**
- * Return number of precreated objects
- *
- * A simple helper to calculate the number of precreated objects on the device.
- *
- * \param[in] env LU environment provided by the caller
- * \param[in] osp OSP device
- *
- * \retval the number of the precreated objects
- */
-static inline int osp_objs_precreated(const struct lu_env *env,
- struct osp_device *osp)
-{
- return osp_fid_diff(&osp->opd_pre_last_created_fid,
- &osp->opd_pre_used_fid);
-}
-
-/**
* Check pool of precreated objects is nearly empty
*
* We should not wait till the pool of the precreated objects is exhausted,
int rc;
ENTRY;
+ if (osp->opd_storage->dd_rdonly)
+ RETURN(0);
+
/* Note: through f_oid is only 32 bits, it will also write 64 bits
* for oid to keep compatibility with the previous version. */
lb_oid->lb_buf = &fid->f_oid;
RETURN(rc);
}
- LCONSOLE_INFO("%s: update sequence from "LPX64" to "LPX64"\n",
+ LCONSOLE_INFO("%s: update sequence from %#llx to %#llx\n",
osp->opd_obd->obd_name, fid_seq(last_fid),
fid_seq(fid));
/* Update last_xxx to the new seq */
if (fid_is_idif(fid)) {
struct lu_fid *last_fid;
struct ost_id *oi = &osi->osi_oi;
+ int rc;
spin_lock(&osp->opd_pre_lock);
last_fid = &osp->opd_pre_last_created_fid;
fid_to_ostid(last_fid, oi);
end = min(ostid_id(oi) + *grow, IDIF_MAX_OID);
*grow = end - ostid_id(oi);
- ostid_set_id(oi, ostid_id(oi) + *grow);
+ rc = ostid_set_id(oi, ostid_id(oi) + *grow);
spin_unlock(&osp->opd_pre_lock);
- if (*grow == 0)
+ if (*grow == 0 || rc)
return 1;
ostid_to_fid(fid, oi, osp->opd_index);
ptlrpc_request_set_replen(req);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSP_FAKE_PRECREATE))
+ GOTO(ready, rc = 0);
+
rc = ptlrpc_queue_wait(req);
if (rc) {
CERROR("%s: can't precreate: rc = %d\n", d->opd_obd->obd_name,
GOTO(out_req, rc = -EPROTO);
ostid_to_fid(fid, &body->oa.o_oi, d->opd_index);
+
+ready:
if (osp_fid_diff(fid, &d->opd_pre_used_fid) <= 0) {
CERROR("%s: precreate fid "DFID" < local used fid "DFID
": rc = %d\n", d->opd_obd->obd_name,
{
struct osp_thread_info *osi = osp_env_info(env);
struct lu_fid *last_fid = &osi->osi_fid;
+ struct lu_fid tmp;
struct ptlrpc_request *req = NULL;
struct obd_import *imp;
struct ost_body *body;
/*
* wait for local recovery to finish, so we can cleanup orphans
- * orphans are all objects since "last used" (assigned), but
- * there might be objects reserved and in some cases they won't
- * be used. we can't cleanup them till we're sure they won't be
- * used. also can't we allow new reservations because they may
- * end up getting orphans being cleaned up below. so we block
- * new reservations and wait till all reserved objects either
- * user or released.
+ * orphans are all objects since "last used" (assigned). we do not
+ * block waiting for all reservations as this can lead to a deadlock
+ * see LU-8972 for the details.
*/
spin_lock(&d->opd_pre_lock);
d->opd_pre_recovering = 1;
spin_unlock(&d->opd_pre_lock);
- /*
- * The locking above makes sure the opd_pre_reserved check below will
- * catch all osp_precreate_reserve() calls who find
- * "!opd_pre_recovering".
- */
- l_wait_event(d->opd_pre_waitq,
- (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+
+ l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
!osp_precreate_running(d) || d->opd_got_disconnected,
&lwi);
if (!osp_precreate_running(d) || d->opd_got_disconnected)
LASSERT(!fid_is_zero(last_fid));
if (fid_oid(&d->opd_last_used_fid) < 2) {
/* lastfid looks strange... ask OST */
+ LCONSOLE_WARN("%s: refresh last id\n", d->opd_obd->obd_name);
rc = osp_get_lastfid_from_ost(env, d);
if (rc)
GOTO(out, rc);
body->oa.o_flags = OBD_FL_DELORPHAN;
body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
- fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
+ /* cleanup objects upto used+reserved as we do not
+ * want to block the orphan cleanup procedure */
+ spin_lock(&d->opd_pre_lock);
+ if (fid_seq(&d->opd_pre_used_fid) != 0) {
+ tmp = d->opd_pre_used_fid;
+ tmp.f_oid += d->opd_pre_reserved;
+ /* shrink current precreate window to let reserved
+ * already objects be created and block new
+ * precreations */
+ d->opd_pre_last_created_fid = tmp;
+ } else {
+ tmp = d->opd_last_used_fid;
+ }
+ fid_to_ostid(&tmp, &body->oa.o_oi);
+ spin_unlock(&d->opd_pre_lock);
ptlrpc_request_set_replen(req);
if (body == NULL)
GOTO(out, rc = -EPROTO);
- /*
- * OST provides us with id new pool starts from in body->oa.o_id
- */
+ /* OST provides us with id new pool starts from in body->oa.o_id */
ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
- spin_lock(&d->opd_pre_lock);
- diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
- if (diff > 0) {
- d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
- d->opd_pre_last_created_fid = d->opd_last_used_fid;
- } else {
- d->opd_pre_create_count = OST_MIN_PRECREATE;
- d->opd_pre_last_created_fid = *last_fid;
- }
- /*
- * This empties the pre-creation pool and effectively blocks any new
- * reservations.
- */
- LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
- LUSTRE_DATA_SEQ_MAX_WIDTH);
- d->opd_pre_used_fid = d->opd_pre_last_created_fid;
- d->opd_pre_create_slow = 0;
- spin_unlock(&d->opd_pre_lock);
-
CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
"last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
if (req)
ptlrpc_req_finished(req);
- spin_lock(&d->opd_pre_lock);
- d->opd_pre_recovering = 0;
- spin_unlock(&d->opd_pre_lock);
-
/*
* If rc is zero, the pre-creation window should have been emptied.
* Since waking up the herd would be useless without pre-created
} else {
wake_up(&d->opd_pre_user_waitq);
}
+ GOTO(ret, rc);
}
+ spin_lock(&d->opd_pre_lock);
+ d->opd_pre_recovering = 0;
+ spin_unlock(&d->opd_pre_lock);
+
+ /* now we wait until all reserved objects are consumed or released,
+ * so that the window doesn't change. otherwise we can get objects
+ * with wrong FIDs */
+ l_wait_event(d->opd_pre_waitq, d->opd_pre_reserved == 0 ||
+ !osp_precreate_running(d) || d->opd_got_disconnected, &lwi);
+ if (!osp_precreate_running(d))
+ GOTO(ret, rc = 0);
+
+ spin_lock(&d->opd_pre_lock);
+ diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+ if (diff > 0) {
+ d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
+ d->opd_pre_last_created_fid = d->opd_last_used_fid;
+ } else {
+ d->opd_pre_create_count = OST_MIN_PRECREATE;
+ d->opd_pre_last_created_fid = *last_fid;
+ }
+ /*
+ * This empties the pre-creation pool and effectively blocks any new
+ * reservations.
+ */
+ LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
+ LUSTRE_DATA_SEQ_MAX_WIDTH);
+ d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+ d->opd_pre_create_slow = 0;
+ spin_unlock(&d->opd_pre_lock);
+
+ret:
RETURN(rc);
}
* Add a bit of hysteresis so this flag isn't continually flapping,
* and ensure that new files don't get extremely fragmented due to
* only a small amount of available space in the filesystem.
- * We want to set the NOSPC flag when there is less than ~0.1% free
- * and clear it when there is at least ~0.2% free space, so:
- * avail < ~0.1% max max = avail + used
- * 1025 * avail < avail + used used = blocks - free
- * 1024 * avail < used
- * 1024 * avail < blocks - free
- * avail < ((blocks - free) >> 10)
- *
- * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
- * lose that amount of space so in those cases we report no space left
- * if their is less than 1 GB left.
+ * We want to set the ENOSPC when there is less than reserved size
+ * free and clear it when there is at least 2*reserved size free space.
* the function updates current precreation status used: functional or not
*
* \param[in] d OSP device
{
struct obd_statfs *msfs = &d->opd_statfs;
int old = d->opd_pre_status;
- __u64 used;
+ __u64 available;
d->opd_pre_status = rc;
if (rc)
goto out;
if (likely(msfs->os_type)) {
- used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
- 1 << 30);
- if ((msfs->os_ffree < 32) || (msfs->os_bavail < used)) {
+ if (unlikely(d->opd_reserved_mb_high == 0 &&
+ d->opd_reserved_mb_low == 0)) {
+ /* Use ~0.1% by default to disable object allocation,
+ * and ~0.2% to enable, size in MB, set both watermark
+ */
+ spin_lock(&d->opd_pre_lock);
+ if (d->opd_reserved_mb_high == 0 &&
+ d->opd_reserved_mb_low == 0) {
+ d->opd_reserved_mb_low =
+ ((msfs->os_bsize >> 10) *
+ msfs->os_blocks) >> 20;
+ if (d->opd_reserved_mb_low == 0)
+ d->opd_reserved_mb_low = 1;
+ d->opd_reserved_mb_high =
+ (d->opd_reserved_mb_low << 1) + 1;
+ }
+ spin_unlock(&d->opd_pre_lock);
+ }
+ /* in MB */
+ available = (msfs->os_bavail * (msfs->os_bsize >> 10)) >> 10;
+ if (msfs->os_ffree < 32)
+ msfs->os_state |= OS_STATE_ENOINO;
+ else if (msfs->os_ffree > 64)
+ msfs->os_state &= ~OS_STATE_ENOINO;
+
+ if (available < d->opd_reserved_mb_low)
+ msfs->os_state |= OS_STATE_ENOSPC;
+ else if (available > d->opd_reserved_mb_high)
+ msfs->os_state &= ~OS_STATE_ENOSPC;
+ if (msfs->os_state & (OS_STATE_ENOINO | OS_STATE_ENOSPC)) {
d->opd_pre_status = -ENOSPC;
if (old != -ENOSPC)
- CDEBUG(D_INFO, "%s: status: "LPU64" blocks, "
- LPU64" free, "LPU64" used, "LPU64" "
- "avail -> %d: rc = %d\n",
+ CDEBUG(D_INFO, "%s: status: %llu blocks, %llu "
+ "free, %llu avail, %llu MB avail, %u "
+ "hwm -> %d: rc = %d\n",
d->opd_obd->obd_name, msfs->os_blocks,
- msfs->os_bfree, used, msfs->os_bavail,
+ msfs->os_bfree, msfs->os_bavail,
+ available, d->opd_reserved_mb_high,
d->opd_pre_status, rc);
CDEBUG(D_INFO,
- "non-committed changes: %lu, in progress: %u\n",
- d->opd_syn_changes, d->opd_syn_rpc_in_progress);
- } else if (old == -ENOSPC) {
+ "non-committed changes: %u, in progress: %u\n",
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress));
+ } else if (unlikely(old == -ENOSPC)) {
d->opd_pre_status = 0;
spin_lock(&d->opd_pre_lock);
d->opd_pre_create_slow = 0;
d->opd_pre_create_count = OST_MIN_PRECREATE;
spin_unlock(&d->opd_pre_lock);
wake_up(&d->opd_pre_waitq);
- CDEBUG(D_INFO, "%s: no space: "LPU64" blocks, "LPU64
- " free, "LPU64" used, "LPU64" avail -> %d: "
- "rc = %d\n", d->opd_obd->obd_name,
- msfs->os_blocks, msfs->os_bfree, used,
- msfs->os_bavail, d->opd_pre_status, rc);
+
+ CDEBUG(D_INFO, "%s: space available: %llu blocks, %llu"
+ " free, %llu avail, %lluMB avail, %u lwm"
+ " -> %d: rc = %d\n", d->opd_obd->obd_name,
+ msfs->os_blocks, msfs->os_bfree, msfs->os_bavail,
+ available, d->opd_reserved_mb_low,
+ d->opd_pre_status, rc);
}
}
-
out:
wake_up(&d->opd_pre_user_waitq);
}
LASSERT(osp->opd_pre != NULL);
- /* Return if last_used fid has been initialized */
+ /* Let's check if the current last_seq/fid is valid,
+ * otherwise request new sequence from the controller */
+ if (osp_is_fid_client(osp) && osp->opd_group != 0) {
+ /* Non-MDT0 can only use normal sequence for
+ * OST objects */
+ if (fid_is_norm(&osp->opd_last_used_fid))
+ RETURN(0);
+ } else {
+ /* Initially MDT0 will start with IDIF, after
+ * that it will request new sequence from the
+ * controller */
+ if (fid_is_idif(&osp->opd_last_used_fid) ||
+ fid_is_norm(&osp->opd_last_used_fid))
+ RETURN(0);
+ }
+
if (!fid_is_zero(&osp->opd_last_used_fid))
- RETURN(0);
+ CWARN("%s: invalid last used fid "DFID
+ ", try to get new sequence.\n",
+ osp->opd_obd->obd_name,
+ PFID(&osp->opd_last_used_fid));
rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
if (rc) {
if (rc) {
CERROR("%s: init env error: rc = %d\n", d->opd_obd->obd_name,
rc);
+
+ spin_lock(&d->opd_pre_lock);
+ thread->t_flags = SVC_STOPPED;
+ spin_unlock(&d->opd_pre_lock);
+ wake_up(&thread->t_ctl_waitq);
+
RETURN(rc);
}
* need to be connected to OST
*/
while (osp_precreate_running(d)) {
+ if (d->opd_pre_recovering &&
+ d->opd_imp_connected &&
+ !d->opd_got_disconnected)
+ break;
l_wait_event(d->opd_pre_waitq,
!osp_precreate_running(d) ||
d->opd_new_connection,
* Clean up orphans or recreate missing objects.
*/
rc = osp_precreate_cleanup_orphans(&env, d);
- if (rc != 0)
+ if (rc != 0) {
+ schedule_timeout_interruptible(cfs_time_seconds(1));
continue;
+ }
/*
* connected, can handle precreates now
*/
if (unlikely(osp_precreate_end_seq(&env, d) &&
osp_create_end_seq(&env, d))) {
- LCONSOLE_INFO("%s:"LPX64" is used up."
+ LCONSOLE_INFO("%s:%#llx is used up."
" Update to new seq\n",
d->opd_obd->obd_name,
fid_seq(&d->opd_pre_last_created_fid));
return 1;
/* ready if OST reported no space and no destroys in progress */
- if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 &&
+ if (atomic_read(&d->opd_sync_changes) +
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0 &&
d->opd_pre_status == -ENOSPC)
return 1;
struct osp_device *d = data;
CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", "
- "reserved="LPU64", syn_changes=%lu, "
- "syn_rpc_in_progress=%d, status=%d\n",
+ "reserved=%llu, sync_changes=%u, "
+ "sync_rpcs_in_progress=%d, status=%d\n",
d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
- d->opd_syn_changes, d->opd_syn_rpc_in_progress,
+ atomic_read(&d->opd_sync_changes),
+ atomic_read(&d->opd_sync_rpcs_in_progress),
d->opd_pre_status);
return 1;
"Next FID "DFID"\n", PFID(&d->opd_pre_last_created_fid),
PFID(&d->opd_pre_used_fid));
+ /* opd_pre_max_create_count 0 to not use specified OST. */
+ if (d->opd_pre_max_create_count == 0)
+ RETURN(-ENOBUFS);
+
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_OSP_PRECREATE_WAIT)) {
+ if (d->opd_index == cfs_fail_val)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OSP_PRECREATE_WAIT,
+ obd_timeout);
+ }
+
/*
* wait till:
* - preallocation is done
* wait till that is done - some space might be released
*/
if (unlikely(rc == -ENOSPC)) {
- if (d->opd_syn_changes) {
+ if (atomic_read(&d->opd_sync_changes)) {
/* force local commit to release space */
dt_commit_async(env, d->opd_storage);
}
- if (d->opd_syn_rpc_in_progress) {
+ if (atomic_read(&d->opd_sync_rpcs_in_progress)) {
/* just wait till destroys are done */
/* see l_wait_even() few lines below */
}
- if (d->opd_syn_changes +
- d->opd_syn_rpc_in_progress == 0) {
+ if (atomic_read(&d->opd_sync_changes) +
+ atomic_read(&d->opd_sync_rpcs_in_progress) == 0) {
/* no hope for free space */
break;
}
d->opd_pre_create_count = OST_MIN_PRECREATE;
d->opd_pre_min_create_count = OST_MIN_PRECREATE;
d->opd_pre_max_create_count = OST_MAX_PRECREATE;
+ d->opd_reserved_mb_high = 0;
+ d->opd_reserved_mb_low = 0;
spin_lock_init(&d->opd_pre_lock);
init_waitqueue_head(&d->opd_pre_waitq);
init_waitqueue_head(&d->opd_pre_user_waitq);
+ thread_set_flags(&d->opd_pre_thread, SVC_INIT);
init_waitqueue_head(&d->opd_pre_thread.t_ctl_waitq);
/*
CDEBUG(D_OTHER, "current %llu, fresh till %llu\n",
(unsigned long long)cfs_time_current(),
(unsigned long long)d->opd_statfs_fresh_till);
- cfs_timer_init(&d->opd_statfs_timer, osp_statfs_timer_cb, d);
+ setup_timer(&d->opd_statfs_timer, osp_statfs_timer_cb,
+ (unsigned long)d);
+
+ if (d->opd_storage->dd_rdonly)
+ RETURN(0);
/*
* start thread handling precreation and statfs updates
*/
void osp_precreate_fini(struct osp_device *d)
{
- struct ptlrpc_thread *thread;
-
+ struct ptlrpc_thread *thread = &d->opd_pre_thread;
ENTRY;
- cfs_timer_disarm(&d->opd_statfs_timer);
+ del_timer(&d->opd_statfs_timer);
if (d->opd_pre == NULL)
RETURN_EXIT;
- thread = &d->opd_pre_thread;
-
- thread->t_flags = SVC_STOPPING;
- wake_up(&d->opd_pre_waitq);
-
- wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+ if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+ thread->t_flags = SVC_STOPPING;
+ wake_up(&d->opd_pre_waitq);
+ wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+ }
OBD_FREE_PTR(d->opd_pre);
d->opd_pre = NULL;