void
lnet_build_unlink_event (lnet_libmd_t *md, lnet_event_t *ev)
{
+ ENTRY;
+
memset(ev, 0, sizeof(*ev));
ev->status = 0;
ev->type = LNET_EVENT_UNLINK;
lnet_md_deconstruct(md, &ev->md);
lnet_md2handle(&ev->md_handle, md);
+ EXIT;
}
void
removed cwd "./" (refer to Bugzilla 14399).
* File join has been disabled in this release, refer to Bugzilla 16929.
+Severity : enhancement
+Bugzilla : 15393
+Description: Commit on sharing. Eliminate inter-client dependencies between
+ uncommitted transactions by doing transaction commits.
+ Thereby clients may recovery independently.
+
Severity : normal
Frequency : Create a symlink file with a very long name
Bugzilla : 16578
int (*dt_sync)(const struct lu_env *env, struct dt_device *dev);
void (*dt_ro)(const struct lu_env *env, struct dt_device *dev);
/**
+ * Start a transaction commit asynchronously
+ *
+ * \param env environment
+ * \param dev dt_device to start commit on
+ *
+ * \return 0 success, negative value if error
+ */
+ int (*dt_commit_async)(const struct lu_env *env,
+ struct dt_device *dev);
+ /**
* Initialize capability context.
*/
int (*dt_init_capa_ctxt)(const struct lu_env *env,
LCK_CR = 16,
LCK_NL = 32,
LCK_GROUP = 64,
+ LCK_COS = 128,
LCK_MAXMODE
} ldlm_mode_t;
-#define LCK_MODE_NUM 7
+#define LCK_MODE_NUM 8
typedef enum {
LDLM_PLAIN = 10,
#define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW)
#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX | LCK_GROUP)
#define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL)
+#define LCK_COMPAT_COS (LCK_COS)
extern ldlm_mode_t lck_compat_array[];
* Server-side-only members.
*/
+ /* connection cookie for the client originated the opeation */
+ __u64 l_client_cookie;
+
/**
* Protected by elt_lock. Callbacks pending.
*/
struct lustre_handle *);
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
__u32 *flags);
+void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode);
void ldlm_lock_cancel(struct ldlm_lock *lock);
void ldlm_cancel_locks_for_export(struct obd_export *export);
void ldlm_reprocess_all(struct ldlm_resource *res);
/* ldlm_request.c */
int ldlm_expired_completion_wait(void *data);
+int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock);
int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
void *data, int flag);
int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
ldlm_completion_callback completion,
ldlm_glimpse_callback glimpse,
void *data, __u32 lvb_len, void *lvb_swabber,
+ const __u64 *client_cookie,
struct lustre_handle *lockh);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
#endif
/* updates to following flag serialised by srv_request_lock */
unsigned long rs_difficult:1; /* ACK/commit stuff */
+ unsigned long rs_no_ack:1; /* no ACK, even for
+ difficult requests */
unsigned long rs_scheduled:1; /* being handled? */
unsigned long rs_scheduled_ever:1;/* any schedule attempts? */
unsigned long rs_handled:1; /* been handled yet? */
int srv_watchdog_factor; /* soft watchdog timeout mutiplier */
unsigned srv_cpu_affinity:1; /* bind threads to CPUs */
unsigned srv_at_check:1; /* check early replies */
+ unsigned srv_is_stopping:1; /* under unregister_service */
cfs_time_t srv_at_checktime; /* debug */
__u32 srv_req_portal;
/* ptlrpc/service.c */
void ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode);
+ struct lustre_handle *lock, int mode, int no_ack);
void ptlrpc_commit_replies (struct obd_device *obd);
void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
#define OBD_FAIL_MDS_LOV_SYNC_RACE 0x13e
#define OBD_FAIL_MDS_CLOSE_NET_REP 0x13f
#define OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x140
+#define OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS 0x141
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
tmp = mode_tail;
continue;
}
-
+
+ if (lock->l_req_mode == LCK_COS) {
+ if (lock->l_client_cookie == req->l_client_cookie) {
+ tmp = mode_tail;
+ } else {
+ tmp = mode_tail;
+ if (!work_list)
+ RETURN(0);
+ compat = 0;
+ if (lock->l_blocking_ast)
+ ldlm_add_ast_work_item(lock, req,
+ work_list);
+ }
+ continue;
+ }
+
for (;;) {
struct list_head *head;
next_transno, queue_len, completed, connected, req_transno);
obd->obd_next_recovery_transno = req_transno;
wake_up = 1;
+ } else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_RECOVERY_ACCEPTS_GAPS)) {
+ CDEBUG(D_HA, "accepting transno gaps is explicitly allowed"
+ " by fail_lock, waking up ("LPD64")\n", next_transno);
+ obd->obd_next_recovery_transno = req_transno;
+ wake_up = 1;
} else if (queue_len == atomic_read(&obd->obd_req_replay_clients)) {
/* some clients haven't connected in time, but we can try
* to replay requests that demand on already committed ones
struct obd_device *obd;
struct obd_export *exp;
struct ptlrpc_service *svc;
+ ENTRY;
- if (req->rq_no_reply)
+ if (req->rq_no_reply) {
+ EXIT;
return;
+ }
svc = req->rq_rqbd->rqbd_service;
rs = req->rq_reply_state;
if (rs == NULL || !rs->rs_difficult) {
/* no notifiers */
target_send_reply_msg (req, rc, fail_id);
+ EXIT;
return;
}
spin_lock(&obd->obd_uncommitted_replies_lock);
+ CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
+ rs->rs_transno, obd->obd_last_committed);
if (rs->rs_transno > obd->obd_last_committed) {
/* not committed already */
list_add_tail (&rs->rs_obd_list,
atomic_inc (&svc->srv_outstanding_replies);
}
- if (!rs->rs_on_net || /* some notifier */
- list_empty(&rs->rs_exp_list) || /* completed already */
- list_empty(&rs->rs_obd_list)) {
+ if (rs->rs_transno <= obd->obd_last_committed ||
+ (!rs->rs_on_net && !rs->rs_no_ack) ||
+ list_empty(&rs->rs_exp_list) || /* completed already */
+ list_empty(&rs->rs_obd_list)) {
+ CDEBUG(D_HA, "Schedule reply immediately\n");
list_add_tail (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
} else {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int target_handle_ping(struct ptlrpc_request *req)
[LCK_CW] LCK_COMPAT_CW,
[LCK_CR] LCK_COMPAT_CR,
[LCK_NL] LCK_COMPAT_NL,
- [LCK_GROUP] LCK_COMPAT_GROUP
+ [LCK_GROUP] LCK_COMPAT_GROUP,
+ [LCK_COS] LCK_COMPAT_COS,
};
/**
[LCK_CW] "CW",
[LCK_CR] "CR",
[LCK_NL] "NL",
- [LCK_GROUP] "GROUP"
+ [LCK_GROUP] "GROUP",
+ [LCK_COS] "COS"
};
char *ldlm_typename[] = {
lock->l_readers++;
lu_ref_add_atomic(&lock->l_reference, "reader", lock);
}
- if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
+ if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
lock->l_writers++;
lu_ref_add_atomic(&lock->l_reference, "writer", lock);
}
lu_ref_del(&lock->l_reference, "reader", lock);
lock->l_readers--;
}
- if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) {
+ if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP | LCK_COS)) {
LASSERT(lock->l_writers > 0);
lu_ref_del(&lock->l_reference, "writer", lock);
lock->l_writers--;
ldlm_lock2desc(lock->l_blocking_lock, &d);
- LDLM_LOCK_RELEASE(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
lock->l_blocking_ast(lock, &d, (void *)arg,
LDLM_CB_BLOCKING);
+ LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
LDLM_LOCK_RELEASE(lock);
RETURN(1);
ldlm_cancel_locks_for_export_cb, exp);
}
+/**
+ * Downgrade an exclusive lock.
+ *
+ * A fast variant of ldlm_lock_convert for convertion of exclusive
+ * locks. The convertion is always successful.
+ *
+ * \param lock A lock to convert
+ * \param new_mode new lock mode
+ */
+void ldlm_lock_downgrade(struct ldlm_lock *lock, int new_mode)
+{
+ ENTRY;
+
+ LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
+ LASSERT(new_mode == LCK_COS);
+
+ lock_res_and_lock(lock);
+ ldlm_resource_unlink_lock(lock);
+ lock->l_req_mode = new_mode;
+ ldlm_grant_lock(lock, NULL);
+ unlock_res_and_lock(lock);
+ ldlm_reprocess_all(lock->l_resource);
+
+ EXIT;
+}
+
struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
__u32 *flags)
{
if (node == NULL) /* Actually, this causes EDEADLOCK to be returned */
RETURN(NULL);
- LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,
+ LASSERTF((new_mode == LCK_PW && lock->l_granted_mode == LCK_PR),
"new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);
lock_res_and_lock(lock);
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_lock_allow_match);
+EXPORT_SYMBOL(ldlm_lock_downgrade);
+EXPORT_SYMBOL(ldlm_lock_convert);
/* ldlm_request.c */
EXPORT_SYMBOL(ldlm_completion_ast_async);
+EXPORT_SYMBOL(ldlm_blocking_ast_nocheck);
EXPORT_SYMBOL(ldlm_completion_ast);
EXPORT_SYMBOL(ldlm_blocking_ast);
EXPORT_SYMBOL(ldlm_glimpse_ast);
RETURN(ldlm_completion_tail(lock));
}
-/*
- * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
+/**
+ * A helper to build a blocking ast function
+ *
+ * Perform a common operation for blocking asts:
+ * defferred lock cancellation.
+ *
+ * \param lock the lock blocking or canceling ast was called on
+ * \retval 0
+ * \see mdt_blocking_ast
+ * \see ldlm_blocking_ast
*/
-int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
+int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock)
{
int do_ast;
ENTRY;
- if (flag == LDLM_CB_CANCELING) {
- /* Don't need to do anything here. */
- RETURN(0);
- }
-
- lock_res_and_lock(lock);
- /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
- * that ldlm_blocking_ast is called just before intent_policy method
- * takes the ns_lock, then by the time we get the lock, we might not
- * be the correct blocking function anymore. So check, and return
- * early, if so. */
- if (lock->l_blocking_ast != ldlm_blocking_ast) {
- unlock_res_and_lock(lock);
- RETURN(0);
- }
-
lock->l_flags |= LDLM_FL_CBPENDING;
do_ast = (!lock->l_readers && !lock->l_writers);
unlock_res_and_lock(lock);
RETURN(0);
}
+/**
+ * Server blocking AST
+ *
+ * ->l_blocking_ast() callback for LDLM locks acquired by server-side
+ * OBDs.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ ENTRY;
+
+ if (flag == LDLM_CB_CANCELING) {
+ /* Don't need to do anything here. */
+ RETURN(0);
+ }
+
+ lock_res_and_lock(lock);
+ /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
+ * that ldlm_blocking_ast is called just before intent_policy method
+ * takes the ns_lock, then by the time we get the lock, we might not
+ * be the correct blocking function anymore. So check, and return
+ * early, if so. */
+ if (lock->l_blocking_ast != ldlm_blocking_ast) {
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
+ RETURN(ldlm_blocking_ast_nocheck(lock));
+}
+
/*
* ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
* comment in filter_intent_policy() on why you may need this.
ldlm_completion_callback completion,
ldlm_glimpse_callback glimpse,
void *data, __u32 lvb_len, void *lvb_swabber,
+ const __u64 *client_cookie,
struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
unlock_res_and_lock(lock);
if (policy != NULL)
lock->l_policy_data = *policy;
+ if (client_cookie != NULL)
+ lock->l_client_cookie = *client_cookie;
if (type == LDLM_EXTENT)
lock->l_req_extent = policy->l_extent;
void __lvfs_set_rdonly(lvfs_sbdev_type dev, lvfs_sbdev_type jdev)
{
- lvfs_sbdev_sync(dev);
if (jdev && (jdev != dev)) {
CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n",
(long)jdev);
RETURN(m);
}
+/**
+ * Asyncronous commit for mdt device.
+ *
+ * Pass asynchonous commit call down the MDS stack.
+ *
+ * \param env environment
+ * \param mdt the mdt device
+ */
+static void mdt_device_commit_async(const struct lu_env *env,
+ struct mdt_device *mdt)
+{
+ struct dt_device *dt = mdt->mdt_bottom;
+ int rc;
+
+ rc = dt->dd_ops->dt_commit_async(env, dt);
+ if (unlikely(rc != 0))
+ CWARN("async commit start failed with rc = %d", rc);
+}
+
+/**
+ * Mark the lock as "synchonous".
+ *
+ * Mark the lock to deffer transaction commit to the unlock time.
+ *
+ * \param lock the lock to mark as "synchonous"
+ *
+ * \see mdt_is_lock_sync
+ * \see mdt_save_lock
+ */
+static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
+{
+ lock->l_ast_data = (void*)1;
+}
+
+/**
+ * Check whehter the lock "synchonous" or not.
+ *
+ * \param lock the lock to check
+ * \retval 1 the lock is "synchonous"
+ * \retval 0 the lock isn't "synchronous"
+ *
+ * \see mdt_set_lock_sync
+ * \see mdt_save_lock
+ */
+static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
+{
+ return lock->l_ast_data != NULL;
+}
+
+/**
+ * Blocking AST for mdt locks.
+ *
+ * Starts transaction commit if in case of COS lock conflict or
+ * deffers such a commit to the mdt_save_lock.
+ *
+ * \param lock the lock which blocks a request or cancelling lock
+ * \param desc unused
+ * \param data unused
+ * \param flag indicates whether this cancelling or blocking callback
+ * \retval 0
+ * \see ldlm_blocking_ast_nocheck
+ */
+int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct obd_device *obd = lock->l_resource->lr_namespace->ns_obd;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ int rc;
+ ENTRY;
+
+ if (flag == LDLM_CB_CANCELING)
+ RETURN(0);
+ lock_res_and_lock(lock);
+ if (lock->l_blocking_ast != mdt_blocking_ast) {
+ unlock_res_and_lock(lock);
+ RETURN(0);
+ }
+ if (mdt_cos_is_enabled(mdt) &&
+ lock->l_req_mode & (LCK_PW | LCK_EX) &&
+ lock->l_blocking_lock != NULL &&
+ lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
+ mdt_set_lock_sync(lock);
+ }
+ rc = ldlm_blocking_ast_nocheck(lock);
+
+ /* There is no lock conflict if l_blocking_lock == NULL,
+ * it indicates a blocking ast sent from ldlm_lock_decref_internal
+ * when the last reference to a local lock was released */
+ if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
+ struct lu_env env;
+
+ rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+ if (unlikely(rc != 0))
+ CWARN("lu_env initialization failed with rc = %d,"
+ "cannot start asynchronous commit\n", rc);
+ else
+ mdt_device_commit_async(&env, mdt);
+ lu_env_fini(&env);
+ }
+ RETURN(rc);
+}
+
int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits, int locality)
{
*/
policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
- policy, res_id, LDLM_FL_ATOMIC_CB);
+ policy, res_id, LDLM_FL_ATOMIC_CB,
+ &info->mti_exp->exp_handle.h_cookie);
if (unlikely(rc))
RETURN(rc);
}
* fix it up and turn FL_LOCAL flag off.
*/
rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
- res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
-
+ res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+ &info->mti_exp->exp_handle.h_cookie);
if (rc)
GOTO(out, rc);
RETURN(rc);
}
-static inline
-void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+/**
+ * Save a lock within request object.
+ *
+ * Keep the lock referenced until whether client ACK or transaction
+ * commit happens or release the lock immediately depending on input
+ * parameters. If COS is ON, a write lock is converted to COS lock
+ * before saving.
+ *
+ * \param info thead info object
+ * \param h lock handle
+ * \param mode lock mode
+ * \param decref force immediate lock releasing
+ */
+static
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
ldlm_mode_t mode, int decref)
{
ENTRY;
if (lustre_handle_is_used(h)) {
- if (decref)
+ if (decref || !info->mti_has_trans ||
+ !(mode & (LCK_PW | LCK_EX))){
mdt_fid_unlock(h, mode);
- else
- ptlrpc_save_lock(req, h, mode);
+ } else {
+ struct mdt_device *mdt = info->mti_mdt;
+ struct ldlm_lock *lock = ldlm_handle2lock(h);
+ struct ptlrpc_request *req = mdt_info_req(info);
+ int no_ack = 0;
+
+ LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
+ h->cookie);
+ CDEBUG(D_HA, "request = %p reply state = %p"
+ " transno = "LPD64"\n",
+ req, req->rq_reply_state, req->rq_transno);
+ if (mdt_cos_is_enabled(mdt)) {
+ no_ack = 1;
+ ldlm_lock_downgrade(lock, LCK_COS);
+ mode = LCK_COS;
+ }
+ ptlrpc_save_lock(req, h, mode, no_ack);
+ if (mdt_is_lock_sync(lock)) {
+ CDEBUG(D_HA, "found sync-lock,"
+ " async commit started\n");
+ mdt_device_commit_async(info->mti_env,
+ mdt);
+ }
+ LDLM_LOCK_PUT(lock);
+ }
h->cookie = 0ull;
}
EXIT;
}
-/*
- * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
- * to save this lock in req. when transaction committed, req will be released,
- * and lock will, too.
+/**
+ * Unlock mdt object.
+ *
+ * Immeditely release the regular lock and the PDO lock or save the
+ * lock in reqeuest and keep them referenced until client ACK or
+ * transaction commit.
+ *
+ * \param info thread info object
+ * \param o mdt object
+ * \param h mdt lock handle referencing regular and PDO locks
+ * \param decref force immediate lock releasing
*/
void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, int decref)
{
- struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
- mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
- mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+ mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+ mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
EXIT;
}
ptlrpc_unregister_service(m->mdt_fld_service);
m->mdt_fld_service = NULL;
}
- ENTRY;
+ EXIT;
}
static int mdt_start_ptlrpc_service(struct mdt_device *m)
m->mdt_opts.mo_user_xattr = 0;
m->mdt_opts.mo_acl = 0;
+ m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
lmi = server_get_mount_2(dev);
if (lmi == NULL) {
CERROR("Cannot get mount info for %s!\n", dev);
rc = mdt_device_sync(&env, mdt);
break;
case OBD_IOC_SET_READONLY:
- rc = dt->dd_ops->dt_sync(&env, dt);
dt->dd_ops->dt_ro(&env, dt);
break;
case OBD_IOC_ABORT_RECOVERY:
return md_ucred(info->mti_env);
}
+/**
+ * Enable/disable COS.
+ *
+ * Set/Clear the COS flag in mdt options.
+ *
+ * \param mdt mdt device
+ * \param val 0 disables COS, other values enable COS
+ */
+void mdt_enable_cos(struct mdt_device *mdt, int val)
+{
+ struct lu_env env;
+ int rc;
+
+ mdt->mdt_opts.mo_cos = !!val;
+ rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
+ if (unlikely(rc != 0)) {
+ CWARN("lu_env initialization failed with rc = %d,"
+ "cannot sync\n", rc);
+ return;
+ }
+ mdt_device_sync(&env, mdt);
+ lu_env_fini(&env);
+}
+
+/**
+ * Check COS status.
+ *
+ * Return COS flag status/
+ *
+ * \param mdt mdt device
+ */
+int mdt_cos_is_enabled(struct mdt_device *mdt)
+{
+ return mdt->mdt_opts.mo_cos != 0;
+}
+
/* type constructor/destructor: mdt_type_init, mdt_type_fini */
LU_TYPE_INIT_FINI(mdt, &mdt_thread_key, &mdt_txn_key);
mo_acl :1,
mo_compat_resname:1,
mo_mds_capa :1,
- mo_oss_capa :1;
+ mo_oss_capa :1,
+ mo_cos :1;
} mdt_opts;
/* mdt state flags */
__u32 mdt_fl_cfglog:1,
#define MDT_SERVICE_WATCHDOG_FACTOR (2000)
#define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID)
#define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
+#define MDT_COS_DEFAULT (1)
struct mdt_object {
struct lu_object_header mot_header;
return cache ? (strcmp(cache->uc_upcall, "NONE") == 0) : 1;
}
+int mdt_blocking_ast(struct ldlm_lock*, struct ldlm_lock_desc*, void*, int);
+
/* Issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
static inline int mdt_fid_lock(struct ldlm_namespace *ns,
struct lustre_handle *lh,
ldlm_mode_t mode,
ldlm_policy_data_t *policy,
const struct ldlm_res_id *res_id,
- int flags)
+ int flags, const __u64 *client_cookie)
{
int rc;
LASSERT(lh != NULL);
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
- mode, &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- 0, NULL, lh);
+ mode, &flags, mdt_blocking_ast,
+ ldlm_completion_ast,
+ NULL, NULL, 0, NULL, client_cookie, lh);
return rc == ELDLM_OK ? 0 : -EIO;
}
return tlname;
}
+void mdt_enable_cos(struct mdt_device *, int);
+int mdt_cos_is_enabled(struct mdt_device *);
+
/* lprocfs stuff */
void lprocfs_mdt_init_vars(struct lprocfs_static_vars *lvars);
int mdt_procfs_init(struct mdt_device *mdt, const char *name);
return count;
}
+static int lprocfs_rd_cos(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ return snprintf(page, count, "%u\n", mdt_cos_is_enabled(mdt));
+}
+
+static int lprocfs_wr_cos(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ int val, rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+ mdt_enable_cos(mdt, val);
+ return count;
+}
+
static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
{ "site_stats", lprocfs_rd_site_stats, 0, 0 },
{ "evict_client", 0, lprocfs_mdt_wr_evict_client, 0 },
{ "hash_stats", lprocfs_obd_rd_hash, 0, 0 },
+ { "commit_on_sharing", lprocfs_rd_cos, lprocfs_wr_cos, 0 },
{ 0 }
};
libcfs_nid2str(exp->exp_connection->c_peer.nid));
for (i = 0; i < oldrep->rs_nlocks; i++)
- ptlrpc_save_lock(req,
- &oldrep->rs_locks[i],
- oldrep->rs_modes[i]);
+ ptlrpc_save_lock(req, &oldrep->rs_locks[i],
+ oldrep->rs_modes[i], 0);
oldrep->rs_nlocks = 0;
DEBUG_REQ(D_HA, req, "stole locks for");
rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
LCK_EX, &flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL, 0,
- NULL, lh);
+ NULL,
+ &info->mti_exp->exp_handle.h_cookie,
+ lh);
} else {
struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
LDLM_PLAIN, NULL, LCK_EX,
&flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL,
- fsname, 0, NULL, lockh);
+ fsname, 0, NULL, NULL, lockh);
if (rc)
CERROR("can't take cfg lock for %s (%d)\n", fsname, rc);
obd->obd_fail = 1;
obd->obd_no_transno = 1;
obd->obd_no_recov = 1;
- /* Set the obd readonly if we can */
- if (OBP(obd, iocontrol))
+ if (OBP(obd, iocontrol)) {
+ obd_iocontrol(OBD_IOC_SYNC,
+ obd->obd_self_export,
+ 0, NULL, NULL);
+ /* Set the obd readonly if we can */
obd_iocontrol(OBD_IOC_SET_READONLY,
obd->obd_self_export,
0, NULL, NULL);
+ }
break;
default:
CERROR("unrecognised flag '%c'\n",
rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN,
NULL, LCK_NL, &lock_flags, NULL,
ldlm_completion_ast, NULL, NULL,
- 0, NULL, &obd->u.echo.eo_nl_lock);
+ 0, NULL, NULL, &obd->u.echo.eo_nl_lock);
LASSERT (rc == ELDLM_OK);
lprocfs_echo_init_vars(&lvars);
rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_EXTENT,
&policy, LCK_PW, &flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL, 0, NULL,
- &lockh);
+ NULL, &lockh);
/* We only care about the side-effects, just drop the lock. */
if (rc == ELDLM_OK)
return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
}
+/**
+ * Start commit for OSD device.
+ *
+ * An implementation of dt_commit_async method for OSD device.
+ * Asychronously starts underlayng fs sync and thereby a transaction
+ * commit.
+ *
+ * \param env environment
+ * \param d dt device
+ *
+ * \see dt_device_operations
+ */
+static int osd_commit_async(const struct lu_env *env,
+ struct dt_device *d)
+{
+ struct super_block *s = osd_sb(osd_dt_dev(d));
+ ENTRY;
+
+ CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
+ RETURN(s->s_op->sync_fs(s, 0));
+}
+
/*
* Concurrency: shouldn't matter.
*/
.dt_conf_get = osd_conf_get,
.dt_sync = osd_sync,
.dt_ro = osd_ro,
+ .dt_commit_async = osd_commit_async,
.dt_credit_get = osd_credit_get,
.dt_init_capa_ctxt = osd_init_capa_ctxt,
};
if (!ack_lock->mode)
break;
/* XXX not even calling target_send_reply in some cases... */
- ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode);
+ ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
}
}
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, LCK_PW, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, NULL,
+ NULL, lh));
}
/*
RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
LDLM_EXTENT, &policy, mode, &flags,
ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, NULL, lh));
+ ldlm_glimpse_ast, NULL, 0, NULL,
+ NULL, lh));
}
static void ost_brw_lock_put(int mode,
* until ptlrpc_server_handle_reply() is done with it */
spin_lock(&svc->srv_lock);
rs->rs_on_net = 0;
- ptlrpc_schedule_difficult_reply (rs);
+ if (!rs->rs_no_ack ||
+ rs->rs_transno <= rs->rs_export->exp_obd->obd_last_committed)
+ ptlrpc_schedule_difficult_reply (rs);
spin_unlock(&svc->srv_lock);
}
req->rq_sent = cfs_time_current_sec();
rc = ptl_send_buf (&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
- rs->rs_difficult ? LNET_ACK_REQ : LNET_NOACK_REQ,
+ (rs->rs_difficult && !rs->rs_no_ack) ?
+ LNET_ACK_REQ : LNET_NOACK_REQ,
&rs->rs_cb_id, conn, svc->srv_rep_portal,
req->rq_xid, req->rq_reply_off);
out:
void
ptlrpc_save_lock (struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode)
+ struct lustre_handle *lock, int mode, int no_ack)
{
struct ptlrpc_reply_state *rs = req->rq_reply_state;
int idx;
rs->rs_locks[idx] = *lock;
rs->rs_modes[idx] = mode;
rs->rs_difficult = 1;
+ rs->rs_no_ack = !!no_ack;
}
void
ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs)
{
struct ptlrpc_service *svc = rs->rs_service;
+ ENTRY;
#ifdef CONFIG_SMP
LASSERT (spin_is_locked (&svc->srv_lock));
LASSERT (rs->rs_difficult);
rs->rs_scheduled_ever = 1; /* flag any notification attempt */
- if (rs->rs_scheduled) /* being set up or already notified */
+ if (rs->rs_scheduled) { /* being set up or already notified */
+ EXIT;
return;
+ }
rs->rs_scheduled = 1;
list_del (&rs->rs_list);
list_add (&rs->rs_list, &svc->srv_reply_queue);
cfs_waitq_signal (&svc->srv_waitq);
+ EXIT;
}
void
{
struct list_head *tmp;
struct list_head *nxt;
+ ENTRY;
/* Find any replies that have been committed and get their service
* to attend to complete them. */
}
spin_unlock(&obd->obd_uncommitted_replies_lock);
+ EXIT;
}
static int
if (!rs->rs_on_net) {
/* Off the net */
svc->srv_n_difficult_replies--;
+ if (svc->srv_n_difficult_replies == 0 && svc->srv_is_stopping)
+ /* wake up threads that are being stopped by
+ ptlrpc_unregister_service/ptlrpc_stop_threads
+ and sleep waiting svr_n_difficult_replies == 0 */
+ cfs_waitq_broadcast(&svc->srv_waitq);
spin_unlock(&svc->srv_lock);
class_export_put (exp);
struct ptlrpc_thread *thread)
{
struct l_wait_info lwi = { 0 };
+ ENTRY;
+ CDEBUG(D_RPCTRACE, "Stopping thread %p\n", thread);
spin_lock(&svc->srv_lock);
thread->t_flags = SVC_STOPPING;
spin_unlock(&svc->srv_lock);
spin_unlock(&svc->srv_lock);
OBD_FREE_PTR(thread);
+ EXIT;
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_thread *thread;
+ ENTRY;
spin_lock(&svc->srv_lock);
while (!list_empty(&svc->srv_threads)) {
}
spin_unlock(&svc->srv_lock);
+ EXIT;
}
int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
struct l_wait_info lwi;
struct list_head *tmp;
struct ptlrpc_reply_state *rs, *t;
+ ENTRY;
+ service->srv_is_stopping = 1;
cfs_timer_disarm(&service->srv_at_timer);
ptlrpc_stop_all_threads(service);
cfs_timer_disarm(&service->srv_at_timer);
OBD_FREE_PTR(service);
- return 0;
+ RETURN(0);
}
/* Returns 0 if the service is healthy.
(long long)LCK_NL);
LASSERTF(LCK_GROUP == 64, " found %lld\n",
(long long)LCK_GROUP);
- LASSERTF(LCK_MAXMODE == 65, " found %lld\n",
+ LASSERTF(LCK_MAXMODE == 129, " found %lld\n",
(long long)LCK_MAXMODE);
- LASSERTF(LCK_MODE_NUM == 7, " found %lld\n",
+ LASSERTF(LCK_MODE_NUM == 8, " found %lld\n",
(long long)LCK_MODE_NUM);
CLASSERT(LDLM_PLAIN == 10);
CLASSERT(LDLM_EXTENT == 11);
}
run_test 20 "recovery time is not increasing"
+test_21() {
+ local param_file=$TMP/$tfile-params
+
+ save_lustre_params $(facet_active_host $SINGLEMDS) "mdt.*.commit_on_sharing" > $param_file
+ do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=1
+ touch $MOUNT1/$tfile-1
+ mv $MOUNT2/$tfile-1 $MOUNT2/$tfile-2
+ mv $MOUNT1/$tfile-2 $MOUNT1/$tfile-3
+ replay_barrier_nosync $SINGLEMDS
+ umount $MOUNT2
+
+ facet_failover $SINGLEMDS
+
+ # all renames are replayed
+ unlink $MOUNT1/$tfile-3 || return 2
+
+ zconf_mount `hostname` $MOUNT2 || error "mount $MOUNT2 fail"
+
+ do_facet $SINGLEMDS lctl set_param mdt.*.commit_on_sharing=0
+ rm -rf $MOUNT1/$tfile-*
+ restore_lustre_params < $param_file
+ rm -f $param_file
+ return 0
+}
+run_test 21 "commit on sharing"
+
equals_msg `basename $0`: test complete, cleaning up
SLEEP=$((`date +%s` - $NOW))
[ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP
$LCTL mark "local REPLAY BARRIER on ${!svc}"
}
+replay_barrier_nosync() {
+ local facet=$1 echo running=${running}
+ local svc=${facet}_svc
+ echo Replay barrier on ${!svc}
+ do_facet $facet $LCTL --device %${!svc} readonly
+ do_facet $facet $LCTL --device %${!svc} notransno
+ do_facet $facet $LCTL mark "$facet REPLAY BARRIER on ${!svc}"
+ $LCTL mark "local REPLAY BARRIER on ${!svc}"
+}
+
mds_evict_client() {
UUID=`lctl get_param -n mdc.${mds1_svc}-mdc-*.uuid`
do_facet mds1 "lctl set_param -n mdt.${mds1_svc}.evict_client $UUID"
(long long)LCK_NL);
LASSERTF(LCK_GROUP == 64, " found %lld\n",
(long long)LCK_GROUP);
- LASSERTF(LCK_MAXMODE == 65, " found %lld\n",
+ LASSERTF(LCK_MAXMODE == 129, " found %lld\n",
(long long)LCK_MAXMODE);
- LASSERTF(LCK_MODE_NUM == 7, " found %lld\n",
+ LASSERTF(LCK_MODE_NUM == 8, " found %lld\n",
(long long)LCK_MODE_NUM);
CLASSERT(LDLM_PLAIN == 10);
CLASSERT(LDLM_EXTENT == 11);