struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
int npages, int type, int portal);
-void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk);
-void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- cfs_page_t *page, int pageoffset, int len);
+void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
+static inline void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk)
+{
+ __ptlrpc_free_bulk(bulk, 1);
+}
+static inline void ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc *bulk)
+{
+ __ptlrpc_free_bulk(bulk, 0);
+}
+void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ cfs_page_t *page, int pageoffset, int len, int);
+static inline void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ cfs_page_t *page, int pageoffset,
+ int len)
+{
+ __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
+}
+
+static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
+ cfs_page_t *page, int pageoffset,
+ int len)
+{
+ __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
+}
+
void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
struct obd_import *imp);
__u64 ptlrpc_next_xid(void);
CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
"CPU partitions OSS IO threads should run on");
+/*
+ * this page is allocated statically when module is initializing
+ * it is used to simulate data corruptions, see ost_checksum_bulk()
+ * for details. as the original pages provided by the layers below
+ * can be remain in the internal cache, we do not want to modify
+ * them.
+ */
+static struct page *ost_page_to_corrupt = NULL;
+
/**
* Do not return server-side uid/gid to remote client
*/
OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
int len = desc->bd_iov[i].kiov_len;
- struct page *np = cfs_alloc_page(CFS_ALLOC_STD);
+ struct page *np = ost_page_to_corrupt;
char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
if (np) {
memcpy(ptr2, ptr, len);
memcpy(ptr2, "bad3", min(4, len));
kunmap(np);
- cfs_page_unpin(desc->bd_iov[i].kiov_page);
desc->bd_iov[i].kiov_page = np;
} else {
CERROR("can't alloc page for corruption\n");
OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
int len = desc->bd_iov[i].kiov_len;
- struct page *np = cfs_alloc_page(CFS_ALLOC_STD);
+ struct page *np = ost_page_to_corrupt;
char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
if (np) {
memcpy(ptr2, ptr, len);
memcpy(ptr2, "bad4", min(4, len));
kunmap(np);
- cfs_page_unpin(desc->bd_iov[i].kiov_page);
desc->bd_iov[i].kiov_page = np;
} else {
CERROR("can't alloc page for corruption\n");
nob += page_rc;
if (page_rc != 0) { /* some data! */
LASSERT (local_nb[i].page != NULL);
- ptlrpc_prep_bulk_page(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- page_rc);
+ ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+ local_nb[i].lnb_page_offset,
+ page_rc);
}
if (page_rc != local_nb[i].len) { /* short read */
ost_tls_put(req);
out_bulk:
if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
- ptlrpc_free_bulk(desc);
+ ptlrpc_free_bulk_nopin(desc);
out:
LASSERT(rc <= 0);
if (rc == 0) {
/* NB Having prepped, we must commit... */
for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- local_nb[i].len);
+ ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+ local_nb[i].lnb_page_offset,
+ local_nb[i].len);
rc = sptlrpc_svc_prep_bulk(req, desc);
if (rc != 0)
ost_tls_put(req);
out_bulk:
if (desc)
- ptlrpc_free_bulk(desc);
+ ptlrpc_free_bulk_nopin(desc);
out:
if (rc == 0) {
oti_to_request(oti, req);
int rc;
ENTRY;
+ ost_page_to_corrupt = cfs_alloc_page(CFS_ALLOC_STD);
+
lprocfs_ost_init_vars(&lvars);
rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
LUSTRE_OSS_NAME, NULL);
static void /*__exit*/ ost_exit(void)
{
+ if (ost_page_to_corrupt)
+ page_cache_release(ost_page_to_corrupt);
+
class_unregister_type(LUSTRE_OSS_NAME);
}
* Data to transfer in the page starts at offset \a pageoffset and
* amount of data to transfer from the page is \a len
*/
-void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
- cfs_page_t *page, int pageoffset, int len)
+void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
+ cfs_page_t *page, int pageoffset, int len, int pin)
{
LASSERT(desc->bd_iov_count < desc->bd_max_iov);
LASSERT(page != NULL);
desc->bd_nob += len;
- cfs_page_pin(page);
+ if (pin)
+ cfs_page_pin(page);
+
ptlrpc_add_bulk_page(desc, page, pageoffset, len);
}
-EXPORT_SYMBOL(ptlrpc_prep_bulk_page);
+EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
/**
* Uninitialize and free bulk descriptor \a desc.
* Works on bulk descriptors both from server and client side.
*/
-void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
+void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
{
int i;
ENTRY;
else
class_import_put(desc->bd_import);
- for (i = 0; i < desc->bd_iov_count ; i++)
- cfs_page_unpin(desc->bd_iov[i].kiov_page);
+ if (unpin) {
+ for (i = 0; i < desc->bd_iov_count ; i++)
+ cfs_page_unpin(desc->bd_iov[i].kiov_page);
+ }
OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
bd_iov[desc->bd_max_iov]));
EXIT;
}
-EXPORT_SYMBOL(ptlrpc_free_bulk);
+EXPORT_SYMBOL(__ptlrpc_free_bulk);
/**
* Set server timelimit for this req, i.e. how long are we willing to wait