From: adilger Date: Fri, 13 Feb 2004 23:17:53 +0000 (+0000) Subject: Landing b_bug974 onto HEAD (20040213_1538). X-Git-Tag: v1_7_100~2588 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=30c3a18963d1d6d70175fbbbdd9554e1eb2fa40d Landing b_bug974 onto HEAD (20040213_1538). Adds support for client-side write cache accounting via OST write credits (grants). There is not yet a "correction" algorithm for grants if they get out of sync between the client and OST (OST can handle clients who think they have grant but do not). Peter had previously suggested that grants should decay over time (faster on the clients than on the OSTs) so idle nodes do not consume space they aren't using. Use /proc/fs/lustre/osc/OSC*/cur_grant_bytes (and existing cur_dirty_bytes) to check client-side cache values. Includes lustre-side changes for lfsck support (create files with specific objids, create specific objids on OSTs). Also includes some changes to the testing scripts (local.sh, lov.sh, acc-sm) so that they include a "client nid '*'" client and can be used for sanityN.sh testing. This has been put into the main acceptance-small.sh loop so that sanityN.sh will be run on both single and multi-OST setups. b=974 b=2349 r=shaver --- diff --git a/lnet/include/linux/kp30.h b/lnet/include/linux/kp30.h index 0c4c4a0..09db989 100644 --- a/lnet/include/linux/kp30.h +++ b/lnet/include/linux/kp30.h @@ -115,7 +115,7 @@ do { \ if (portal_cerror == 0) \ break; \ CHECK_STACK(CDEBUG_STACK); \ - if (!(mask) || ((mask) & (D_ERROR | D_EMERG | D_WARNING)) || \ + if (((mask) & (D_ERROR | D_EMERG | D_WARNING)) || \ (portal_debug & (mask) && \ portal_subsystem_debug & DEBUG_SUBSYSTEM)) \ portals_debug_msg(DEBUG_SUBSYSTEM, mask, \ diff --git a/lnet/libcfs/debug.c b/lnet/libcfs/debug.c index 0bc93f3..7ad9327 100644 --- a/lnet/libcfs/debug.c +++ b/lnet/libcfs/debug.c @@ -633,9 +633,9 @@ int portals_debug_mark_buffer(char *text) if (debug_buf == NULL) return -EINVAL; - CDEBUG(0, "********************************************************\n"); + CDEBUG(D_TRACE,"***************************************************\n"); CWARN("DEBUG MARKER: %s\n", text); - CDEBUG(0, "********************************************************\n"); + CDEBUG(D_TRACE,"***************************************************\n"); return 0; } diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 40aa745..cbdcb10 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -1,6 +1,8 @@ tbd Cluster File Systems, Inc. * version 1.2.0 * bug fixes + - account for cache space usage on clients to avoid data loss (974) + - lfsck support in lustre kernel code (2349) - reduce journal credits needed for BRW writes (2370) - orphan handling to avoid losing space on client/server crashes - ptlrpcd can be blocked, stopping ALL progress (2477) diff --git a/lustre/include/linux/lprocfs_status.h b/lustre/include/linux/lprocfs_status.h index 3f4d52f..3a7ac42 100644 --- a/lustre/include/linux/lprocfs_status.h +++ b/lustre/include/linux/lprocfs_status.h @@ -249,6 +249,8 @@ extern int lprocfs_rd_kbytestotal(char *page, char **start, off_t off, int count, int *eof, void *data); extern int lprocfs_rd_kbytesfree(char *page, char **start, off_t off, int count, int *eof, void *data); +extern int lprocfs_rd_kbytesavail(char *page, char **start, off_t off, + int count, int *eof, void *data); extern int lprocfs_rd_filestotal(char *page, char **start, off_t off, int count, int *eof, void *data); extern int lprocfs_rd_filesfree(char *page, char **start, off_t off, @@ -340,6 +342,9 @@ static inline int lprocfs_rd_kbytesfree(char *page, char **start, off_t off, int count, int *eof, void *data) { return 0; } static inline +int lprocfs_rd_kbytesavail(char *page, char **start, off_t off, + int count, int *eof, void *data) { return 0; } +static inline int lprocfs_rd_filestotal(char *page, char **start, off_t off, int count, int *eof, void *data) { return 0; } static inline diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index f8ae03ca..b949fe1 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -57,6 +57,9 @@ struct filter_export_data { struct filter_client_data *fed_fcd; loff_t fed_lr_off; int fed_lr_idx; + unsigned long fed_dirty; /* in bytes */ + unsigned long fed_grant; /* in bytes */ + unsigned long fed_pending; /* bytes just being written */ }; struct obd_export { diff --git a/lustre/include/linux/lustre_fsfilt.h b/lustre/include/linux/lustre_fsfilt.h index 5f9ac77..3f3421a 100644 --- a/lustre/include/linux/lustre_fsfilt.h +++ b/lustre/include/linux/lustre_fsfilt.h @@ -28,7 +28,6 @@ #ifdef __KERNEL__ #include -#include typedef void (*fsfilt_cb_t)(struct obd_device *obd, __u64 last_rcvd, void *data, int error); @@ -222,10 +221,22 @@ static inline int fsfilt_add_journal_cb(struct obd_device *obd, __u64 last_rcvd, cb_func, cb_data); } +/* very similar to obd_statfs(), but caller already holds obd_osfs_lock */ static inline int fsfilt_statfs(struct obd_device *obd, struct super_block *sb, - struct obd_statfs *osfs) + unsigned long max_age) { - return obd->obd_fsops->fs_statfs(sb, osfs); + int rc = 0; + + CDEBUG(D_SUPER, "osfs %lu, max_age %lu\n", obd->obd_osfs_age, max_age); + if (time_before(obd->obd_osfs_age, max_age)) { + rc = obd->obd_fsops->fs_statfs(sb, &obd->obd_osfs); + if (rc == 0) /* N.B. statfs can't really fail */ + obd->obd_osfs_age = jiffies; + } else { + CDEBUG(D_SUPER, "using cached obd_statfs data\n"); + } + + return rc; } static inline int fsfilt_sync(struct obd_device *obd, struct super_block *sb) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index da9bd52..9428296 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -265,7 +265,6 @@ typedef uint32_t obd_blksize; typedef uint32_t obd_mode; typedef uint32_t obd_uid; typedef uint32_t obd_gid; -typedef uint64_t obd_rdev; typedef uint32_t obd_flag; typedef uint32_t obd_count; @@ -274,6 +273,7 @@ typedef uint32_t obd_count; #define OBD_FL_DELORPHAN (0x00000004) /* if set in o_flags delete orphans */ #define OBD_FL_NORPC (0x00000008) // if set in o_flags set in OSC not OST #define OBD_FL_IDONLY (0x00000010) // if set in o_flags only adjust obj id +#define OBD_FL_RECREATE_OBJS (0x00000020) // recreate missing obj #define OBD_INLINESZ 64 @@ -285,21 +285,26 @@ struct obdo { obd_time o_mtime; obd_time o_ctime; obd_size o_size; - obd_blocks o_blocks; /* brw: clients sent cached bytes */ - obd_rdev o_rdev; /* brw: clients/servers sent grant */ + obd_blocks o_blocks; /* brw: cli sent cached bytes */ + obd_size o_grant; obd_blksize o_blksize; /* optimal IO blocksize */ - obd_mode o_mode; + obd_mode o_mode; /* brw: cli sent cache remain */ obd_uid o_uid; obd_gid o_gid; obd_flag o_flags; - obd_count o_nlink; /* brw: checksum */ + obd_count o_nlink; /* brw: checksum */ obd_count o_generation; obd_flag o_valid; /* hot fields in this obdo */ - obd_flag o_obdflags; + obd_count o_misc; __u32 o_easize; /* epoch in ost writes */ char o_inline[OBD_INLINESZ]; /* fid in ost writes */ }; +#define o_dirty o_blocks +#define o_undirty o_mode +#define o_dropped o_misc +#define o_cksum o_nlink + extern void lustre_swab_obdo (struct obdo *o); #define LOV_MAGIC_V1 0x0BD10BD0 @@ -357,7 +362,6 @@ struct lov_mds_md_v0 { /* LOV EA mds/wire data (little-endian) */ #define OBD_MD_FLUID (0x00000200) /* user ID */ #define OBD_MD_FLGID (0x00000400) /* group ID */ #define OBD_MD_FLFLAGS (0x00000800) /* flags word */ -#define OBD_MD_FLOBDFLG (0x00001000) #define OBD_MD_FLNLINK (0x00002000) /* link count */ #define OBD_MD_FLGENER (0x00004000) /* generation number */ #define OBD_MD_FLINLINE (0x00008000) /* inline data */ @@ -372,7 +376,8 @@ struct lov_mds_md_v0 { /* LOV EA mds/wire data (little-endian) */ #define OBD_MD_FLGROUP (0x01000000) /* group */ #define OBD_MD_FLIFID (0x02000000) /* ->ost write inline fid */ #define OBD_MD_FLEPOCH (0x04000000) /* ->ost write easize is epoch */ -#define OBD_MD_FLNOTOBD (~(OBD_MD_FLOBDFLG | OBD_MD_FLBLOCKS | OBD_MD_LINKNAME|\ +#define OBD_MD_FLGRANT (0x08000000) /* ost preallocation space grant */ +#define OBD_MD_FLNOTOBD (~(OBD_MD_FLBLOCKS | OBD_MD_LINKNAME|\ OBD_MD_FLEASIZE | OBD_MD_FLHANDLE | OBD_MD_FLCKSUM|\ OBD_MD_FLQOS | OBD_MD_FLOSCOPQ | OBD_MD_FLCOOKIE)) @@ -411,10 +416,10 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os); #define OBD_BRW_READ 0x01 #define OBD_BRW_WRITE 0x02 #define OBD_BRW_RWMASK (OBD_BRW_READ | OBD_BRW_WRITE) -#define OBD_BRW_CREATE 0x04 #define OBD_BRW_SYNC 0x08 #define OBD_BRW_CHECK 0x10 #define OBD_BRW_FROM_GRANT 0x20 /* the osc manages this under llite */ +#define OBD_BRW_GRANTED 0x40 /* the ost manages this */ #define OBD_OBJECT_EOF 0xffffffffffffffffULL @@ -497,6 +502,11 @@ struct ll_fid { __u32 f_type; }; +struct ll_recreate_obj { + __u64 lrc_id; + __u32 lrc_ost_idx; +}; + extern void lustre_swab_ll_fid (struct ll_fid *fid); #define MDS_STATUS_CONN 1 @@ -588,6 +598,7 @@ extern void lustre_swab_mds_rec_setattr (struct mds_rec_setattr *sa); #define MDS_OPEN_DELAY_CREATE 0100000000 /* delay initial object create */ #define MDS_OPEN_HAS_EA 010000000000 /* specify object create pattern */ +#define MDS_OPEN_HAS_OBJS 020000000000 /* Just set the EA the obj exist */ struct mds_rec_create { __u32 cr_opcode; diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 0ad0ec7..9013e8a 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -431,6 +431,7 @@ static inline void obd_ioctl_freedata(char *buf, int len) #define OBD_IOC_LOV_SETSTRIPE _IOW ('f', 154, long) #define OBD_IOC_LOV_GETSTRIPE _IOW ('f', 155, long) +#define OBD_IOC_LOV_SETEA _IOW ('f', 156, long) #define OBD_IOC_MOUNTOPT _IOWR('f', 170, long) diff --git a/lustre/include/linux/lustre_user.h b/lustre/include/linux/lustre_user.h index 33a6251..2eba485 100644 --- a/lustre/include/linux/lustre_user.h +++ b/lustre/include/linux/lustre_user.h @@ -30,6 +30,8 @@ #define LL_IOC_CLRFLAGS _IOW ('f', 153, long) #define LL_IOC_LOV_SETSTRIPE _IOW ('f', 154, long) #define LL_IOC_LOV_GETSTRIPE _IOW ('f', 155, long) +#define LL_IOC_LOV_SETEA _IOW ('f', 156, long) +#define LL_IOC_RECREATE_OBJ _IOW ('f', 157, long) #define O_LOV_DELAY_CREATE 0100000000 /* hopefully this does not conflict */ @@ -42,6 +44,7 @@ #define LOV_PATTERN_RAID1 0x002 #define LOV_PATTERN_FIRST 0x100 +#define lov_user_ost_data lov_user_ost_data_v1 struct lov_user_ost_data_v1 { /* per-stripe data structure */ __u64 l_object_id; /* OST object ID */ __u64 l_object_gr; /* OST object group (creating MDS number) */ diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 44b1809..619010b 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -164,9 +164,9 @@ struct filter_obd { struct list_head fo_export_list; int fo_subdir_count; - spinlock_t fo_grant_lock; /* protects tot_granted */ - obd_size fo_tot_granted; - obd_size fo_tot_cached; + obd_size fo_tot_dirty; /* protected by obd_osfs_lock */ + obd_size fo_tot_granted; /* all values in bytes */ + obd_size fo_tot_pending; obd_size fo_readcache_max_filesize; @@ -177,7 +177,7 @@ struct filter_obd { struct ptlrpc_client fo_mdc_client; #endif struct file **fo_last_objid_files; - __u64 *fo_last_objids; //last created object ID for groups + __u64 *fo_last_objids; /* last created objid for groups */ struct semaphore fo_alloc_lock; @@ -210,18 +210,14 @@ struct client_obd { //struct llog_canceld_ctxt *cl_llcd; /* it's included by obd_llog_ctxt */ void *cl_llcd_offset; - struct semaphore cl_dirty_sem; - obd_size cl_dirty; /* all _dirty_ in bytes */ - obd_size cl_dirty_granted; /* from ost */ - obd_size cl_dirty_max; /* allowed w/o rpc */ - struct list_head cl_cache_waiters; - struct obd_device *cl_mgmtcli_obd; - /* this is just to keep existing infinitely caching behaviour between - * clients and OSTs that don't have the grant code in yet.. it can - * be yanked once everything speaks grants */ - char cl_ost_can_grant; + /* the grant values are protected by loi_list_lock below */ + long cl_dirty; /* all _dirty_ in bytes */ + long cl_dirty_max; /* allowed w/o rpc */ + long cl_avail_grant; /* bytes of credit for ost */ + long cl_lost_grant; /* lost credits (trunc) */ + struct list_head cl_cache_waiters; /* waiting for cache/grant */ /* keep track of objects that have lois that contain pages which * have been queued for async brw. this lock also protects the @@ -372,10 +368,10 @@ struct niobuf_local { __u64 offset; __u32 len; __u32 flags; - int rc; struct page *page; struct dentry *dentry; - unsigned long start; + int lnb_grant_used; + int rc; }; @@ -465,6 +461,7 @@ struct obd_device { spinlock_t obd_dev_lock; __u64 obd_last_committed; struct fsfilt_operations *obd_fsops; + spinlock_t obd_osfs_lock; struct llog_ctxt *obd_llog_ctxt[LLOG_MAX_CTXTS]; struct obd_statfs obd_osfs; unsigned long obd_osfs_age; diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 660f588..3e1a512 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -621,17 +621,17 @@ static inline int obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, OBD_COUNTER_INCREMENT(obd, statfs); CDEBUG(D_SUPER, "osfs %lu, max_age %lu\n", obd->obd_osfs_age, max_age); - if (obd->obd_osfs_age == 0 || time_before(obd->obd_osfs_age, max_age)) { + if (time_before(obd->obd_osfs_age, max_age)) { rc = OBP(obd, statfs)(obd, osfs, max_age); - spin_lock(&obd->obd_dev_lock); + spin_lock(&obd->obd_osfs_lock); memcpy(&obd->obd_osfs, osfs, sizeof(obd->obd_osfs)); obd->obd_osfs_age = jiffies; - spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_osfs_lock); } else { CDEBUG(D_SUPER, "using cached obd_statfs data\n"); - spin_lock(&obd->obd_dev_lock); + spin_lock(&obd->obd_osfs_lock); memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); - spin_unlock(&obd->obd_dev_lock); + spin_unlock(&obd->obd_osfs_lock); } RETURN(rc); } @@ -724,10 +724,10 @@ static inline int obd_prep_async_page(struct obd_export *exp, RETURN(ret); } -static inline int obd_queue_async_io(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie, - int cmd, obd_off off, int count, +static inline int obd_queue_async_io(struct obd_export *exp, + struct lov_stripe_md *lsm, + struct lov_oinfo *loi, void *cookie, + int cmd, obd_off off, int count, obd_flag brw_flags, obd_flag async_flags) { int rc; diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 246ed17..9c29dbc 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -78,7 +78,7 @@ ldlm_extent_internal_policy(struct list_head *queue, struct ldlm_lock *req, EXIT; return; } - new_ex->start = MIN(lock->l_policy_data.l_extent.end+1, + new_ex->start = min(lock->l_policy_data.l_extent.end+1, req_start); } diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 9d4934e..4b7eb3b 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -91,17 +91,14 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) RETURN(-EINVAL); } - sema_init(&cli->cl_sem, 1); cli->cl_conn_count = 0; - memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2, + memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, min(lcfg->lcfg_inllen2, sizeof(server_uuid))); - init_MUTEX(&cli->cl_dirty_sem); cli->cl_dirty = 0; - cli->cl_dirty_granted = 0; + cli->cl_avail_grant = 0; cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024; - cli->cl_ost_can_grant = 1; INIT_LIST_HEAD(&cli->cl_cache_waiters); INIT_LIST_HEAD(&cli->cl_loi_ready_list); INIT_LIST_HEAD(&cli->cl_loi_write_list); @@ -472,6 +469,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) } } + /* If all else goes well, this is our RPC return code. */ req->rq_status = 0; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 64dfb52..e1fe658 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -688,7 +688,9 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) lock = ldlm_handle2lock(&dlm_req->lock_handle1); if (!lock) { CERROR("received cancel for unknown lock cookie "LPX64 - " from nid "LPX64" (%s)\n", dlm_req->lock_handle1.cookie, + " from client %s nid "LPX64" (%s)\n", + dlm_req->lock_handle1.cookie, + req->rq_export->exp_client_uuid.uuid, req->rq_peer.peer_nid, portals_nid2str(req->rq_peer.peer_ni->pni_number, req->rq_peer.peer_nid, str)); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index af3d3aa..2bd8248 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -166,12 +166,8 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) lli->lli_st_gid = src->o_gid; if (valid & OBD_MD_FLFLAGS) lli->lli_st_flags = src->o_flags; - if (valid & OBD_MD_FLNLINK) - lli->lli_st_nlink = src->o_nlink; if (valid & OBD_MD_FLGENER) lli->lli_st_generation = src->o_generation; - if (valid & OBD_MD_FLRDEV) - lli->lli_st_rdev = to_kdev_t(src->o_rdev); } #define S_IRWXUGO (S_IRWXU|S_IRWXG|S_IRWXO) @@ -231,18 +227,10 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) dst->o_flags = lli->lli_st_flags; newvalid |= OBD_MD_FLFLAGS; } - if (valid & OBD_MD_FLNLINK) { - dst->o_nlink = lli->lli_st_nlink; - newvalid |= OBD_MD_FLNLINK; - } if (valid & OBD_MD_FLGENER) { dst->o_generation = lli->lli_st_generation; newvalid |= OBD_MD_FLGENER; } - if (valid & OBD_MD_FLRDEV) { - dst->o_rdev = (__u32)kdev_t_to_nr(lli->lli_st_rdev); - newvalid |= OBD_MD_FLRDEV; - } dst->o_valid |= newvalid; } diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 1f6c88a..6c76e87 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -690,27 +690,79 @@ out: RETURN(retval); } -static int ll_lov_setstripe(struct inode *inode, struct file *file, - unsigned long arg) +static int ll_lov_recreate_obj(struct inode *inode, struct file *file, + unsigned long arg) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct obd_export *exp = ll_i2obdexp(inode); + struct ll_recreate_obj ucreatp; + struct obd_trans_info oti = { 0 }; + struct obdo *oa = NULL; + int lsm_size; + int rc = 0; + struct lov_stripe_md *lsm, *lsm2; + ENTRY; + + if (!capable (CAP_SYS_ADMIN)) + RETURN(-EPERM); + + rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg, + sizeof(struct ll_recreate_obj)); + if (rc) { + RETURN(-EFAULT); + } + oa = obdo_alloc(); + if (oa == NULL) { + RETURN(-ENOMEM); + } + + down(&lli->lli_open_sem); + lsm = lli->lli_smd; + if (lsm == NULL) { + up(&lli->lli_open_sem); + obdo_free(oa); + RETURN (-ENOENT); + } + lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) * + (lsm->lsm_stripe_count)); + + OBD_ALLOC(lsm2, lsm_size); + if (lsm2 == NULL) { + up(&lli->lli_open_sem); + obdo_free(oa); + RETURN(-ENOMEM); + } + + oa->o_id = ucreatp.lrc_id; + oa->o_nlink = ucreatp.lrc_ost_idx; + oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS; + oa->o_flags |= OBD_FL_RECREATE_OBJS; + obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + oti.oti_objid = NULL; + memcpy(lsm2, lsm, lsm_size); + rc = obd_create(exp, oa, &lsm2, &oti); + + up(&lli->lli_open_sem); + OBD_FREE(lsm2, lsm_size); + obdo_free(oa); + RETURN (rc); +} + +static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file, + int flags, struct lov_user_md *lum, int lum_size) { struct ll_inode_info *lli = ll_i2info(inode); struct file *f; struct obd_export *exp = ll_i2obdexp(inode); struct lov_stripe_md *lsm; - struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = FMODE_WRITE}; - struct lov_user_md lum, *lump = (struct lov_user_md *)arg; + struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags}; struct ptlrpc_request *req = NULL; + int rc = 0; struct lustre_md md; - int rc; ENTRY; - /* Bug 1152: copy properly when this is no longer true */ - LASSERT(sizeof(lum) == sizeof(*lump)); - LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0])); - rc = copy_from_user(&lum, lump, sizeof(lum)); - if (rc) - RETURN(-EFAULT); - down(&lli->lli_open_sem); lsm = lli->lli_smd; if (lsm) { @@ -727,7 +779,7 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file, f->f_dentry = file->f_dentry; f->f_vfsmnt = file->f_vfsmnt; - rc = ll_intent_file_open(f, &lum, sizeof(lum), &oit); + rc = ll_intent_file_open(f, lum, lum_size, &oit); if (rc) GOTO(out, rc); if (it_disposition(&oit, DISP_LOOKUP_NEG)) @@ -759,6 +811,55 @@ static int ll_lov_setstripe(struct inode *inode, struct file *file, RETURN(rc); } +static int ll_lov_setea(struct inode *inode, struct file *file, + unsigned long arg) +{ + int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE; + struct lov_user_md *lump; + int lum_size = sizeof(struct lov_user_md) + + sizeof(struct lov_user_ost_data); + int rc; + ENTRY; + + if (!capable (CAP_SYS_ADMIN)) + RETURN(-EPERM); + + OBD_ALLOC(lump, lum_size); + if (lump == NULL) { + RETURN(-ENOMEM); + } + rc = copy_from_user(lump, (struct lov_user_md *)arg, + lum_size); + if (rc) { + OBD_FREE(lump, lum_size); + RETURN(-EFAULT); + } + + rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size); + + OBD_FREE(lump, lum_size); + RETURN(rc); +} + +static int ll_lov_setstripe(struct inode *inode, struct file *file, + unsigned long arg) +{ + struct lov_user_md lum, *lump = (struct lov_user_md *)arg; + int rc; + int flags = FMODE_WRITE; + ENTRY; + + /* Bug 1152: copy properly when this is no longer true */ + LASSERT(sizeof(lum) == sizeof(*lump)); + LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lump->lmm_objects[0])); + rc = copy_from_user(&lum, lump, sizeof(lum)); + if (rc) + RETURN(-EFAULT); + + rc = ll_lov_setstripe_ea_info(inode, file, flags, &lum, sizeof(lum)); + RETURN(rc); +} + static int ll_lov_getstripe(struct inode *inode, unsigned long arg) { struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; @@ -804,8 +905,12 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, RETURN(0); case LL_IOC_LOV_SETSTRIPE: RETURN(ll_lov_setstripe(inode, file, arg)); + case LL_IOC_LOV_SETEA: + RETURN( ll_lov_setea(inode, file, arg) ); case LL_IOC_LOV_GETSTRIPE: RETURN(ll_lov_getstripe(inode, arg)); + case LL_IOC_RECREATE_OBJ: + RETURN(ll_lov_recreate_obj(inode, file, arg)); case EXT3_IOC_GETFLAGS: case EXT3_IOC_SETFLAGS: RETURN( ll_iocontrol(inode, file, cmd, arg) ); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index a109b2d..21555c2 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -43,7 +43,7 @@ extern struct super_operations ll_super_operations; #define log2(n) ffz(~(n)) #endif -struct ll_sb_info *lustre_init_sbi(struct super_block *sb) +struct ll_sb_info *lustre_init_sbi(struct super_block *sb) { struct ll_sb_info *sbi = NULL; class_uuid_t uuid; @@ -65,7 +65,7 @@ struct ll_sb_info *lustre_init_sbi(struct super_block *sb) RETURN(sbi); } -void lustre_free_sbi(struct super_block *sb) +void lustre_free_sbi(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); ENTRY; @@ -126,8 +126,8 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) sb->s_blocksize_bits = log2(osfs.os_bsize); sb->s_magic = LL_SUPER_MAGIC; sb->s_maxbytes = PAGE_CACHE_MAXBYTES; - - devno = get_uuid2int(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid, + + devno = get_uuid2int(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid, strlen(sbi2mdc(sbi)->cl_import->imp_target_uuid.uuid)); sb->s_dev = devno; @@ -159,7 +159,7 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) sb->s_op = &lustre_super_operations; - /* make root inode + /* make root inode * XXX: move this to after cbd setup? */ err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); @@ -436,14 +436,14 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, err = class_process_config(&lcfg); if (err < 0) GOTO(out_detach, err); - + obd = class_name2obd(name); if (obd == NULL) GOTO(out_cleanup, err = -EINVAL); /* Disable initial recovery on this import */ - err = obd_set_info(obd->obd_self_export, - strlen("initial_recov"), "initial_recov", + err = obd_set_info(obd->obd_self_export, + strlen("initial_recov"), "initial_recov", sizeof(allow_recov), &allow_recov); if (err) GOTO(out_cleanup, err); @@ -453,9 +453,9 @@ int lustre_process_log(struct lustre_mount_data *lmd, char * profile, CERROR("cannot connect to %s: rc = %d\n", lmd->lmd_mds, err); GOTO(out_cleanup, err); } - + exp = class_conn2export(&mdc_conn); - + ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); rc = class_config_parse_llog(ctxt, profile, cfg); if (rc) { @@ -496,7 +496,7 @@ out_del_conn: out: if (rc == 0) rc = err; - + RETURN(rc); } @@ -527,15 +527,15 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) } OBD_ALLOC(sbi->ll_lmd, sizeof(*sbi->ll_lmd)); - if (sbi->ll_lmd == NULL) + if (sbi->ll_lmd == NULL) GOTO(out_free, err = -ENOMEM); memcpy(sbi->ll_lmd, lmd, sizeof(*lmd)); /* generate a string unique to this super, let's try the address of the super itself.*/ - len = (sizeof(sb) * 2) + 1; + len = (sizeof(sb) * 2) + 1; OBD_ALLOC(sbi->ll_instance, len); - if (sbi->ll_instance == NULL) + if (sbi->ll_instance == NULL) GOTO(out_free, err = -ENOMEM); sprintf(sbi->ll_instance, "%p", sb); @@ -556,13 +556,13 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) } if (osc) OBD_FREE(osc, strlen(osc) + 1); - OBD_ALLOC(osc, strlen(lprof->lp_osc) + + OBD_ALLOC(osc, strlen(lprof->lp_osc) + strlen(sbi->ll_instance) + 2); sprintf(osc, "%s-%s", lprof->lp_osc, sbi->ll_instance); if (mdc) OBD_FREE(mdc, strlen(mdc) + 1); - OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + + OBD_ALLOC(mdc, strlen(lprof->lp_mdc) + strlen(sbi->ll_instance) + 2); sprintf(mdc, "%s-%s", lprof->lp_mdc, sbi->ll_instance); } @@ -576,9 +576,9 @@ int lustre_fill_super(struct super_block *sb, void *data, int silent) CERROR("no mdc\n"); GOTO(out_free, err = -EINVAL); } - + err = lustre_common_fill_super(sb, mdc, osc); - + if (err) GOTO(out_free, err); @@ -605,9 +605,9 @@ out_free: OBD_ALLOC(cln_prof, len); sprintf(cln_prof, "%s-clean", sbi->ll_lmd->lmd_profile); - err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, + err = lustre_process_log(sbi->ll_lmd, cln_prof, &cfg, 0); - if (err < 0) + if (err < 0) CERROR("Unable to process log: %s\n", cln_prof); OBD_FREE(cln_prof, len); OBD_FREE(sbi->ll_instance, strlen(sbi->ll_instance)+ 1); @@ -619,11 +619,11 @@ out_free: goto out_dev; } /* lustre_fill_super */ -static void lustre_manual_cleanup(struct ll_sb_info *sbi) +static void lustre_manual_cleanup(struct ll_sb_info *sbi) { struct lustre_cfg lcfg; struct obd_device *obd; - int next = 0; + int next = 0; while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL) { @@ -644,7 +644,7 @@ static void lustre_manual_cleanup(struct ll_sb_info *sbi) } } - if (sbi->ll_lmd != NULL) + if (sbi->ll_lmd != NULL) class_del_profile(sbi->ll_lmd->lmd_profile); } @@ -660,7 +660,7 @@ void lustre_put_super(struct super_block *sb) if (obd) force_umount = obd->obd_no_recov; obd = NULL; - + lustre_common_put_super(sb); if (sbi->ll_lmd != NULL) { @@ -860,7 +860,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) /* from sys_utime() */ if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) { if (current->fsuid != inode->i_uid && - (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0) + (rc=ll_permission(inode,MAY_WRITE,NULL))!=0) RETURN(rc); } else { /* from inode_change_ok() */ @@ -878,7 +878,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) * If we don't we can race with other i_size updaters on our node, like * ll_file_read. We can also race with i_size propogation to other * nodes through dirtying and writeback of final cached pages. This - * last one is especially bad for racing o_append users on other + * last one is especially bad for racing o_append users on other * nodes. */ if (ia_valid & ATTR_SIZE) { struct ldlm_extent extent = { .start = attr->ia_size, diff --git a/lustre/llite/lproc_llite.c b/lustre/llite/lproc_llite.c index 06a7a7c..58c9ed9 100644 --- a/lustre/llite/lproc_llite.c +++ b/lustre/llite/lproc_llite.c @@ -105,6 +105,28 @@ static int ll_rd_kbytesfree(char *page, char **start, off_t off, int count, return rc; } +static int ll_rd_kbytesavail(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct super_block *sb = (struct super_block *)data; + struct obd_statfs osfs; + int rc; + + LASSERT(sb != NULL); + rc = ll_statfs_internal(sb, &osfs, jiffies - HZ); + if (!rc) { + __u32 blk_size = osfs.os_bsize >> 10; + __u64 result = osfs.os_bavail; + + while (blk_size >>= 1) + result <<= 1; + + *eof = 1; + rc = snprintf(page, count, LPU64"\n", result); + } + return rc; +} + static int ll_rd_filestotal(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -206,6 +228,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "blocksize", ll_rd_blksize, 0, 0 }, { "kbytestotal", ll_rd_kbytestotal, 0, 0 }, { "kbytesfree", ll_rd_kbytesfree, 0, 0 }, + { "kbytesavail", ll_rd_kbytesavail, 0, 0 }, { "filestotal", ll_rd_filestotal, 0, 0 }, { "filesfree", ll_rd_filesfree, 0, 0 }, //{ "filegroups", lprocfs_rd_filegroups, 0, 0 }, diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index 7c4c791..52e1437 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -146,7 +146,7 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, struct ll_inode_info *lli = ll_i2info(inode); struct lov_stripe_md *lsm = lli->lli_smd; obd_off offset = ((obd_off)page->index) << PAGE_SHIFT; - struct brw_page pg; + struct brw_page pga; struct obdo oa; int rc = 0; ENTRY; @@ -154,19 +154,24 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, if (!PageLocked(page)) LBUG(); - if (PageUptodate(page)) - RETURN(0); - /* Check to see if we should return -EIO right away */ - pg.pg = page; - pg.off = offset; - pg.count = PAGE_SIZE; - pg.flag = 0; - rc = obd_brw(OBD_BRW_CHECK, ll_i2obdexp(inode), NULL, lsm, 1, &pg, + pga.pg = page; + pga.off = offset; + pga.count = PAGE_SIZE; + pga.flag = 0; + + oa.o_id = lsm->lsm_object_id; + oa.o_mode = inode->i_mode; + oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE; + + rc = obd_brw(OBD_BRW_CHECK, ll_i2obdexp(inode), &oa, lsm, 1, &pga, NULL); if (rc) RETURN(rc); + if (PageUptodate(page)) + RETURN(0); + /* We're completely overwriting an existing page, so _don't_ set it up * to date until commit_write */ if (from == 0 && to == PAGE_SIZE) { @@ -183,10 +188,6 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from, GOTO(prepare_done, rc = 0); } - oa.o_id = lsm->lsm_object_id; - oa.o_mode = inode->i_mode; - oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE; - /* XXX could be an async ocp read.. read-ahead? */ rc = ll_brw(OBD_BRW_READ, inode, &oa, page, 0); if (rc == 0) { @@ -231,15 +232,15 @@ static int ll_ap_make_ready(void *data, int cmd) struct ll_async_page *llap; struct page *page; ENTRY; - + llap = llap_from_cookie(data); - if (IS_ERR(llap)) + if (IS_ERR(llap)) RETURN(-EINVAL); page = llap->llap_page; if (cmd == OBD_BRW_READ) { - /* _sync_page beat us to it and is about to call + /* _sync_page beat us to it and is about to call * _set_async_flags which will fire off rpcs again */ if (!test_and_clear_bit(LL_PRIVBITS_READ, &page->private)) RETURN(-EAGAIN); @@ -413,15 +414,15 @@ int ll_commit_write(struct file *file, struct page *page, unsigned from, /* _make_ready only sees llap once we've unlocked the page */ llap->llap_write_queued = 1; - rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie, + rc = obd_queue_async_io(exp, lsm, NULL, llap->llap_cookie, OBD_BRW_WRITE, 0, 0, 0, 0); if (rc != 0) { /* async failed, try sync.. */ struct obd_sync_io_container *osic; osic_init(&osic); llap->llap_write_queued = 0; - rc = obd_queue_sync_io(exp, lsm, NULL, osic, - llap->llap_cookie, + rc = obd_queue_sync_io(exp, lsm, NULL, osic, + llap->llap_cookie, OBD_BRW_WRITE, 0, to, 0); if (rc) GOTO(free_osic, rc); @@ -544,14 +545,13 @@ static int ll_page_matches(struct page *page) } RETURN(matches); } - -static int ll_issue_page_read(struct obd_export *exp, - struct ll_async_page *llap, - int defer_uptodate) -{ + +static int ll_issue_page_read(struct obd_export *exp, + struct ll_async_page *llap, int defer_uptodate) +{ struct page *page = llap->llap_page; int rc; - + /* we don't issue this page as URGENT so that it can be batched * with other pages by the kernel's read-ahead. We have a strong * requirement that readpage() callers must call wait_on_page() @@ -559,8 +559,8 @@ static int ll_issue_page_read(struct obd_export *exp, llap->llap_defer_uptodate = defer_uptodate; page_cache_get(page); set_bit(LL_PRIVBITS_READ, &page->private); /* see ll_sync_page() */ - rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd, - NULL, llap->llap_cookie, OBD_BRW_READ, 0, + rc = obd_queue_async_io(exp, ll_i2info(page->mapping->host)->lli_smd, + NULL, llap->llap_cookie, OBD_BRW_READ, 0, PAGE_SIZE, 0, ASYNC_COUNT_STABLE); if (rc) { LL_CDEBUG_PAGE(page, "read queueing failed\n"); diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c index 81467da..cda014e 100644 --- a/lustre/llite/rw24.c +++ b/lustre/llite/rw24.c @@ -109,18 +109,17 @@ static int ll_writepage_24(struct page *page) page_cache_get(page); if (llap->llap_write_queued) { LL_CDEBUG_PAGE(page, "marking urgent\n"); - rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, - llap->llap_cookie, ASYNC_READY | - ASYNC_URGENT); + rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, + llap->llap_cookie, + ASYNC_READY | ASYNC_URGENT); } else { llap->llap_write_queued = 1; - rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, - llap->llap_cookie, OBD_BRW_WRITE, 0, 0, - OBD_BRW_CREATE, ASYNC_READY | - ASYNC_URGENT); + rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, + llap->llap_cookie, OBD_BRW_WRITE, 0, 0, + 0, ASYNC_READY | ASYNC_URGENT); if (rc == 0) LL_CDEBUG_PAGE(page, "mmap write queued\n"); - else + else llap->llap_write_queued = 0; } if (rc) @@ -170,7 +169,7 @@ static int ll_direct_IO_24(int rw, RETURN(-ENOMEM); } - flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */; + flags = 0 /* | OBD_BRW_DIRECTIO */; offset = ((obd_off)blocknr << inode->i_blkbits); length = iobuf->length; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 7edee0d..21e884f 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -107,22 +107,21 @@ static int ll_writepage_26(struct page *page, struct writeback_control *wbc) llap = llap_from_page(page); if (IS_ERR(llap)) GOTO(out, rc = PTR_ERR(llap)); - page_cache_get(page); + page_cache_get(page); if (llap->llap_write_queued) { LL_CDEBUG_PAGE(page, "marking urgent\n"); - rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, - llap->llap_cookie, ASYNC_READY | - ASYNC_URGENT); + rc = obd_set_async_flags(exp, ll_i2info(inode)->lli_smd, NULL, + llap->llap_cookie, + ASYNC_READY | ASYNC_URGENT); } else { llap->llap_write_queued = 1; - rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, - llap->llap_cookie, OBD_BRW_WRITE, 0, 0, - OBD_BRW_CREATE, ASYNC_READY | - ASYNC_URGENT); + rc = obd_queue_async_io(exp, ll_i2info(inode)->lli_smd, NULL, + llap->llap_cookie, OBD_BRW_WRITE, 0, 0, + 0, ASYNC_READY | ASYNC_URGENT); if (rc == 0) LL_CDEBUG_PAGE(page, "mmap write queued\n"); - else + else llap->llap_write_queued = 0; } if (rc) diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 6c26a16..a565f51 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -41,6 +41,8 @@ int lov_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_mds_md *lmm, int lmm_bytes); int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp, struct lov_user_md *lump); +int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_user_md *lump); int lov_getstripe(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_user_md *lump); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 64b6a28..03506e63 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -558,6 +558,32 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa, if (!lov->desc.ld_active_tgt_count) RETURN(-EIO); + /* Recreate a specific object id at the given OST index */ + if (src_oa->o_valid & OBD_MD_FLFLAGS && src_oa->o_flags & + OBD_FL_RECREATE_OBJS) { + struct lov_stripe_md obj_md; + struct lov_stripe_md *obj_mdp = &obj_md; + + ost_idx = src_oa->o_nlink; + lsm = *ea; + if (lsm == NULL) + RETURN(-EINVAL); + if (ost_idx >= lov->desc.ld_tgt_count) + RETURN(-EINVAL); + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) { + if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id) + RETURN(-EINVAL); + break; + } + } + if (i == lsm->lsm_stripe_count) + RETURN(-EINVAL); + + rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti); + RETURN(rc); + } + ret_oa = obdo_alloc(); if (!ret_oa) RETURN(-ENOMEM); @@ -1320,10 +1346,11 @@ static int lov_sync(struct obd_export *exp, struct obdo *oa, RETURN(rc); } -static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm, +static int lov_brw_check(struct lov_obd *lov, struct obdo *oa, + struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga) { - int i; + int i, rc = 0; /* The caller just wants to know if there's a chance that this * I/O can succeed */ @@ -1342,8 +1369,12 @@ static int lov_brw_check(struct lov_obd *lov, struct lov_stripe_md *lsm, CDEBUG(D_HA, "lov idx %d inactive\n", ost); return -EIO; } + rc = obd_brw(OBD_BRW_CHECK, lov->tgts[stripe].ltd_exp, oa, + NULL, 1, &pga[i], NULL); + if (rc) + break; } - return 0; + return rc; } static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa, @@ -1370,7 +1401,7 @@ static int lov_brw(int cmd, struct obd_export *exp, struct obdo *src_oa, lov = &exp->exp_obd->u.lov; if (cmd == OBD_BRW_CHECK) { - rc = lov_brw_check(lov, lsm, oa_bufs, pga); + rc = lov_brw_check(lov, src_oa, lsm, oa_bufs, pga); RETURN(rc); } @@ -1526,7 +1557,7 @@ static int lov_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, lov = &exp->exp_obd->u.lov; if (cmd == OBD_BRW_CHECK) { - rc = lov_brw_check(lov, lsm, oa_bufs, pga); + rc = lov_brw_check(lov, oa, lsm, oa_bufs, pga); RETURN(rc); } @@ -1707,24 +1738,24 @@ int lov_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm, /* so the callback doesn't need the lsm */ lap->lap_loi_id = loi->loi_id; - rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp, + rc = obd_prep_async_page(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, page, lap->lap_sub_offset, - &lov_async_page_ops, lap, + &lov_async_page_ops, lap, &lap->lap_sub_cookie); if (rc) { OBD_FREE(lap, sizeof(*lap)); RETURN(rc); } - CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page, + CDEBUG(D_CACHE, "lap %p page %p cookie %p off "LPU64"\n", lap, page, lap->lap_sub_cookie, offset); *res = lap; RETURN(0); } -static int lov_queue_async_io(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, void *cookie, - int cmd, obd_off off, int count, +static int lov_queue_async_io(struct obd_export *exp, + struct lov_stripe_md *lsm, + struct lov_oinfo *loi, void *cookie, + int cmd, obd_off off, int count, obd_flag brw_flags, obd_flag async_flags) { struct lov_obd *lov = &exp->exp_obd->u.lov; @@ -1766,16 +1797,16 @@ static int lov_set_async_flags(struct obd_export *exp, RETURN(PTR_ERR(lap)); loi = &lsm->lsm_oinfo[lap->lap_stripe]; - rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp, + rc = obd_set_async_flags(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, lap->lap_sub_cookie, async_flags); RETURN(rc); } -static int lov_queue_sync_io(struct obd_export *exp, - struct lov_stripe_md *lsm, - struct lov_oinfo *loi, +static int lov_queue_sync_io(struct obd_export *exp, + struct lov_stripe_md *lsm, + struct lov_oinfo *loi, struct obd_sync_io_container *osic, void *cookie, - int cmd, obd_off off, int count, + int cmd, obd_off off, int count, obd_flag brw_flags) { struct lov_obd *lov = &exp->exp_obd->u.lov; @@ -1792,17 +1823,17 @@ static int lov_queue_sync_io(struct obd_export *exp, RETURN(PTR_ERR(lap)); loi = &lsm->lsm_oinfo[lap->lap_stripe]; - rc = obd_queue_sync_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, - osic, lap->lap_sub_cookie, cmd, off, count, + rc = obd_queue_sync_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, + osic, lap->lap_sub_cookie, cmd, off, count, brw_flags); RETURN(rc); } /* this isn't exactly optimal. we may have queued sync io in oscs on - * all stripes, but we don't record that fact at queue time. so we + * all stripes, but we don't record that fact at queue time. so we * trigger sync io on all stripes. */ -static int lov_trigger_sync_io(struct obd_export *exp, - struct lov_stripe_md *lsm, +static int lov_trigger_sync_io(struct obd_export *exp, + struct lov_stripe_md *lsm, struct lov_oinfo *loi, struct obd_sync_io_container *osic) { @@ -1814,7 +1845,7 @@ static int lov_trigger_sync_io(struct obd_export *exp, if (lsm_bad_magic(lsm)) RETURN(-EINVAL); - for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; + for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++, loi++) { err = obd_trigger_sync_io(lov->tgts[loi->loi_ost_idx].ltd_exp, lsm, loi, osic); @@ -2163,6 +2194,9 @@ static int lov_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case LL_IOC_LOV_GETSTRIPE: rc = lov_getstripe(exp, karg, uarg); break; + case LL_IOC_LOV_SETEA: + rc = lov_setea(exp, karg, uarg); + break; default: { int set = 0; if (count == 0) diff --git a/lustre/lov/lov_pack.c b/lustre/lov/lov_pack.c index 0053a58..ef04e68 100644 --- a/lustre/lov/lov_pack.c +++ b/lustre/lov/lov_pack.c @@ -500,6 +500,44 @@ int lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(0); } +int lov_setea(struct obd_export *exp, struct lov_stripe_md **lsmp, + struct lov_user_md *lump) +{ + int i; + int rc; + struct obd_export *oexp; + struct lov_obd *lov = &exp->exp_obd->u.lov; + obd_id last_id = 0; + + for (i = 0; i < lump->lmm_stripe_count; i++) { + __u32 len = sizeof(last_id); + oexp = lov->tgts[lump->lmm_objects[i].l_ost_idx].ltd_exp; + rc = obd_get_info(oexp, strlen("last_id"), "last_id", + &len, &last_id); + if (rc) + RETURN(rc); + if (last_id < lump->lmm_objects[i].l_object_id) { + CERROR("Setting EA for object > than last id on " + "ost idx %d "LPD64" > "LPD64" \n", + lump->lmm_objects[i].l_ost_idx, + lump->lmm_objects[i].l_object_id, last_id); + RETURN(-EINVAL); + } + } + + rc = lov_setstripe(exp, lsmp, lump); + if (rc) + RETURN(rc); + for (i = 0; i < lump->lmm_stripe_count; i++) { + (*lsmp)->lsm_oinfo[i].loi_ost_idx = + lump->lmm_objects[i].l_ost_idx; + (*lsmp)->lsm_oinfo[i].loi_id = lump->lmm_objects[i].l_object_id; + (*lsmp)->lsm_oinfo[i].loi_gr = lump->lmm_objects[i].l_object_gr; + } + RETURN(0); +} + + /* Retrieve object striping information. * * @lump is a pointer to an in-core struct with lmm_ost_count indicating diff --git a/lustre/lov/lproc_lov.c b/lustre/lov/lproc_lov.c index 2a322e6..c29644c 100644 --- a/lustre/lov/lproc_lov.c +++ b/lustre/lov/lproc_lov.c @@ -187,6 +187,7 @@ struct lprocfs_vars lprocfs_obd_vars[] = { { "blocksize", lprocfs_rd_blksize, 0, 0 }, { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "desc_uuid", lov_rd_desc_uuid, 0, 0 }, { 0 } }; diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index a45560a..91513f8 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -664,7 +664,11 @@ static int fsfilt_ext3_add_journal_cb(struct obd_device *obd, __u64 last_rcvd, static int fsfilt_ext3_statfs(struct super_block *sb, struct obd_statfs *osfs) { struct kstatfs sfs; - int rc = vfs_statfs(sb, &sfs); + int rc; + + memset(&sfs, 0, sizeof(sfs)); + + rc = sb->s_op->statfs(sb, &sfs); if (!rc && sfs.f_bfree < sfs.f_ffree) { sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree; diff --git a/lustre/lvfs/fsfilt_extN.c b/lustre/lvfs/fsfilt_extN.c index b4f3fc7..8756f9a 100644 --- a/lustre/lvfs/fsfilt_extN.c +++ b/lustre/lvfs/fsfilt_extN.c @@ -636,7 +636,11 @@ static int fsfilt_extN_add_journal_cb(struct obd_device *obd, __u64 last_rcvd, static int fsfilt_extN_statfs(struct super_block *sb, struct obd_statfs *osfs) { struct kstatfs sfs; - int rc = vfs_statfs(sb, &sfs); + int rc; + + memset(&sfs, 0, sizeof(sfs)); + + rc = sb->s_op->statfs(sb, &sfs); if (!rc && sfs.f_bfree < sfs.f_ffree) { sfs.f_files = (sfs.f_files - sfs.f_ffree) + sfs.f_bfree; diff --git a/lustre/lvfs/fsfilt_reiserfs.c b/lustre/lvfs/fsfilt_reiserfs.c index 2e16c18..9864eda 100644 --- a/lustre/lvfs/fsfilt_reiserfs.c +++ b/lustre/lvfs/fsfilt_reiserfs.c @@ -158,10 +158,15 @@ static int fsfilt_reiserfs_add_journal_cb(struct obd_device *obd, return 0; } -static int fsfilt_reiserfs_statfs(struct super_block *sb, struct obd_statfs *osfs) +static int fsfilt_reiserfs_statfs(struct super_block *sb, + struct obd_statfs *osfs) { - struct statfs sfs; - int rc = vfs_statfs(sb, &sfs); + struct kstatfs sfs; + int rc; + + memset(&sfs, 0, sizeof(sfs)); + + rc = sb->s_op->statfs(sb, &sfs); statfs_pack(osfs, &sfs); return rc; diff --git a/lustre/mdc/lproc_mdc.c b/lustre/mdc/lproc_mdc.c index 6dca228..7223b81 100644 --- a/lustre/mdc/lproc_mdc.c +++ b/lustre/mdc/lproc_mdc.c @@ -35,6 +35,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "blocksize", lprocfs_rd_blksize, 0, 0 }, { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "filestotal", lprocfs_rd_filestotal, 0, 0 }, { "filesfree", lprocfs_rd_filesfree, 0, 0 }, //{ "filegroups", lprocfs_rd_filegroups, 0, 0 }, diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index 0de8ad7..794bcf9 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -99,7 +99,8 @@ static __u32 mds_pack_open_flags(__u32 flags) { return (flags & (FMODE_READ | FMODE_WRITE | FMODE_EXEC | - MDS_OPEN_DELAY_CREATE | MDS_OPEN_HAS_EA)) | + MDS_OPEN_DELAY_CREATE | MDS_OPEN_HAS_EA | + MDS_OPEN_HAS_OBJS)) | ((flags & O_CREAT) ? MDS_OPEN_CREAT : 0) | ((flags & O_EXCL) ? MDS_OPEN_EXCL : 0) | ((flags & O_TRUNC) ? MDS_OPEN_TRUNC : 0) | diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 06dd213..c512293 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -574,7 +574,7 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, if (inode->i_size + 1 != body->eadatasize) CERROR("symlink size: %Lu, reply space: %d\n", inode->i_size + 1, body->eadatasize); - size[bufcount] = MIN(inode->i_size + 1, body->eadatasize); + size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize); bufcount++; CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n", inode->i_size + 1, body->eadatasize); @@ -784,7 +784,15 @@ out_pop: static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, unsigned long max_age) { - return fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs); + int rc; + + spin_lock(&obd->obd_osfs_lock); + rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age); + if (rc == 0) + memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); + spin_unlock(&obd->obd_osfs_lock); + + return rc; } static int mds_statfs(struct ptlrpc_request *req) @@ -800,7 +808,8 @@ static int mds_statfs(struct ptlrpc_request *req) } /* We call this so that we can cache a bit - 1 jiffie worth */ - rc = obd_statfs(obd, lustre_msg_buf(req->rq_repmsg,0,size),jiffies-HZ); + rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size), + jiffies - HZ); if (rc) { CERROR("mds_obd_statfs failed: rc %d\n", rc); GOTO(out, rc); diff --git a/lustre/mds/lproc_mds.c b/lustre/mds/lproc_mds.c index 59b3401..10365a6 100644 --- a/lustre/mds/lproc_mds.c +++ b/lustre/mds/lproc_mds.c @@ -158,6 +158,7 @@ struct lprocfs_vars lprocfs_mds_obd_vars[] = { { "blocksize", lprocfs_rd_blksize, 0, 0 }, { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "fstype", lprocfs_rd_fstype, 0, 0 }, { "filestotal", lprocfs_rd_filestotal, 0, 0 }, { "filesfree", lprocfs_rd_filesfree, 0, 0 }, diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 7a12362..80728da 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -373,28 +373,38 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME| OBD_MD_FLCTIME); - /* check if things like lstripe/lfs stripe are sending us the ea */ - if (rec->ur_flags & MDS_OPEN_HAS_EA) { - rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_osc_exp, + if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) { + /* check if things like lstripe/lfs stripe are sending us the ea */ + if (rec->ur_flags & MDS_OPEN_HAS_EA) { + rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, + mds->mds_osc_exp, + 0, &lsm, rec->ur_eadata); + if (rc) + GOTO(out_oa, rc); + } + rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti); + if (rc) { + int level = D_ERROR; + if (rc == -ENOSPC) + level = D_INODE; + CDEBUG(level, "error creating objects for " + "inode %lu: rc = %d\n", + inode->i_ino, rc); + if (rc > 0) { + CERROR("obd_create returned invalid " + "rc %d\n", rc); + rc = -EIO; + } + GOTO(out_oa, rc); + } + } else { + rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_osc_exp, 0, &lsm, rec->ur_eadata); - if (rc) + if (rc) { GOTO(out_oa, rc); - } - - rc = obd_create(mds->mds_osc_exp, oa, &lsm, &oti); - if (rc) { - int level = D_ERROR; - if (rc == -ENOSPC) - level = D_INODE; - CDEBUG(level, "error creating objects for inode %lu: rc = %d\n", - inode->i_ino, rc); - if (rc > 0) { - CERROR("obd_create returned invalid rc %d\n", rc); - rc = -EIO; } - GOTO(out_oa, rc); + lsm->lsm_object_id = oa->o_id; } - if (inode->i_size) { oa->o_size = inode->i_size; obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME| diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index dbd805e..615c102 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -30,7 +30,7 @@ #include #include #include -#else +#else #include #include #include @@ -241,7 +241,7 @@ struct obd_device *class_uuid2obd(struct obd_uuid *uuid) /* Search for a client OBD connected to tgt_uuid. If grp_uuid is specified, then only the client with that uuid is returned, otherwise any client connected to the tgt is returned. */ -struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, +struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, char * typ_name, struct obd_uuid *grp_uuid) { @@ -251,13 +251,13 @@ struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, struct obd_device *obd = &obd_dev[i]; if (obd->obd_type == NULL) continue; - if ((strncmp(obd->obd_type->typ_name, typ_name, + if ((strncmp(obd->obd_type->typ_name, typ_name, strlen(typ_name)) == 0)) { struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; if (obd_uuid_equals(tgt_uuid, &imp->imp_target_uuid) && - ((grp_uuid)? obd_uuid_equals(grp_uuid, - &obd->obd_uuid) : 1)) { + ((grp_uuid)? obd_uuid_equals(grp_uuid, + &obd->obd_uuid) : 1)) { return obd; } } @@ -597,7 +597,7 @@ int class_disconnect(struct obd_export *export, int flags) if (list_empty(&export->exp_handle.h_link)) RETURN(0); - CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n", + CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n", export->exp_handle.h_cookie); class_unlink_export(export); @@ -624,10 +624,10 @@ void class_disconnect_exports(struct obd_device *obd, int flags) list_for_each_safe(tmp, n, &work_list) { exp = list_entry(tmp, struct obd_export, exp_obd_chain); class_export_get(exp); - - if (obd_uuid_equals(&exp->exp_client_uuid, + + if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) { - CDEBUG(D_HA, + CDEBUG(D_HA, "exp %p export uuid == obd uuid, don't discon\n", exp); class_export_put(exp); @@ -688,11 +688,11 @@ void osic_add_one(struct obd_sync_io_container *osic, osic_grab(osic); } -void osic_complete_one(struct obd_sync_io_container *osic, +void osic_complete_one(struct obd_sync_io_container *osic, struct osic_callback_context *occ, int rc) { unsigned long flags; - wait_queue_head_t *wake = NULL; + wait_queue_head_t *wake = NULL; int old_rc; spin_lock_irqsave(&osic->osic_lock, flags); @@ -710,7 +710,7 @@ void osic_complete_one(struct obd_sync_io_container *osic, spin_unlock_irqrestore(&osic->osic_lock, flags); CDEBUG(D_CACHE, "osic %p completed, rc %d -> %d via %d, %d now " - "pending (racey)\n", osic, old_rc, osic->osic_rc, rc, + "pending (racey)\n", osic, old_rc, osic->osic_rc, rc, osic->osic_pending); if (wake) wake_up(wake); @@ -737,7 +737,7 @@ static void interrupted_osic(void *data) spin_lock_irqsave(&osic->osic_lock, flags); list_for_each(pos, &osic->osic_occ_list) { - occ = list_entry(pos, struct osic_callback_context, + occ = list_entry(pos, struct osic_callback_context, occ_osic_item); occ->occ_interrupted(occ); } @@ -760,10 +760,10 @@ int osic_wait(struct obd_sync_io_container *osic) lwi = (struct l_wait_info){ 0, }; } while (rc == -EINTR); - LASSERTF(osic->osic_pending == 0, + LASSERTF(osic->osic_pending == 0, "exiting osic_wait(osic = %p) with %d pending\n", osic, osic->osic_pending); - CDEBUG(D_CACHE, "done waiting on osic %p\n", osic); + CDEBUG(D_CACHE, "done waiting on osic %p rc %d\n", osic, osic->osic_rc); return osic->osic_rc; } diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index 82ceab4..e4146dc 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -215,7 +215,6 @@ int llog_process(struct llog_handle *loghandle, llog_cb_t cb, else last_index = LLOG_BITMAP_BYTES * 8 - 1; - while (rc == 0) { struct llog_rec_hdr *rec; diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 3547ea7..fadf05b 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -265,6 +265,24 @@ int lprocfs_rd_kbytesfree(char *page, char **start, off_t off, int count, return rc; } +int lprocfs_rd_kbytesavail(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_statfs osfs; + int rc = obd_statfs(data, &osfs, jiffies - HZ); + if (!rc) { + __u32 blk_size = osfs.os_bsize >> 10; + __u64 result = osfs.os_bavail; + + while (blk_size >>= 1) + result <<= 1; + + *eof = 1; + rc = snprintf(page, count, LPU64"\n", result); + } + return rc; +} + int lprocfs_rd_filestotal(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -783,6 +801,7 @@ EXPORT_SYMBOL(lprocfs_rd_numrefs); EXPORT_SYMBOL(lprocfs_rd_blksize); EXPORT_SYMBOL(lprocfs_rd_kbytestotal); EXPORT_SYMBOL(lprocfs_rd_kbytesfree); +EXPORT_SYMBOL(lprocfs_rd_kbytesavail); EXPORT_SYMBOL(lprocfs_rd_filestotal); EXPORT_SYMBOL(lprocfs_rd_filesfree); EXPORT_SYMBOL(lprocfs_rd_filegroups); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index fcaa7a7..46710aa 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -122,6 +122,8 @@ int class_attach(struct lustre_cfg *lcfg) INIT_LIST_HEAD(&obd->obd_exports); obd->obd_num_exports = 0; spin_lock_init(&obd->obd_dev_lock); + spin_lock_init(&obd->obd_osfs_lock); + obd->obd_osfs_age = jiffies - 1000 * HZ; init_waitqueue_head(&obd->obd_refcount_waitq); /* XXX belongs in setup not attach */ diff --git a/lustre/obdclass/obdo.c b/lustre/obdclass/obdo.c index 996ef58..4e8e244 100644 --- a/lustre/obdclass/obdo.c +++ b/lustre/obdclass/obdo.c @@ -183,22 +183,10 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) dst->o_flags = src->i_flags; newvalid |= OBD_MD_FLFLAGS; } - if (valid & OBD_MD_FLNLINK) { - dst->o_nlink = src->i_nlink; - newvalid |= OBD_MD_FLNLINK; - } if (valid & OBD_MD_FLGENER) { dst->o_generation = src->i_generation; newvalid |= OBD_MD_FLGENER; } - if (valid & OBD_MD_FLRDEV) { -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - dst->o_rdev = (__u32)kdev_t_to_nr(src->i_rdev); -#else - dst->o_rdev = (__u32)old_decode_dev(src->i_rdev); -#endif - newvalid |= OBD_MD_FLRDEV; - } dst->o_valid |= newvalid; } @@ -265,16 +253,8 @@ void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid) dst->i_gid = src->o_gid; if (valid & OBD_MD_FLFLAGS) dst->i_flags = src->o_flags; - if (valid & OBD_MD_FLNLINK) - dst->i_nlink = src->o_nlink; if (valid & OBD_MD_FLGENER) dst->i_generation = src->o_generation; - if (valid & OBD_MD_FLRDEV) -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - dst->i_rdev = to_kdev_t(src->o_rdev); -#else - dst->i_rdev = old_decode_dev(src->o_rdev); -#endif } EXPORT_SYMBOL(obdo_to_inode); #endif @@ -307,21 +287,10 @@ void obdo_cpy_md(struct obdo *dst, struct obdo *src, obd_flag valid) dst->o_gid = src->o_gid; if (valid & OBD_MD_FLFLAGS) dst->o_flags = src->o_flags; - /* - if (valid & OBD_MD_FLOBDFLG) - dst->o_obdflags = src->o_obdflags; - */ - if (valid & OBD_MD_FLNLINK) - dst->o_nlink = src->o_nlink; if (valid & OBD_MD_FLGENER) dst->o_generation = src->o_generation; - if (valid & OBD_MD_FLRDEV) - dst->o_rdev = src->o_rdev; - if (valid & OBD_MD_FLINLINE && - src->o_obdflags & OBD_FL_INLINEDATA) { + if (valid & OBD_MD_FLINLINE) memcpy(dst->o_inline, src->o_inline, sizeof(src->o_inline)); - dst->o_obdflags |= OBD_FL_INLINEDATA; - } dst->o_valid |= valid; } diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 36beb06..c627f82 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -221,7 +221,7 @@ int echo_preprw(int cmd, struct obd_export *export, struct obdo *oa, RETURN(-EINVAL); /* Temp fix to stop falling foul of osc_announce_cached() */ - oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLRDEV); + oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT); memset(res, 0, sizeof(*res) * niocount); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index fa591b2..d3d79ad 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -784,8 +784,8 @@ static int echo_client_async_page(struct obd_export *exp, int rw, eas.eas_next_offset += PAGE_SIZE; eap->eap_off = eas.eas_next_offset; - rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, - eap->eap_off, &ec_async_page_ops, + rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, + eap->eap_off, &ec_async_page_ops, eap, &eap->eap_cookie); if (rc) { spin_lock_irqsave(&eas.eas_lock, flags); @@ -794,8 +794,8 @@ static int echo_client_async_page(struct obd_export *exp, int rw, } /* always asserts urgent, which isn't quite right */ - rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, - rw, 0, PAGE_SIZE, 0, + rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, + rw, 0, PAGE_SIZE, 0, ASYNC_READY | ASYNC_URGENT | ASYNC_COUNT_STABLE); spin_lock_irqsave(&eas.eas_lock, flags); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index f8b2ed1..d576705 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -93,7 +93,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, filter->fo_fsd->fsd_last_transno = cpu_to_le64(last_rcvd); spin_unlock(&filter->fo_translock); oti->oti_transno = last_rcvd; - } else { + } else { spin_lock(&filter->fo_translock); last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(filter->fo_fsd->fsd_last_transno)) @@ -1017,7 +1017,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) * file then this enqueue will communicate the DISCARD to all the * clients. This assumes that we always destroy all the objects for * a file at a time, as is currently the case. If we're not the - * OST at stripe 0 then we'll harmlessly get a very lonely lock in + * OST at stripe 0 then we'll harmlessly get a very lonely lock in * the local DLM and immediately drop it. */ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_EXTENT, &extent, @@ -1045,7 +1045,7 @@ static int filter_destroy_internal(struct obd_device *obd, obd_id objid, if (inode->i_nlink != 1 || atomic_read(&inode->i_count) != 1) { CERROR("destroying objid %*s nlink = %lu, count = %d\n", dchild->d_name.len, dchild->d_name.name, - (unsigned long)inode->i_nlink, + (unsigned long)inode->i_nlink, atomic_read(&inode->i_count)); } @@ -1320,6 +1320,11 @@ static int filter_destroy_export(struct obd_export *exp) { ENTRY; + if (exp->exp_filter_data.fed_pending) + CERROR("%s: cli %s/%p has %lu pending on destroyed export\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + exp, exp->exp_filter_data.fed_pending); + target_destroy_export(exp); if (exp->exp_obd->obd_replayable) @@ -1330,20 +1335,50 @@ static int filter_destroy_export(struct obd_export *exp) /* also incredibly similar to mds_disconnect */ static int filter_disconnect(struct obd_export *exp, int flags) { + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct filter_export_data *fed = &exp->exp_filter_data; unsigned long irqflags; struct llog_ctxt *ctxt; int rc; ENTRY; LASSERT(exp); + + /* This would imply RPCs still in flight or preprw/commitrw imbalance */ + if (fed->fed_pending) + CWARN("%s: cli %s has %lu pending at disconnect time\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + fed->fed_pending); + + /* Forget what this client had cached. This is also done on the + * client when it invalidates its import. Do this before unlinking + * from the export list so filter_grant_sanity_check totals are OK. */ + spin_lock(&exp->exp_obd->obd_osfs_lock); + LASSERTF(exp->exp_obd->u.filter.fo_tot_dirty >= fed->fed_dirty, + "%s: tot_dirty "LPU64" cli %s/%p fed_dirty %lu\n", + exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_dirty, + exp->exp_client_uuid.uuid, exp, fed->fed_dirty); + exp->exp_obd->u.filter.fo_tot_dirty -= fed->fed_dirty; + LASSERTF(exp->exp_obd->u.filter.fo_tot_granted >= fed->fed_grant, + "%s: tot_granted "LPU64" cli %s/%p fed_grant %lu\n", + exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_granted, + exp->exp_client_uuid.uuid, exp, fed->fed_grant); + exp->exp_obd->u.filter.fo_tot_granted -= fed->fed_grant; + LASSERTF(exp->exp_obd->u.filter.fo_tot_pending >= fed->fed_pending, + "%s: tot_pending "LPU64" cli %s/%p fed_pending %lu\n", + exp->exp_obd->obd_name, exp->exp_obd->u.filter.fo_tot_pending, + exp->exp_client_uuid.uuid, exp, fed->fed_pending); + fed->fed_dirty = 0; + fed->fed_grant = 0; + spin_unlock(&exp->exp_obd->obd_osfs_lock); + ldlm_cancel_locks_for_export(exp); spin_lock_irqsave(&exp->exp_lock, irqflags); exp->exp_flags = flags; spin_unlock_irqrestore(&exp->exp_lock, irqflags); - fsfilt_sync(exp->exp_obd, exp->exp_obd->u.filter.fo_sb); - /* XXX cleanup preallocated inodes */ + fsfilt_sync(exp->exp_obd, filter->fo_sb); /* flush any remaining cancel messages out to the target */ ctxt = llog_get_context(exp->exp_obd, LLOG_UNLINK_REPL_CTXT); @@ -1554,21 +1589,21 @@ static int filter_should_precreate(struct obd_export *exp, struct obdo *oa, diff = oa->o_id - filter_last_id(filter, oa); CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n", filter_last_id(filter, oa), diff); - + /* delete orphans request */ - if ((oa->o_valid & OBD_MD_FLFLAGS) && + if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_DELORPHAN)) { if (diff >= 0) RETURN(diff); filter_destroy_precreated(exp, oa, filter); rc = filter_update_last_objid(obd, group, 0); if (rc) - CERROR("unable to write lastobjid, but orphans" + CERROR("unable to write lastobjid, but orphans" "were deleted\n"); RETURN(0); } else { /* only precreate if group == 0 and o_id is specfied */ - if (!(oa->o_valid & OBD_FL_DELORPHAN) && + if (!(oa->o_valid & OBD_FL_DELORPHAN) && (group != 0 || oa->o_id == 0)) RETURN(1); @@ -1596,15 +1631,33 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, struct dentry *dparent; int err = 0, rc = 0, i; __u64 next_id; + int recreate_obj = 0; void *handle = NULL; ENTRY; filter = &obd->u.filter; + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + recreate_obj = 1; + } + for (i = 0; i < *num && err == 0; i++) { int cleanup_phase = 0; - next_id = filter_last_id(filter, oa) + 1; + if (recreate_obj) { + __u64 last_id; + next_id = oa->o_id; + last_id = filter_last_id(filter, NULL); + if (next_id > last_id) { + CERROR("Error: Trying to recreate obj greater" + "than last id "LPD64" > "LPD64"\n", + next_id, last_id); + RETURN(-EINVAL); + } + } else + next_id = filter_last_id(filter, NULL) + 1; + CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id); dparent = filter_parent_lock(obd, group, next_id, LCK_PW, @@ -1620,9 +1673,18 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, if (dchild->d_inode != NULL) { /* This would only happen if lastobjid was bad on disk*/ - CERROR("Serious error: objid %*s already exists; is " - "this filesystem corrupt?\n", - dchild->d_name.len, dchild->d_name.name); + /* Could also happen if recreating missing obj but + * already exists + */ + if (recreate_obj) { + CERROR("Serious error: recreating obj %*s but " + "obj already exists \n", + dchild->d_name.len, dchild->d_name.name); + } else { + CERROR("Serious error: objid %*s already " + "exists; is this filesystem corrupt?\n", + dchild->d_name.len, dchild->d_name.name); + } GOTO(cleanup, rc = -EEXIST); } @@ -1636,12 +1698,15 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, if (rc) { CERROR("create failed rc = %d\n", rc); GOTO(cleanup, rc); - } + } - filter_set_last_id(filter, oa, next_id); - err = filter_update_last_objid(obd, group, 0); - if (err) - CERROR("unable to write lastobjid but file created\n"); + if (!recreate_obj) { + filter_set_last_id(filter, NULL, next_id); + err = filter_update_last_objid(obd, group, 0); + if (err) + CERROR("unable to write lastobjid " + "but file created\n"); + } cleanup: switch(cleanup_phase) { @@ -1659,7 +1724,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa, case 0: break; } - + if (rc) break; } @@ -1696,12 +1761,18 @@ static int filter_create(struct obd_export *exp, struct obdo *oa, obd = exp->exp_obd; push_ctxt(&saved, &obd->obd_ctxt, NULL); - diff = filter_should_precreate(exp, oa, group); - if (diff > 0) { - oa->o_id = filter_last_id(&obd->u.filter, oa); + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + diff = 1; rc = filter_precreate(obd, oa, group, &diff); - oa->o_id += diff; - oa->o_valid = OBD_MD_FLID; + } else { + diff = filter_should_precreate(exp, oa, group); + if (diff > 0) { + oa->o_id = filter_last_id(&obd->u.filter, oa); + rc = filter_precreate(obd, oa, group, &diff); + oa->o_id += diff; + oa->o_valid = OBD_MD_FLID; + } } pop_ctxt(&saved, &obd->obd_ctxt, NULL); @@ -1754,7 +1825,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa, cleanup_phase = 2; if (dchild->d_inode == NULL) { - CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n", + CDEBUG(D_INODE, "destroying non-existent object "LPU64"\n", oa->o_id); GOTO(cleanup, rc = -ENOENT); } @@ -1905,11 +1976,90 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa, RETURN(rc); } +/* debugging to make sure that nothing bad happens, can be turned off soon. + * caller must hold osfs lock */ +static void filter_grant_total_exports(struct obd_device *obd, + obd_size *tot_dirty, + obd_size *tot_pending, + obd_size *tot_granted, + obd_size maxsize) +{ + struct filter_export_data *fed; + struct obd_export *exp_pos; + + spin_lock(&obd->obd_dev_lock); + list_for_each_entry(exp_pos, &obd->obd_exports, exp_obd_chain) { + fed = &exp_pos->exp_filter_data; + LASSERTF(fed->fed_dirty <= maxsize, "cli %s/%p %lu > "LPU64, + exp_pos->exp_client_uuid.uuid, exp_pos, + fed->fed_dirty, maxsize); + LASSERTF(fed->fed_grant + fed->fed_pending <= maxsize, + "cli %s/%p %lu+%lu > "LPU64, + exp_pos->exp_client_uuid.uuid, exp_pos, + fed->fed_grant, fed->fed_pending, maxsize); + *tot_dirty += fed->fed_dirty; + *tot_pending += fed->fed_pending; + *tot_granted += fed->fed_grant + fed->fed_pending; + } + spin_unlock(&obd->obd_dev_lock); +} + +static void filter_grant_sanity_check(obd_size tot_dirty, obd_size tot_pending, + obd_size tot_granted, + obd_size fo_tot_dirty, + obd_size fo_tot_pending, + obd_size fo_tot_granted, obd_size maxsize) +{ + LASSERTF(tot_dirty == fo_tot_dirty, LPU64" != "LPU64, + tot_dirty, fo_tot_dirty); + LASSERTF(tot_pending == fo_tot_pending, LPU64" != "LPU64, + tot_pending, fo_tot_pending); + LASSERTF(tot_granted == fo_tot_granted, LPU64" != "LPU64, + tot_granted, fo_tot_granted); + LASSERTF(tot_dirty <= maxsize, LPU64" > "LPU64, tot_dirty, maxsize); + LASSERTF(tot_pending <= tot_granted, LPU64" > "LPU64, tot_pending, + tot_granted); + LASSERTF(tot_granted <= maxsize, LPU64" > "LPU64, tot_granted, maxsize); +} + static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, unsigned long max_age) { + struct filter_obd *filter = &obd->u.filter; + obd_size tot_cached = 0, tot_pending = 0, tot_granted = 0; + obd_size fo_tot_cached, fo_tot_pending, fo_tot_granted; + int blockbits = filter->fo_sb->s_blocksize_bits; + int rc; ENTRY; - RETURN(fsfilt_statfs(obd, obd->u.filter.fo_sb, osfs)); + + /* at least try to account for cached pages. its still racey and + * might be under-reporting if clients haven't announced their + * caches with brw recently */ + spin_lock(&obd->obd_osfs_lock); + rc = fsfilt_statfs(obd, filter->fo_sb, max_age); + memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); + filter_grant_total_exports(obd, &tot_cached, &tot_pending, &tot_granted, + osfs->os_blocks << blockbits); + fo_tot_cached = filter->fo_tot_dirty; + fo_tot_pending = filter->fo_tot_pending; + fo_tot_granted = filter->fo_tot_granted; + spin_unlock(&obd->obd_osfs_lock); + + /* Do check outside spinlock, to avoid wedging system on failure */ + filter_grant_sanity_check(tot_cached, tot_pending, tot_granted, + fo_tot_cached, fo_tot_pending, + fo_tot_granted, osfs->os_blocks << blockbits); + + CDEBUG(D_SUPER | D_CACHE, "blocks cached "LPU64" granted "LPU64 + "pending "LPU64" free "LPU64" avail "LPU64"\n", + tot_cached >> blockbits, tot_granted >> blockbits, + tot_pending >> blockbits, osfs->os_bfree, osfs->os_bavail); + + osfs->os_bavail -= min(osfs->os_bavail, + (tot_cached +tot_pending +osfs->os_bsize -1) >> + blockbits); + + RETURN(rc); } static int filter_get_info(struct obd_export *exp, __u32 keylen, @@ -2000,7 +2150,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, BDEVNAME_DECLARE_STORAGE(tmp); CERROR("setting device %s read-only\n", ll_bdevname(sb, tmp)); - + handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL); LASSERT(handle); (void)fsfilt_commit(obd, inode, handle, 1); @@ -2015,18 +2165,18 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp, } case OBD_IOC_LLOG_CANCEL: - case OBD_IOC_LLOG_REMOVE: + case OBD_IOC_LLOG_REMOVE: case OBD_IOC_LLOG_INFO: case OBD_IOC_LLOG_PRINT: { /* FIXME to be finished */ RETURN(-EOPNOTSUPP); /* struct llog_ctxt *ctxt = NULL; - + push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); rc = llog_ioctl(ctxt, cmd, data); pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL); - + RETURN(rc); */ } @@ -2046,12 +2196,12 @@ static struct llog_operations filter_size_orig_logops = { }; static int filter_llog_init(struct obd_device *obd, struct obd_device *tgt, - int count, struct llog_logid *logid) + int count, struct llog_logid *logid) { struct llog_ctxt *ctxt; int rc; ENTRY; - + filter_unlink_repl_logops = llog_client_ops; filter_unlink_repl_logops.lop_cancel = llog_obd_repl_cancel; filter_unlink_repl_logops.lop_connect = llog_repl_connect; @@ -2074,7 +2224,7 @@ static int filter_llog_finish(struct obd_device *obd, int count) { int rc; ENTRY; - + rc = llog_cleanup(llog_get_context(obd, LLOG_UNLINK_REPL_CTXT)); if (rc) RETURN(rc); diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index ce7b4a3..06d852c 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -48,6 +48,8 @@ #define FILTER_INCOMPAT_GROUPS 0x00000001 #define FILTER_INCOMPAT_SUPP (FILTER_INCOMPAT_GROUPS) +#define FILTER_GRANT_CHUNK (2ULL*1024*1024) + /* Data stored per server at the head of the last_rcvd file. In le32 order. * Try to keep this the same as mds_server_data so we might one day merge. */ struct filter_server_data { @@ -128,15 +130,20 @@ void flip_into_page_cache(struct inode *inode, struct page *new_page); int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti); +obd_size filter_grant_space_left(struct obd_export *exp); +long filter_grant(struct obd_export *exp, obd_size current_grant, + obd_size want, obd_size fs_space_left); +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res); /* filter_log.c */ struct ost_filterdata { __u32 ofd_epoch; }; -int filter_log_sz_change(struct llog_handle *cathandle, +int filter_log_sz_change(struct llog_handle *cathandle, struct ll_fid *mds_fid, __u32 io_epoch, - struct llog_cookie *logcookie, + struct llog_cookie *logcookie, struct inode *inode); //int filter_get_catalog(struct obd_device *); void filter_cancel_cookies_cb(struct obd_device *obd, __u64 transno, diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index f4581bb..c2867b5 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -99,12 +99,171 @@ err_page: return lnb->rc; } +/* Grab the dirty and seen grant announcements from the incoming obdo. + * We will later calculate the clients new grant and return it. + * Caller must hold osfs lock */ +static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa) +{ + struct filter_export_data *fed; + struct obd_device *obd = exp->exp_obd; + ENTRY; + + if ((oa->o_valid & (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) != + (OBD_MD_FLBLOCKS|OBD_MD_FLGRANT)) { + oa->o_valid &= ~OBD_MD_FLGRANT; + EXIT; + return; + } + + fed = &exp->exp_filter_data; + + /* Add some margin, since there is a small race if other RPCs arrive + * out-or-order and have already consumed some grant. We want to + * leave this here in case there is a large error in accounting. */ + CDEBUG(oa->o_grant > fed->fed_grant + FILTER_GRANT_CHUNK ? + D_ERROR : D_CACHE, + "%s: cli %s reports granted: "LPU64" dropped: %u, local: %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, oa->o_grant, + oa->o_dropped, fed->fed_grant); + + /* Update our accounting now so that statfs takes it into account. + * Note that fed_dirty is only approximate and can become incorrect + * if RPCs arrive out-of-order. No important calculations depend + * on fed_dirty however. */ + obd->u.filter.fo_tot_dirty += oa->o_dirty - fed->fed_dirty; + if (fed->fed_grant < oa->o_dropped) { + CERROR("%s: cli %s reports %u dropped > fed_grant %lu\n", + obd->obd_name, exp->exp_client_uuid.uuid, + oa->o_dropped, fed->fed_grant); + oa->o_dropped = 0; + } + if (obd->u.filter.fo_tot_granted < oa->o_dropped) { + CERROR("%s: cli %s reports %u dropped > tot_granted "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, + oa->o_dropped, obd->u.filter.fo_tot_granted); + oa->o_dropped = 0; + } + obd->u.filter.fo_tot_granted -= oa->o_dropped; + fed->fed_grant -= oa->o_dropped; + fed->fed_dirty = oa->o_dirty; + EXIT; +} + +#define GRANT_FOR_LLOG 16 + +/* Figure out how much space is available between what we've granted + * and what remains in the filesystem. Compensate for ext3 indirect + * block overhead when computing how much free space is left ungranted. + * + * Caller must hold obd_osfs_lock. */ +obd_size filter_grant_space_left(struct obd_export *exp) +{ + struct obd_device *obd = exp->exp_obd; + int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + obd_size tot_granted = obd->u.filter.fo_tot_granted, avail, left = 0; + int rc, statfs_done = 0; + + if (time_before(obd->obd_osfs_age, jiffies - HZ)) { +restat: + rc = fsfilt_statfs(obd, obd->u.filter.fo_sb, jiffies + 1); + if (rc) /* N.B. statfs can't really fail */ + RETURN(0); + statfs_done = 1; + } + + avail = obd->obd_osfs.os_bavail; + left = avail - (avail >> (blockbits - 3)); /* (d)indirect */ + if (left > GRANT_FOR_LLOG) { + left = (left - GRANT_FOR_LLOG) << blockbits; + } else { + left = 0 /* << blockbits */; + } + + if (!statfs_done && left < 32 * FILTER_GRANT_CHUNK + tot_granted) { + CDEBUG(D_CACHE, "fs has no space left and statfs too old\n"); + goto restat; + } + + if (left >= tot_granted) { + left -= tot_granted; + } else { + static unsigned long next; + if (left < tot_granted - obd->u.filter.fo_tot_pending && + time_after(jiffies, next)) { + spin_unlock(&obd->obd_osfs_lock); + CERROR("%s: cli %s granted "LPU64" more than available " + LPU64" and pending "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, tot_granted, left, + obd->u.filter.fo_tot_pending); + if (next == 0) + portals_debug_dumplog(); + next = jiffies + 20 * HZ; + spin_lock(&obd->obd_osfs_lock); + } + left = 0; + } + + CDEBUG(D_CACHE, "%s: cli %s free: "LPU64" avail: "LPU64" grant "LPU64 + " left: "LPU64" pending: "LPU64"\n", obd->obd_name, + exp->exp_client_uuid.uuid, obd->obd_osfs.os_bfree << blockbits, + avail << blockbits, tot_granted, left, + obd->u.filter.fo_tot_pending); + + return left; +} + +/* Calculate how much grant space to allocate to this client, based on how + * much space is currently free and how much of that is already granted. + * + * Caller must hold obd_osfs_lock. */ +long filter_grant(struct obd_export *exp, obd_size current_grant, + obd_size want, obd_size fs_space_left) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_export_data *fed = &exp->exp_filter_data; + int blockbits = obd->u.filter.fo_sb->s_blocksize_bits; + __u64 grant = 0; + + /* Grant some fraction of the client's requested grant space so that + * they are not always waiting for write credits (not all of it to + * avoid overgranting in face of multiple RPCs in flight). This + * essentially will be able to control the OSC_MAX_RIF for a client. + * + * If we do have a large disparity and multiple RPCs in flight we + * might grant "too much" but that's OK because it means we are + * dirtying a lot on the client and will likely use it up quickly. */ + if (current_grant < want) { + grant = min((want >> blockbits) / 2, + (fs_space_left >> blockbits) / 8); + grant <<= blockbits; + + if (grant) { + if (grant > FILTER_GRANT_CHUNK) + grant = FILTER_GRANT_CHUNK; + + obd->u.filter.fo_tot_granted += grant; + fed->fed_grant += grant; + } + } + + CDEBUG(D_CACHE,"%s: cli %s wants: "LPU64" granting: "LPU64"\n", + obd->obd_name, exp->exp_client_uuid.uuid, want, grant); + CDEBUG(D_CACHE, + "%s: cli %s tot cached:"LPU64" granted:"LPU64 + " num_exports: %d\n", obd->obd_name, exp->exp_client_uuid.uuid, + obd->u.filter.fo_tot_dirty, + obd->u.filter.fo_tot_granted, obd->obd_num_exports); + + return grant; +} + static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, struct niobuf_local *res, struct obd_trans_info *oti) { + struct obd_device *obd = exp->exp_obd; struct obd_run_ctxt saved; struct obd_ioobj *o; struct niobuf_remote *rnb; @@ -119,6 +278,21 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, /* We are currently not supporting multi-obj BRW_READ RPCS at all. * When we do this function's dentry cleanup will need to be fixed */ LASSERT(objcount == 1); + LASSERT(obj->ioo_bufcnt > 0); + + if (oa && oa->o_valid & OBD_MD_FLGRANT) { + spin_lock(&obd->obd_osfs_lock); + filter_grant_incoming(exp, oa); + +#if 0 + /* Reads do not increase grants */ + oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty, + filter_grant_space_left(exp)); +#else + oa->o_grant = 0; +#endif + spin_unlock(&obd->obd_osfs_lock); + } OBD_ALLOC(fso, objcount * sizeof(*fso)); if (fso == NULL) @@ -130,7 +304,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, for (i = 0, o = obj; i < objcount; i++, o++) { LASSERT(o->ioo_bufcnt); - dentry = filter_oa2dentry(exp->exp_obd, oa); + dentry = filter_oa2dentry(obd, oa); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); @@ -160,7 +334,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; - lnb->start = jiffies; if (inode->i_size <= rnb->offset) { /* If there's no more data, abort early. @@ -195,8 +368,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, CDEBUG(D_INFO, "start_page_read: %lu jiffies\n", (jiffies - now)); - lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES, - tot_bytes); + lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); while (lnb-- > res) { rc = filter_finish_page_read(lnb); if (rc) { @@ -235,6 +407,111 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, return rc; } +/* When clients have dirtied as much space as they've been granted they + * fall through to sync writes. These sync writes haven't been expressed + * in grants and need to error with ENOSPC when there isn't room in the + * filesystem for them after grants are taken into account. However, + * writeback of the dirty data that was already granted space can write + * right on through. + * + * Caller must hold obd_osfs_lock. */ +static int filter_grant_check(struct obd_export *exp, int objcount, + struct fsfilt_objinfo *fso, int niocount, + struct niobuf_remote *rnb, + struct niobuf_local *lnb, obd_size *left, + struct inode *inode) +{ + struct filter_export_data *fed = &exp->exp_filter_data; + int blocksize = exp->exp_obd->u.filter.fo_sb->s_blocksize; + unsigned long used = 0, ungranted = 0, using; + int i, rc = -ENOSPC, obj, n = 0, mask = D_CACHE; + + for (obj = 0; obj < objcount; obj++) { + for (i = 0; i < fso[obj].fso_bufcnt; i++, n++) { + int tmp, bytes; + + /* FIXME: this is calculated with PAGE_SIZE on client */ + bytes = rnb[n].len; + bytes += rnb[n].offset & (blocksize - 1); + tmp = (rnb[n].offset + rnb[n].len) & (blocksize - 1); + if (tmp) + bytes += blocksize - tmp; + + if (rnb[n].flags & OBD_BRW_FROM_GRANT) { + if (fed->fed_grant < used + bytes) { + CDEBUG(D_CACHE, + "%s: cli %s claims %ld+%d GRANT," + " no such grant %lu, idx %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, + used, bytes, fed->fed_grant, n); + mask = D_ERROR; + } else { + used += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + lnb[n].lnb_grant_used = bytes; + CDEBUG(0, "idx %d used=%lu\n", n, used); + rc = 0; + continue; + } + } + if (*left > ungranted) { + /* if enough space, pretend it was granted */ + ungranted += bytes; + rnb[n].flags |= OBD_BRW_GRANTED; + CDEBUG(0, "idx %d ungranted=%lu\n",n,ungranted); + rc = 0; + continue; + } + + /* We can't check for already-mapped blocks here, as + * it requires dropping the osfs lock to do the bmap. + * Instead, we return ENOSPC and in that case we need + * to go through and verify if all of the blocks not + * marked BRW_GRANTED are already mapped and we can + * ignore this error. */ + lnb[n].rc = -ENOSPC; + rnb[n].flags &= OBD_BRW_GRANTED; + CDEBUG(D_CACHE, "%s: cli %s idx %d no space for %d\n", + exp->exp_obd->obd_name, + exp->exp_client_uuid.uuid, n, bytes); + } + } + + /* Now substract what client have used already. We don't subtract + * this from the tot_granted yet, so that other client's can't grab + * that space before we have actually allocated our blocks. That + * happens in filter_grant_commit() after the writes are done. */ + *left -= ungranted; + fed->fed_grant -= used; + fed->fed_pending += used; + exp->exp_obd->u.filter.fo_tot_pending += used; + + CDEBUG(mask, + "%s: cli %s used: %lu ungranted: %lu grant: %lu dirty: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, used, + ungranted, fed->fed_grant, fed->fed_dirty); + + /* Rough calc in case we don't refresh cached statfs data */ + using = (used + ungranted + 1 ) >> + exp->exp_obd->u.filter.fo_sb->s_blocksize_bits; + if (exp->exp_obd->obd_osfs.os_bavail > using) + exp->exp_obd->obd_osfs.os_bavail -= using; + else + exp->exp_obd->obd_osfs.os_bavail = 0; + + if (fed->fed_dirty < used) { + CERROR("%s: cli %s claims used %lu > fed_dirty %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, + used, fed->fed_dirty); + used = fed->fed_dirty; + } + exp->exp_obd->u.filter.fo_tot_dirty -= used; + fed->fed_dirty -= used; + + return rc; +} + static int filter_start_page_write(struct inode *inode, struct niobuf_local *lnb) { @@ -272,11 +549,12 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, { struct obd_run_ctxt saved; struct niobuf_remote *rnb; - struct niobuf_local *lnb = NULL; + struct niobuf_local *lnb; struct fsfilt_objinfo fso; struct dentry *dentry; - int rc = 0, i, tot_bytes = 0; + obd_size left; unsigned long now = jiffies; + int rc = 0, i, tot_bytes = 0, cleanup_phase = 1; ENTRY; LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); @@ -305,25 +583,47 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n", (jiffies - now)); + spin_lock(&exp->exp_obd->obd_osfs_lock); + if (oa) + filter_grant_incoming(exp, oa); + cleanup_phase = 0; + + left = filter_grant_space_left(exp); + + rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res, + &left, dentry->d_inode); + if (oa && oa->o_valid & OBD_MD_FLGRANT) + oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); + + spin_unlock(&exp->exp_obd->obd_osfs_lock); + + if (rc) { + f_dput(dentry); + GOTO(cleanup, rc); + } + for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt; i++, lnb++, rnb++) { + /* We still set up for ungranted pages so that granted pages + * can be written to disk as they were promised, and portals + * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; - lnb->start = jiffies; rc = filter_start_page_write(dentry->d_inode, lnb); if (rc) { - CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@" - LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset, + CDEBUG(D_ERROR, "page err %u@"LPU64" %u/%u %p: rc %d\n", + lnb->len, lnb->offset, i, obj->ioo_bufcnt, dentry, rc); while (lnb-- > res) __free_pages(lnb->page, 0); f_dput(dentry); GOTO(cleanup, rc); } - tot_bytes += lnb->len; + if (lnb->rc == 0) + tot_bytes += lnb->len; } if (time_after(jiffies, now + 15 * HZ)) @@ -336,6 +636,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, tot_bytes); EXIT; cleanup: + switch(cleanup_phase) { + case 1: + spin_lock(&exp->exp_obd->obd_osfs_lock); + if (oa) + filter_grant_incoming(exp, oa); + spin_unlock(&exp->exp_obd->obd_osfs_lock); + default: ; + } pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL); return rc; } @@ -432,7 +740,37 @@ void flip_into_page_cache(struct inode *inode, struct page *new_page) } while (rc != 0); } -/* XXX needs to trickle its oa down */ +void filter_grant_commit(struct obd_export *exp, int niocount, + struct niobuf_local *res) +{ + struct filter_obd *filter = &exp->exp_obd->u.filter; + struct niobuf_local *lnb = res; + unsigned long pending = 0; + int i; + + spin_lock(&exp->exp_obd->obd_osfs_lock); + for (i = 0, lnb = res; i < niocount; i++, lnb++) + pending += lnb->lnb_grant_used; + + LASSERTF(exp->exp_filter_data.fed_pending >= pending, + "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + exp->exp_filter_data.fed_pending, pending); + exp->exp_filter_data.fed_pending -= pending; + LASSERTF(filter->fo_tot_granted >= pending, + "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + exp->exp_obd->u.filter.fo_tot_granted, pending); + filter->fo_tot_granted -= pending; + LASSERTF(filter->fo_tot_pending >= pending, + "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n", + exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, + filter->fo_tot_pending, pending); + filter->fo_tot_pending -= pending; + + spin_unlock(&exp->exp_obd->obd_osfs_lock); +} + int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti) diff --git a/lustre/obdfilter/filter_io_24.c b/lustre/obdfilter/filter_io_24.c index 32adb9f..eced509 100644 --- a/lustre/obdfilter/filter_io_24.c +++ b/lustre/obdfilter/filter_io_24.c @@ -202,6 +202,27 @@ cleanup: return rc; } +/* See if there are unallocated parts in given file region */ +static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len) +{ + int (*fs_bmap)(struct address_space *, long) = + inode->i_mapping->a_ops->bmap; + int j; + + /* We can't know if the range is mapped already or not */ + if (fs_bmap == NULL) + return 0; + + offset >>= inode->i_blkbits; + len >>= inode->i_blkbits; + + for (j = 0; j <= len; j++) + if (fs_bmap(inode->i_mapping, offset + j) == 0) + return 0; + + return 1; +} + int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti) @@ -213,7 +234,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct iattr iattr = { 0 }; struct kiobuf *iobuf; struct inode *inode = NULL; - int rc = 0, i, cleanup_phase = 0, err; + int rc = 0, i, n, cleanup_phase = 0, err; unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */ void *wait_handle; ENTRY; @@ -234,18 +255,29 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, GOTO(cleanup, rc); iobuf->offset = 0; - iobuf->length = PAGE_SIZE * obj->ioo_bufcnt; - iobuf->nr_pages = obj->ioo_bufcnt; + iobuf->length = 0; + iobuf->nr_pages = 0; cleanup_phase = 1; fso.fso_dentry = res->dentry; fso.fso_bufcnt = obj->ioo_bufcnt; inode = res->dentry->d_inode; - iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); - for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) { + for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) { loff_t this_size; - iobuf->maplist[i] = lnb->page; + + /* If overwriting an existing block, we don't need a grant */ + if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC && + filter_range_is_mapped(inode, lnb->offset, lnb->len)) + lnb->rc = 0; + + if (lnb->rc) /* ENOSPC, network RPC error */ + continue; + + iobuf->maplist[n++] = lnb->page; + iobuf->length += PAGE_SIZE; + iobuf->nr_pages++; + /* We expect these pages to be in offset order, but we'll * be forgiving */ this_size = lnb->offset + lnb->len; @@ -270,6 +302,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, if (time_after(jiffies, now + 15 * HZ)) CERROR("slow brw_start %lus\n", (jiffies - now) / HZ); + iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr, oti, &wait_handle); if (rc == 0) @@ -278,6 +311,7 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, if (time_after(jiffies, now + 15 * HZ)) CERROR("slow direct_io %lus\n", (jiffies - now) / HZ); + filter_grant_commit(exp, niocount, res); err = fsfilt_commit_wait(obd, inode, wait_handle); if (err) rc = err; diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index b312f8b1..fb43702 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -37,9 +37,6 @@ #warning "implement writeback mode -bzzz" -int ext3_map_inode_page(struct inode *inode, struct page *page, - unsigned long *blocks, int *created, int create); - /* 512byte block min */ #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512) struct dio_request { @@ -77,6 +74,27 @@ static int can_be_merged(struct bio *bio, sector_t sector) return bio->bi_sector + size == sector ? 1 : 0; } +/* See if there are unallocated parts in given file region */ +static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len) +{ + sector_t (*fs_bmap)(struct address_space *, sector_t) = + inode->i_mapping->a_ops->bmap; + int j; + + /* We can't know if we are overwriting or not */ + if (fs_bmap == NULL) + return 0; + + offset >>= inode->i_blkbits; + len >>= inode->i_blkbits; + + for (j = 0; j <= len; j++) + if (fs_bmap(inode->i_mapping, offset + j) == 0) + return 0; + + return 1; +} + int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti) @@ -128,14 +146,23 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, if (time_after(jiffies, now + 15 * HZ)) CERROR("slow brw_start %lus\n", (jiffies - now) / HZ); + iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) { loff_t this_size; sector_t sector; int offs; + /* If overwriting an existing block, we don't need a grant */ + if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC && + filter_range_is_mapped(inode, lnb->offset, lnb->len)) + lnb->rc = 0; + + if (lnb->rc) /* ENOSPC, network RPC error */ + continue; + /* get block number for next page */ - rc = ext3_map_inode_page(inode, lnb->page, dreq->blocks, - dreq->created, 1); + rc = fsfilt_map_inode_page(obd, inode, lnb->page, dreq->blocks, + dreq->created, 1); if (rc) GOTO(cleanup, rc); @@ -175,6 +202,8 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, submit_bio(WRITE, bio); } + filter_grant_commit(exp, niocount, res); + /* time to wait for I/O completion */ wait_event(dreq->wait, atomic_read(&dreq->numreqs) == 0); @@ -187,7 +216,6 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, if (rc == 0) { down(&inode->i_sem); - inode_update_time(inode, 1); if (iattr.ia_size > inode->i_size) { CDEBUG(D_INFO, "setting i_size to "LPU64"\n", iattr.ia_size); diff --git a/lustre/obdfilter/lproc_obdfilter.c b/lustre/obdfilter/lproc_obdfilter.c index 51458c0..6fae59d 100644 --- a/lustre/obdfilter/lproc_obdfilter.c +++ b/lustre/obdfilter/lproc_obdfilter.c @@ -89,6 +89,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "blocksize", lprocfs_rd_blksize, 0, 0 }, { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "filestotal", lprocfs_rd_filestotal, 0, 0 }, { "filesfree", lprocfs_rd_filesfree, 0, 0 }, //{ "filegroups", lprocfs_rd_filegroups, 0, 0 }, diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index ee22f1c..9216ec0 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -134,6 +134,7 @@ int osc_wr_max_dirty_mb(struct file *file, const char *buffer, spin_lock(&cli->cl_loi_list_lock); cli->cl_dirty_max = (obd_count)val * 1024 * 1024; + osc_wake_cache_waiters(cli); spin_unlock(&cli->cl_loi_list_lock); return count; @@ -147,7 +148,20 @@ int osc_rd_cur_dirty_bytes(char *page, char **start, off_t off, int count, int rc; spin_lock(&cli->cl_loi_list_lock); - rc = snprintf(page, count, LPU64"\n", cli->cl_dirty); + rc = snprintf(page, count, "%lu\n", cli->cl_dirty); + spin_unlock(&cli->cl_loi_list_lock); + return rc; +} + +int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *dev = data; + struct client_obd *cli = &dev->u.cli; + int rc; + + spin_lock(&cli->cl_loi_list_lock); + rc = snprintf(page, count, "%lu\n", cli->cl_avail_grant); spin_unlock(&cli->cl_loi_list_lock); return rc; } @@ -281,6 +295,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "blocksize", lprocfs_rd_blksize, 0, 0 }, { "kbytestotal", lprocfs_rd_kbytestotal, 0, 0 }, { "kbytesfree", lprocfs_rd_kbytesfree, 0, 0 }, + { "kbytesavail", lprocfs_rd_kbytesavail, 0, 0 }, { "filestotal", lprocfs_rd_filestotal, 0, 0 }, { "filesfree", lprocfs_rd_filesfree, 0, 0 }, //{ "filegroups", lprocfs_rd_filegroups, 0, 0 }, @@ -292,6 +307,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { osc_wr_max_rpcs_in_flight, 0 }, { "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 }, { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 }, + { "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 }, {"create_low_watermark", osc_rd_create_low_wm, osc_wr_create_low_wm, 0}, { "create_count", osc_rd_create_count, osc_wr_create_count, 0 }, { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 }, diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index 149ff44..845b306 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -216,6 +216,11 @@ int osc_create(struct obd_export *exp, struct obdo *oa, if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)) RETURN(osc_real_create(exp, oa, ea, oti)); + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_RECREATE_OBJS) { + RETURN(osc_real_create(exp, oa, ea, oti)); + } + lsm = *ea; if (lsm == NULL) { rc = obd_alloc_memmd(exp, &lsm); diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index d78c8bf..b5f6392 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -44,6 +44,13 @@ struct osc_async_page { void *oap_caller_data; }; +struct osc_cache_waiter { + struct list_head ocw_entry; + wait_queue_head_t ocw_waitq; + struct osc_async_page *ocw_oap; + int ocw_rc; +}; + #define OSCC_FLAG_RECOVERING 1 #define OSCC_FLAG_CREATING 2 #define OSCC_FLAG_NOSPC 4 /* can't create more objects on this OST */ @@ -53,6 +60,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa, int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti); void oscc_init(struct obd_export *exp); +void osc_wake_cache_waiters(struct client_obd *cli); #ifdef __KERNEL__ int lproc_osc_attach_seqstat(struct obd_device *dev); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index b817a14..e8dd043 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -532,38 +532,80 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, return rc; } -static void osc_announce_cached(struct client_obd *cli, struct ost_body *body) +static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, + long writing_bytes) { - obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV; + obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; - LASSERT(!(body->oa.o_valid & bits)); + LASSERT(!(oa->o_valid & bits)); - body->oa.o_valid |= bits; - down(&cli->cl_dirty_sem); - body->oa.o_blocks = cli->cl_dirty; - body->oa.o_rdev = cli->cl_dirty_granted; - up(&cli->cl_dirty_sem); - CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n", - cli->cl_dirty, cli->cl_dirty_granted); + oa->o_valid |= bits; + spin_lock(&cli->cl_loi_list_lock); + oa->o_dirty = cli->cl_dirty; + oa->o_undirty = cli->cl_dirty_max - oa->o_dirty; + oa->o_grant = cli->cl_avail_grant; + oa->o_dropped = cli->cl_lost_grant; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); + CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", + oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant); } -static void osc_update_grant(struct client_obd *cli, struct ost_body *body) +/* caller must hold loi_list_lock */ +static void osc_consume_write_grant(struct client_obd *cli, + struct osc_async_page *oap) +{ + cli->cl_dirty += PAGE_SIZE; + cli->cl_avail_grant -= PAGE_SIZE; + oap->oap_brw_flags |= OBD_BRW_FROM_GRANT; + CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap); + LASSERT(cli->cl_avail_grant >= 0); +} + +/* caller must hold loi_list_lock */ +void osc_wake_cache_waiters(struct client_obd *cli) { - if(!(body->oa.o_valid & OBD_MD_FLRDEV)) { - if (cli->cl_ost_can_grant) { - CDEBUG(D_INODE, "%s can't grant\n", - cli->cl_import->imp_target_uuid.uuid); + struct list_head *l, *tmp; + struct osc_cache_waiter *ocw; + + list_for_each_safe(l, tmp, &cli->cl_cache_waiters) { + /* if we can't dirty more, we must wait until some is written */ + if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) { + CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n", + cli->cl_dirty, cli->cl_dirty_max); + return; } - cli->cl_ost_can_grant = 0; - return; + + /* if still dirty cache but no grant wait for pending RPCs that + * may yet return us some grant before doing sync writes */ + if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) { + CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n", + cli->cl_brw_in_flight); + return; + } + + ocw = list_entry(l, struct osc_cache_waiter, ocw_entry); + list_del_init(&ocw->ocw_entry); + if (cli->cl_avail_grant < PAGE_SIZE) { + /* no more RPCs in flight to return grant, do sync IO */ + ocw->ocw_rc = -EDQUOT; + CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap); + } else { + osc_consume_write_grant(cli, ocw->ocw_oap); + } + wake_up(&ocw->ocw_waitq); } - CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev); - down(&cli->cl_dirty_sem); - cli->cl_dirty_granted = body->oa.o_rdev; - /* XXX check for over-run and wake up the io thread that - * doesn't exist yet */ - up(&cli->cl_dirty_sem); + EXIT; +} + +static void osc_update_grant(struct client_obd *cli, struct ost_body *body) +{ + spin_lock(&cli->cl_loi_list_lock); + CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant); + cli->cl_avail_grant += body->oa.o_grant; + /* waiters are woken in brw_interpret_oap */ + spin_unlock(&cli->cl_loi_list_lock); } /* We assume that the reason this OSC got a short read is because it read @@ -637,7 +679,7 @@ static int check_write_rcs(struct ptlrpc_request *request, int niocount, static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { - unsigned mask = ~(OBD_BRW_CREATE|OBD_BRW_FROM_GRANT); + unsigned mask = ~OBD_BRW_FROM_GRANT; /* warn if we try to combine flags that we don't know to be * safe to combine */ @@ -696,7 +738,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ; for (niocount = i = 1; i < page_count; i++) - if (!can_merge_pages (&pga[i - 1], &pga[i])) + if (!can_merge_pages(&pga[i - 1], &pga[i])) niocount++; size[0] = sizeof(*body); @@ -760,7 +802,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, LASSERT((void *)(niobuf - niocount) == lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf))); - osc_announce_cached(cli, body); + osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0); spin_lock_irqsave(&req->rq_lock, flags); req->rq_no_resend = 1; spin_unlock_irqrestore(&req->rq_lock, flags); @@ -769,7 +811,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa, if (opc == OST_WRITE) { #if CHECKSUM_BULK body->oa.o_valid |= OBD_MD_FLCKSUM; - body->oa.o_nlink = cksum_pages(requested_nob, page_count, pga); + body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga); #endif /* 1 RC per niobuf */ size[1] = sizeof(__u32) * niocount; @@ -796,14 +838,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, { struct client_obd *cli = &req->rq_import->imp_obd->u.cli; struct ost_body *body; + ENTRY; if (rc < 0) - return (rc); + RETURN(rc); body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { CERROR ("Can't unpack body\n"); - return (-EPROTO); + RETURN(-EPROTO); } osc_update_grant(cli, body); @@ -811,15 +854,15 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, if (req->rq_reqmsg->opc == OST_WRITE) { if (rc > 0) { CERROR ("Unexpected +ve rc %d\n", rc); - return (-EPROTO); + RETURN(-EPROTO); } - return(check_write_rcs(req, niocount, page_count, pga)); + RETURN(check_write_rcs(req, niocount, page_count, pga)); } if (rc > requested_nob) { CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob); - return (-EPROTO); + RETURN(-EPROTO); } if (rc < requested_nob) @@ -832,7 +875,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, const struct ptlrpc_peer *peer = &req->rq_import->imp_connection->c_peer; static int cksum_counter; - obd_count server_cksum = oa->o_nlink; + obd_count server_cksum = oa->o_cksum; obd_count cksum = cksum_pages(rc, page_count, pga); char str[PTL_NALFMT_SIZE]; @@ -844,7 +887,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, LPX64" (%s)\n", server_cksum, cksum, peer->peer_nid, str); cksum_counter = 0; - oa->o_nlink = cksum; + oa->o_cksum = cksum; } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){ CWARN("Checksum %u from "LPX64" (%s) OK: %x\n", cksum_counter, peer->peer_nid, str, cksum); @@ -859,7 +902,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, struct obdo *oa, req->rq_import->imp_connection->c_peer.peer_nid); } #endif - return (0); + RETURN(0); } static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa, @@ -1088,7 +1131,8 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa, } static void osc_check_rpcs(struct client_obd *cli); -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap); +static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, + int sent); static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi); static void lop_update_pending(struct client_obd *cli, struct loi_oap_pages *lop, int cmd, int delta); @@ -1127,27 +1171,25 @@ static void osc_occ_interrupted(struct osic_callback_context *occ) list_del_init(&oap->oap_urgent_item); loi = oap->oap_loi; - lop = (oap->oap_cmd == OBD_BRW_WRITE) ? + lop = (oap->oap_cmd == OBD_BRW_WRITE) ? &loi->loi_write_lop : &loi->loi_read_lop; lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1); loi_list_maint(oap->oap_cli, oap->oap_loi); osic_complete_one(oap->oap_osic, &oap->oap_occ, 0); oap->oap_osic = NULL; - } unlock: spin_unlock(&oap->oap_cli->cl_loi_list_lock); } -/* this must be called holding the list lock to give coverage to exit_cache, +/* this must be called holding the loi list lock to give coverage to exit_cache, * async_flag maintenance, and oap_request */ static void osc_complete_oap(struct client_obd *cli, - struct osc_async_page *oap, int rc) + struct osc_async_page *oap, int sent, int rc) { - ENTRY; - osc_exit_cache(cli, oap); + osc_exit_cache(cli, oap, sent); oap->oap_async_flags = 0; oap->oap_interrupted = 0; @@ -1165,7 +1207,6 @@ static void osc_complete_oap(struct client_obd *cli, oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd, rc); - EXIT; } static int brw_interpret_oap(struct ptlrpc_request *request, @@ -1190,6 +1231,11 @@ static int brw_interpret_oap(struct ptlrpc_request *request, spin_lock(&cli->cl_loi_list_lock); + /* We need to decrement before osc_complete_oap->osc_wake_cache_waiters + * is called so we know whether to go to sync BRWs or wait for more + * RPCs to complete */ + cli->cl_brw_in_flight--; + /* the caller may re-use the oap after the completion call so * we need to clean it up a little */ list_for_each_safe(pos, n, &aa->aa_oaps) { @@ -1199,10 +1245,10 @@ static int brw_interpret_oap(struct ptlrpc_request *request, //oap->oap_page, oap->oap_page->index, oap); list_del_init(&oap->oap_rpc_item); - osc_complete_oap(cli, oap, rc); + osc_complete_oap(cli, oap, 1, rc); } - cli->cl_brw_in_flight--; + osc_wake_cache_waiters(cli); osc_check_rpcs(cli); spin_unlock(&cli->cl_loi_list_lock); @@ -1250,8 +1296,8 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, pga[i].pg = oap->oap_page; pga[i].count = oap->oap_count; pga[i].flag = oap->oap_brw_flags; - //CDEBUG(D_INODE, "putting page %p index %lu oap %p into pga\n", - //pga[i].pg, oap->oap_page->index, oap); + CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n", + pga[i].pg, oap->oap_page->index, oap, pga[i].flag); i++; } @@ -1328,15 +1374,15 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, int rc = ops->ap_make_ready(oap->oap_caller_data, cmd); if (rc < 0) CDEBUG(D_INODE, "oap %p page %p returned %d " - "instead of ready\n", oap, + "instead of ready\n", oap, oap->oap_page, rc); switch (rc) { case -EAGAIN: /* llite is telling us that the page is still * in commit_write and that we should try - * and put it in an rpc again later. we + * and put it in an rpc again later. we * break out of the loop so we don't create - * a hole in the sequence of pages in the rpc + * a hole in the sequence of pages in the rpc * stream.*/ pos = NULL; break; @@ -1351,7 +1397,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, break; default: LASSERTF(0, "oap %p page %p returned %d " - "from make_ready\n", oap, + "from make_ready\n", oap, oap->oap_page, rc); break; } @@ -1367,13 +1413,12 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, /* ask the caller for the size of the io as the rpc leaves. */ if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) - oap->oap_count = ops->ap_refresh_count( - oap->oap_caller_data, - cmd); + oap->oap_count = + ops->ap_refresh_count(oap->oap_caller_data,cmd); if (oap->oap_count <= 0) { - CDEBUG(D_INODE, "oap %p count %d, completing\n", oap, + CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap, oap->oap_count); - osc_complete_oap(cli, oap, oap->oap_count); + osc_complete_oap(cli, oap, 0, oap->oap_count); continue; } @@ -1383,6 +1428,8 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, break; } + osc_wake_cache_waiters(cli); + if (page_count == 0) RETURN(0); @@ -1403,7 +1450,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, * were between the pending list and the rpc */ if (oap->oap_interrupted) { CDEBUG(D_INODE, "oap %p interrupted\n", oap); - osc_complete_oap(cli, oap, oap->oap_count); + osc_complete_oap(cli, oap, 0, oap->oap_count); continue; } @@ -1430,7 +1477,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight); } else { lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count); - lprocfs_oh_tally(&cli->cl_write_rpc_hist, + lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_brw_in_flight); } @@ -1442,7 +1489,7 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi, list_for_each(pos, &aa->aa_oaps) { oap = list_entry(pos, struct osc_async_page, oap_rpc_item); if (oap->oap_interrupted) { - CDEBUG(D_INODE, "oap %p in req %p interrupted\n", + CDEBUG(D_INODE, "oap %p in req %p interrupted\n", oap, request); ptlrpc_mark_interrupted(request); break; @@ -1487,7 +1534,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, * that are being queued but which can't be made ready until * the queuer finishes with the page. this is a wart for * llite::commit_write() */ - optimal *= 2; + optimal += 16; } if (lop->lop_num_pending >= optimal) RETURN(1); @@ -1495,7 +1542,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, RETURN(0); } -static void on_list(struct list_head *item, struct list_head *list, +static void on_list(struct list_head *item, struct list_head *list, int should_be_on) { if (list_empty(item) && should_be_on) @@ -1508,39 +1555,39 @@ static void on_list(struct list_head *item, struct list_head *list, * can find pages to build into rpcs quickly */ static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi) { - on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, + on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list, lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) || lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)); - on_list(&loi->loi_write_item, &cli->cl_loi_write_list, + on_list(&loi->loi_write_item, &cli->cl_loi_write_list, loi->loi_write_lop.lop_num_pending); } -#define LOI_DEBUG(LOI, STR, args...) \ - CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ - !list_empty(&(LOI)->loi_cli_item), \ +#define LOI_DEBUG(LOI, STR, args...) \ + CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \ + !list_empty(&(LOI)->loi_cli_item), \ (LOI)->loi_write_lop.lop_num_pending, \ - !list_empty(&(LOI)->loi_write_lop.lop_urgent), \ + !list_empty(&(LOI)->loi_write_lop.lop_urgent), \ (LOI)->loi_read_lop.lop_num_pending, \ - !list_empty(&(LOI)->loi_read_lop.lop_urgent), \ - args) \ + !list_empty(&(LOI)->loi_read_lop.lop_urgent), \ + args) \ struct lov_oinfo *osc_next_loi(struct client_obd *cli) { ENTRY; /* first return all objects which we already know to have - * pages ready to be stuffed into rpcs */ + * pages ready to be stuffed into rpcs */ if (!list_empty(&cli->cl_loi_ready_list)) - RETURN(list_entry(cli->cl_loi_ready_list.next, + RETURN(list_entry(cli->cl_loi_ready_list.next, struct lov_oinfo, loi_cli_item)); - - /* then if we have cache waiters, return all objects with queued + + /* then if we have cache waiters, return all objects with queued * writes. This is especially important when many small files * have filled up the cache and not been fired into rpcs because * they don't pass the nr_pending/object threshhold */ if (!list_empty(&cli->cl_cache_waiters) && !list_empty(&cli->cl_loi_write_list)) - RETURN(list_entry(cli->cl_loi_write_list.next, + RETURN(list_entry(cli->cl_loi_write_list.next, struct lov_oinfo, loi_write_item)); RETURN(NULL); } @@ -1608,73 +1655,78 @@ static void osc_check_rpcs(struct client_obd *cli) /* we're trying to queue a page in the osc so we're subject to the * 'cl_dirty_max' limit on the number of pages that can be queued in the osc. * If the osc's queued pages are already at that limit, then we want to sleep - * until there is space in the osc's queue for us. we need this goofy - * little struct to really tell that our allocation was fulfilled in - * the presence of pending signals */ -struct osc_cache_waiter { - struct list_head ocw_entry; - wait_queue_head_t ocw_waitq; -}; + * until there is space in the osc's queue for us. We also may be waiting for + * write credits from the OST if there are RPCs in flight that may return some + * before we fall back to sync writes. + * + * We need this know our allocation was granted in the presence of signals */ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw) { int rc; ENTRY; spin_lock(&cli->cl_loi_list_lock); - rc = list_empty(&ocw->ocw_entry); + rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0; spin_unlock(&cli->cl_loi_list_lock); RETURN(rc); }; + +/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for + * grant or cache space. */ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi, struct osc_async_page *oap) { struct osc_cache_waiter ocw; - struct l_wait_info lwi = {0}; - int rc = 0; - ENTRY; + struct l_wait_info lwi = { 0 }; + + CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n", + cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant, + cli->cl_avail_grant); - /* XXX check for ost grants here as well.. for now we ignore them. */ if (cli->cl_dirty_max < PAGE_SIZE) - RETURN(-EDQUOT); + return(-EDQUOT); - /* if we fail this test then cl_dirty contains at least one page - * that will have to be completed after we release the lock */ - if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) { + + /* Hopefully normal case - cache space and write credits available */ + if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max && + cli->cl_avail_grant >= PAGE_SIZE) { /* account for ourselves */ - cli->cl_dirty += PAGE_SIZE; - GOTO(out, rc = 0); + osc_consume_write_grant(cli, oap); + return(0); } - init_waitqueue_head(&ocw.ocw_waitq); - list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); + /* Make sure that there are write rpcs in flight to wait for. This + * is a little silly as this object may not have any pending but + * other objects sure might. */ + if (cli->cl_brw_in_flight) { + list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters); + init_waitqueue_head(&ocw.ocw_waitq); + ocw.ocw_oap = oap; + ocw.ocw_rc = 0; - /* make sure that there are write rpcs in flight to wait for. this - * is a little silly as this object may not have any pending - * but other objects sure might. this should probably be cleaned. */ - loi_list_maint(cli, loi); - osc_check_rpcs(cli); - spin_unlock(&cli->cl_loi_list_lock); + loi_list_maint(cli, loi); + osc_check_rpcs(cli); + spin_unlock(&cli->cl_loi_list_lock); - CDEBUG(D_INODE, "sleeping for cache space\n"); - l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); + CDEBUG(0, "sleeping for cache space\n"); + l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi); - spin_lock(&cli->cl_loi_list_lock); - if (!list_empty(&ocw.ocw_entry)) { - rc = -EINTR; - list_del(&ocw.ocw_entry); + spin_lock(&cli->cl_loi_list_lock); + if (!list_empty(&ocw.ocw_entry)) { + list_del(&ocw.ocw_entry); + RETURN(-EINTR); + } + RETURN(ocw.ocw_rc); } - GOTO(out, rc); -out: - if (rc == 0) - oap->oap_brw_flags |= OBD_BRW_FROM_GRANT; - return rc; + + RETURN(-EDQUOT); } -/* the companion to enter_cache, called when an oap is now longer part of the +/* the companion to enter_cache, called when an oap is no longer part of the * dirty accounting.. so writeback completes or truncate happens before writing * starts. must be called with the loi lock held. */ -static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap) +static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, + int sent) { - struct osc_cache_waiter *ocw; ENTRY; if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) { @@ -1682,16 +1734,14 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap) return; } - if (list_empty(&cli->cl_cache_waiters)) { - cli->cl_dirty -= PAGE_SIZE; - } else { - ocw = list_entry(cli->cl_cache_waiters.next, - struct osc_cache_waiter, ocw_entry); - list_del_init(&ocw->ocw_entry); - wake_up(&ocw->ocw_waitq); + oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT; + cli->cl_dirty -= PAGE_SIZE; + if (!sent) { + cli->cl_lost_grant += PAGE_SIZE; + CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n", + cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty); } - oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT; EXIT; } @@ -1973,7 +2023,8 @@ static int osc_teardown_async_page(struct obd_export *exp, if (!list_empty(&oap->oap_rpc_item)) GOTO(out, rc = -EBUSY); - osc_exit_cache(cli, oap); + osc_exit_cache(cli, oap, 0); + osc_wake_cache_waiters(cli); if (!list_empty(&oap->oap_urgent_item)) { list_del_init(&oap->oap_urgent_item); @@ -2771,7 +2822,7 @@ static int osc_disconnect(struct obd_export *exp, int flags) if (obd->u.cli.cl_conn_count == 1) { /* flush any remaining cancel messages out to the target */ llog_sync(ctxt, exp); - + /* balance the conn2export for oscc in osc_connect */ class_export_put(exp); } @@ -2796,18 +2847,27 @@ static int osc_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_invalidate_import(struct obd_device *obd, struct obd_import *imp) { + struct client_obd *cli; LASSERT(imp->imp_obd == obd); /* this used to try and tear down queued pages, but it was * not correctly implemented. We'll have to do it again once * we call obd_invalidate_import() agian */ - LBUG(); + /* XXX And we still need to do this */ + + /* Reset grants, too */ + cli = &obd->u.cli; + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); + RETURN(0); } int osc_setup(struct obd_device *obd, obd_count len, void *buf) { int rc; - + rc = ptlrpcd_addref(); if (rc) return rc; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 6edebc8..dfdcf1c 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -305,12 +305,13 @@ static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo, LASSERT(page < npages); pp_rnb[page].len = pnob; pp_rnb[page].offset = off; - pp_rnb[page].flags = rnb->flags; + pp_rnb[page].flags = rnb[rnbidx].flags; - CDEBUG(D_PAGE, " obj %d id "LPX64 - "page %d(%d) "LPX64" for %d\n", + CDEBUG(0, " obj %d id "LPX64 + "page %d(%d) "LPX64" for %d, flg %x\n", i, ioo[i].ioo_id, obj_pages, page, - pp_rnb[page].offset, pp_rnb[page].len); + pp_rnb[page].offset, pp_rnb[page].len, + pp_rnb[page].flags); page++; obj_pages++; @@ -384,9 +385,6 @@ static int ost_brw_read(struct ptlrpc_request *req) GOTO(out, rc = -EFAULT); } - /* BUG 974: when we send back cache grants, don't clear this flag */ - body->oa.o_valid &= ~OBD_MD_FLRDEV; - ioo = lustre_swab_reqbuf(req, 1, sizeof(*ioo), lustre_swab_obd_ioobj); if (ioo == NULL) { CERROR("Missing/short ioobj\n"); @@ -478,15 +476,15 @@ static int ost_brw_read(struct ptlrpc_request *req) rc = obd_commitrw(OBD_BRW_READ, req->rq_export, &body->oa, 1, ioo, npages, local_nb, &oti); - repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); - memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); + if (rc == 0) { + repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); + memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); #if CHECKSUM_BULK - if (rc == 0) { - repbody->oa.o_nlink = ost_checksum_bulk(desc); + repbody->oa.o_cksum = ost_checksum_bulk(desc); repbody->oa.o_valid |= OBD_MD_FLCKSUM; - } #endif + } out_bulk: ptlrpc_free_bulk(desc); @@ -564,9 +562,6 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out, rc = -EFAULT); } - /* BUG 974: when we send back cache grants, don't clear this flag */ - body->oa.o_valid &= ~OBD_MD_FLRDEV; - LASSERT_REQSWAB(req, 1); objcount = req->rq_reqmsg->buflens[1] / sizeof(*ioo); if (objcount == 0) { @@ -655,7 +650,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) #if CHECKSUM_BULK if (rc == 0 && (body->oa.o_valid & OBD_MD_FLCKSUM) != 0) { static int cksum_counter; - obd_count client_cksum = body->oa.o_nlink; + obd_count client_cksum = body->oa.o_cksum; obd_count cksum = ost_checksum_bulk(desc); portals_nid2str(req->rq_connection->c_peer.peer_ni->pni_number, @@ -665,7 +660,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) LPX64" (%s)\n", client_cksum, cksum, req->rq_connection->c_peer.peer_nid, str); cksum_counter = 1; - repbody->oa.o_nlink = cksum; + repbody->oa.o_cksum = cksum; } else { cksum_counter++; if ((cksum_counter & (-cksum_counter)) == cksum_counter) @@ -946,11 +941,12 @@ static int ost_handle(struct ptlrpc_request *req) oti_init(oti, req); switch (req->rq_reqmsg->opc) { - case OST_CONNECT: + case OST_CONNECT: { CDEBUG(D_INODE, "connect\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_CONNECT_NET, 0); rc = target_handle_connect(req, ost_handle); break; + } case OST_DISCONNECT: CDEBUG(D_INODE, "disconnect\n"); OBD_FAIL_RETURN(OBD_FAIL_OST_DISCONNECT_NET, 0); diff --git a/lustre/portals/include/linux/kp30.h b/lustre/portals/include/linux/kp30.h index 0c4c4a0..09db989 100644 --- a/lustre/portals/include/linux/kp30.h +++ b/lustre/portals/include/linux/kp30.h @@ -115,7 +115,7 @@ do { \ if (portal_cerror == 0) \ break; \ CHECK_STACK(CDEBUG_STACK); \ - if (!(mask) || ((mask) & (D_ERROR | D_EMERG | D_WARNING)) || \ + if (((mask) & (D_ERROR | D_EMERG | D_WARNING)) || \ (portal_debug & (mask) && \ portal_subsystem_debug & DEBUG_SUBSYSTEM)) \ portals_debug_msg(DEBUG_SUBSYSTEM, mask, \ diff --git a/lustre/portals/libcfs/debug.c b/lustre/portals/libcfs/debug.c index 0bc93f3..7ad9327 100644 --- a/lustre/portals/libcfs/debug.c +++ b/lustre/portals/libcfs/debug.c @@ -633,9 +633,9 @@ int portals_debug_mark_buffer(char *text) if (debug_buf == NULL) return -EINVAL; - CDEBUG(0, "********************************************************\n"); + CDEBUG(D_TRACE,"***************************************************\n"); CWARN("DEBUG MARKER: %s\n", text); - CDEBUG(0, "********************************************************\n"); + CDEBUG(D_TRACE,"***************************************************\n"); return 0; } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index e0e725a..d29fe39 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -306,7 +306,7 @@ void lustre_swab_obdo (struct obdo *o) __swab64s (&o->o_ctime); __swab64s (&o->o_size); __swab64s (&o->o_blocks); - __swab64s (&o->o_rdev); + __swab64s (&o->o_grant); __swab32s (&o->o_blksize); __swab32s (&o->o_mode); __swab32s (&o->o_uid); @@ -315,7 +315,7 @@ void lustre_swab_obdo (struct obdo *o) __swab32s (&o->o_nlink); __swab32s (&o->o_generation); __swab32s (&o->o_valid); - __swab32s (&o->o_obdflags); + __swab32s (&o->o_misc); __swab32s (&o->o_easize); /* o_inline is opaque */ } @@ -615,7 +615,7 @@ void lustre_swab_llogd_conn_body (struct llogd_conn_body *d) void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' - * running on Linux schnapps.adilger.int 2.4.22-l32 #4 Thu Jan 8 14:32:57 MST 2004 i686 i686 + * running on Linux schnapps.adilger.int 2.4.22-l32 #4 Thu Jan 8 14:32:57 MST 2004 i686 i686 * with gcc version 3.2.2 20030222 (Red Hat Linux 3.2.2-5) */ @@ -756,8 +756,8 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct obdo *)0)->o_size) == 8); LASSERT(offsetof(struct obdo, o_blocks) == 48); LASSERT((int)sizeof(((struct obdo *)0)->o_blocks) == 8); - LASSERT(offsetof(struct obdo, o_rdev) == 56); - LASSERT((int)sizeof(((struct obdo *)0)->o_rdev) == 8); + LASSERT(offsetof(struct obdo, o_grant) == 56); + LASSERT((int)sizeof(((struct obdo *)0)->o_grant) == 8); LASSERT(offsetof(struct obdo, o_blksize) == 64); LASSERT((int)sizeof(((struct obdo *)0)->o_blksize) == 4); LASSERT(offsetof(struct obdo, o_mode) == 68); @@ -774,8 +774,8 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct obdo *)0)->o_generation) == 4); LASSERT(offsetof(struct obdo, o_valid) == 92); LASSERT((int)sizeof(((struct obdo *)0)->o_valid) == 4); - LASSERT(offsetof(struct obdo, o_obdflags) == 96); - LASSERT((int)sizeof(((struct obdo *)0)->o_obdflags) == 4); + LASSERT(offsetof(struct obdo, o_misc) == 96); + LASSERT((int)sizeof(((struct obdo *)0)->o_misc) == 4); LASSERT(offsetof(struct obdo, o_easize) == 100); LASSERT((int)sizeof(((struct obdo *)0)->o_easize) == 4); LASSERT(offsetof(struct obdo, o_inline) == 104); @@ -792,7 +792,6 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_MD_FLUID == 512); LASSERT(OBD_MD_FLGID == 1024); LASSERT(OBD_MD_FLFLAGS == 2048); - LASSERT(OBD_MD_FLOBDFLG == 4096); LASSERT(OBD_MD_FLNLINK == 8192); LASSERT(OBD_MD_FLGENER == 16384); LASSERT(OBD_MD_FLINLINE == 32768); @@ -810,6 +809,7 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_FL_DELORPHAN == 4); LASSERT(OBD_FL_NORPC == 8); LASSERT(OBD_FL_IDONLY == 16); + LASSERT(OBD_FL_RECREATE_OBJS == 32); /* Checks for struct lov_mds_md_v1 */ LASSERT((int)sizeof(struct lov_mds_md_v1) == 32); @@ -885,7 +885,6 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct niobuf_remote *)0)->flags) == 4); LASSERT(OBD_BRW_READ == 1); LASSERT(OBD_BRW_WRITE == 2); - LASSERT(OBD_BRW_CREATE == 4); LASSERT(OBD_BRW_SYNC == 8); LASSERT(OBD_BRW_FROM_GRANT == 32); diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 6c3a68b..6b069a5 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -233,9 +233,7 @@ inline void ptlrpc_invalidate_import_state(struct obd_import *imp) ptlrpc_abort_inflight(imp); -#if 0 obd_invalidate_import(obd, imp); -#endif ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); } diff --git a/lustre/scripts/lustre.spec.in b/lustre/scripts/lustre.spec.in index 08a4345..82a7d26 100644 --- a/lustre/scripts/lustre.spec.in +++ b/lustre/scripts/lustre.spec.in @@ -1,5 +1,5 @@ # lustre.spec -%define version HEAD +%define version b_bug974 %define kversion @LINUXRELEASE@ %define linuxdir @LINUX@ %define enable_doc @ENABLE_DOC@ diff --git a/lustre/tests/acceptance-small.sh b/lustre/tests/acceptance-small.sh index b6a2ee1..536e99e 100755 --- a/lustre/tests/acceptance-small.sh +++ b/lustre/tests/acceptance-small.sh @@ -3,7 +3,8 @@ # the CVS HEAD are allowed. set -vxe -[ "$CONFIGS" -a -z "$SANITYN" ] && SANITYN=no +PATH=`dirname $0`/../utils:$PATH + [ "$CONFIGS" ] || CONFIGS="local lov" [ "$MAX_THREADS" ] || MAX_THREADS=10 if [ -z "$THREADS" ]; then @@ -15,6 +16,7 @@ fi [ "$RSIZE" ] || RSIZE=64 [ "$UID" ] || UID=1000 [ "$MOUNT" ] || MOUNT=/mnt/lustre +[ "$MOUNT2" ] || MOUNT2=${MOUNT}2 [ "$TMP" ] || TMP=/tmp [ "$COUNT" ] || COUNT=1000 #[ "$DEBUG_LVL" ] || DEBUG_LVL=0x370200 @@ -110,23 +112,46 @@ for NAME in $CONFIGS; do if [ "$FSX" != "no" ]; then mount | grep $MOUNT || sh llmount.sh $DEBUG_OFF - ./fsx -W -c 50 -p 1000 -P $TMP -l 1024000 -N $(($COUNT * 100)) $MOUNT/fsxfile + ./fsx -W -c 50 -p 1000 -P $TMP -l $SIZE \ + -N $(($COUNT * 100)) $MOUNT/fsxfile $DEBUG_ON sh llmountcleanup.sh - #sh llrmount.sh + sh llrmount.sh fi + if [ "$SANITYN" != "no" ]; then + mount | grep $MOUNT || sh llmount.sh + $DEBUG_OFF + + mkdir -p $MOUNT2 + case $NAME in + local|lov) + MDSNODE=`hostname` + MDSNAME=mds1 + CLIENT=client + ;; + *) # we could extract this from $NAME.xml somehow + ;; + esac + if [ "$MDSNODE" -a "$MDSNAME" -a "$CLIENT" ]; then + llmount $MDSNODE:/$MDSNAME/$CLIENT $MOUNT2 + SANITYLOG=$TMP/sanity.log START=: CLEAN=: sh sanityN.sh + umount $MOUNT2 + else + echo "don't know \$MDSNODE, \$MDSNAME, \$CLIENT" + echo "can't mount2 for '$NAME', skipping sanityN.sh" + fi + + $DEBUG_ON + sh llmountcleanup.sh + #sh llrmount.sh + fi + mount | grep $MOUNT && sh llmountcleanup.sh done if [ "$REPLAY_SINGLE" != "no" ]; then sh replay-single.sh fi -if [ "$SANITYN" != "no" ]; then - export NAME=mount2 - mount | grep $MOUNT || sh llmount.sh - sh sanityN.sh - mount | grep $MOUNT && sh llmountcleanup.sh -fi if [ "$CONF_SANITY" != "no" ]; then sh conf-sanity.sh diff --git a/lustre/tests/local.sh b/lustre/tests/local.sh index 95dd276..0f8fe13 100755 --- a/lustre/tests/local.sh +++ b/lustre/tests/local.sh @@ -11,7 +11,7 @@ MDSDEV=${MDSDEV:-$TMP/mds1-`hostname`} MDSSIZE=${MDSSIZE:-100000} FSTYPE=${FSTYPE:-ext3} MOUNT=${MOUNT:-/mnt/lustre} -#MOUNT2=${MOUNT2:-${MOUNT}2} +MOUNT2=${MOUNT2:-${MOUNT}2} NETWORKTYPE=${NETWORKTYPE:-tcp} OSTDEV=${OSTDEV:-$TMP/ost1-`hostname`} @@ -20,7 +20,8 @@ OSTSIZE=${OSTSIZE:-200000} # specific journal size for the ost, in MB JSIZE=${JSIZE:-0} [ "$JSIZE" -gt 0 ] && JARG="--journal_size $JSIZE" -MDSISIZE=${MDSISIZE:-128} +MDSISIZE=${MDSISIZE:-0} +[ "$MDSISIZE" -gt 0 ] && IARG="--inode_size $MDSISIZE" STRIPE_BYTES=65536 STRIPES_PER_OBJ=0 # 0 means stripe over all OSTs @@ -30,9 +31,10 @@ rm -f $config # create nodes ${LMC} --add node --node localhost || exit 10 ${LMC} --add net --node localhost --nid `hostname` --nettype $NETWORKTYPE || exit 11 +${LMC} --add net --node client --nid '*' --nettype $NETWORKTYPE || exit 12 # configure mds server -${LMC} --add mds --nspath /mnt/mds_ns --node localhost --mds mds1 --fstype $FSTYPE --dev $MDSDEV --size $MDSSIZE $JARG --mkfsoptions "-I $MDSISIZE" || exit 20 +${LMC} --add mds --nspath /mnt/mds_ns --node localhost --mds mds1 --fstype $FSTYPE --dev $MDSDEV --size $MDSSIZE $JARG $IARG || exit 20 # configure ost ${LMC} -m $config --add lov --lov lov1 --mds mds1 --stripe_sz $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0 || exit 20 @@ -40,4 +42,4 @@ ${LMC} --add ost --nspath /mnt/ost_ns --node localhost --lov lov1 --fstype $FSTY # create client config ${LMC} --add mtpt --node localhost --path $MOUNT --mds mds1 --lov lov1 || exit 40 -#${LMC} --add mtpt --node localhost --path $MOUNT2 --mds mds1 --lov lov1 || exit 40 +${LMC} --add mtpt --node client --path $MOUNT2 --mds mds1 --lov lov1 || exit 41 diff --git a/lustre/tests/lov.sh b/lustre/tests/lov.sh index 2be5a74..ec09598 100755 --- a/lustre/tests/lov.sh +++ b/lustre/tests/lov.sh @@ -6,17 +6,20 @@ export PATH=`dirname $0`/../utils:$PATH config=${1:-lov.xml} -LMC=${LMC:-lmc} +LMC="${LMC:-lmc} -m $config" TMP=${TMP:-/tmp} MDSDEV=${MDSDEV:-$TMP/mds1-`hostname`} MDSSIZE=${MDSSIZE:-100000} FSTYPE=${FSTYPE:-ext3} +MOUNT=${MOUNT:-/mnt/lustre} +MOUNT2=${MOUNT2:-${MOUNT}2} +NETWORKTYPE=${NETWORKTYPE:-tcp} OSTCOUNT=${OSTCOUNT:-5} # OSTDEVN will still override the device for OST N -OSTSIZE=${OSTSIZE:-100000} +OSTSIZE=${OSTSIZE:-150000} # 1 to config an echo client instead of llite ECHO_CLIENT=${ECHO_CLIENT:-} @@ -28,26 +31,31 @@ JSIZE=${JSIZE:-0} JARG="" [ "$JSIZE" -gt 0 ] && JARG="--journal_size $JSIZE" +rm -f $config + # create nodes -${LMC} -o $config --add net --node localhost --nid localhost --nettype tcp +${LMC} --add node --node localhost || exit 10 +${LMC} --add net --node localhost --nid `hostname` --nettype $NETWORKTYPE || exit 11 +${LMC} --add net --node client --nid '*' --nettype $NETWORKTYPE || exit 12 # configure mds server -${LMC} -m $config --format --add mds --node localhost --mds mds1 --fstype $FSTYPE --dev $MDSDEV --size $MDSSIZE +${LMC} --format --add mds --node localhost --mds mds1 --fstype $FSTYPE --dev $MDSDEV --size $MDSSIZE || exit 20 # configure ost -${LMC} -m $config --add lov --lov lov1 --mds mds1 --stripe_sz $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0 +${LMC} --add lov --lov lov1 --mds mds1 --stripe_sz $STRIPE_BYTES --stripe_cnt $STRIPES_PER_OBJ --stripe_pattern 0 || exit 20 for num in `seq $OSTCOUNT`; do OST=ost$num DEVPTR=OSTDEV$num eval $DEVPTR=${!DEVPTR:=$TMP/$OST-`hostname`} - ${LMC} -m $config --add ost --node localhost --lov lov1 --ost $OST --fstype $FSTYPE --dev ${!DEVPTR} --size $OSTSIZE $JARG + ${LMC} --add ost --node localhost --lov lov1 --ost $OST --fstype $FSTYPE --dev ${!DEVPTR} --size $OSTSIZE $JARG || exit 30 done if [ -z "$ECHO_CLIENT" ]; then # create client config - ${LMC} -m $config --add mtpt --node localhost --path /mnt/lustre --mds mds1 --lov lov1 + ${LMC} --add mtpt --node localhost --path $MOUNT --mds mds1 --lov lov1 || exit 40 + ${LMC} --add mtpt --node client --path $MOUNT2 --mds mds1 --lov lov1 || exit 41 else - ${LMC} -m $config --add echo_client --node localhost --ost lov1 + ${LMC} --add echo_client --node localhost --ost lov1 || exit 42 fi diff --git a/lustre/tests/oos.sh b/lustre/tests/oos.sh index 8519dad..5a2646a 100755 --- a/lustre/tests/oos.sh +++ b/lustre/tests/oos.sh @@ -1,46 +1,77 @@ #!/bin/bash -export NAME=${NAME:-local} -export OSTSIZE=10000 +set -e +set -vx +export PATH=`dirname $0`/../utils:$PATH +LFS=${LFS:-lfs} +MOUNT=${MOUNT:-$1} MOUNT=${MOUNT:-/mnt/lustre} +OOS=$MOUNT/oosfile TMP=${TMP:-/tmp} - -echo "mnt.." -sh llmount.sh -echo "done" +LOG=$TMP/ooslog SUCCESS=1 -FREESPACE=`df |grep $MOUNT|tr -s ' '|cut -d ' ' -f4` +rm -f $OOS + +sleep 1 # to ensure we get up-to-date statfs info -rm -f $TMP/oosfile -dd if=/dev/zero of=$MOUNT/oosfile count=$[$FREESPACE + 1] bs=1k 2>$TMP/oosfile +#echo -1 > /proc/sys/portals/debug +#echo 0x40a8 > /proc/sys/portals/subsystem_debug +#lctl clear +#lctl debug_daemon start /r/tmp/debug 1024 -RECORDSOUT=`grep "records out" $TMP/oosfile|cut -d + -f1` +STRIPECOUNT=`cat /proc/fs/lustre/lov/*/activeobd | head -1` +ORIGFREE=`cat /proc/fs/lustre/llite/*/kbytesavail | head -1` +MAXFREE=${MAXFREE:-$((200000 * $STRIPECOUNT))} +if [ $ORIGFREE -gt $MAXFREE ]; then + echo "skipping out-of-space test on $OSC" + echo "reports ${ORIGFREE}kB free, more tham MAXFREE ${MAXFREE}kB" + echo "increase $MAXFREE (or reduce test fs size) to proceed" + exit 0 +fi -[ -z "`grep "No space left on device" $TMP/oosfile`" ] && \ - echo "failed:dd not return ENOSPC" && SUCCESS=0 +export LANG=C LC_LANG=C # for "No space left on device" message -REMAINEDFREE=`df |grep $MOUNT|tr -s ' '|cut -d ' ' -f4` -[ $[$FREESPACE - $REMAINEDFREE ] -lt $RECORDSOUT ] && \ - echo "failed:the space written by dd not equal to available space" && \ - SUCCESS=0 && echo "$FREESPACE - $REMAINEDFREE $RECORDSOUT" +# make sure we stripe over all OSTs to avoid OOS on only a subset of OSTs +$LFS setstripe $OOS 65536 0 $STRIPECOUNT +if dd if=/dev/zero of=$OOS count=$(($ORIGFREE + 100)) bs=1k 2> $LOG; then + echo "ERROR: dd did not fail" + SUCCESS=0 +fi -[ $REMAINEDFREE -gt 100 ] && \ - echo "failed:too many space left $REMAINEDFREE and -ENOSPC returned" &&\ +if [ "`grep -c 'No space left on device' $LOG`" -ne 1 ]; then + echo "ERROR: dd not return ENOSPC" SUCCESS=0 +fi + +# flush cache to OST(s) so avail numbers are correct +sync; sleep 1 ; sync + +for AVAIL in /proc/fs/lustre/osc/OSC*MNT*/kbytesavail; do + [ `cat $AVAIL` -lt 400 ] && OSCFULL=full +done +if [ -z "$OSCFULL" ]; then + echo "no OSTs are close to full" + grep "[0-9]" /proc/fs/lustre/osc/OSC*MNT*/{kbytesavail,cur*} + SUCCESS=0 +fi + +RECORDSOUT=`grep "records out" $LOG | cut -d + -f1` -FILESIZE=`ls -l $MOUNT/oosfile|tr -s ' '|cut -d ' ' -f5` -[ $RECORDSOUT -ne $[$FILESIZE/1024] ] && \ - echo "failed:the space written by dd not equal to the size of file" && \ +FILESIZE=`ls -l $OOS | awk '{ print $5 }'` +if [ $RECORDSOUT -ne $(($FILESIZE / 1024)) ]; then + echo "ERROR: blocks written by dd not equal to the size of file" SUCCESS=0 +fi -[ $SUCCESS -eq 1 ] && echo "Success!" +#lctl debug_daemon stop -rm -f $MOUNT/oosfile* -rm -f $TMP/oosfile +rm -f $OOS -echo "" -echo "cln.." -sh llmountcleanup.sh +if [ $SUCCESS -eq 1 ]; then + echo "Success!" +else + exit 1 +fi diff --git a/lustre/tests/oos2.sh b/lustre/tests/oos2.sh new file mode 100644 index 0000000..42b5571 --- /dev/null +++ b/lustre/tests/oos2.sh @@ -0,0 +1,82 @@ +#!/bin/bash + +set -e +set -vx + +export PATH=`dirname $0`/../utils:$PATH +LFS=${LFS:-lfs} +MOUNT=${MOUNT:-$1} +MOUNT=${MOUNT:-/mnt/lustre} +MOUNT2=${MOUNT2:-$2} +MOUNT2=${MOUNT2:-${MOUNT}2} +OOS=$MOUNT/oosfile +OOS2=$MOUNT2/oosfile2 +TMP=${TMP:-/tmp} +LOG=$TMP/oosfile +LOG2=${LOG}2 + +SUCCESS=1 + +rm -f $OOS $OOS2 $LOG $LOG2 + +sleep 1 # to ensure we get up-to-date statfs info + +STRIPECOUNT=`cat /proc/fs/lustre/lov/*/activeobd | head -1` +ORIGFREE=`cat /proc/fs/lustre/llite/*/kbytesavail | head -1` +MAXFREE=${MAXFREE:-$((200000 * $STRIPECOUNT))} +if [ $ORIGFREE -gt $MAXFREE ]; then + echo "skipping out-of-space test on $OSC" + echo "reports ${ORIGFREE}kB free, more tham MAXFREE ${MAXFREE}kB" + echo "increase $MAXFREE (or reduce test fs size) to proceed" + exit 0 +fi + +export LANG=C LC_LANG=C # for "No space left on device" message + +# make sure we stripe over all OSTs to avoid OOS on only a subset of OSTs +$LFS setstripe $OOS 65536 -1 $STRIPECOUNT +$LFS setstripe $OOS2 65536 -1 $STRIPECOUNT +dd if=/dev/zero of=$OOS count=$((3 * $ORIGFREE / 4 + 100)) bs=1k 2>> $LOG & +DDPID=$! +if dd if=/dev/zero of=$OOS2 count=$((3*$ORIGFREE/4 + 100)) bs=1k 2>> $LOG2; then + echo "ERROR: dd2 did not fail" + SUCCESS=0 +fi +if wait $DDPID; then + echo "ERROR: dd did not fail" + SUCCESS=0 +fi + +if [ "`cat $LOG $LOG2 | grep -c 'No space left on device'`" -ne 2 ]; then + echo "ERROR: dd not return ENOSPC" + SUCCESS=0 +fi + +# flush cache to OST(s) so avail numbers are correct +sync; sleep 1 ; sync + +for AVAIL in /proc/fs/lustre/osc/OSC*MNT*/kbytesavail; do + [ `cat $AVAIL` -lt 400 ] && OSCFULL=full +done +if [ -z "$OSCFULL" ]; then + echo "no OSTs are close to full" + grep "[0-9]" /proc/fs/lustre/osc/OSC*MNT*/{kbytesavail,cur*} |tee -a $LOG + SUCCESS=0 +fi + +RECORDSOUT=$((`grep "records out" $LOG | cut -d+ -f 1` + \ + `grep "records out" $LOG2 | cut -d+ -f 1`)) + +FILESIZE=$((`ls -l $OOS | awk '{print $5}'` + `ls -l $OOS2 | awk '{print $5}'`)) +if [ $RECORDSOUT -ne $(($FILESIZE / 1024)) ]; then + echo "ERROR: blocks written by dd not equal to the size of file" + SUCCESS=0 +fi + +rm -f $OOS $OOS2 + +if [ $SUCCESS -eq 1 ]; then + echo "Success!" +else + exit 1 +fi diff --git a/lustre/tests/recovery-cleanup.sh b/lustre/tests/recovery-cleanup.sh index ce4a4a6..e4eefd0 100755 --- a/lustre/tests/recovery-cleanup.sh +++ b/lustre/tests/recovery-cleanup.sh @@ -22,10 +22,10 @@ CLIENT=${CLIENT:-mdev8} NETWORKTYPE=${NETWORKTYPE:-tcp} MOUNTPT=${MOUNTPT:-/mnt/lustre} CONFIG=${CONFIG:-recovery-cleanup.xml} -MDSDEV=${MDSDEV:-/tmp/mds-`hostname`} +MDSDEV=${MDSDEV:-/tmp/mds1-`hostname`} MDSSIZE=${MDSSIZE:-100000} FSTYPE=${FSTYPE:-ext3} -OSTDEV=${OSTDEV:-/tmp/ost-`hostname`} +OSTDEV=${OSTDEV:-/tmp/ost1-`hostname`} OSTSIZE=${OSTSIZE:-100000} do_mds() { diff --git a/lustre/tests/runiozone b/lustre/tests/runiozone index c2eec04..db74c2e 100755 --- a/lustre/tests/runiozone +++ b/lustre/tests/runiozone @@ -2,7 +2,7 @@ [ -z "$SIZE" ] && SIZE=5g [ -z "$COUNT" ] && COUNT=100 [ -z "$VERIFY" ] && VERIFY="-+d" -[ -z "$ODIR" ] && ODIR="-I" +#[ -z "$ODIR" ] && ODIR="-I" [ -z "$REC" ] && REC=64 [ -z "$FILE" ] && FILE=/mnt/lustre/iozone.$$ [ $1 ] && SIZE=$1 diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 7c91dd5..ec166e4 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -1414,6 +1414,8 @@ do_dirty_record() { } test_45() { f="$DIR/f45" + # Obtain grants from OST if it supports it + echo blah > ${f}_grant stop_kupdated sync do_dirty_record "echo blah > $f" @@ -1734,6 +1736,17 @@ test_63() { } run_test 63 "Verify osic_wait interruption does not crash ======" +test_64a () { + df $DIR + grep "[0-9]" /proc/fs/lustre/osc/OSC*MNT*/cur* +} +run_test 64a "verify filter grant calculations (in kernel) ======" + +test_64b () { + sh oos.sh $MOUNT +} +run_test 64b "check out-of-space detection on client ============" + # on the LLNL clusters, runas will still pick up root's $TMP settings, # which will not be writable for the runas user, and then you get a CVS # error message with a corrupt path string (CVS bug) and panic. diff --git a/lustre/tests/sanityN.sh b/lustre/tests/sanityN.sh index 703d378..37f3c96 100644 --- a/lustre/tests/sanityN.sh +++ b/lustre/tests/sanityN.sh @@ -91,7 +91,7 @@ run_test() { [ "$SANITYLOG" ] && rm -f $SANITYLOG || true error () { - log "FAIL: $@" + log "FAIL: $TESTNAME $@" if [ "$SANITYLOG" ]; then echo "FAIL: $TESTNAME $@" >> $SANITYLOG else @@ -284,7 +284,14 @@ test_13() { # bug 2451 - directory coherency } run_test 13 "test directory page revocation ====================" +test_14() { # bug 974 - ENOSPC + env + sh oos2.sh $MOUNT1 $MOUNT2 +} +run_test 14 "test out-of-space with multiple writers ===========" + log "cleanup: ======================================================" rm -rf $DIR1/[df][0-9]* $DIR1/lnk || true + echo '=========================== finished ===============================' [ -f "$SANITYLOG" ] && cat $SANITYLOG && exit 1 || true diff --git a/lustre/utils/lconf b/lustre/utils/lconf index 9225374..30f8437 100755 --- a/lustre/utils/lconf +++ b/lustre/utils/lconf @@ -1352,7 +1352,7 @@ class MDSDEV(Module): self.journal_size = self.db.get_val_int('journalsize', 0) self.fstype = self.db.get_val('fstype', '') self.nspath = self.db.get_val('nspath', '') - self.mkfsoptions = self.db.get_val('mkfsoptions', '') + self.mkfsoptions = self.db.get_val('mkfsoptions', '') # overwrite the orignal MDSDEV name and uuid with the MDS name and uuid target_uuid = self.db.get_first_ref('target') mds = self.db.lookup(target_uuid) @@ -1588,7 +1588,7 @@ class OSD(Module): self.uuid = target_uuid # modules self.add_lustre_module('ost', 'ost') - # FIXME: should we default to ext3 here? + # FIXME: should we default to ext3 here? if self.fstype: self.add_lustre_module('lvfs' , 'fsfilt_%s' % (self.fstype)) self.add_lustre_module(self.osdtype, self.osdtype) @@ -1760,14 +1760,14 @@ class MDC(Client): Client.__init__(self, db, uuid, 'mdc', fs_name) def permits_inactive(self): - return 0 + return 0 class OSC(Client): def __init__(self, db, uuid, fs_name): Client.__init__(self, db, uuid, 'osc', fs_name) def permits_inactive(self): - return 1 + return 1 def mgmtcli_name_for_uuid(uuid): return 'MGMTCLI_%s' % uuid diff --git a/lustre/utils/obd.c b/lustre/utils/obd.c index 9de3058..ad043aa 100644 --- a/lustre/utils/obd.c +++ b/lustre/utils/obd.c @@ -171,10 +171,10 @@ char *obdo_print(struct obdo *obd) sprintf(buf, "id: "LPX64"\ngrp: "LPX64"\natime: "LPU64"\nmtime: "LPU64 "\nctime: "LPU64"\nsize: "LPU64"\nblocks: "LPU64 "\nblksize: %u\nmode: %o\nuid: %d\ngid: %d\nflags: %x\n" - "obdflags: %x\nnlink: %d,\nvalid %x\n", + "misc: %x\nnlink: %d,\nvalid %x\n", obd->o_id, obd->o_gr, obd->o_atime, obd->o_mtime, obd->o_ctime, obd->o_size, obd->o_blocks, obd->o_blksize, obd->o_mode, - obd->o_uid, obd->o_gid, obd->o_flags, obd->o_obdflags, + obd->o_uid, obd->o_gid, obd->o_flags, obd->o_misc, obd->o_nlink, obd->o_valid); return strdup(buf); } @@ -1353,7 +1353,7 @@ int jt_obd_test_brw(int argc, char **argv) cmd = write ? OBD_IOC_BRW_WRITE : OBD_IOC_BRW_READ; for (i = 1, next_count = verbose; i <= count; i++) { - data.ioc_obdo1.o_valid &= ~(OBD_MD_FLBLOCKS|OBD_MD_FLRDEV); + data.ioc_obdo1.o_valid &= ~(OBD_MD_FLBLOCKS|OBD_MD_FLGRANT); IOC_PACK(argv[0], data); rc = l2_ioctl(OBD_DEV_ID, cmd, buf); SHMEM_BUMP(); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index fb90a0f..8beb802 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -101,7 +101,7 @@ check_obdo(void) CHECK_MEMBER(obdo, o_ctime); CHECK_MEMBER(obdo, o_size); CHECK_MEMBER(obdo, o_blocks); - CHECK_MEMBER(obdo, o_rdev); + CHECK_MEMBER(obdo, o_grant); CHECK_MEMBER(obdo, o_blksize); CHECK_MEMBER(obdo, o_mode); CHECK_MEMBER(obdo, o_uid); @@ -110,7 +110,7 @@ check_obdo(void) CHECK_MEMBER(obdo, o_nlink); CHECK_MEMBER(obdo, o_generation); CHECK_MEMBER(obdo, o_valid); - CHECK_MEMBER(obdo, o_obdflags); + CHECK_MEMBER(obdo, o_misc); CHECK_MEMBER(obdo, o_easize); CHECK_MEMBER(obdo, o_inline); @@ -126,7 +126,6 @@ check_obdo(void) CHECK_VALUE(OBD_MD_FLUID); CHECK_VALUE(OBD_MD_FLGID); CHECK_VALUE(OBD_MD_FLFLAGS); - CHECK_VALUE(OBD_MD_FLOBDFLG); CHECK_VALUE(OBD_MD_FLNLINK); CHECK_VALUE(OBD_MD_FLGENER); CHECK_VALUE(OBD_MD_FLINLINE); @@ -145,6 +144,7 @@ check_obdo(void) CHECK_VALUE(OBD_FL_DELORPHAN); CHECK_VALUE(OBD_FL_NORPC); CHECK_VALUE(OBD_FL_IDONLY); + CHECK_VALUE(OBD_FL_RECREATE_OBJS); } void @@ -212,7 +212,6 @@ check_niobuf_remote(void) CHECK_VALUE(OBD_BRW_READ); CHECK_VALUE(OBD_BRW_WRITE); - CHECK_VALUE(OBD_BRW_CREATE); CHECK_VALUE(OBD_BRW_SYNC); CHECK_VALUE(OBD_BRW_FROM_GRANT); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index c370456..9f8bcd0f 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -164,8 +164,8 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct obdo *)0)->o_size) == 8); LASSERT(offsetof(struct obdo, o_blocks) == 48); LASSERT((int)sizeof(((struct obdo *)0)->o_blocks) == 8); - LASSERT(offsetof(struct obdo, o_rdev) == 56); - LASSERT((int)sizeof(((struct obdo *)0)->o_rdev) == 8); + LASSERT(offsetof(struct obdo, o_grant) == 56); + LASSERT((int)sizeof(((struct obdo *)0)->o_grant) == 8); LASSERT(offsetof(struct obdo, o_blksize) == 64); LASSERT((int)sizeof(((struct obdo *)0)->o_blksize) == 4); LASSERT(offsetof(struct obdo, o_mode) == 68); @@ -182,8 +182,8 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct obdo *)0)->o_generation) == 4); LASSERT(offsetof(struct obdo, o_valid) == 92); LASSERT((int)sizeof(((struct obdo *)0)->o_valid) == 4); - LASSERT(offsetof(struct obdo, o_obdflags) == 96); - LASSERT((int)sizeof(((struct obdo *)0)->o_obdflags) == 4); + LASSERT(offsetof(struct obdo, o_misc) == 96); + LASSERT((int)sizeof(((struct obdo *)0)->o_misc) == 4); LASSERT(offsetof(struct obdo, o_easize) == 100); LASSERT((int)sizeof(((struct obdo *)0)->o_easize) == 4); LASSERT(offsetof(struct obdo, o_inline) == 104); @@ -200,7 +200,6 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_MD_FLUID == 512); LASSERT(OBD_MD_FLGID == 1024); LASSERT(OBD_MD_FLFLAGS == 2048); - LASSERT(OBD_MD_FLOBDFLG == 4096); LASSERT(OBD_MD_FLNLINK == 8192); LASSERT(OBD_MD_FLGENER == 16384); LASSERT(OBD_MD_FLINLINE == 32768); @@ -218,6 +217,7 @@ void lustre_assert_wire_constants(void) LASSERT(OBD_FL_DELORPHAN == 4); LASSERT(OBD_FL_NORPC == 8); LASSERT(OBD_FL_IDONLY == 16); + LASSERT(OBD_FL_RECREATE_OBJS == 32); /* Checks for struct lov_mds_md_v1 */ LASSERT((int)sizeof(struct lov_mds_md_v1) == 32); @@ -293,7 +293,6 @@ void lustre_assert_wire_constants(void) LASSERT((int)sizeof(((struct niobuf_remote *)0)->flags) == 4); LASSERT(OBD_BRW_READ == 1); LASSERT(OBD_BRW_WRITE == 2); - LASSERT(OBD_BRW_CREATE == 4); LASSERT(OBD_BRW_SYNC == 8); LASSERT(OBD_BRW_FROM_GRANT == 32);