extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-static int ldlm_handle_enqueue(struct ptlrpc_request *req)
+int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
+{
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ptlrpc_client *cl;
+ int rc = 0, size = sizeof(*body);
+ ENTRY;
+
+ if (lock == NULL) {
+ LBUG();
+ RETURN(-EINVAL);
+ }
+
+ cl = &lock->l_resource->lr_namespace->ns_rpc_client;
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+ &size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+ body->lock_flags = flags;
+
+ LDLM_DEBUG(lock, "server preparing completion AST");
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ RETURN(rc);
+}
+
+int ldlm_handle_enqueue(struct ptlrpc_request *req)
{
struct obd_device *obddev = req->rq_export->exp_obd;
struct ldlm_reply *dlm_rep;
flags = dlm_req->lock_flags;
err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
- ldlm_server_ast, ldlm_server_ast);
+ ldlm_server_completion_ast, ldlm_server_ast);
if (err != ELDLM_OK)
GOTO(out, err);
return 0;
}
-static int ldlm_handle_convert(struct ptlrpc_request *req)
+int ldlm_handle_convert(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_reply *dlm_rep;
RETURN(0);
}
-static int ldlm_handle_cancel(struct ptlrpc_request *req)
+int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
lock = ldlm_handle2lock(&dlm_req->lock_handle1);
if (!lock) {
+ LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock %p)",
+ (void *)(unsigned long) dlm_req->lock_handle1.addr);
req->rq_status = ESTALE;
} else {
LDLM_DEBUG(lock, "server-side cancel handler START");
ldlm_reprocess_all(lock->l_resource);
LDLM_DEBUG(lock, "server-side cancel handler END");
LDLM_LOCK_PUT(lock);
- } else
- LDLM_DEBUG_NOLOCK("server-side cancel handler END (lock %p)",
- lock);
+ }
RETURN(0);
}
}
LDLM_LOCK_PUT(lock);
} else {
- struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ struct list_head ast_list = LIST_HEAD_INIT(ast_list);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
/* If we receive the completion AST before the actual enqueue
* returned, then we might need to switch resources. */
+ ldlm_resource_unlink_lock(lock);
if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
lock->l_resource->lr_name,
sizeof(__u64) * RES_NAME_SIZE) != 0) {
ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
}
- lock->l_resource->lr_tmp = &rpc_list;
- ldlm_resource_unlink_lock(lock);
+ lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock);
- /* FIXME: we want any completion function, not just wake_up */
- wake_up(&lock->l_waitq);
lock->l_resource->lr_tmp = NULL;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
LDLM_LOCK_PUT(lock);
- ldlm_run_ast_work(&rpc_list);
+ ldlm_run_ast_work(&ast_list);
}
LDLM_DEBUG_NOLOCK("client %s callback handler END (lock %p)",
RETURN(0);
}
-static int ldlm_handle(struct ptlrpc_request *req)
+
+static int ldlm_callback_handler(struct ptlrpc_request *req)
{
int rc;
ENTRY;
req->rq_reqmsg->type);
GOTO(out, rc = -EINVAL);
}
-
- if (!req->rq_export && req->rq_reqmsg->opc == LDLM_ENQUEUE) {
- CERROR("No export handle for enqueue request.\n");
- GOTO(out, rc = -ENOTCONN);
- }
-
switch (req->rq_reqmsg->opc) {
- case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
- rc = ldlm_handle_enqueue(req);
- break;
-
- case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
- rc = ldlm_handle_convert(req);
- break;
-
- case LDLM_CANCEL:
- CDEBUG(D_INODE, "cancel\n");
- OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0);
- rc = ldlm_handle_cancel(req);
- break;
-
case LDLM_CALLBACK:
CDEBUG(D_INODE, "callback\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CALLBACK, 0);
return 0;
}
+
static int ldlm_iocontrol(long cmd, struct lustre_handle *conn, int len,
void *karg, void *uarg)
{
MOD_INC_USE_COUNT;
ldlm->ldlm_service =
ptlrpc_init_svc(64 * 1024, LDLM_REQUEST_PORTAL,
- LDLM_REPLY_PORTAL, "self", ldlm_handle);
+ LDLM_REPLY_PORTAL, "self", ldlm_callback_handler);
if (!ldlm->ldlm_service) {
LBUG();
GOTO(out_dec, rc = -ENOMEM);
}
- if (mds_reint_p == NULL)
- mds_reint_p = inter_module_get_request("mds_reint", "mds");
- if (IS_ERR(mds_reint_p)) {
- CERROR("MDSINTENT locks require the MDS module.\n");
- GOTO(out_dec, rc = -EINVAL);
- }
- if (mds_getattr_name_p == NULL)
- mds_getattr_name_p = inter_module_get_request
- ("mds_getattr_name", "mds");
- if (IS_ERR(mds_getattr_name_p)) {
- CERROR("MDSINTENT locks require the MDS module.\n");
- GOTO(out_dec, rc = -EINVAL);
- }
-
for (i = 0; i < LDLM_NUM_THREADS; i++) {
- rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service,
- "lustre_dlm");
+ char name[32];
+ sprintf(name, "lustre_dlm_%2d", i);
+ rc = ptlrpc_start_thread(obddev, ldlm->ldlm_service, name);
if (rc) {
CERROR("cannot start LDLM thread #%d: rc %d\n", i, rc);
LBUG();
ptlrpc_stop_all_threads(ldlm->ldlm_service);
ptlrpc_unregister_service(ldlm->ldlm_service);
out_dec:
- if (mds_reint_p != NULL)
- inter_module_put("mds_reint");
- if (mds_getattr_name_p != NULL)
- inter_module_put("mds_getattr_name");
MOD_DEC_USE_COUNT;
return rc;
}
ptlrpc_stop_all_threads(ldlm->ldlm_service);
ptlrpc_unregister_service(ldlm->ldlm_service);
- if (mds_reint_p != NULL)
- inter_module_put("mds_reint");
- if (mds_getattr_name_p != NULL)
- inter_module_put("mds_getattr_name");
-
MOD_DEC_USE_COUNT;
RETURN(0);
}
CERROR("couldn't free ldlm lock slab\n");
}
+EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_handle_enqueue);
+EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_handle_convert);
+EXPORT_SYMBOL(ldlm_register_intent);
+EXPORT_SYMBOL(ldlm_unregister_intent);
EXPORT_SYMBOL(ldlm_lockname);
EXPORT_SYMBOL(ldlm_typename);
EXPORT_SYMBOL(ldlm_handle2lock);
EXPORT_SYMBOL(ldlm_lock_match);
EXPORT_SYMBOL(ldlm_lock_addref);
EXPORT_SYMBOL(ldlm_lock_decref);
+EXPORT_SYMBOL(ldlm_lock_change_resource);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_cancel);
EXPORT_SYMBOL(ldlm_match_or_enqueue);
+EXPORT_SYMBOL(ldlm_it2str);
EXPORT_SYMBOL(ldlm_test);
EXPORT_SYMBOL(ldlm_lock_dump);
EXPORT_SYMBOL(ldlm_namespace_new);