+ RETURN(rc);
+}
+
+struct ldlm_sleep_flock {
+ __u64 lsf_pid;
+ __u64 lsf_nid;
+ __u64 lsf_blocking_pid;
+ __u64 lsf_blocking_nid;
+ struct list_head lsf_list;
+};
+
+int
+ldlm_handle_flock_deadlock_check(struct ptlrpc_request *req)
+{
+ struct ldlm_request *dlm_req;
+ struct ldlm_sleep_flock *lsf;
+ struct list_head *pos;
+ __u64 pid, nid, blocking_pid, blocking_nid;
+ unsigned int flags;
+ int rc = 0;
+ ENTRY;
+
+ req->rq_status = 0;
+
+ dlm_req = lustre_swab_reqbuf(req, 0, sizeof (*dlm_req),
+ lustre_swab_ldlm_request);
+ if (dlm_req == NULL) {
+ CERROR("bad request buffer for flock deadlock check\n");
+ RETURN(-EFAULT);
+ }
+
+ flags = dlm_req->lock_flags;
+ pid = dlm_req->lock_desc.l_policy_data.l_flock.pid;
+ nid = dlm_req->lock_desc.l_policy_data.l_flock.nid;
+ blocking_pid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_pid;
+ blocking_nid = dlm_req->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+ CDEBUG(D_DLMTRACE, "flags: 0x%x req: pid: "LPU64" nid "LPU64" "
+ "blk: pid: "LPU64" nid: "LPU64"\n",
+ dlm_req->lock_flags, pid, nid, blocking_pid, blocking_nid);
+
+ if (flags & LDLM_FL_GET_BLOCKING) {
+ struct ldlm_lock *lock;
+ struct ldlm_reply *dlm_rep;
+ int size = sizeof(*dlm_rep);
+
+ lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ if (!lock) {
+ CERROR("received deadlock check for unknown lock "
+ "cookie "LPX64" from client %s id %s\n",
+ dlm_req->lock_handle1.cookie,
+ req->rq_export->exp_client_uuid.uuid,
+ req->rq_peerstr);
+ req->rq_status = -ESTALE;
+ RETURN(0);
+ }
+
+ lock_res_and_lock(lock);
+ blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
+ blocking_nid = lock->l_policy_data.l_flock.blocking_nid;
+ unlock_res_and_lock(lock);
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc) {
+ CERROR("lustre_pack_reply failed: rc = %d\n", rc);
+ req->rq_status = rc;
+ RETURN(0);
+ }
+
+ dlm_rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*dlm_rep));
+ dlm_rep->lock_desc.l_policy_data.l_flock.blocking_pid =
+ blocking_pid;
+ dlm_rep->lock_desc.l_policy_data.l_flock.blocking_nid =
+ blocking_nid;
+ } else {
+ rc = lustre_pack_reply(req, 0, NULL, NULL);
+ }
+
+ if (flags & LDLM_FL_DEADLOCK_CHK) {
+ __u64 orig_blocking_pid = blocking_pid;
+ __u64 orig_blocking_nid = blocking_nid;
+ restart:
+ list_for_each(pos, &ldlm_flock_waitq) {
+ lsf = list_entry(pos,struct ldlm_sleep_flock,lsf_list);
+
+ /* We want to return a deadlock condition for the
+ * last lock on the waitq that created the deadlock
+ * situation. Posix verification suites expect this
+ * behavior. We'll stop if we haven't found a deadlock
+ * up to the point where the current process is queued
+ * to let the last lock on the queue that's in the
+ * deadlock loop detect the deadlock. In this case
+ * just update the blocking info.*/
+ if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+ lsf->lsf_blocking_pid = blocking_pid;
+ lsf->lsf_blocking_nid = blocking_nid;
+ break;
+ }
+
+ if ((lsf->lsf_pid != blocking_pid) ||
+ (lsf->lsf_nid != blocking_nid))
+ continue;
+
+ blocking_pid = lsf->lsf_blocking_pid;
+ blocking_nid = lsf->lsf_blocking_nid;
+
+ if (blocking_pid == pid && blocking_nid == nid){
+ req->rq_status = -EDEADLOCK;
+ flags |= LDLM_FL_DEADLOCK_DEL;
+ break;
+ }
+
+ goto restart;
+ }
+
+ /* If we got all the way thru the list then we're not on it. */
+ if (pos == &ldlm_flock_waitq) {
+ OBD_ALLOC(lsf, sizeof(*lsf));
+ if (!lsf)
+ RETURN(-ENOSPC);
+
+ lsf->lsf_pid = pid;
+ lsf->lsf_nid = nid;
+ lsf->lsf_blocking_pid = orig_blocking_pid;
+ lsf->lsf_blocking_nid = orig_blocking_nid;
+ list_add_tail(&lsf->lsf_list, &ldlm_flock_waitq);
+ }
+ }
+
+ if (flags & LDLM_FL_DEADLOCK_DEL) {
+ list_for_each_entry(lsf, &ldlm_flock_waitq, lsf_list) {
+ if ((lsf->lsf_pid == pid) && (lsf->lsf_nid == nid)) {
+ list_del_init(&lsf->lsf_list);
+ OBD_FREE(lsf, sizeof(*lsf));
+ break;
+ }
+ }
+ }
+
+ RETURN(rc);
+}
+
+int
+ldlm_send_flock_deadlock_check(struct obd_device *obd, struct ldlm_lock *lock,
+ unsigned int flags)
+{
+ struct obd_import *imp;
+ struct ldlm_request *body;
+ struct ldlm_reply *reply;
+ struct ptlrpc_request *req;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ CDEBUG(D_DLMTRACE, "obd: %p flags: 0x%x\n", obd, flags);
+
+ imp = obd->u.cli.cl_import;
+ req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_FLK_DEADLOCK_CHK, 1,
+ &size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
+ body->lock_flags = flags;
+ ldlm_lock2desc(lock, &body->lock_desc);
+ memcpy(&body->lock_handle1, &lock->l_remote_handle,
+ sizeof(body->lock_handle1));
+
+ if (flags & LDLM_FL_GET_BLOCKING) {
+ size = sizeof(*reply);
+ req->rq_replen = lustre_msg_size(1, &size);
+ } else {
+ req->rq_replen = lustre_msg_size(0, NULL);
+ }
+
+ rc = ptlrpc_queue_wait(req);
+ if (rc != ELDLM_OK)
+ GOTO(out, rc);
+
+ if (flags & LDLM_FL_GET_BLOCKING) {
+ reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
+ lustre_swab_ldlm_reply);
+ if (reply == NULL) {
+ CERROR ("Can't unpack ldlm_reply\n");
+ GOTO (out, rc = -EPROTO);
+ }
+
+ lock->l_policy_data.l_flock.blocking_pid =
+ reply->lock_desc.l_policy_data.l_flock.blocking_pid;
+ lock->l_policy_data.l_flock.blocking_nid =
+ reply->lock_desc.l_policy_data.l_flock.blocking_nid;
+
+ CDEBUG(D_DLMTRACE, "LDLM_FL_GET_BLOCKING: pid: "LPU64" "
+ "nid: "LPU64" blk: pid: "LPU64" nid: "LPU64"\n",
+ lock->l_policy_data.l_flock.pid,
+ lock->l_policy_data.l_flock.nid,
+ lock->l_policy_data.l_flock.blocking_pid,
+ lock->l_policy_data.l_flock.blocking_nid);
+ }
+
+ rc = req->rq_status;
+ out:
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+}
+
+int
+ldlm_flock_deadlock_check(struct obd_device *master_obd, struct obd_device *obd,
+ struct ldlm_lock *lock)
+{
+ unsigned int flags = 0;
+ int rc;
+ ENTRY;
+
+ if (obd == NULL) {
+ /* Delete this process from the sleeplock list. */
+ flags = LDLM_FL_DEADLOCK_DEL;
+ rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+ RETURN(rc);
+ }
+
+ flags = LDLM_FL_GET_BLOCKING;
+ if (obd == master_obd)
+ flags |= LDLM_FL_DEADLOCK_CHK;
+
+ rc = ldlm_send_flock_deadlock_check(obd, lock, flags);
+ CDEBUG(D_DLMTRACE, "1st check: rc: %d flags: 0x%x\n", rc, flags);
+ if (rc || (flags & LDLM_FL_DEADLOCK_CHK))
+ RETURN(rc);
+
+ CDEBUG(D_DLMTRACE, "about to send 2nd check: master: %p.\n",
+ master_obd);
+
+ flags = LDLM_FL_DEADLOCK_CHK;
+
+ rc = ldlm_send_flock_deadlock_check(master_obd, lock, flags);
+
+ CDEBUG(D_DLMTRACE, "2nd check: rc: %d flags: 0x%x\n", rc, flags);
+
+ RETURN(rc);