* list. */
#define LDLM_FL_KMS_IGNORE 0x200000
+/* Don't put lock into the LRU list, so that it is not canceled due to aging.
+ * Used by MGC locks, they are cancelled only at unmount or by callback. */
+#define LDLM_FL_NO_LRU 0x400000
+
/* Immediatelly cancel such locks when they block some other locks. Send
* cancel notification to original lock holder, but expect no reply. This is
* for clients (like liblustre) that cannot be expected to reliably response
lock_res_and_lock(lock);
if (((lock->l_req_mode == lock->l_granted_mode) &&
!(lock->l_flags & LDLM_FL_CP_REQD)) ||
- (lock->l_flags & LDLM_FL_FAILED))
+ (lock->l_flags & (LDLM_FL_FAILED | LDLM_FL_CANCEL)))
ret = 1;
unlock_res_and_lock(lock);
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns_is_client(ns) &&
!lock->l_readers && !lock->l_writers &&
+ !(lock->l_flags & LDLM_FL_NO_LRU) &&
!(lock->l_flags & LDLM_FL_BL_AST)) {
+
+ LDLM_DEBUG(lock, "add lock into lru list");
+
/* If this is a client-side namespace and this was the last
* reference, put it on the LRU. */
ldlm_lock_add_to_lru(lock);
!ns_connect_lru_resize(ns))
ldlm_cancel_lru(ns, 0, LDLM_ASYNC, 0);
} else {
+ LDLM_DEBUG(lock, "do not add lock into lru list");
unlock_res_and_lock(lock);
}
lock->l_conn_export = exp;
lock->l_export = NULL;
lock->l_blocking_ast = einfo->ei_cb_bl;
+ lock->l_flags |= (*flags & LDLM_FL_NO_LRU);
/* Dump lock data into the request buffer */
body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld)
{
struct lustre_handle lockh = { 0 };
- int rc = 0, rcl, flags = 0;
+ int rc = 0, rcl, flags = LDLM_FL_NO_LRU;
ENTRY;
LASSERT(cld);
RETURN(0);
}
-/* similar to filter_prepare_destroy */
-int mgs_get_lock(struct obd_device *obd, struct ldlm_res_id *res,
- struct lustre_handle *lockh)
+static int mgs_completion_ast_config(struct ldlm_lock *lock, int flags,
+ void *cbdata)
{
- int rc, flags = 0;
ENTRY;
- rc = ldlm_cli_enqueue_local(obd->obd_namespace, res,
- LDLM_PLAIN, NULL, LCK_EX,
- &flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL,
- NULL, 0, NULL, lockh);
- if (rc)
- CERROR("can't take cfg lock for "LPX64"/"LPX64"(%d)\n",
- le64_to_cpu(res->name[0]), le64_to_cpu(res->name[1]),
- rc);
+ if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV))) {
+ struct fs_db *fsdb = (struct fs_db *)lock->l_ast_data;
+ struct lustre_handle lockh;
- RETURN(rc);
+ /* clear the bit before lock put */
+ cfs_clear_bit(FSDB_REVOKING_LOCK, &fsdb->fsdb_flags);
+
+ ldlm_lock2handle(lock, &lockh);
+ ldlm_lock_decref_and_cancel(&lockh, LCK_EX);
+ }
+
+ RETURN(ldlm_completion_ast(lock, flags, cbdata));
}
-int mgs_put_lock(struct lustre_handle *lockh)
+static int mgs_completion_ast_ir(struct ldlm_lock *lock, int flags,
+ void *cbdata)
{
ENTRY;
- ldlm_lock_decref_and_cancel(lockh, LCK_EX);
- RETURN(0);
+
+ if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
+ LDLM_FL_BLOCK_CONV))) {
+ struct fs_db *fsdb = (struct fs_db *)lock->l_ast_data;
+ struct lustre_handle lockh;
+
+ mgs_ir_notify_complete(fsdb);
+
+ ldlm_lock2handle(lock, &lockh);
+ ldlm_lock_decref_and_cancel(&lockh, LCK_EX);
+ }
+
+ RETURN(ldlm_completion_ast(lock, flags, cbdata));
}
-void mgs_revoke_lock(struct obd_device *obd, struct fs_db *fsdb)
+void mgs_revoke_lock(struct obd_device *obd, struct fs_db *fsdb, int type)
{
- struct lustre_handle lockh;
- struct ldlm_res_id res_id;
- int lockrc;
- int bit;
- int rc;
+ ldlm_completion_callback cp = NULL;
+ struct lustre_handle lockh = { 0 };
+ struct ldlm_res_id res_id;
+ int flags = LDLM_FL_ATOMIC_CB;
+ int rc;
+ ENTRY;
LASSERT(fsdb->fsdb_name[0] != '\0');
- rc = mgc_fsname2resid(fsdb->fsdb_name, &res_id, CONFIG_T_CONFIG);
+ rc = mgc_fsname2resid(fsdb->fsdb_name, &res_id, type);
LASSERT(rc == 0);
- bit = FSDB_REVOKING_LOCK;
- if (!rc && cfs_test_and_set_bit(bit, &fsdb->fsdb_flags) == 0) {
- lockrc = mgs_get_lock(obd, &res_id, &lockh);
- /* clear the bit before lock put */
- cfs_clear_bit(bit, &fsdb->fsdb_flags);
+ switch (type) {
+ case CONFIG_T_CONFIG:
+ cp = mgs_completion_ast_config;
+ if (cfs_test_and_set_bit(FSDB_REVOKING_LOCK, &fsdb->fsdb_flags))
+ rc = -EALREADY;
+ break;
+ case CONFIG_T_RECOVER:
+ cp = mgs_completion_ast_ir;
+ default:
+ break;
+ }
- if (lockrc != ELDLM_OK)
- CERROR("lock error %d for fs %s\n",
- lockrc, fsdb->fsdb_name);
- else
- mgs_put_lock(&lockh);
+ if (!rc) {
+ LASSERT(cp != NULL);
+ rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
+ LDLM_PLAIN, NULL, LCK_EX, &flags,
+ ldlm_blocking_ast, cp, NULL,
+ fsdb, 0, NULL, &lockh);
+ if (rc != ELDLM_OK) {
+ CERROR("can't take cfg lock for "LPX64"/"LPX64"(%d)\n",
+ le64_to_cpu(res_id.name[0]),
+ le64_to_cpu(res_id.name[1]), rc);
+
+ if (type == CONFIG_T_CONFIG)
+ cfs_clear_bit(FSDB_REVOKING_LOCK,
+ &fsdb->fsdb_flags);
+ }
+ /* lock has been cancelled in completion_ast. */
}
+
+ RETURN_EXIT;
}
/* rc=0 means ok
}
out:
- mgs_revoke_lock(obd, fsdb);
+ mgs_revoke_lock(obd, fsdb, CONFIG_T_CONFIG);
out_nolock:
CDEBUG(D_MGS, "replying with %s, index=%d, rc=%d\n", mti->mti_svname,
struct obd_device *fsdb_obd;
cfs_waitq_t fsdb_notify_waitq;
cfs_completion_t fsdb_notify_comp;
+ cfs_time_t fsdb_notify_start;
cfs_atomic_t fsdb_notify_phase;
volatile int fsdb_notify_async:1,
fsdb_notify_stop:1;
char *poolname, char *fsname, char *ostname);
/* mgs_handler.c */
-void mgs_revoke_lock(struct obd_device *obd, struct fs_db *fsdb);
int mgs_get_lock(struct obd_device *obd, struct ldlm_res_id *res,
struct lustre_handle *lockh);
int mgs_put_lock(struct lustre_handle *lockh);
+void mgs_revoke_lock(struct obd_device *obd, struct fs_db *fsdb, int type);
/* mgs_nids.c */
int mgs_ir_update(struct obd_device *obd, struct mgs_target_info *mti);
int mgs_ir_init_fs(struct obd_device *obd, struct fs_db *fsdb);
void mgs_ir_fini_fs(struct obd_device *obd, struct fs_db *fsdb);
+void mgs_ir_notify_complete(struct fs_db *fsdb);
int mgs_get_ir_logs(struct ptlrpc_request *req);
int lprocfs_wr_ir_state(struct file *file, const char *buffer,
unsigned long count, void *data);
* so we don't really need to hold the lock while we're
* writing (above).
*/
- mgs_revoke_lock(obd, fsdb);
+ mgs_revoke_lock(obd, fsdb, CONFIG_T_CONFIG);
out:
OBD_FREE_PTR(mti);
RETURN(rc);
cfs_up(&fsdb->fsdb_sem);
/* request for update */
- mgs_revoke_lock(obd, fsdb);
+ mgs_revoke_lock(obd, fsdb, CONFIG_T_CONFIG);
EXIT;
out:
}
/* --------- Imperative Recovery relies on nidtbl stuff ------- */
+void mgs_ir_notify_complete(struct fs_db *fsdb)
+{
+ struct timeval tv;
+ cfs_duration_t delta;
+
+ cfs_atomic_set(&fsdb->fsdb_notify_phase, 0);
+
+ /* do statistic */
+ fsdb->fsdb_notify_count++;
+ delta = cfs_time_sub(cfs_time_current(), fsdb->fsdb_notify_start);
+ fsdb->fsdb_notify_total += delta;
+ if (delta > fsdb->fsdb_notify_max)
+ fsdb->fsdb_notify_max = delta;
+
+ cfs_duration_usec(delta, &tv);
+ CDEBUG(D_MGS, "Revoke recover lock of %s completed after %ld.%06lds\n",
+ fsdb->fsdb_name, tv.tv_sec, tv.tv_usec);
+}
+
static int mgs_ir_notify(void *arg)
{
struct fs_db *fsdb = arg;
cfs_complete(&fsdb->fsdb_notify_comp);
+ set_user_nice(current, -2);
+
mgc_fsname2resid(fsdb->fsdb_name, &resid, CONFIG_T_RECOVER);
while (1) {
struct l_wait_info lwi = { 0 };
- struct lustre_handle lockh;
- cfs_time_t curtime;
- int lockrc;
- int delta;
l_wait_event(fsdb->fsdb_notify_waitq,
fsdb->fsdb_notify_stop ||
CDEBUG(D_MGS, "%s woken up, phase is %d\n",
name, cfs_atomic_read(&fsdb->fsdb_notify_phase));
- curtime = cfs_time_current();
- lockrc = mgs_get_lock(fsdb->fsdb_obd, &resid, &lockh);
- if (lockrc == ELDLM_OK) {
- cfs_atomic_set(&fsdb->fsdb_notify_phase, 0);
- mgs_put_lock(&lockh);
-
- /* do statistic */
- fsdb->fsdb_notify_count++;
- delta = cfs_time_current() - curtime;
- fsdb->fsdb_notify_total += delta;
- if (delta > fsdb->fsdb_notify_max)
- fsdb->fsdb_notify_max = delta;
- CDEBUG(D_MGS, "Revoke recover lock of %s %dT\n",
- fsdb->fsdb_name, delta);
- } else {
- CERROR("Fatal error %d for fs %s\n",
- lockrc, fsdb->fsdb_name);
- }
+ fsdb->fsdb_notify_start = cfs_time_current();
+ mgs_revoke_lock(fsdb->fsdb_obd, fsdb, CONFIG_T_RECOVER);
}
cfs_complete(&fsdb->fsdb_notify_comp);