/* check that we do support OBD_CONNECT_TRUNCLOCK. */
CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL)
- RETURN(-EFAULT);
+ /* ost_body is varified and swabbed in ost_hpreq_handler() */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
oinfo.oi_oa = &body->oa;
oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size;
struct ost_prolong_data {
struct obd_export *opd_exp;
ldlm_policy_data_t opd_policy;
+ struct obdo *opd_oa;
ldlm_mode_t opd_mode;
+ int opd_lock_match;
};
static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
return LDLM_ITER_CONTINUE;
}
+ /* Fill the obdo with the matched lock handle.
+ * XXX: it is possible in some cases the IO RPC is covered by several
+ * locks, even for the write case, so it may need to be a lock list. */
+ if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) {
+ opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie;
+ opd->opd_oa->o_valid |= OBD_MD_FLHANDLE;
+ }
+
if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
/* ignore locks not being cancelled */
return LDLM_ITER_CONTINUE;
/* OK. this is a possible lock the user holds doing I/O
* let's refresh eviction timer for it */
ldlm_refresh_waiting_lock(lock);
+ opd->opd_lock_match = 1;
return LDLM_ITER_CONTINUE;
}
-static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
- struct niobuf_remote *nb, struct obdo *oa,
- ldlm_mode_t mode)
+static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, struct obdo *oa,
+ ldlm_mode_t mode)
{
struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
+ struct ost_prolong_data opd = { 0 };
int nrbufs = obj->ioo_bufcnt;
- struct ost_prolong_data opd;
ENTRY;
lock = ldlm_handle2lock(&oa->o_handle);
if (lock != NULL) {
ost_prolong_locks_iter(lock, &opd);
+ if (opd.opd_lock_match) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(1);
+ }
+
+ /* Check if the lock covers the whole IO region,
+ * otherwise iterate through the resource. */
+ if (lock->l_policy_data.l_extent.end >=
+ opd.opd_policy.l_extent.end &&
+ lock->l_policy_data.l_extent.start <=
+ opd.opd_policy.l_extent.start) {
+ LDLM_LOCK_PUT(lock);
+ RETURN(0);
+ }
LDLM_LOCK_PUT(lock);
- EXIT;
- return;
}
}
+ opd.opd_oa = oa;
ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
ost_prolong_locks_iter, &opd);
-
- EXIT;
+ RETURN(opd.opd_lock_match);
}
static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
struct l_wait_info lwi;
struct lustre_handle lockh = { 0 };
__u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
- int objcount, niocount, npages, nob = 0, rc, i;
+ int niocount, npages, nob = 0, rc, i;
int no_reply = 0;
ENTRY;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
- objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
-
- ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),
- lustre_swab_obd_ioobj);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo));
+ LASSERT(ioo != NULL);
niocount = ioo->ioo_bufcnt;
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
-
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (lustre_req_need_swab(req)) {
- /* swab remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
rc = lustre_pack_reply(req, 2, size, NULL);
if (rc)
if (desc == NULL) /* XXX: check all cleanup stuff */
GOTO(out, rc = -ENOMEM);
- ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
+ ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR);
nob = 0;
for (i = 0; i < npages; i++) {
__u32 *rcs;
__u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };
int objcount, niocount, npages;
- int rc, swab, i, j;
+ int rc, i, j;
obd_count client_cksum = 0, server_cksum = 0;
cksum_type_t cksum_type = OBD_CKSUM_CRC32;
int no_reply = 0;
if (exp->exp_failed)
GOTO(out, rc = -ENOTCONN);
- swab = lustre_req_need_swab(req);
- body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- GOTO(out, rc = -EFAULT);
- }
+ /* ost_body, ioobj & noibuf_remote are verified and swabbed in
+ * ost_rw_hpreq_check(). */
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- GOTO(out, rc = -EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- GOTO(out, rc = -EFAULT);
- }
-
- lustre_set_req_swabbed(req, REQ_REC_OFF + 1);
ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
objcount * sizeof(*ioo));
- LASSERT (ioo != NULL);
- for (niocount = i = 0; i < objcount; i++) {
- if (swab)
- lustre_swab_obd_ioobj(&ioo[i]);
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- GOTO(out, rc = -EFAULT);
- }
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
niocount += ioo[i].ioo_bufcnt;
- }
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",
- niocount);
- GOTO(out, rc = -EFAULT);
- }
-
- remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
- niocount * sizeof(*remote_nb),
- lustre_swab_niobuf_remote);
- if (remote_nb == NULL) {
- CERROR("Missing/short niobuf\n");
- GOTO(out, rc = -EFAULT);
- }
- if (swab) { /* swab the remaining niobufs */
- for (i = 1; i < niocount; i++)
- lustre_swab_niobuf_remote (&remote_nb[i]);
- }
+ remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*remote_nb));
+ LASSERT(remote_nb != NULL);
size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs);
rc = lustre_pack_reply(req, 3, size, NULL);
GOTO(out_lock, rc = -ETIMEDOUT);
}
- ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
+ ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW);
/* obd_preprw clobbers oa->valid, so save what we need */
if (body->oa.o_valid & OBD_MD_FLCKSUM) {
return rc;
}
+static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ __u64 start, end;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ /* As the request may be covered by several locks, do not look at
+ * o_handle, look at the RPC IO region. */
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+
+ start = nb[0].offset & CFS_PAGE_MASK;
+ end = (nb[ioo->ioo_bufcnt - 1].offset +
+ nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ if (!(lock->l_granted_mode & mode))
+ RETURN(0);
+
+ if (lock->l_policy_data.l_extent.end < start ||
+ lock->l_policy_data.l_extent.start > end)
+ RETURN(0);
+
+ RETURN(1);
+}
+
+/**
+ * Swab buffers needed to call ost_rw_prolong_locks() and call it.
+ * Return the value from ost_rw_prolong_locks() which is non-zero if
+ * there is a cancelled lock which is waiting for this IO request.
+ */
+static int ost_rw_hpreq_check(struct ptlrpc_request *req)
+{
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body;
+ int objcount, niocount;
+ int mode, opc, i;
+ ENTRY;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ LASSERT(opc == OST_READ || opc == OST_WRITE);
+
+ body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+
+ objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo));
+ LASSERT(ioo != NULL);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+ nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb));
+ LASSERT(nb != NULL);
+ LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK));
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+ RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode));
+}
+
+static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa)
+{
+ struct ldlm_res_id res_id = { .name = { oa->o_id } };
+ struct ost_prolong_data opd = { 0 };
+ __u64 start, end;
+ ENTRY;
+
+ start = oa->o_size;
+ end = start + oa->o_blocks;
+
+ opd.opd_mode = LCK_PW;
+ opd.opd_exp = exp;
+ opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK;
+ if (oa->o_blocks == OBD_OBJECT_EOF || end < start)
+ opd.opd_policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+ opd.opd_policy.l_extent.end);
+
+ opd.opd_oa = oa;
+ ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+ ost_prolong_locks_iter, &opd);
+ RETURN(opd.opd_lock_match);
+}
+
+static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct ost_body *body;
+ ENTRY;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),
+ lustre_swab_obdo);
+ LASSERT(body != NULL);
+
+ if (body->oa.o_valid & OBD_MD_FLHANDLE &&
+ body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+ RETURN(1);
+ RETURN(0);
+}
+
+static int ost_punch_hpreq_check(struct ptlrpc_request *req)
+{
+ struct ost_body *body = lustre_msg_buf(req->rq_reqmsg,
+ REQ_REC_OFF, sizeof(*body));
+ LASSERT(body != NULL);
+ LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK));
+
+ RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa));
+}
+
+struct ptlrpc_hpreq_ops ost_hpreq_rw = {
+ .hpreq_lock_match = ost_rw_hpreq_lock_match,
+ .hpreq_check = ost_rw_hpreq_check,
+};
+
+struct ptlrpc_hpreq_ops ost_hpreq_punch = {
+ .hpreq_lock_match = ost_punch_hpreq_lock_match,
+ .hpreq_check = ost_punch_hpreq_check,
+};
+
+/** Assign high priority operations to the request if needed. */
+static int ost_hpreq_handler(struct ptlrpc_request *req)
+{
+ ENTRY;
+ if (req->rq_export) {
+ int opc = lustre_msg_get_opc(req->rq_reqmsg);
+ struct ost_body *body;
+
+ if (opc == OST_READ || opc == OST_WRITE) {
+ struct niobuf_remote *nb;
+ struct obd_ioobj *ioo;
+ int objcount, niocount;
+ int swab, i;
+
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+ objcount = lustre_msg_buflen(req->rq_reqmsg,
+ REQ_REC_OFF + 1) /
+ sizeof(*ioo);
+ if (objcount == 0) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ if (objcount > 1) {
+ CERROR("too many ioobjs (%d)\n", objcount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) &&
+ lustre_req_need_swab(req);
+ ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1,
+ objcount * sizeof(*ioo),
+ lustre_swab_obd_ioobj);
+ if (!ioo) {
+ CERROR("Missing/short ioobj\n");
+ RETURN(-EFAULT);
+ }
+ for (niocount = i = 0; i < objcount; i++) {
+ if (i > 0 && swab)
+ lustre_swab_obd_ioobj(&ioo[i]);
+ if (ioo[i].ioo_bufcnt == 0) {
+ CERROR("ioo[%d] has zero bufcnt\n", i);
+ RETURN(-EFAULT);
+ }
+ niocount += ioo[i].ioo_bufcnt;
+ }
+ if (niocount > PTLRPC_MAX_BRW_PAGES) {
+ DEBUG_REQ(D_ERROR, req, "bulk has too many "
+ "pages (%d)", niocount);
+ RETURN(-EFAULT);
+ }
+
+ swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) &&
+ lustre_req_need_swab(req);
+ nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,
+ niocount * sizeof(*nb),
+ lustre_swab_niobuf_remote);
+ if (!nb) {
+ CERROR("Missing/short niobuf\n");
+ RETURN(-EFAULT);
+ }
+
+ if (swab) {
+ /* swab remaining niobufs */
+ for (i = 1; i < niocount; i++)
+ lustre_swab_niobuf_remote(&nb[i]);
+ }
+
+ if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ req->rq_ops = &ost_hpreq_rw;
+ } else if (opc == OST_PUNCH) {
+ body = lustre_swab_reqbuf(req, REQ_REC_OFF,
+ sizeof(*body),
+ lustre_swab_obdo);
+ if (!body) {
+ CERROR("Missing/short ost_body\n");
+ RETURN(-EFAULT);
+ }
+
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
+ !(body->oa.o_flags & OBD_FL_TRUNCLOCK))
+ req->rq_ops = &ost_hpreq_punch;
+ }
+ }
+ RETURN(0);
+}
+
static int ost_handle(struct ptlrpc_request *req)
{
struct obd_trans_info trans_info = { 0, };
/* Insure a 4x range for dynamic threads */
if (oss_min_threads > OSS_THREADS_MAX / 4)
oss_min_threads = OSS_THREADS_MAX / 4;
- oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4);
+ oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1);
}
ost->ost_service =
OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
ost_handle, LUSTRE_OSS_NAME,
obd->obd_proc_entry, target_print_req,
- oss_min_threads, oss_max_threads, "ll_ost");
+ oss_min_threads, oss_max_threads, "ll_ost",
+ NULL);
if (ost->ost_service == NULL) {
CERROR("failed to start OST service\n");
GOTO(out_lprocfs, rc = -ENOMEM);
obd->obd_proc_entry, target_print_req,
oss_min_create_threads,
oss_max_create_threads,
- "ll_ost_creat");
+ "ll_ost_creat", NULL);
if (ost->ost_create_service == NULL) {
CERROR("failed to start OST create service\n");
GOTO(out_service, rc = -ENOMEM);
OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR,
ost_handle, "ost_io",
obd->obd_proc_entry, target_print_req,
- oss_min_threads, oss_max_threads, "ll_ost_io");
+ oss_min_threads, oss_max_threads, "ll_ost_io",
+ ost_hpreq_handler);
if (ost->ost_io_service == NULL) {
CERROR("failed to start OST I/O service\n");
GOTO(out_create, rc = -ENOMEM);