+static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,
+ struct niobuf_remote *rnb, int nrnb,
+ struct niobuf_remote **pp_rnbp)
+{
+ /* Copy a remote niobuf, splitting it into page-sized chunks
+ * and setting ioo[i].ioo_bufcnt accordingly */
+ struct niobuf_remote *pp_rnb;
+ int i;
+ int j;
+ int page;
+ int rnbidx = 0;
+ int npages = 0;
+
+ /*
+ * array of sufficient size already preallocated by caller
+ */
+ LASSERT(pp_rnbp != NULL);
+ LASSERT(*pp_rnbp != NULL);
+
+ /* first count and check the number of pages required */
+ for (i = 0; i < nioo; i++)
+ for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {
+ obd_off offset = rnb[rnbidx].offset;
+ obd_off p0 = offset >> CFS_PAGE_SHIFT;
+ obd_off pn = (offset + rnb[rnbidx].len - 1) >>
+ CFS_PAGE_SHIFT;
+
+ LASSERT(rnbidx < nrnb);
+
+ npages += (pn + 1 - p0);
+
+ if (rnb[rnbidx].len == 0) {
+ CERROR("zero len BRW: obj %d objid "LPX64
+ " buf %u\n", i, ioo[i].ioo_id, j);
+ return -EINVAL;
+ }
+ if (j > 0 &&
+ rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {
+ CERROR("unordered BRW: obj %d objid "LPX64
+ " buf %u offset "LPX64" <= "LPX64"\n",
+ i, ioo[i].ioo_id, j, rnb[rnbidx].offset,
+ rnb[rnbidx].offset);
+ return -EINVAL;
+ }
+ }
+
+ LASSERT(rnbidx == nrnb);
+
+ if (npages == nrnb) { /* all niobufs are for single pages */
+ *pp_rnbp = rnb;
+ return npages;
+ }
+
+ pp_rnb = *pp_rnbp;
+
+ /* now do the actual split */
+ page = rnbidx = 0;
+ for (i = 0; i < nioo; i++) {
+ int obj_pages = 0;
+
+ for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {
+ obd_off off = rnb[rnbidx].offset;
+ int nob = rnb[rnbidx].len;
+
+ LASSERT(rnbidx < nrnb);
+ do {
+ obd_off poff = off & ~CFS_PAGE_MASK;
+ int pnob = (poff + nob > CFS_PAGE_SIZE) ?
+ PAGE_SIZE - poff : nob;
+
+ LASSERT(page < npages);
+ pp_rnb[page].len = pnob;
+ pp_rnb[page].offset = off;
+ pp_rnb[page].flags = rnb[rnbidx].flags;
+
+ CDEBUG(0, " obj %d id "LPX64
+ "page %d(%d) "LPX64" for %d, flg %x\n",
+ i, ioo[i].ioo_id, obj_pages, page,
+ pp_rnb[page].offset, pp_rnb[page].len,
+ pp_rnb[page].flags);
+ page++;
+ obj_pages++;
+
+ off += pnob;
+ nob -= pnob;
+ } while (nob > 0);
+ LASSERT(nob == 0);
+ }
+ ioo[i].ioo_bufcnt = obj_pages;
+ }
+ LASSERT(page == npages);
+
+ return npages;
+}
+
+static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc)
+{
+ __u32 cksum = ~0;
+ int i;
+
+ for (i = 0; i < desc->bd_iov_count; i++) {
+ struct page *page = desc->bd_iov[i].kiov_page;
+ int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+ char *ptr = kmap(page) + off;
+ int len = desc->bd_iov[i].kiov_len;
+
+ /* corrupt the data before we compute the checksum, to
+ * simulate a client->OST data error */
+ if (i == 0 &&
+ OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))
+ memcpy(ptr, "bad3", min(4, len));
+ cksum = crc32_le(cksum, ptr, len);
+ /* corrupt the data after we compute the checksum, to
+ * simulate an OST->client data error */
+ if (i == 0 && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND))
+ memcpy(ptr, "bad4", min(4, len));
+ kunmap(page);
+ }
+
+ return cksum;
+}
+
+/*
+ * populate @nio by @nrpages pages from per-thread page pool
+ */
+static void ost_nio_pages_get(struct ptlrpc_request *req,
+ struct niobuf_local *nio, int nrpages)
+{
+ int i;
+ struct ost_thread_local_cache *tls;
+
+ ENTRY;
+
+ LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+ LASSERT(req != NULL);
+ LASSERT(req->rq_svc_thread != NULL);
+
+ tls = ost_tls(req);
+ LASSERT(tls != NULL);
+
+ memset(nio, 0, nrpages * sizeof *nio);
+ for (i = 0; i < nrpages; ++ i) {
+ struct page *page;
+
+ page = tls->page[i];
+ LASSERT(page != NULL);
+ POISON_PAGE(page, 0xf1);
+ nio[i].page = page;
+ LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);
+ }
+ EXIT;
+}
+
+/*
+ * Dual for ost_nio_pages_get(). Poison pages in pool for debugging
+ */
+static void ost_nio_pages_put(struct ptlrpc_request *req,
+ struct niobuf_local *nio, int nrpages)
+{
+ int i;
+
+ ENTRY;
+
+ LASSERT(nrpages <= OST_THREAD_POOL_SIZE);
+
+ for (i = 0; i < nrpages; ++ i)
+ POISON_PAGE(nio[i].page, 0xf2);
+ EXIT;
+}
+
+static int ost_brw_lock_get(int mode, struct obd_export *exp,
+ struct obd_ioobj *obj, struct niobuf_remote *nb,
+ struct lustre_handle *lh)
+{
+ int flags = 0;
+ int nrbufs = obj->ioo_bufcnt;
+ struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
+ obj->ioo_gr, 0} };
+ ldlm_policy_data_t policy;
+ int i;
+
+ ENTRY;
+
+ LASSERT(mode == LCK_PR || mode == LCK_PW);
+ LASSERT(!lustre_handle_is_used(lh));
+
+ if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ RETURN(0);
+
+ /* EXPENSIVE ASSERTION */
+ for (i = 1; i < nrbufs; i ++)
+ LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
+ (nb[i].flags & OBD_BRW_SRVLOCK));
+
+ policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
+ policy.l_extent.end = (nb[nrbufs - 1].offset +
+ nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
+ LDLM_EXTENT, &policy, mode, &flags,
+ ldlm_blocking_ast, ldlm_completion_ast,
+ ldlm_glimpse_ast, NULL, 0, NULL, lh));
+}
+
+static void ost_brw_lock_put(int mode,
+ struct obd_ioobj *obj, struct niobuf_remote *niob,
+ struct lustre_handle *lh)
+{
+ ENTRY;
+ LASSERT(mode == LCK_PR || mode == LCK_PW);
+ LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
+ lustre_handle_is_used(lh));
+ if (lustre_handle_is_used(lh))
+ ldlm_lock_decref(lh, mode);
+ EXIT;
+}
+
+struct ost_prolong_data {
+ struct obd_export *opd_exp;
+ ldlm_policy_data_t opd_policy;
+ ldlm_mode_t opd_mode;
+};
+
+static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data)
+{
+ struct ost_prolong_data *opd = data;
+
+ LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+
+ if (lock->l_req_mode != lock->l_granted_mode) {
+ /* scan granted locks only */
+ return LDLM_ITER_STOP;
+ }
+
+ if (lock->l_export != opd->opd_exp) {
+ /* prolong locks only for given client */
+ return LDLM_ITER_CONTINUE;
+ }
+
+ if (!(lock->l_granted_mode & opd->opd_mode)) {
+ /* we aren't interesting in all type of locks */
+ return LDLM_ITER_CONTINUE;
+ }
+
+ if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||
+ lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {
+ /* the request doesn't cross the lock, skip it */
+ return LDLM_ITER_CONTINUE;
+ }
+
+ if (!(lock->l_flags & LDLM_FL_AST_SENT)) {
+ /* ignore locks not being cancelled */
+ return LDLM_ITER_CONTINUE;
+ }
+
+ /* OK. this is a possible lock the user holds doing I/O
+ * let's refresh eviction timer for it */
+ ldlm_refresh_waiting_lock(lock);
+
+ return LDLM_ITER_CONTINUE;
+}
+
+static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,
+ struct niobuf_remote *nb, ldlm_mode_t mode)
+{
+ struct ldlm_res_id res_id = { .name = { obj->ioo_id, 0,
+ obj->ioo_gr, 0} };
+ int nrbufs = obj->ioo_bufcnt;
+ struct ost_prolong_data opd;
+
+ ENTRY;
+
+ opd.opd_mode = mode;
+ opd.opd_exp = exp;
+ opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
+ opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +
+ nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
+
+ CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,
+ opd.opd_policy.l_extent.end);
+ ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,
+ ost_prolong_locks_iter, &opd);
+}
+
+static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)