LDLM_LOCK_RELEASE(lock);
continue;
}
- export = class_export_get(lock->l_export);
+ export = class_export_lock_get(lock->l_export, lock);
spin_unlock_bh(&waiting_locks_spinlock);
+ do_dump++;
+ class_fail_export(export);
+ class_export_lock_put(export, lock);
+
/* release extra ref grabbed by ldlm_add_waiting_lock()
* or ldlm_failed_ast() */
LDLM_LOCK_RELEASE(lock);
-
- do_dump++;
- class_fail_export(export);
- class_export_put(export);
+
spin_lock_bh(&waiting_locks_spinlock);
}
spin_unlock_bh(&waiting_locks_spinlock);
cont = 0;
LDLM_LOCK_GET(lock);
+
spin_unlock_bh(&waiting_locks_spinlock);
LDLM_DEBUG(lock, "prolong the busy lock");
ldlm_refresh_waiting_lock(lock,
spin_lock_bh(&waiting_locks_spinlock);
if (!cont) {
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
break;
}
- LDLM_LOCK_PUT(lock);
+ LDLM_LOCK_RELEASE(lock);
continue;
}
lock->l_resource->lr_namespace->ns_timeouts++;
static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
{
int ret;
+ int timeout = ldlm_get_enq_timeout(lock);
LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));
return 0;
}
- ret = __ldlm_add_waiting_lock(lock, ldlm_get_enq_timeout(lock));
+ ret = __ldlm_add_waiting_lock(lock, timeout);
if (ret)
/* grab ref on the lock if it has been added to the
* waiting list */
LDLM_LOCK_GET(lock);
spin_unlock_bh(&waiting_locks_spinlock);
- LDLM_DEBUG(lock, "%sadding to wait list",
- ret == 0 ? "not re-" : "");
+ LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
+ ret == 0 ? "not re-" : "", timeout,
+ AT_OFF ? "off" : "on");
return ret;
}
if (lock->l_export == NULL) {
/* We don't have a "waiting locks list" on clients. */
- LDLM_DEBUG(lock, "client lock: no-op");
+ CDEBUG(D_DLMTRACE, "Client lock %p : no-op\n", lock);
return 0;
}
LASSERT(lock != NULL);
if (rc != 0) {
/* If client canceled the lock but the cancel has not
- * been recieved yet, we need to update lvbo to have the
+ * been received yet, we need to update lvbo to have the
* proper attributes cached. */
if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)
- ldlm_res_lvbo_update(lock->l_resource, NULL,
- 0, 1);
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
}
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
total_enqueue_wait = cfs_time_sub(cfs_time_current_sec(),
lock->l_last_activity);
- if (total_enqueue_wait > obd_timeout)
- /* non-fatal with AT - change to LDLM_DEBUG? */
- LDLM_WARN(lock, "enqueue wait took %lus from "CFS_TIME_T,
- total_enqueue_wait, lock->l_last_activity);
-
req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
&RQF_LDLM_CP_CALLBACK);
if (req == NULL)
/* Server-side enqueue wait time estimate, used in
__ldlm_add_waiting_lock to set future enqueue timers */
- at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
- total_enqueue_wait);
+ if (total_enqueue_wait < ldlm_get_enq_timeout(lock))
+ at_add(&lock->l_resource->lr_namespace->ns_at_estimate,
+ total_enqueue_wait);
+ else
+ /* bz18618. Don't add lock enqueue time we spend waiting for a
+ previous callback to fail. Locks waiting legitimately will
+ get extended by ldlm_refresh_waiting_lock regardless of the
+ estimate, so it's okay to underestimate here. */
+ LDLM_DEBUG(lock, "lock completed after %lus; estimate was %ds. "
+ "It is likely that a previous callback timed out.",
+ total_enqueue_wait,
+ at_get(&lock->l_resource->lr_namespace->ns_at_estimate));
ptlrpc_request_set_replen(req);
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
req->rq_send_state = LUSTRE_IMP_FULL;
- /* ptlrpc_prep_req already set timeout */
+ /* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
else if (rc != 0)
rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
else
- rc = ldlm_res_lvbo_update(res, req->rq_repmsg,
- REPLY_REC_OFF, 1);
+ rc = ldlm_res_lvbo_update(res, req, 1);
+
ptlrpc_req_finished(req);
if (rc == -ERESTART)
ldlm_reprocess_all(res);
if (unlikely(flags & LDLM_FL_REPLAY)) {
/* Find an existing lock in the per-export lock hash */
- lock = lustre_hash_lookup(req->rq_export->exp_lock_hash,
- (void *)&dlm_req->lock_handle[0]);
+ lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
+ (void *)&dlm_req->lock_handle[0]);
if (lock != NULL) {
DEBUG_REQ(D_DLMTRACE, req, "found existing lock cookie "
LPX64, lock->l_handle.h_cookie);
LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
GOTO(out, rc = -ENOTCONN);
}
- lock->l_export = class_export_get(req->rq_export);
+ lock->l_export = class_export_lock_get(req->rq_export, lock);
if (lock->l_export->exp_lock_hash)
- lustre_hash_add(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle,
- &lock->l_exp_hash);
+ cfs_hash_add(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle,
+ &lock->l_exp_hash);
existing_lock:
if (lock->l_granted_mode == lock->l_req_mode) {
/*
* Only cancel lock if it was granted, because it would
- * be destroyed immediatelly and would never be granted
+ * be destroyed immediately and would never be granted
* in the future, causing timeouts on client. Not
- * granted lock will be cancelled immediatelly after
+ * granted lock will be cancelled immediately after
* sending completion AST.
*/
if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {
if (res != NULL) {
ldlm_resource_getref(res);
LDLM_RESOURCE_ADDREF(res);
- ldlm_res_lvbo_update(res, NULL, 0, 1);
+ ldlm_res_lvbo_update(res, NULL, 1);
}
pres = res;
}
int do_ast;
ENTRY;
- LDLM_DEBUG(lock, "client blocking AST callback handler START");
+ LDLM_DEBUG(lock, "client blocking AST callback handler");
lock_res_and_lock(lock);
lock->l_flags |= LDLM_FL_CBPENDING;
unlock_res_and_lock(lock);
if (do_ast) {
- LDLM_DEBUG(lock, "already unused, calling "
- "callback (%p)", lock->l_blocking_ast);
+ CDEBUG(D_DLMTRACE, "Lock %p already unused, calling callback (%p)\n",
+ lock, lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, ld, lock->l_ast_data,
LDLM_CB_BLOCKING);
} else {
- LDLM_DEBUG(lock, "Lock still has references, will be"
- " cancelled later");
+ CDEBUG(D_DLMTRACE, "Lock %p is referenced, will be cancelled later\n",
+ lock);
}
LDLM_DEBUG(lock, "client blocking callback handler END");
LDLM_ERROR(lock, "completion AST did not contain "
"expected LVB!");
} else {
- void *lvb = req_capsule_client_swab_get(&req->rq_pill,
- &RMF_DLM_LVB,
- (void *)lock->l_lvb_swabber);
+ void *lvb = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_LVB);
memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len);
}
}
#endif
}
+/* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
+static int ldlm_handle_setinfo(struct ptlrpc_request *req)
+{
+ struct obd_device *obd = req->rq_export->exp_obd;
+ char *key;
+ void *val;
+ int keylen, vallen;
+ int rc = -ENOSYS;
+ ENTRY;
+
+ DEBUG_REQ(D_ERROR, req, "%s: handle setinfo\n", obd->obd_name);
+
+ req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
+
+ key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ if (key == NULL) {
+ DEBUG_REQ(D_IOCTL, req, "no set_info key");
+ RETURN(-EFAULT);
+ }
+ keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT);
+ val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ if (val == NULL) {
+ DEBUG_REQ(D_IOCTL, req, "no set_info val");
+ RETURN(-EFAULT);
+ }
+ vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT);
+
+ /* We are responsible for swabbing contents of val */
+
+ if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
+ /* Pass it on to mdc (the "export" in this case) */
+ rc = obd_set_info_async(req->rq_export,
+ sizeof(KEY_HSM_COPYTOOL_SEND),
+ KEY_HSM_COPYTOOL_SEND,
+ vallen, val, NULL);
+ else
+ DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
+
+ return rc;
+}
+
/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
req_capsule_init(&req->rq_pill, req, RCL_SERVER);
if (req->rq_export == NULL) {
- struct ldlm_request *dlm_req;
-
- CDEBUG(D_RPCTRACE, "operation %d from %s with bad "
- "export cookie "LPX64"; this is "
- "normal if this node rebooted with a lock held\n",
- lustre_msg_get_opc(req->rq_reqmsg),
- libcfs_id2str(req->rq_peer),
- lustre_msg_get_handle(req->rq_reqmsg)->cookie);
-
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
- dlm_req->lock_handle[0].cookie);
-
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK))
RETURN(0);
break;
+ case LDLM_SET_INFO:
+ rc = ldlm_handle_setinfo(req);
+ ldlm_callback_reply(req, rc);
+ RETURN(0);
case OBD_LOG_CANCEL: /* remove this eventually - for 1.4.0 compat */
CERROR("shouldn't be handling OBD_LOG_CANCEL on DLM thread\n");
req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
lock_res_and_lock(lock);
lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
- /* If somebody cancels lock and cache is already droped,
+ /* If somebody cancels lock and cache is already dropped,
* or lock is failed before cp_ast received on client,
* we can tell the server we have no lock. Otherwise, we
* should send cancel after dropping the cache. */
libcfs_id2str(req->rq_peer),
lustre_msg_get_handle(req->rq_reqmsg)->cookie);
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
- dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL)
- ldlm_lock_dump_handle(D_ERROR,
- &dlm_req->lock_handle[0]);
+ if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_CANCEL) {
+ req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
+ dlm_req = req_capsule_client_get(&req->rq_pill,
+ &RMF_DLM_REQ);
+ if (dlm_req != NULL)
+ ldlm_lock_dump_handle(D_ERROR,
+ &dlm_req->lock_handle[0]);
+ }
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
lock->l_flags |= LDLM_FL_AST_SENT;
if (lock->l_export && lock->l_export->exp_lock_hash &&
!hlist_unhashed(&lock->l_exp_hash))
- lustre_hash_del(lock->l_export->exp_lock_hash,
- &lock->l_remote_handle, &lock->l_exp_hash);
+ cfs_hash_del(lock->l_export->exp_lock_hash,
+ &lock->l_remote_handle, &lock->l_exp_hash);
list_add_tail(&lock->l_rk_ast, rpc_list);
LDLM_LOCK_GET(lock);
ENTRY;
CFS_INIT_LIST_HEAD(&rpc_list);
- lustre_hash_for_each_empty(exp->exp_lock_hash,
- ldlm_revoke_lock_cb, &rpc_list);
+ cfs_hash_for_each_empty(exp->exp_lock_hash,
+ ldlm_revoke_lock_cb, &rpc_list);
ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
EXIT;
* Export handle<->lock hash operations.
*/
static unsigned
-ldlm_export_lock_hash(lustre_hash_t *lh, void *key, unsigned mask)
+ldlm_export_lock_hash(cfs_hash_t *hs, void *key, unsigned mask)
{
- return lh_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+ return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
}
static void *
RETURN(lock);
}
-static lustre_hash_ops_t ldlm_export_lock_ops = {
- .lh_hash = ldlm_export_lock_hash,
- .lh_key = ldlm_export_lock_key,
- .lh_compare = ldlm_export_lock_compare,
- .lh_get = ldlm_export_lock_get,
- .lh_put = ldlm_export_lock_put
+static cfs_hash_ops_t ldlm_export_lock_ops = {
+ .hs_hash = ldlm_export_lock_hash,
+ .hs_key = ldlm_export_lock_key,
+ .hs_compare = ldlm_export_lock_compare,
+ .hs_get = ldlm_export_lock_get,
+ .hs_put = ldlm_export_lock_put
};
int ldlm_init_export(struct obd_export *exp)
ENTRY;
exp->exp_lock_hash =
- lustre_hash_init(obd_uuid2str(&exp->exp_client_uuid),
- 7, 16, &ldlm_export_lock_ops, LH_REHASH);
+ cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
+ HASH_EXP_LOCK_CUR_BITS, HASH_EXP_LOCK_MAX_BITS,
+ &ldlm_export_lock_ops, CFS_HASH_REHASH);
if (!exp->exp_lock_hash)
RETURN(-ENOMEM);
void ldlm_destroy_export(struct obd_export *exp)
{
ENTRY;
- lustre_hash_exit(exp->exp_lock_hash);
+ cfs_hash_destroy(exp->exp_lock_hash);
exp->exp_lock_hash = NULL;
EXIT;
}
ldlm_state->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CB_REQUEST_PORTAL,
- LDLM_CB_REPLY_PORTAL, 1800,
+ LDLM_CB_REPLY_PORTAL, 2,
ldlm_callback_handler, "ldlm_cbd",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
ldlm_state->ldlm_cancel_service =
ptlrpc_init_svc(LDLM_NBUFS, LDLM_BUFSIZE, LDLM_MAXREQSIZE,
LDLM_MAXREPSIZE, LDLM_CANCEL_REQUEST_PORTAL,
- LDLM_CANCEL_REPLY_PORTAL, 6000,
+ LDLM_CANCEL_REPLY_PORTAL, 6,
ldlm_cancel_handler, "ldlm_canceld",
ldlm_svc_proc_dir, NULL,
ldlm_min_threads, ldlm_max_threads,
return -ENOMEM;
ldlm_lock_slab = cfs_mem_cache_create("ldlm_locks",
- sizeof(struct ldlm_lock), 0,
- SLAB_HWCACHE_ALIGN);
+ sizeof(struct ldlm_lock), 0,
+ SLAB_HWCACHE_ALIGN | SLAB_DESTROY_BY_RCU);
if (ldlm_lock_slab == NULL) {
cfs_mem_cache_destroy(ldlm_resource_slab);
return -ENOMEM;
cfs_mem_cache_destroy(ldlm_lock_slab);
return -ENOMEM;
}
-
+#if LUSTRE_TRACKS_LOCK_EXP_REFS
+ class_export_dump_hook = ldlm_dump_export_locks;
+#endif
return 0;
}
CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
rc = cfs_mem_cache_destroy(ldlm_resource_slab);
LASSERTF(rc == 0, "couldn't free ldlm resource slab\n");
+#ifdef __KERNEL__
+ /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
+ * synchronize_rcu() to wait a grace period elapsed, so that
+ * ldlm_lock_free() get a chance to be called. */
+ synchronize_rcu();
+#endif
rc = cfs_mem_cache_destroy(ldlm_lock_slab);
LASSERTF(rc == 0, "couldn't free ldlm lock slab\n");
rc = cfs_mem_cache_destroy(ldlm_interval_slab);
EXPORT_SYMBOL(ldlm_it2str);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_lock_dump_handle);
-EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
EXPORT_SYMBOL(ldlm_lock_allow_match);
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_start_recovery_thread);
+EXPORT_SYMBOL(server_disconnect_export);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);