+/**
+ * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
+ * not.
+ */
+static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i, rc;
+ __u64 start, end;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ /* As the request may be covered by several locks, do not look at
+ * o_handle, look at the RPC IO region. */
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(0);
+
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(0);
+
+ rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
+ if (rc)
+ RETURN(rc);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+
+ nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (nb == NULL ||
+ niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+ RCL_CLIENT) / sizeof(*nb)))
+ RETURN(0);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+
+ start = nb[0].offset & CFS_PAGE_MASK;
+ end = (nb[ioo->ioo_bufcnt - 1].offset +
+ nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ LASSERT(lock->l_resource != NULL);
+ if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
+ &lock->l_resource->lr_name))
+ RETURN(0);
+
+ if (!(lock->l_granted_mode & mode))
+ RETURN(0);
+
+ if (lock->l_policy_data.l_extent.end < start ||
+ lock->l_policy_data.l_extent.start > end)
+ RETURN(0);
+
+ RETURN(1);
+}
+
+/**
+ * High-priority queue request check for whether the given PTLRPC request (\a
+ * req) is blocking an LDLM lock cancel.
+ *
+ * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
+ * cancel, 0 if it is not, and -EFAULT if the request is malformed.
+ *
+ * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
+ * function looks only at OST_READs and OST_WRITEs.
+ */
+static int ost_rw_hpreq_check(struct ptlrpc_request *req)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i, rc;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
+
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(-EFAULT);
+
+ rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
+ if (rc)
+ RETURN(rc);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+ nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ if (nb == NULL ||
+ niocount != (req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_REMOTE,
+ RCL_CLIENT) / sizeof(*nb)))
+ RETURN(-EFAULT);
+ if (niocount != 0 && (nb[0].flags & OBD_BRW_SRVLOCK))
+ RETURN(-EFAULT);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+ RETURN(ost_rw_prolong_locks(req, ioo, nb, &body->oa, mode));
+}
+
+static int ost_punch_prolong_locks(struct ptlrpc_request *req, struct obdo *oa)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ost_prolong_data opd = { 0 };
+ __u64 start, end;
+ ENTRY;
+
+ start = oa->o_size;
+ end = start + oa->o_blocks;
+
+ opd.opd_mode = LCK_PW;
+ opd.opd_exp = req->rq_export;
+ opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
+ if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
+ opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ /* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL) */
+ opd.opd_timeout = AT_OFF ? obd_timeout / 2:
+ max(at_est2timeout(at_get(&req->rq_rqbd->
+ rqbd_service->srv_at_estimate)), ldlm_timeout);
+
+ CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+ opd.opd_policy.l_extent.end);
+
+ opd.opd_oa = oa;
+ ldlm_resource_iterate(req->rq_export->exp_obd->obd_namespace, &res_id,
+ ost_prolong_locks_iter, &opd);
+ RETURN(opd.opd_lock_match);
+}
+
+/**
+ * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
+ */
+static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ost_body *body;
+ int rc;
+ ENTRY;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(0); /* can't return -EFAULT here */
+
+ rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
+ if (rc)
+ RETURN(rc);
+
+ if (body->oa.o_valid & OBD_MD_FLHANDLE &&
+ body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+ RETURN(1);
+ RETURN(0);
+}
+
+/**
+ * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
+ */
+static int ost_punch_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ost_body *body;
+ int rc;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
+
+ rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
+ if (rc)
+ RETURN(rc);
+
+ LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_SRVLOCK));
+
+ RETURN(ost_punch_prolong_locks(req, &body->oa));
+}
+
+struct ptlrpc_hpreq_ops ost_hpreq_rw = {
+ .hpreq_lock_match = ost_rw_hpreq_lock_match,
+ .hpreq_check = ost_rw_hpreq_check,
+};
+
+struct ptlrpc_hpreq_ops ost_hpreq_punch = {
+ .hpreq_lock_match = ost_punch_hpreq_lock_match,
+ .hpreq_check = ost_punch_hpreq_check,
+};
+
+/** Assign high priority operations to the request if needed. */
+static int ost_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+ struct ost_body *body;
+
+ if (opc == OST_READ || opc == OST_WRITE) {
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ int objcount, niocount;
+ int i;
+
+ /* RPCs on the H-P queue can be inspected before
+ * ost_handler() initializes their pills, so we
+ * initialize that here. Capsule initialization is
+ * idempotent, as is setting the pill's format (provided
+ * it doesn't change).
+ */
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ if (opc == OST_READ)
+ req_capsule_set(&req->rq_pill,
+ &RQF_OST_BRW_READ);
+ else
+ req_capsule_set(&req->rq_pill,
+ &RQF_OST_BRW_WRITE);
+
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ if (body == NULL) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+
+ objcount = req_capsule_get_size(&req->rq_pill,
+ &RMF_OBD_IOOBJ,
+ RCL_CLIENT) /
+ sizeof(*ioo);
+ if (objcount == 0) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ if (objcount > 1) {
+ CERROR("too many ioobjs (%d)\n", objcount);
+ RETURN(-EFAULT);
+ }
+
+ ioo = req_capsule_client_get(&req->rq_pill,
+ &RMF_OBD_IOOBJ);
+ if (ioo == NULL) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+
+ for (niocount = i = 0; i < objcount; i++) {
+ if (ioo[i].ioo_bufcnt == 0) {
+ CERROR("ioo[%d] has zero bufcnt\n", i);
+ RETURN(-EFAULT);
+ }
+ niocount += ioo[i].ioo_bufcnt;
+ }
+ if (niocount > PTLRPC_MAX_BRW_PAGES) {
+ DEBUG_REQ(D_RPCTRACE, req,
+ "bulk has too many pages (%d)",
+ niocount);
+ RETURN(-EFAULT);
+ }
+
+ nb = req_capsule_client_get(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE);
+ if (nb == NULL) {
+ CERROR("Missing/short niobuf\n");
+ RETURN(-EFAULT);
+ }
+
+ if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ req->rq_ops = &ost_hpreq_rw;
+ } else if (opc == OST_PUNCH) {
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
+
+ body = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_BODY);
+ if (body == NULL) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_SRVLOCK))
+ req->rq_ops = &ost_hpreq_punch;
+ }
+ }
+ RETURN(0);
+}
+