Whamcloud - gitweb
land b_eq on HEAD
[fs/lustre-release.git] / lustre / portals / portals / lib-md.c
index be6949c..a1ed583 100644 (file)
@@ -83,7 +83,7 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private,
         int           rc;
         int           i;
 
-        /* NB we are passes an allocated, but uninitialised/active md.
+        /* NB we are passed an allocated, but uninitialised/active md.
          * if we return success, caller may lib_md_unlink() it.
          * otherwise caller may only lib_md_free() it.
          */
@@ -94,9 +94,10 @@ static int lib_md_build(nal_cb_t *nal, lib_md_t *new, void *private,
                         return PTL_INV_EQ;
         }
 
-        if ((md->options & PTL_MD_IOV) != 0 &&  /* discontiguous MD */
-            md->niov > PTL_MD_MAX_IOV)          /* too many fragments */
-                return PTL_IOV_TOO_MANY;
+        /* Must check this _before_ allocation.  Also, note that non-iov
+         * MDs must set md_niov to 0. */
+        LASSERT((md->options & (PTL_MD_IOV | PTL_MD_KIOV)) == 0 ||
+                md->niov <= PTL_MD_MAX_IOV);
 
         if ((md->options & max_size_opts) != 0 && /* max size used */
             (md->max_size < 0 || md->max_size > md->length)) // illegal max_size
@@ -239,7 +240,11 @@ int do_PtlMDAttach(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         lib_md_t *md;
         unsigned long flags;
 
-        md = lib_md_alloc (nal);
+        if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 &&
+            args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */
+                return (ret->rc = PTL_IOV_TOO_MANY);
+
+        md = lib_md_alloc(nal, &args->md_in);
         if (md == NULL)
                 return (ret->rc = PTL_NOSPACE);
 
@@ -287,7 +292,11 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
         lib_md_t *md;
         unsigned long flags;
 
-        md = lib_md_alloc (nal);
+        if ((args->md_in.options & (PTL_MD_KIOV | PTL_MD_IOV)) != 0 &&
+            args->md_in.niov > PTL_MD_MAX_IOV) /* too many fragments */
+                return (ret->rc = PTL_IOV_TOO_MANY);
+
+        md = lib_md_alloc(nal, &args->md_in);
         if (md == NULL)
                 return (ret->rc = PTL_NOSPACE);
 
@@ -311,34 +320,43 @@ int do_PtlMDBind(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 
 int do_PtlMDUnlink(nal_cb_t * nal, void *private, void *v_args, void *v_ret)
 {
-        PtlMDUnlink_in *args = v_args;
+        PtlMDUnlink_in  *args = v_args;
         PtlMDUnlink_out *ret = v_ret;
-
-        lib_md_t *md;
-        unsigned long flags;
+        ptl_event_t      ev;
+        lib_md_t        *md;
+        unsigned long    flags;
 
         state_lock(nal, &flags);
 
         md = ptl_handle2md(&args->md_in, nal);
         if (md == NULL) {
-                ret->rc = PTL_INV_MD;
-        } else if (md->pending != 0) {           /* being filled/spilled */
-                ret->rc = PTL_MD_INUSE;
-        } else {
-                /* Callers attempting to unlink a busy MD which will get
-                 * unlinked once the net op completes should see INUSE,
-                 * before completion and INV_MD thereafter.  LASSERT we've
-                 * got that right... */
-                LASSERT ((md->md_flags & PTL_MD_FLAG_UNLINK) == 0);
-
-                lib_md_deconstruct(nal, md, &ret->status_out);
-                lib_md_unlink(nal, md);
-                ret->rc = PTL_OK;
+                state_unlock(nal, &flags);
+                return (ret->rc = PTL_INV_MD);
+        }
+
+        /* If the MD is busy, lib_md_unlink just marks it for deletion, and
+         * when the NAL is done, the completion event flags that the MD was
+         * unlinked.  Otherwise, we enqueue an event now... */
+
+        if (md->eq != NULL &&
+            md->pending == 0) {
+                memset(&ev, 0, sizeof(ev));
+
+                ev.type = PTL_EVENT_UNLINK;
+                ev.status = PTL_OK;
+                ev.unlinked = 1;
+                lib_md_deconstruct(nal, md, &ev.mem_desc);
+                
+                lib_enq_event_locked(nal, private, md->eq, &ev);
         }
 
+        lib_md_deconstruct(nal, md, &ret->status_out);
+        lib_md_unlink(nal, md);
+        ret->rc = PTL_OK;
+
         state_unlock(nal, &flags);
 
-        return (ret->rc);
+        return (PTL_OK);
 }
 
 int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args,
@@ -379,6 +397,23 @@ int do_PtlMDUpdate_internal(nal_cb_t * nal, void *private, void *v_args,
                 goto out;
         }
 
+        /* XXX fttb, the new MD must be the same type wrt fragmentation */
+        if (((new->options ^ md->options) & 
+             (PTL_MD_IOV | PTL_MD_KIOV)) != 0) {
+                ret->rc = PTL_INV_MD;
+                goto out;
+        }
+
+        if (new->niov > md->md_niov) {
+                ret->rc = PTL_IOV_TOO_MANY;
+                goto out;
+        } 
+
+        if (new->niov < md->md_niov) {
+                ret->rc = PTL_IOV_TOO_SMALL;
+                goto out;
+        }
+
         if (!PtlHandleEqual (args->testq_in, PTL_EQ_NONE)) {
                 test_eq = ptl_handle2eq(&args->testq_in, nal);
                 if (test_eq == NULL) {