Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 9e46952..4bda8de 100644 (file)
@@ -47,7 +47,9 @@
 
 #include <linux/kp30.h>
 #include <linux/lustre_mds.h> /* for mds_objid */
+#include <linux/lustre_otree.h>
 #include <linux/obd_ost.h>
+#include <linux/obd_lov.h>
 
 #ifndef  __CYGWIN__
 #include <linux/ctype.h>
@@ -127,7 +129,7 @@ static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                 }
         }
 
-        lsm_size = sizeof(**lsmp);
+        lsm_size = lov_stripe_md_size(1);
         if (!lsmp)
                 RETURN(lsm_size);
 
@@ -141,15 +143,20 @@ static int osc_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                 OBD_ALLOC(*lsmp, lsm_size);
                 if (!*lsmp)
                         RETURN(-ENOMEM);
+
+                (*lsmp)->lsm_oinfo[0].loi_dirty_ot =
+                        &(*lsmp)->lsm_oinfo[0].loi_dirty_ot_inline;
+                ot_init((*lsmp)->lsm_oinfo[0].loi_dirty_ot);
         }
 
         if (lmm) {
                 /* XXX zero *lsmp? */
                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
-                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
                 LASSERT((*lsmp)->lsm_object_id);
         }
 
+        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
         RETURN(lsm_size);
 }
 
@@ -169,8 +176,7 @@ static int osc_getattr_interpret(struct ptlrpc_request *req,
                 RETURN (rc);
         }
 
-        body = lustre_swab_repbuf (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
+        body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
         if (body == NULL) {
                 CERROR ("can't unpack ost_body\n");
                 RETURN (-EPROTO);
@@ -520,6 +526,10 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa,
         oa->o_blksize = OSC_BRW_MAX_SIZE;
         oa->o_valid |= OBD_MD_FLBLKSZ;
 
+        /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
+         * have valid lsm_oinfo data structs, so don't go touching that.
+         * This needs to be fixed in a big way.
+         */
         lsm->lsm_object_id = oa->o_id;
         lsm->lsm_stripe_count = 0;
         lsm->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
@@ -627,6 +637,40 @@ static int osc_destroy(struct lustre_handle *conn, struct obdo *oa,
         return rc;
 }
 
+static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+{
+        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+
+        LASSERT(!(body->oa.o_valid & bits));
+
+        body->oa.o_valid |= bits;
+        down(&cli->cl_dirty_sem);
+        body->oa.o_blocks = cli->cl_dirty;
+        body->oa.o_rdev = cli->cl_dirty_granted;
+        up(&cli->cl_dirty_sem);
+        CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
+               cli->cl_dirty, cli->cl_dirty_granted);
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+        if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+                if (cli->cl_ost_can_grant) {
+                        CDEBUG(D_INODE, "%s can't grant\n",
+                               cli->cl_import->imp_target_uuid.uuid);
+                }
+                cli->cl_ost_can_grant = 0;
+                return;
+        }
+
+        CDEBUG(D_INODE, "got "LPU64" grant\n", body->oa.o_rdev);
+        down(&cli->cl_dirty_sem);
+        cli->cl_dirty_granted = body->oa.o_rdev;
+        /* XXX check for over-run and wake up the io thread that
+         * doesn't exist yet */
+        up(&cli->cl_dirty_sem);
+}
+
 /* We assume that the reason this OSC got a short read is because it read
  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
  * via the LOV, and it _knows_ it's reading inside the file, it's just that
@@ -710,9 +754,10 @@ static inline int can_merge_pages (struct brw_page *p1, struct brw_page *p2)
 }
 
 #if CHECKSUM_BULK
-static __u64 cksum_pages(int nob, obd_count page_count, struct brw_page *pga)
+static obd_count cksum_pages(int nob, obd_count page_count,
+                             struct brw_page *pga)
 {
-        __u64 cksum = 0;
+        obd_count cksum = 0;
         char *ptr;
         int   i;
 
@@ -741,6 +786,7 @@ static int osc_brw_prep_request(struct obd_import *imp,
 {
         struct ptlrpc_request   *req;
         struct ptlrpc_bulk_desc *desc;
+        struct client_obd       *cli = &imp->imp_obd->u.cli;
         struct ost_body         *body;
         struct obd_ioobj        *ioobj;
         struct niobuf_remote    *niobuf;
@@ -802,8 +848,7 @@ static int osc_brw_prep_request(struct obd_import *imp,
 
                 requested_nob += pg->count;
 
-                if (i > 0 &&
-                    can_merge_pages (pg_prev, pg)) {
+                if (i > 0 && can_merge_pages (pg_prev, pg)) {
                         niobuf--;
                         niobuf->len += pg->count;
                 } else {
@@ -818,8 +863,9 @@ static int osc_brw_prep_request(struct obd_import *imp,
 #if CHECKSUM_BULK
         body->oa.o_valid |= OBD_MD_FLCKSUM;
         if (opc == OST_BRW_WRITE)
-                body->oa.o_rdev = cksum_pages (requested_nob, page_count, pga);
+                body->oa.o_nlink = cksum_pages (requested_nob, page_count, pga);
 #endif
+        osc_announce_cached(cli, body);
         spin_lock_irqsave (&req->rq_lock, flags);
         req->rq_no_resend = 1;
         spin_unlock_irqrestore (&req->rq_lock, flags);
@@ -849,9 +895,18 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
                                  obd_count page_count, struct brw_page *pga,
                                  int rc)
 {
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+        struct ost_body *body;
         if (rc < 0)
                 return (rc);
 
+        body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_ost_body);
+        if (body == NULL) {
+                CERROR ("Can't unpack body\n");
+                RETURN(-EPROTO);
+        }
+        osc_update_grant(cli, body);
+
         if (req->rq_reqmsg->opc == OST_WRITE) {
                 if (rc > 0) {
                         CERROR ("Unexpected +ve rc %d\n", rc);
@@ -868,18 +923,13 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
         }
 
         if (rc < requested_nob)
-                handle_short_read (rc, page_count, pga);
+                handle_short_read(rc, page_count, pga);
 
 #if CHECKSUM_BULK
-        imp = req->rq_import;
-        body = lustre_swab_repmsg (req, 0, sizeof (*body),
-                                   lustre_swab_ost_body);
-        if (body == NULL) {
-                CERROR ("Can't unpack body\n");
-        } else if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 static int cksum_counter;
-                __u64 server_cksum = body->oa.o_rdev;
-                __u64 cksum = cksum_pages (rc, page_count, pga);
+                obd_count server_cksum = body->oa.o_nlink;
+                obd_count cksum = cksum_pages(rc, page_count, pga);
 
                 cksum_counter++;
                 if (server_cksum != cksum) {
@@ -888,7 +938,7 @@ static int osc_brw_fini_request (struct ptlrpc_request *req,
                                imp->imp_connection->c_peer.peer_nid);
                         cksum_counter = 0;
                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
-                        CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
+                        CERROR("Checksum %u from "LPX64" OK: %x\n",
                                cksum_counter,
                                imp->imp_connection->c_peer.peer_nid, cksum);
         } else {
@@ -1395,6 +1445,80 @@ static int sanosc_brw(int cmd, struct lustre_handle *conn,
 #endif
 #endif
 
+static int osc_mark_page_dirty(struct lustre_handle *conn, 
+                               struct lov_stripe_md *lsm, unsigned long offset)
+{
+        struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        int rc;
+        ENTRY;
+
+        down(&cli->cl_dirty_sem);
+
+        if (cli->cl_ost_can_grant && 
+            (cli->cl_dirty + PAGE_CACHE_SIZE >= cli->cl_dirty_granted)) {
+                CDEBUG(D_INODE, "granted "LPU64" < "LPU64"\n",
+                       cli->cl_dirty_granted, cli->cl_dirty + PAGE_CACHE_SIZE);
+                GOTO(out, rc = -EDQUOT);
+        }
+
+        rc = ot_mark_offset(dirty_ot, offset);
+        if (rc)
+                GOTO(out, rc);
+
+        cli->cl_dirty += PAGE_CACHE_SIZE;
+        CDEBUG(D_INODE, "dirtied off %lu, now "LPU64" bytes dirty\n",
+                        offset, cli->cl_dirty);
+out:
+        up(&cli->cl_dirty_sem);
+        RETURN(rc);
+}
+
+static int osc_clear_dirty_pages(struct lustre_handle *conn, 
+                                 struct lov_stripe_md *lsm,
+                                 unsigned long start, unsigned long end,
+                                 unsigned long *cleared)
+{
+        struct client_obd *cli = &class_conn2obd(conn)->u.cli;
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        unsigned long old_marked, new_marked;
+        int rc;
+        ENTRY;
+
+        down(&cli->cl_dirty_sem);
+
+        old_marked = ot_num_marked(dirty_ot);
+
+        rc = ot_clear_extent(dirty_ot, start, end);
+        if (rc)
+                GOTO(out, rc);
+
+        new_marked = ot_num_marked(dirty_ot);
+
+        LASSERT(new_marked <= old_marked);
+        LASSERT(old_marked * PAGE_CACHE_SIZE <= cli->cl_dirty);
+        *cleared = old_marked - new_marked;
+        cli->cl_dirty -= (__u64)*cleared << PAGE_CACHE_SHIFT;
+        CDEBUG(D_INODE, "cleared [%lu,%lu], now "LPU64" bytes dirty\n",
+                        start, end, cli->cl_dirty);
+
+out:
+        up(&cli->cl_dirty_sem);
+        RETURN(rc);
+}
+
+static int osc_last_dirty_offset(struct lustre_handle *conn,
+                                 struct lov_stripe_md *lsm,
+                                 unsigned long *offset)
+{
+        struct otree *dirty_ot = lsm->lsm_oinfo[0].loi_dirty_ot;
+        int rc;
+        ENTRY;
+
+        rc = ot_last_marked(dirty_ot, offset);
+        RETURN(rc);
+}
+
 static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
                        __u32 type, void *extentp, int extent_len, __u32 mode,
@@ -1511,15 +1635,15 @@ static int osc_cancel_unused(struct lustre_handle *connh,
                                       opaque);
 }
 
-static int osc_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+static int osc_statfs(struct obd_export *exp, struct obd_statfs *osfs)
 {
         struct obd_statfs *msfs;
         struct ptlrpc_request *request;
         int rc, size = sizeof(*osfs);
         ENTRY;
 
-        request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_STATFS, 0, NULL,
-                                  NULL);
+        request = ptlrpc_prep_req(exp->exp_obd->u.cli.cl_import, OST_STATFS, 0, 
+                                  NULL, NULL);
         if (!request)
                 RETURN(-ENOMEM);
 
@@ -1711,7 +1835,10 @@ struct obd_ops osc_obd_ops = {
         o_cancel:       osc_cancel,
         o_cancel_unused: osc_cancel_unused,
         o_iocontrol:    osc_iocontrol,
-        o_get_info:     osc_get_info
+        o_get_info:     osc_get_info,
+        .o_mark_page_dirty =    osc_mark_page_dirty,
+        .o_clear_dirty_pages =  osc_clear_dirty_pages,
+        .o_last_dirty_offset =  osc_last_dirty_offset,
 };
 
 struct obd_ops sanosc_obd_ops = {
@@ -1741,6 +1868,9 @@ struct obd_ops sanosc_obd_ops = {
         o_cancel:       osc_cancel,
         o_cancel_unused: osc_cancel_unused,
         o_iocontrol:    osc_iocontrol,
+        .o_mark_page_dirty =    osc_mark_page_dirty,
+        .o_clear_dirty_pages =  osc_clear_dirty_pages,
+        .o_last_dirty_offset =  osc_last_dirty_offset,
 };
 
 int __init osc_init(void)