From e279c23aba56e7f7247ecfe0e6a50796d9daf650 Mon Sep 17 00:00:00 2001 From: phil Date: Tue, 17 May 2005 04:04:09 +0000 Subject: [PATCH] b=5902 r=adilger Many improvements to the code that checksums I/O, among them: - add a switch to enable or disable them at runtime - check the pages while under llite's control, and the buffers while under the osc/ost's - if the server and client see different checksums, try to figure out where it went wrong - use the kernel crc32 routine --- lustre/ChangeLog | 9 +++ lustre/include/liblustre.h | 27 +++++++ lustre/include/linux/lustre_lib.h | 18 ----- lustre/include/linux/obd.h | 3 + lustre/llite/file.c | 2 +- lustre/llite/llite_internal.h | 8 +- lustre/llite/lproc_llite.c | 35 +++++++++ lustre/llite/rw.c | 61 ++++++++++++++- lustre/lov/lov_obd.c | 11 ++- lustre/obdclass/genops.c | 2 +- lustre/osc/lproc_osc.c | 79 ++++++++++++++------ lustre/osc/osc_request.c | 151 ++++++++++++++++++++++++++------------ lustre/ost/ost_handler.c | 65 ++++++++-------- 13 files changed, 339 insertions(+), 132 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index ea1f882..ddaa9b3 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -38,6 +38,15 @@ Description: A bug in MDS/OSS recovery could cause the OSS to fail an assertion Details : There's little harm in just aborting MDS/OSS recovery and letting it try again next time, so I removed the LASSERT and return an error instead. +Severity : enhancement +Bugzilla : 5902 +Description: New debugging infrastructure for tracking down data corruption +Details : The I/O checksum code was replaced to: (a) control it at runtime, + (b) cover more of the client-side code path, and (c) try to narrow + down where problems occurred + +------------------------------------------------------------------------------ + 2005-05-05 Cluster File Systems, Inc. * version 1.4.2 NOTE: Lustre 1.4.2 uses an incompatible network protocol than previous diff --git a/lustre/include/liblustre.h b/lustre/include/liblustre.h index d618a40..e6d83de 100644 --- a/lustre/include/liblustre.h +++ b/lustre/include/liblustre.h @@ -76,6 +76,33 @@ typedef unsigned short umode_t; #endif +/* crc32_le lifted from the Linux kernel, which had the following to say: + * + * This code is in the public domain; copyright abandoned. + * Liability for non-performance of this code is limited to the amount + * you paid for it. Since it is distributed for free, your refund will + * be very very small. If it breaks, you get to keep both pieces. + */ +#define CRCPOLY_LE 0xedb88320 +/** + * crc32_le() - Calculate bitwise little-endian Ethernet AUTODIN II CRC32 + * @crc - seed value for computation. ~0 for Ethernet, sometimes 0 for + * other uses, or the previous crc32 value if computing incrementally. + * @p - pointer to buffer over which CRC is run + * @len - length of buffer @p + */ +static inline __u32 crc32_le(__u32 crc, unsigned char const *p, size_t len) +{ + int i; + while (len--) { + crc ^= *p++; + for (i = 0; i < 8; i++) + crc = (crc >> 1) ^ ((crc & 1) ? CRCPOLY_LE : 0); + } + return crc; +} + + /* This is because lprocfs_status.h gets included here indirectly. It would * be much better to just avoid lprocfs being included into liblustre entirely * but that requires more header surgery than I can handle right now. diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 085e04f..233c954 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -480,26 +480,8 @@ static inline void obd_ioctl_freedata(char *buf, int len) * we define this to be 2T - 4k, which is the ext3 maxbytes. */ #define LUSTRE_STRIPE_MAXBYTES 0x1fffffff000ULL -#define CHECKSUM_CHUNK 4096 -#define CHECKSUM_BULK 0 #define POISON_BULK 0 -#if CHECKSUM_BULK -static inline void ost_checksum(obd_count *cksum,int *psum, void *addr, int len) -{ - unsigned char *ptr = (unsigned char *)addr; - int sum = 0; - - /* very stupid, but means I don't have to think about byte order */ - while (len-- > 0) - sum += *ptr++; - - *cksum = (*cksum << 2) + sum; - if (psum) - *psum = sum; -} -#endif - static inline int ll_insecure_random_int(void) { struct timeval t; diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 9266323..fde4fb1 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -291,6 +291,9 @@ struct client_obd { struct mdc_rpc_lock *cl_setattr_lock; struct osc_creator cl_oscc; + /* Flags section */ + unsigned int cl_checksum:1; /* debug checksums */ + /* also protected by the poorly named _loi_list_lock lock above */ struct osc_async_rc cl_ar; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index fa7455b..cc1e5ba 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -876,7 +876,7 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count, LCK_PW); if (IS_ERR(node)) RETURN(PTR_ERR(node)); - + tree.lt_fd = file->private_data; rc = ll_tree_lock(&tree, node, buf, count, file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 7078e91..76b3463 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -107,6 +107,10 @@ struct ll_ra_info { unsigned long ra_stats[_NR_RA_STAT]; }; +/* flags for sbi->ll_flags */ +#define LL_SBI_NOLCK 0x1 /* DLM locking disabled (directio-only) */ +#define LL_SBI_CHECKSUM 0x2 /* checksum each page as it's written */ + struct ll_sb_info { struct list_head ll_list; /* this protects pglist and ra_info. It isn't safe to @@ -217,6 +221,8 @@ struct ll_async_page { struct list_head llap_pglist_item; /* user credit information for oss enforcement quota */ struct obd_ucred llap_ouc; + /* checksum for paranoid I/O debugging */ + __u32 llap_checksum; }; enum { @@ -395,8 +401,6 @@ int ll_tree_lock(struct ll_lock_tree *tree, int ll_tree_unlock(struct ll_lock_tree *tree); -#define LL_SBI_NOLCK 0x1 - #define LL_MAX_BLKSIZE (4UL * 1024 * 1024) #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index fed41c3..75fa98a 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -261,6 +261,40 @@ static int ll_wr_max_cached_mb(struct file *file, const char *buffer, return count; } +static int ll_rd_checksum(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + + return snprintf(page, count, "%u\n", + (sbi->ll_flags & LL_SBI_CHECKSUM) ? 1 : 0); +} + +static int ll_wr_checksum(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct super_block *sb = data; + struct ll_sb_info *sbi = ll_s2sbi(sb); + int val, rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val) + sbi->ll_flags |= LL_SBI_CHECKSUM; + else + sbi->ll_flags &= ~LL_SBI_CHECKSUM; + + rc = obd_set_info(sbi->ll_osc_exp, strlen("checksum"), "checksum", + sizeof(val), &val); + if (rc) + CWARN("Failed to set OSC checksum flags: %d\n", rc); + + return count; +} + static struct lprocfs_vars lprocfs_obd_vars[] = { { "uuid", ll_rd_sb_uuid, 0, 0 }, //{ "mntpt_path", ll_rd_path, 0, 0 }, @@ -275,6 +309,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "max_read_ahead_mb", ll_rd_max_readahead_mb, ll_wr_max_readahead_mb, 0 }, { "max_cached_mb", ll_rd_max_cached_mb, ll_wr_max_cached_mb, 0 }, + { "checksum_pages", ll_rd_checksum, ll_wr_checksum, 0 }, { 0 } }; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 6ca5b8c..b698b05 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -40,6 +40,7 @@ #include #include #include +#include #define DEBUG_SUBSYSTEM S_LLITE @@ -131,12 +132,31 @@ void ll_truncate(struct inode *inode) LASSERT(atomic_read(&lli->lli_size_sem.count) <= 0); + /* XXX I'm pretty sure this is a hack to paper over a more fundamental + * race condition. */ if (lov_merge_size(lsm, 0) == inode->i_size) { CDEBUG(D_VFSTRACE, "skipping punch for "LPX64" (size = %llu)\n", lsm->lsm_object_id, inode->i_size); GOTO(out_unlock, 0); } + if (unlikely((ll_i2sbi(inode)->ll_flags & LL_SBI_CHECKSUM) && + (inode->i_size & ~PAGE_MASK))) { + /* If the truncate leaves behind a partial page, update its + * checksum. */ + struct page *page = find_get_page(inode->i_mapping, + inode->i_size >> PAGE_CACHE_SHIFT); + if (page != NULL) { + struct ll_async_page *llap = llap_cast_private(page); + if (llap != NULL) { + llap->llap_checksum = + crc32_le(0, kmap(page), PAGE_SIZE); + kunmap(page); + } + page_cache_release(page); + } + } + CDEBUG(D_INFO, "calling punch for "LPX64" (new size %llu)\n", lsm->lsm_object_id, inode->i_size); @@ -557,7 +577,29 @@ struct ll_async_page *llap_from_page(struct page *page, unsigned origin) list_add_tail(&llap->llap_pglist_item, &sbi->ll_pglist); spin_unlock(&sbi->ll_lock); -out: + out: + if (unlikely(sbi->ll_flags & LL_SBI_CHECKSUM)) { + __u32 csum = 0; + csum = crc32_le(csum, kmap(page), PAGE_SIZE); + kunmap(page); + if (origin == LLAP_ORIGIN_READAHEAD || + origin == LLAP_ORIGIN_READPAGE) { + llap->llap_checksum = 0; + } else if (origin == LLAP_ORIGIN_COMMIT_WRITE || + llap->llap_checksum == 0) { + llap->llap_checksum = csum; + CDEBUG(D_PAGE, "page %p cksum %x\n", page, csum); + } else if (llap->llap_checksum == csum) { + /* origin == LLAP_ORIGIN_WRITEPAGE */ + CDEBUG(D_PAGE, "page %p cksum %x confirmed\n", + page, csum); + } else { + /* origin == LLAP_ORIGIN_WRITEPAGE */ + LL_CDEBUG_PAGE(D_ERROR, page, "old cksum %x != new " + "%x!\n", llap->llap_checksum, csum); + } + } + llap->llap_origin = origin; RETURN(llap); } @@ -568,6 +610,7 @@ static int queue_or_sync_write(struct obd_export *exp, struct inode *inode, { unsigned long size_index = inode->i_size >> PAGE_SHIFT; struct obd_io_group *oig; + struct ll_sb_info *sbi = ll_i2sbi(inode); int rc; ENTRY; @@ -603,6 +646,22 @@ static int queue_or_sync_write(struct obd_export *exp, struct inode *inode, to = size_to; } + /* compare the checksum once before the page leaves llite */ + if (unlikely((sbi->ll_flags & LL_SBI_CHECKSUM) && + llap->llap_checksum != 0)) { + __u32 csum = 0; + struct page *page = llap->llap_page; + csum = crc32_le(csum, kmap(page), PAGE_SIZE); + kunmap(page); + if (llap->llap_checksum == csum) { + CDEBUG(D_PAGE, "page %p cksum %x confirmed\n", + page, csum); + } else { + CERROR("page %p old cksum %x != new cksum %x!\n", + page, llap->llap_checksum, csum); + } + } + rc = obd_queue_group_io(exp, ll_i2info(inode)->lli_smd, NULL, oig, llap->llap_cookie, OBD_BRW_WRITE, 0, to, 0, ASYNC_READY | ASYNC_URGENT | diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index a3372ac..a53de39 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -2037,15 +2037,18 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen, if (KEY_IS("next_id")) { if (vallen != lov->desc.ld_tgt_count) RETURN(-EINVAL); + vallen = sizeof(obd_id); + } + + if (KEY_IS("next_id") || KEY_IS("checksum")) { for (i = 0; i < lov->desc.ld_tgt_count; i++) { /* OST was disconnected */ if (!lov->tgts[i].ltd_exp) continue; - /* initialize all OSCs, even inactive ones */ - err = obd_set_info(lov->tgts[i].ltd_exp, - keylen, key, sizeof(obd_id), - ((obd_id*)val) + i); + /* hit all OSCs, even inactive ones */ + err = obd_set_info(lov->tgts[i].ltd_exp, keylen, key, + vallen, ((obd_id*)val) + i); if (!rc) rc = err; } diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 4908339..452d13f 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -961,7 +961,7 @@ int oig_wait(struct obd_io_group *oig) return oig->oig_rc; } - + /* Ping evictor thread */ #define PET_READY 1 #define PET_TERMINATE 2 diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index e02603b..da9aa9d 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -35,8 +35,8 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { {0} }; static struct lprocfs_vars lprocfs_module_vars[] = { {0} }; #else -int osc_rd_max_pages_per_rpc(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_max_pages_per_rpc(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -48,8 +48,8 @@ int osc_rd_max_pages_per_rpc(char *page, char **start, off_t off, int count, return rc; } -int osc_wr_max_pages_per_rpc(struct file *file, const char *buffer, - unsigned long count, void *data) +static int osc_wr_max_pages_per_rpc(struct file *file, const char *buffer, + unsigned long count, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -69,8 +69,8 @@ int osc_wr_max_pages_per_rpc(struct file *file, const char *buffer, return count; } -int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -82,8 +82,8 @@ int osc_rd_max_rpcs_in_flight(char *page, char **start, off_t off, int count, return rc; } -int osc_wr_max_rpcs_in_flight(struct file *file, const char *buffer, - unsigned long count, void *data) +static int osc_wr_max_rpcs_in_flight(struct file *file, const char *buffer, + unsigned long count, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -103,8 +103,8 @@ int osc_wr_max_rpcs_in_flight(struct file *file, const char *buffer, return count; } -int osc_rd_max_dirty_mb(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_max_dirty_mb(char *page, char **start, off_t off, int count, + int *eof, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -117,8 +117,8 @@ int osc_rd_max_dirty_mb(char *page, char **start, off_t off, int count, return snprintf(page, count, "%u\n", val); } -int osc_wr_max_dirty_mb(struct file *file, const char *buffer, - unsigned long count, void *data) +static int osc_wr_max_dirty_mb(struct file *file, const char *buffer, + unsigned long count, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -139,8 +139,8 @@ int osc_wr_max_dirty_mb(struct file *file, const char *buffer, return count; } -int osc_rd_cur_dirty_bytes(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_cur_dirty_bytes(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -152,8 +152,8 @@ int osc_rd_cur_dirty_bytes(char *page, char **start, off_t off, int count, return rc; } -int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *dev = data; struct client_obd *cli = &dev->u.cli; @@ -165,8 +165,8 @@ int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, int count, return rc; } -int osc_rd_create_count(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_create_count(char *page, char **start, off_t off, int count, + int *eof, void *data) { struct obd_device *obd = data; @@ -177,8 +177,8 @@ int osc_rd_create_count(char *page, char **start, off_t off, int count, obd->u.cli.cl_oscc.oscc_grow_count); } -int osc_wr_create_count(struct file *file, const char *buffer, - unsigned long count, void *data) +static int osc_wr_create_count(struct file *file, const char *buffer, + unsigned long count, void *data) { struct obd_device *obd = data; int val, rc; @@ -200,8 +200,8 @@ int osc_wr_create_count(struct file *file, const char *buffer, return count; } -int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_prealloc_next_id(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *obd = data; @@ -212,8 +212,8 @@ int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count, obd->u.cli.cl_oscc.oscc_next_id); } -int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count, - int *eof, void *data) +static int osc_rd_prealloc_last_id(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct obd_device *obd = data; @@ -224,6 +224,36 @@ int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count, obd->u.cli.cl_oscc.oscc_last_id); } +static int osc_rd_checksum(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *obd = data; + + if (obd == NULL) + return 0; + + return snprintf(page, count, "%d\n", + obd->u.cli.cl_checksum ? 1 : 0); +} + +static int osc_wr_checksum(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val, rc; + + if (obd == NULL) + return 0; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + obd->u.cli.cl_checksum = (val ? 1 : 0); + + return count; +} + static struct lprocfs_vars lprocfs_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "ping", 0, lprocfs_wr_ping, 0 }, @@ -246,6 +276,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "create_count", osc_rd_create_count, osc_wr_create_count, 0 }, { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 }, { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 }, + { "checksums", osc_rd_checksum, osc_wr_checksum, 0 }, { 0 } }; diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f3756ccf..b45fd27 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -44,6 +44,7 @@ # else # include # endif +# include #else /* __KERNEL__ */ # include #endif @@ -62,6 +63,7 @@ #include #include #include +#include #include "osc_internal.h" /* Pack OSC object metadata for disk storage (LE byte order). */ @@ -301,9 +303,9 @@ static int osc_setattr_async(struct obd_export *exp, struct obdo *oa, memcpy(&body->oa, oa, sizeof(*oa)); request->rq_replen = lustre_msg_size(1, &size); - /* do mds to ost setattr asynchronouly */ + /* do mds to ost setattr asynchronouly */ ptlrpcd_add_req(request); - + RETURN(rc); } @@ -716,37 +718,29 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) return (p1->off + p1->count == p2->off); } -#if CHECKSUM_BULK -static obd_count cksum_blocks(int nob, obd_count page_count, - struct brw_page *pga) +static obd_count osc_checksum_bulk(int nob, obd_count page_count, + struct brw_page *pga) { - obd_count cksum = 0; + __u32 cksum = ~0; LASSERT (page_count > 0); - while (nob > 0) { + while (nob > 0 && page_count > 0) { char *ptr = kmap(pga->pg); - int psum, off = pga->off & ~PAGE_MASK; + int off = pga->off & ~PAGE_MASK; int count = pga->count > nob ? nob : pga->count; - while (count > 0) { - ost_checksum(&cksum, &psum, ptr + off, - count > CHECKSUM_CHUNK ? - CHECKSUM_CHUNK : count); - LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n", - off, psum); - off += CHECKSUM_CHUNK; - count -= CHECKSUM_CHUNK; - } + cksum = crc32_le(cksum, ptr + off, count); kunmap(pga->pg); + LL_CDEBUG_PAGE(D_PAGE, pga->pg, "off %d checksum %x\n", + off, cksum); nob -= pga->count; page_count--; pga++; } - return (cksum); + return cksum; } -#endif static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, @@ -837,14 +831,22 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, /* size[0] still sizeof (*body) */ if (opc == OST_WRITE) { -#if CHECKSUM_BULK - body->oa.o_valid |= OBD_MD_FLCKSUM; - body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga); -#endif + if (unlikely(cli->cl_checksum)) { + body->oa.o_valid |= OBD_MD_FLCKSUM; + body->oa.o_cksum = osc_checksum_bulk(requested_nob, + page_count, pga); + CDEBUG(D_PAGE, "checksum at write origin: %x\n", + body->oa.o_cksum); + /* save this in 'oa', too, for later checking */ + oa->o_valid |= OBD_MD_FLCKSUM; + oa->o_cksum = body->oa.o_cksum; + } /* 1 RC per niobuf */ size[1] = sizeof(__u32) * niocount; req->rq_replen = lustre_msg_size(2, size); } else { + if (unlikely(cli->cl_checksum)) + body->oa.o_valid |= OBD_MD_FLCKSUM; /* 1 RC for the whole I/O */ req->rq_replen = lustre_msg_size(1, size); } @@ -859,6 +861,39 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, return (rc); } +static void check_write_csum(__u32 cli, __u32 srv, int requested_nob, + obd_count page_count, struct brw_page *pga) +{ + __u32 new_csum; + + if (srv == cli) { + CDEBUG(D_PAGE, "checksum %x confirmed\n", cli); + return; + } + + new_csum = osc_checksum_bulk(requested_nob, page_count, pga); + + if (new_csum == srv) { + CERROR("BAD CHECKSUM (WRITE): pages were mutated on the client" + "after we checksummed them (original client csum:" + " %x; server csum: %x; client csum now: %x)\n", + cli, srv, new_csum); + return; + } + + if (new_csum == cli) { + CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit " + "(original client csum: %x; server csum: %x; client " + "csum now: %x)\n", cli, srv, new_csum); + return; + } + + CERROR("BAD CHECKSUM (WRITE): pages were mutated in transit, and the " + "current page contents don't match the originals OR what the " + "server received (original client csum: %x; server csum: %x; " + "client csum now: %x)\n", cli, srv, new_csum); +} + static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, int requested_nob, int niocount, obd_count page_count, struct brw_page *pga, @@ -866,6 +901,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, { struct client_obd *cli = &req->rq_import->imp_obd->u.cli; struct ost_body *body; + __u32 client_cksum = 0; ENTRY; if (rc < 0 && rc != -EDQUOT) @@ -886,6 +922,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, if (rc < 0) RETURN(rc); + if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) + client_cksum = oa->o_cksum; /* save for later */ + osc_update_grant(cli, body); memcpy(oa, &body->oa, sizeof(*oa)); @@ -896,10 +935,17 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, } LASSERT (req->rq_bulk->bd_nob == requested_nob); + if (unlikely((oa->o_valid & OBD_MD_FLCKSUM) && + client_cksum)) { + check_write_csum(client_cksum, oa->o_cksum, + requested_nob, page_count, pga); + } + RETURN(check_write_rcs(req, requested_nob, niocount, page_count, pga)); } + /* The rest of this function executes only for OST_READs */ if (rc > requested_nob) { CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob); RETURN(-EPROTO); @@ -914,39 +960,45 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, if (rc < requested_nob) handle_short_read(rc, page_count, pga); -#if CHECKSUM_BULK - if (oa->o_valid & OBD_MD_FLCKSUM) { - const struct ptlrpc_peer *peer = + if (unlikely(oa->o_valid & OBD_MD_FLCKSUM)) { + struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer; static int cksum_counter; - obd_count server_cksum = oa->o_cksum; - obd_count cksum = cksum_pages(rc, page_count, pga); + __u32 cksum = osc_checksum_bulk(rc, page_count, pga); + __u32 server_cksum = oa->o_cksum; char str[PTL_NALFMT_SIZE]; - portals_nid2str(peer->peer_ni->pni_number, peer->peer_nid, str); + ptlrpc_peernid2str(peer, str); + + if (server_cksum == ~0 && rc > 0) { + CERROR("Protocol error: server %s set the 'checksum' " + "bit, but didn't send a checksum. Not fatal, " + "but please tell CFS.\n", str); + RETURN(0); + } cksum_counter++; if (server_cksum != cksum) { - CERROR("Bad checksum: server %x, client %x, server NID " - LPX64" (%s)\n", server_cksum, cksum, - peer->peer_nid, str); + CERROR("Bad checksum: server %x != client %x, server " + "NID "LPX64" (%s)\n", server_cksum, cksum, + peer->peer_id.nid, str); cksum_counter = 0; oa->o_cksum = cksum; } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){ CWARN("Checksum %u from "LPX64" (%s) OK: %x\n", - cksum_counter, peer->peer_nid, str, cksum); + cksum_counter, peer->peer_id.nid, str, cksum); } - CDEBUG(D_PAGE, "checksum %x\n", cksum); - } else { + CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum); + } else if (unlikely(client_cksum)) { static int cksum_missed; cksum_missed++; if ((cksum_missed & (-cksum_missed)) == cksum_missed) CERROR("Request checksum %u from "LPX64", no reply\n", cksum_missed, - req->rq_import->imp_connection->c_peer.peer_id.nid); + req->rq_import->imp_connection->c_peer.peer_id.nid); } -#endif + RETURN(0); } @@ -3018,10 +3070,12 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, char *bufs[1] = {key}; ENTRY; +#define KEY_IS(str) \ + (keylen == strlen(str) && memcmp(key, str, keylen) == 0) + OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); - if (keylen == strlen("next_id") && - memcmp(key, "next_id", strlen("next_id")) == 0) { + if (KEY_IS("next_id")) { if (vallen != sizeof(obd_id)) RETURN(-EINVAL); obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; @@ -3032,16 +3086,14 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (keylen == strlen("growth_count") && - memcmp(key, "growth_count", strlen("growth_count")) == 0) { + if (KEY_IS("growth_count")) { if (vallen != sizeof(int)) RETURN(-EINVAL); obd->u.cli.cl_oscc.oscc_grow_count = *((int*)val); RETURN(0); } - if (keylen == strlen("unlinked") && - memcmp(key, "unlinked", keylen) == 0) { + if (KEY_IS("unlinked")) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; spin_lock(&oscc->oscc_lock); oscc->oscc_flags &= ~OSCC_FLAG_NOSPC; @@ -3050,8 +3102,7 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, } - if (keylen == strlen("initial_recov") && - memcmp(key, "initial_recov", strlen("initial_recov")) == 0) { + if (KEY_IS("initial_recov")) { struct obd_import *imp = exp->exp_obd->u.cli.cl_import; if (vallen != sizeof(int)) RETURN(-EINVAL); @@ -3062,8 +3113,14 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (keylen < strlen("mds_conn") || - memcmp(key, "mds_conn", strlen("mds_conn")) != 0) + if (KEY_IS("checksum")) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0; + RETURN(0); + } + + if (!KEY_IS("mds_conn")) RETURN(-EINVAL); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 12d61cc..f1dd396 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -36,6 +36,7 @@ #define DEBUG_SUBSYSTEM S_OST #include +#include #include #include #include @@ -343,33 +344,24 @@ static void free_per_page_niobufs (int npages, struct niobuf_remote *pp_rnb, OBD_FREE(pp_rnb, sizeof(*pp_rnb) * npages); } -#if CHECKSUM_BULK -obd_count ost_checksum_bulk(struct ptlrpc_bulk_desc *desc) +static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc) { - obd_count cksum = 0; + __u32 cksum = ~0; int i; for (i = 0; i < desc->bd_iov_count; i++) { struct page *page = desc->bd_iov[i].kiov_page; char *ptr = kmap(page); - int psum, off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK; - int count = desc->bd_iov[i].kiov_len; - - while (count > 0) { - ost_checksum(&cksum, &psum, ptr + off, - count > CHECKSUM_CHUNK ? - CHECKSUM_CHUNK : count); - LL_CDEBUG_PAGE(D_PAGE, page, "off %d checksum %x\n", - off, psum); - off += CHECKSUM_CHUNK; - count -= CHECKSUM_CHUNK; - } + int off = desc->bd_iov[i].kiov_offset & ~PAGE_MASK; + + cksum = crc32_le(cksum, ptr + off, desc->bd_iov[i].kiov_len); kunmap(page); + LL_CDEBUG_PAGE(D_PAGE, page, "off %d checksum %x\n", + off, cksum); } return cksum; } -#endif static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) { @@ -386,7 +378,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) int npages; int nob = 0; int rc; - int i; + int i, do_checksum; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) @@ -444,6 +436,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_bulk, rc); /* We're finishing using body->oa as an input variable */ + do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM); body->oa.o_valid = 0; nob = 0; @@ -507,10 +500,12 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); -#if CHECKSUM_BULK - repbody->oa.o_cksum = ost_checksum_bulk(desc); - repbody->oa.o_valid |= OBD_MD_FLCKSUM; -#endif + if (unlikely(do_checksum)) { + repbody->oa.o_cksum = ost_checksum_bulk(desc); + repbody->oa.o_valid |= OBD_MD_FLCKSUM; + CDEBUG(D_PAGE, "checksum at read origin: %x\n", + repbody->oa.o_cksum); + } } out_bulk: @@ -568,7 +563,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) int size[2] = { sizeof(*body) }; int objcount, niocount, npages; int comms_error = 0; - int rc, swab, i, j; + int rc, swab, i, j, do_checksum; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK)) @@ -635,6 +630,9 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (desc == NULL) GOTO(out_local, rc = -ENOMEM); + /* obd_preprw clobbers oa->valid, so save what we need */ + do_checksum = (body->oa.o_valid & OBD_MD_FLCKSUM); + rc = obd_preprw(OBD_BRW_WRITE, req->rq_export, &body->oa, objcount, ioo, npages, pp_rnb, local_nb, oti); if (rc != 0) @@ -674,28 +672,27 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); -#if CHECKSUM_BULK - if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) { + if (unlikely(do_checksum && rc == 0)) { static int cksum_counter; obd_count client_cksum = body->oa.o_cksum; obd_count cksum = ost_checksum_bulk(desc); + cksum_counter++; if (client_cksum != cksum) { CERROR("Bad checksum: client %x, server %x id %s\n", - client_cksum, cksum, - req->rq_peerstr); - cksum_counter = 1; + client_cksum, cksum, req->rq_peerstr); + cksum_counter = 0; repbody->oa.o_cksum = cksum; + repbody->oa.o_valid |= OBD_MD_FLCKSUM; + } else if ((cksum_counter & (-cksum_counter)) == + cksum_counter) { + CWARN("Checksum %u from %s: %x OK\n", + cksum_counter, req->rq_peerstr, cksum); } else { - cksum_counter++; - if ((cksum_counter & (-cksum_counter)) == cksum_counter) - CWARN("Checksum %u from %s: %x OK\n", - cksum_counter, - req->rq_peerstr, - cksum); + CDEBUG(D_PAGE, "checksum %x confirmed\n", cksum); } } -#endif + /* Must commit after prep above in all cases */ rc = obd_commitrw(OBD_BRW_WRITE, req->rq_export, &repbody->oa, objcount, ioo, npages, local_nb, oti, rc); -- 1.8.3.1