l_pending_chain);
if (l->l_callback_timeout > jiffies)
break;
+ CERROR("lock timer expired, lock %p\n", l);
LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
l->l_export, l->l_export->exp_connection);
recovd_conn_fail(l->l_export->exp_connection);
memcpy(&body->lock_desc, desc, sizeof(*desc));
LDLM_DEBUG(lock, "server preparing blocking AST");
- req->rq_replen = 0; /* no reply needed */
+ req->rq_replen = lustre_msg_size(0, NULL);
ldlm_add_waiting_lock(lock);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- (void)ptl_send_rpc(req);
+ req->rq_level = LUSTRE_CONN_RECOVD;
+ rc = ptlrpc_queue_wait(req);
+ if (rc == -ETIMEDOUT || rc == -EINTR) {
+ ldlm_expired_completion_wait(lock);
+ } else if (rc) {
+ CERROR("client returned %d from blocking AST for lock %p\n",
+ req->rq_status, lock);
+ LDLM_DEBUG(lock, "client returned error %d from blocking AST",
+ req->rq_status);
+ ldlm_lock_cancel(lock);
+ /* Server-side AST functions are called from ldlm_reprocess_all,
+ * which needs to be told to please restart its reprocessing. */
+ rc = -ERESTART;
+ }
- /* not waiting for reply */
ptlrpc_req_finished(req);
RETURN(rc);
ldlm_lock2desc(lock, &body->lock_desc);
LDLM_DEBUG(lock, "server preparing completion AST");
- req->rq_replen = 0; /* no reply needed */
-
- (void)ptl_send_rpc(req);
-
- /* not waiting for reply */
+ req->rq_replen = lustre_msg_size(0, NULL);
+
+ req->rq_level = LUSTRE_CONN_RECOVD;
+ rc = ptlrpc_queue_wait(req);
+ if (rc == -ETIMEDOUT || rc == -EINTR) {
+ ldlm_expired_completion_wait(lock);
+ } else if (rc) {
+ CERROR("client returned %d from completion AST for lock %p\n",
+ req->rq_status, lock);
+ LDLM_DEBUG(lock, "client returned error %d from completion AST",
+ req->rq_status);
+ ldlm_lock_cancel(lock);
+ /* Server-side AST functions are called from ldlm_reprocess_all,
+ * which needs to be told to please restart its reprocessing. */
+ rc = -ERESTART;
+ }
ptlrpc_req_finished(req);
RETURN(rc);
&lock->l_export->exp_ldlm_data.led_held_locks);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
- ldlm_server_completion_ast,
+ err = ldlm_lock_enqueue(obddev->obd_namespace, lock, cookie, cookielen,
+ &flags, ldlm_server_completion_ast,
ldlm_server_blocking_ast);
if (err != ELDLM_OK)
GOTO(out, err);
RETURN(0);
}
-static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+ struct lustre_handle *handle);
+
+static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns)
{
struct ldlm_request *dlm_req;
struct ldlm_lock *lock;
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
if (!lock) {
CERROR("blocking callback on lock "LPX64" - lock disappeared\n",
- dlm_req->lock_handle1.addr);
- RETURN(0);
+ dlm_req->lock_handle1.cookie);
+ RETURN(-EINVAL);
}
LDLM_DEBUG(lock, "client blocking AST callback handler START");
RETURN(0);
}
-static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+ struct ldlm_namespace *ns)
{
struct list_head ast_list = LIST_HEAD_INIT(ast_list);
struct ldlm_request *dlm_req;
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
- lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
if (!lock) {
CERROR("completion callback on lock "LPX64" - lock "
- "disappeared\n", dlm_req->lock_handle1.addr);
- RETURN(0);
+ "disappeared\n", dlm_req->lock_handle1.cookie);
+ RETURN(-EINVAL);
}
LDLM_DEBUG(lock, "client completion callback handler START");
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ l_lock(&ns->ns_lock);
/* If we receive the completion AST before the actual enqueue returned,
* then we might need to switch lock modes, resources, or extents. */
if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
lock->l_resource->lr_name,
sizeof(__u64) * RES_NAME_SIZE) != 0) {
- ldlm_lock_change_resource(lock,
+ ldlm_lock_change_resource(ns, lock,
dlm_req->lock_desc.l_resource.lr_name);
LDLM_DEBUG(lock, "completion AST, new resource");
}
lock->l_resource->lr_tmp = &ast_list;
ldlm_grant_lock(lock);
lock->l_resource->lr_tmp = NULL;
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ l_unlock(&ns->ns_lock);
LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
LDLM_LOCK_PUT(lock);
static int ldlm_callback_handler(struct ptlrpc_request *req)
{
+ struct ldlm_namespace *ns;
int rc;
ENTRY;
rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
if (rc) {
- CERROR("lustre_ldlm: Invalid request: %d\n", rc);
+ CERROR("Invalid request: %d\n", rc);
RETURN(rc);
}
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
- CERROR("--> ignoring this error as a temporary workaround! "
- "beware!\n");
- //RETURN(-ENOTCONN);
+ RETURN(-ENOTCONN);
}
+ LASSERT(req->rq_export != NULL);
+ LASSERT(req->rq_export->exp_obd != NULL);
+ ns = req->rq_export->exp_obd->obd_namespace;
+ LASSERT(ns != NULL);
+
switch (req->rq_reqmsg->opc) {
case LDLM_BL_CALLBACK:
CDEBUG(D_INODE, "blocking ast\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
- rc = ldlm_handle_bl_callback(req);
- RETURN(rc);
+ rc = ldlm_handle_bl_callback(req, ns);
+ break;
case LDLM_CP_CALLBACK:
CDEBUG(D_INODE, "completion ast\n");
OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
- rc = ldlm_handle_cp_callback(req);
- RETURN(rc);
-
+ rc = ldlm_handle_cp_callback(req, ns);
+ break;
default:
CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
RETURN(-EINVAL);
}
+ req->rq_status = rc;
+ if (rc) {
+ ptlrpc_error(req->rq_svc, req);
+ } else {
+ rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+ &req->rq_repmsg);
+ if (rc)
+ RETURN(rc);
+ ptlrpc_reply(req->rq_svc, req);
+ }
+
RETURN(0);
}
-
static int ldlm_cancel_handler(struct ptlrpc_request *req)
{
int rc;
}
if (req->rq_export == NULL) {
+ struct ldlm_request *dlm_req;
CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
req->rq_reqmsg->opc, req->rq_request_portal,
req->rq_reply_portal);
CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
+ dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+ ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
CERROR("--> ignoring this error as a temporary workaround! "
"beware!\n");
//RETURN(-ENOTCONN);
RETURN(0);
}
-
static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
void *karg, void *uarg)
{
if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
_IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
- CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
+ CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
_IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
RETURN(-EINVAL);
}
if (ldlm_already_setup)
RETURN(-EALREADY);
- MOD_INC_USE_COUNT;
-
rc = ldlm_proc_setup(obddev);
if (rc != 0)
- GOTO(out_dec, rc);
+ RETURN(rc);
ldlm->ldlm_cb_service =
ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
out_proc:
ldlm_proc_cleanup(obddev);
- out_dec:
- MOD_DEC_USE_COUNT;
return rc;
}
ldlm_proc_cleanup(obddev);
ldlm_already_setup = 0;
- MOD_DEC_USE_COUNT;
RETURN(0);
}
}
struct obd_ops ldlm_obd_ops = {
+ o_owner: THIS_MODULE,
o_iocontrol: ldlm_iocontrol,
o_setup: ldlm_setup,
o_cleanup: ldlm_cleanup,
EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
EXPORT_SYMBOL(ldlm_replay_locks);
EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_reprocess_all_ns);
EXPORT_SYMBOL(ldlm_namespace_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(l_lock);
EXPORT_SYMBOL(l_unlock);