#include <dt_object.h>
#include <lustre_export.h>
+bool barrier_entry(struct dt_device *key);
+void barrier_exit(struct dt_device *key);
int barrier_handler(struct dt_device *key, struct ptlrpc_request *req);
int barrier_register(struct dt_device *key, struct dt_device *next);
void barrier_deregister(struct dt_device *key);
static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
unsigned int cmd, int len, void *karg)
{
- struct mdd_device *mdd;
+ struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
struct obd_ioctl_data *data = karg;
int rc;
-
ENTRY;
- mdd = lu2mdd_dev(&m->md_lu_dev);
-
/* Doesn't use obd_ioctl_data */
switch (cmd) {
case OBD_IOC_CHANGELOG_CLEAR: {
struct changelog_setinfo *cs = karg;
+
+ if (unlikely(!barrier_entry(mdd->mdd_bottom)))
+ RETURN(-EINPROGRESS);
+
rc = mdd_changelog_clear(env, mdd, cs->cs_id,
cs->cs_recno);
+ barrier_exit(mdd->mdd_bottom);
RETURN(rc);
}
case OBD_IOC_GET_MNTOPT: {
switch (cmd) {
case OBD_IOC_CHANGELOG_REG:
+ if (unlikely(!barrier_entry(mdd->mdd_bottom)))
+ RETURN(-EINPROGRESS);
+
rc = mdd_changelog_user_register(env, mdd, &data->ioc_u32_1);
+ barrier_exit(mdd->mdd_bottom);
break;
case OBD_IOC_CHANGELOG_DEREG:
+ if (unlikely(!barrier_entry(mdd->mdd_bottom)))
+ RETURN(-EINPROGRESS);
+
rc = mdd_changelog_user_purge(env, mdd, data->ioc_u32_1);
+ barrier_exit(mdd->mdd_bottom);
break;
default:
rc = -ENOTTY;
struct thandle *handle = NULL;
int is_orphan = 0;
int rc;
+ bool blocked = false;
ENTRY;
if (ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_KEEP_ORPHAN) {
again:
if (is_orphan) {
- handle = mdd_trans_create(env, mdo2mdd(obj));
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
+ /* mdd_trans_create() maybe failed because of barrier_entry(),
+ * under such case, the orphan MDT-object will be left in the
+ * orphan list, and when the MDT remount next time, the unused
+ * orphans will be destroyed automatically.
+ *
+ * One exception: the former mdd_finish_unlink may failed to
+ * add the orphan MDT-object to the orphan list, then if the
+ * mdd_trans_create() failed because of barrier_entry(), the
+ * MDT-object will become real orphan that is neither in the
+ * namespace nor in the orphan list. Such bad case should be
+ * very rare and will be handled by e2fsck/lfsck. */
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ if (rc != -EINPROGRESS)
+ GOTO(stop, rc);
- rc = mdd_declare_close(env, mdd_obj, ma, handle);
- if (rc)
- GOTO(stop, rc);
+ handle = NULL;
+ blocked = true;
+ goto cont;
+ }
+
+ rc = mdd_declare_close(env, mdd_obj, ma, handle);
+ if (rc)
+ GOTO(stop, rc);
rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
if (rc)
GOTO(stop, rc);
- rc = mdd_trans_start(env, mdo2mdd(obj), handle);
- if (rc)
- GOTO(stop, rc);
- }
+ rc = mdd_trans_start(env, mdo2mdd(obj), handle);
+ if (rc)
+ GOTO(stop, rc);
+ }
- mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+cont:
+ mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
rc = mdd_la_get(env, mdd_obj, &ma->ma_attr);
if (rc != 0) {
CERROR("Failed to get lu_attr of "DFID": %d\n",
((mdd_obj->mod_flags & (ORPHAN_OBJ | DEAD_OBJ)) != 0 ||
ma->ma_attr.la_nlink == 0);
- if (is_orphan && handle == NULL) {
+ if (is_orphan && !handle && !blocked) {
mdd_write_unlock(env, mdd_obj);
goto again;
}
mdd_obj->mod_count--; /*release open count */
- if (!is_orphan)
+ if (!is_orphan || blocked)
GOTO(out, rc = 0);
/* Orphan object */
EXIT;
out:
- mdd_write_unlock(env, mdd_obj);
-
- if (rc == 0 &&
- (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
- !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
- if (handle == NULL) {
- handle = mdd_trans_create(env, mdo2mdd(obj));
- if (IS_ERR(handle))
+ mdd_write_unlock(env, mdd_obj);
+
+ if (!rc && !blocked &&
+ (mode & (FMODE_WRITE | MDS_OPEN_APPEND | MDS_OPEN_TRUNC)) &&
+ !(ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_RECOV_OPEN)) {
+ if (handle == NULL) {
+ handle = mdd_trans_create(env, mdo2mdd(obj));
+ if (IS_ERR(handle))
GOTO(stop, rc = PTR_ERR(handle));
rc = mdd_declare_changelog_store(env, mdd, NULL, NULL,
#include <obd_class.h>
#include <lprocfs_status.h>
#include <lustre_mds.h>
+#include <lustre_barrier.h>
#include "mdd_internal.h"
struct thandle *mdd_trans_create(const struct lu_env *env,
struct mdd_device *mdd)
{
+ /* If blocked by the write barrier, then return "-EINPROGRESS"
+ * to the caller. Usually, such error will be forwarded to the
+ * client, and the expected behaviour is to re-try such modify
+ * RPC some time later until the barrier is thawed or expired. */
+ if (unlikely(!barrier_entry(mdd->mdd_bottom)))
+ return ERR_PTR(-EINPROGRESS);
+
return mdd_child_ops(mdd)->dt_trans_create(env, mdd->mdd_child);
}
handle->th_result = result;
rc = mdd_child_ops(mdd)->dt_trans_stop(env, mdd->mdd_child, handle);
+ barrier_exit(mdd->mdd_bottom);
/* if operation failed, return \a result, otherwise return status of
* dt_trans_stop */
#include <lustre_swab.h>
#include <obd.h>
#include <obd_support.h>
+#include <lustre_barrier.h>
#include <llog_swab.h>
if (repoqc == NULL)
GOTO(out_nodemap, rc = err_serious(-EFAULT));
+ if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
+ barrier_exit(tsi->tsi_tgt->lut_bottom);
+
if (oqctl->qc_id != id)
swap(oqctl->qc_id, id);
+ if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
+ if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
+ RETURN(-EINPROGRESS);
+ }
+
switch (oqctl->qc_cmd) {
case Q_GETINFO:
spin_lock_init(&mo->mot_write_lock);
mutex_init(&mo->mot_lov_mutex);
init_rwsem(&mo->mot_open_sem);
+ atomic_set(&mo->mot_open_count, 0);
RETURN(o);
}
RETURN(NULL);
LASSERT(list_empty(&barrier_instance_list));
}
+bool barrier_entry(struct dt_device *key)
+{
+ struct barrier_instance *barrier;
+ bool entered = false;
+ ENTRY;
+
+ barrier = barrier_instance_find(key);
+ if (unlikely(!barrier))
+ /* Fail open */
+ RETURN(true);
+
+ read_lock(&barrier->bi_rwlock);
+ if (likely(barrier->bi_status != BS_FREEZING_P1 &&
+ barrier->bi_status != BS_FREEZING_P2 &&
+ barrier->bi_status != BS_FROZEN) ||
+ cfs_time_beforeq(barrier->bi_deadline, cfs_time_current_sec())) {
+ percpu_counter_inc(&barrier->bi_writers);
+ entered = true;
+ }
+ read_unlock(&barrier->bi_rwlock);
+
+ barrier_instance_put(barrier);
+ return entered;
+}
+EXPORT_SYMBOL(barrier_entry);
+
+void barrier_exit(struct dt_device *key)
+{
+ struct barrier_instance *barrier;
+
+ barrier = barrier_instance_find(key);
+ if (likely(barrier)) {
+ percpu_counter_dec(&barrier->bi_writers);
+
+ /* Avoid out-of-order execution the decreasing inflight
+ * modifications count and the check of barrier status. */
+ smp_mb();
+
+ if (unlikely(barrier->bi_status == BS_FREEZING_P1))
+ wake_up_all(&barrier->bi_waitq);
+ barrier_instance_put(barrier);
+ }
+}
+EXPORT_SYMBOL(barrier_exit);
+
int barrier_handler(struct dt_device *key, struct ptlrpc_request *req)
{
struct ldlm_gl_barrier_desc *desc;