#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
+#include <linux/lustre_otree.h>
#include <linux/obd_ost.h>
+#include <linux/obd_lov.h>
#ifndef __CYGWIN__
#include <linux/ctype.h>
}
}
- lsm_size = sizeof(**lsmp);
+ lsm_size = lov_stripe_md_size(1);
if (!lsmp)
RETURN(lsm_size);
OBD_ALLOC(*lsmp, lsm_size);
if (!*lsmp)
RETURN(-ENOMEM);
+
+ (*lsmp)->lsm_oinfo[0].loi_dirty_ot =
+ &(*lsmp)->lsm_oinfo[0].loi_dirty_ot_inline;
+ ot_init((*lsmp)->lsm_oinfo[0].loi_dirty_ot);
}
if (lmm) {
/* XXX zero *lsmp? */
(*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
- (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
LASSERT((*lsmp)->lsm_object_id);
}
+ (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
RETURN(lsm_size);
}
RETURN (rc);
}
- body = lustre_swab_repbuf (req, 0, sizeof (*body),
- lustre_swab_ost_body);
+ body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
if (body == NULL) {
CERROR ("can't unpack ost_body\n");
RETURN (-EPROTO);
oa->o_blksize = OSC_BRW_MAX_SIZE;
oa->o_valid |= OBD_MD_FLBLKSZ;
+ /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
+ * have valid lsm_oinfo data structs, so don't go touching that.
+ * This needs to be fixed in a big way.
+ */
lsm->lsm_object_id = oa->o_id;
lsm->lsm_stripe_count = 0;
lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
return rc;
}
+static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+{
+ obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+
+ LASSERT(!(body->oa.o_valid & bits));
+
+ body->oa.o_valid |= bits;
+ down(&cli->cl_dirty_sem);
+ body->oa.o_blocks = cli->cl_dirty;
+ body->oa.o_rdev = cli->cl_dirty_granted;
+ up(&cli->cl_dirty_sem);
+ CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
+ cli->cl_dirty, cli->cl_dirty_granted);
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+ if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+ if (cli->cl_ost_can_grant) {
+ CDEBUG(D_INODE, "%s can't grant\n",
+ cli->cl_import->imp_target_uuid.uuid);
+ }
+ cli->cl_ost_can_grant = 0;
+ return;
+ }
+
+ CDEBUG(D_INODE, "got "LPU64" grant\n", body->oa.o_rdev);
+ down(&cli->cl_dirty_sem);
+ cli->cl_dirty_granted = body->oa.o_rdev;
+ /* XXX check for over-run and wake up the io thread that
+ * doesn't exist yet */
+ up(&cli->cl_dirty_sem);
+}
+
/* We assume that the reason this OSC got a short read is because it read
* beyond the end of a stripe file; i.e. lustre is reading a sparse file
* via the LOV, and it _knows_ it's reading inside the file, it's just that
}
#if CHECKSUM_BULK
-static __u64 cksum_pages(int nob, obd_count page_count, struct brw_page *pga)
+static obd_count cksum_pages(int nob, obd_count page_count,
+ struct brw_page *pga)
{
- __u64 cksum = 0;
+ obd_count cksum = 0;
char *ptr;
int i;
{
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
+ struct client_obd *cli = &imp->imp_obd->u.cli;
struct ost_body *body;
struct obd_ioobj *ioobj;
struct niobuf_remote *niobuf;
requested_nob += pg->count;
- if (i > 0 &&
- can_merge_pages (pg_prev, pg)) {
+ if (i > 0 && can_merge_pages (pg_prev, pg)) {
niobuf--;
niobuf->len += pg->count;
} else {
#if CHECKSUM_BULK
body->oa.o_valid |= OBD_MD_FLCKSUM;
if (opc == OST_BRW_WRITE)
- body->oa.o_rdev = cksum_pages (requested_nob, page_count, pga);
+ body->oa.o_nlink = cksum_pages (requested_nob, page_count, pga);
#endif
+ osc_announce_cached(cli, body);
spin_lock_irqsave (&req->rq_lock, flags);
req->rq_no_resend = 1;
spin_unlock_irqrestore (&req->rq_lock, flags);
obd_count page_count, struct brw_page *pga,
int rc)
{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ struct ost_body *body;
if (rc < 0)
return (rc);
+ body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
+ if (body == NULL) {
+ CERROR ("Can't unpack body\n");
+ RETURN(-EPROTO);
+ }
+ osc_update_grant(cli, body);
+
if (req->rq_reqmsg->opc == OST_WRITE) {
if (rc > 0) {
CERROR ("Unexpected +ve rc %d\n", rc);
}
if (rc < requested_nob)
- handle_short_read (rc, page_count, pga);
+ handle_short_read(rc, page_count, pga);
#if CHECKSUM_BULK
- imp = req->rq_import;
- body = lustre_swab_repmsg (req, 0, sizeof (*body),
- lustre_swab_ost_body);
- if (body == NULL) {
- CERROR ("Can't unpack body\n");
- } else if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+ if (body->oa.o_valid & OBD_MD_FLCKSUM) {
static int cksum_counter;
- __u64 server_cksum = body->oa.o_rdev;
- __u64 cksum = cksum_pages (rc, page_count, pga);
+ obd_count server_cksum = body->oa.o_nlink;
+ obd_count cksum = cksum_pages(rc, page_count, pga);
cksum_counter++;
if (server_cksum != cksum) {
imp->imp_connection->c_peer.peer_nid);
cksum_counter = 0;
} else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
- CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
+ CERROR("Checksum %u from "LPX64" OK: %x\n",
cksum_counter,
imp->imp_connection->c_peer.peer_nid, cksum);
} else {
#endif
#endif
+static int osc_mark_page_dirty(struct lustre_handle *conn,
+ struct lov_stripe_md *lsm, unsigned long offset)
+{
+ struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+ struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+ int rc;
+ ENTRY;
+
+ down(&cli->cl_dirty_sem);
+
+ if (cli->cl_ost_can_grant &&
+ (cli->cl_dirty + PAGE_CACHE_SIZE >= cli->cl_dirty_granted)) {
+ CDEBUG(D_INODE, "granted "LPU64" < "LPU64"\n",
+ cli->cl_dirty_granted, cli->cl_dirty + PAGE_CACHE_SIZE);
+ GOTO(out, rc = -EDQUOT);
+ }
+
+ rc = ot_mark_offset(dirty_ot, offset);
+ if (rc)
+ GOTO(out, rc);
+
+ cli->cl_dirty += PAGE_CACHE_SIZE;
+ CDEBUG(D_INODE, "dirtied off %lu, now "LPU64" bytes dirty\n",
+ offset, cli->cl_dirty);
+out:
+ up(&cli->cl_dirty_sem);
+ RETURN(rc);
+}
+
+static int osc_clear_dirty_pages(struct lustre_handle *conn,
+ struct lov_stripe_md *lsm,
+ unsigned long start, unsigned long end,
+ unsigned long *cleared)
+{
+ struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+ struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+ unsigned long old_marked, new_marked;
+ int rc;
+ ENTRY;
+
+ down(&cli->cl_dirty_sem);
+
+ old_marked = ot_num_marked(dirty_ot);
+
+ rc = ot_clear_extent(dirty_ot, start, end);
+ if (rc)
+ GOTO(out, rc);
+
+ new_marked = ot_num_marked(dirty_ot);
+
+ LASSERT(new_marked <= old_marked);
+ LASSERT(old_marked * PAGE_CACHE_SIZE <= cli->cl_dirty);
+ *cleared = old_marked - new_marked;
+ cli->cl_dirty -= (__u64)*cleared << PAGE_CACHE_SHIFT;
+ CDEBUG(D_INODE, "cleared [%lu,%lu], now "LPU64" bytes dirty\n",
+ start, end, cli->cl_dirty);
+
+out:
+ up(&cli->cl_dirty_sem);
+ RETURN(rc);
+}
+
+static int osc_last_dirty_offset(struct lustre_handle *conn,
+ struct lov_stripe_md *lsm,
+ unsigned long *offset)
+{
+ struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+ int rc;
+ ENTRY;
+
+ rc = ot_last_marked(dirty_ot, offset);
+ RETURN(rc);
+}
+
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
struct lustre_handle *parent_lock,
__u32 type, void *extentp, int extent_len, __u32 mode,
opaque);
}
-static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int osc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
{
struct obd_statfs *msfs;
struct ptlrpc_request *request;
int rc, size = sizeof(*osfs);
ENTRY;
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
- NULL);
+ request = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, OST_STATFS, 0,
+ NULL, NULL);
if (!request)
RETURN(-ENOMEM);
o_cancel: osc_cancel,
o_cancel_unused: osc_cancel_unused,
o_iocontrol: osc_iocontrol,
- o_get_info: osc_get_info
+ o_get_info: osc_get_info,
+ .o_mark_page_dirty = osc_mark_page_dirty,
+ .o_clear_dirty_pages = osc_clear_dirty_pages,
+ .o_last_dirty_offset = osc_last_dirty_offset,
};
struct obd_ops sanosc_obd_ops = {
o_cancel: osc_cancel,
o_cancel_unused: osc_cancel_unused,
o_iocontrol: osc_iocontrol,
+ .o_mark_page_dirty = osc_mark_page_dirty,
+ .o_clear_dirty_pages = osc_clear_dirty_pages,
+ .o_last_dirty_offset = osc_last_dirty_offset,
};
int __init osc_init(void)