Whamcloud - gitweb
- updated documentation (introduction)
authorbraam <braam>
Wed, 20 Mar 2002 23:47:14 +0000 (23:47 +0000)
committerbraam <braam>
Wed, 20 Mar 2002 23:47:14 +0000 (23:47 +0000)
- race fix in md_active
- new lock stuff

lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_test.c
lustre/ldlm/resource.c
lustre/obdclass/pack.c [deleted file]
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index da191eb..c2e680d 100644 (file)
@@ -157,7 +157,8 @@ extern struct obd_ops ldlm_obd_ops;
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obbdev, __u32 ns_id,
                                      struct ldlm_resource *parent_res,
                                      struct ldlm_lock *parent_lock,
-                                     __u32 *res_id, ldlm_mode_t mode);
+                                     __u32 *res_id, ldlm_mode_t mode, 
+                                     struct ldlm_handle *);
 void ldlm_lock_dump(struct ldlm_lock *lock);
 
 /* ldlm_test.c */
@@ -170,7 +171,7 @@ void ldlm_resource_dump(struct ldlm_resource *res);
 struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
                                         struct ldlm_resource *parent,
                                         __u32 *name, int create);
-void ldlm_resource_put(struct ldlm_resource *res);
+int ldlm_resource_put(struct ldlm_resource *res);
 
 #endif /* __KERNEL__ */
 
index f30f0b8..af1bf71 100644 (file)
@@ -53,9 +53,6 @@
 /* default rpc ring length */
 #define RPC_RING_LENGTH    2
 
-/* generic wrappable next */
-#define NEXT_INDEX(index, max) (((index+1) >= max) ? 0 : (index+1))
-
 #define SVC_STOPPING 1
 #define SVC_RUNNING 2
 #define SVC_STOPPED 4
@@ -159,7 +156,6 @@ struct ptlrpc_service {
         /* FIXME: perhaps a list of EQs, if multiple NIs are used? */
         char *srv_buf[RPC_RING_LENGTH];
         __u32 srv_buf_size;
-        __u32 srv_me_active;
        __u32 srv_me_tail;
        __u32 srv_md_active;
         __u32 srv_ring_length;
index 285e16b..f1f4720 100644 (file)
@@ -7,6 +7,8 @@
  * See the file COPYING in this distribution
  *
  * by Cluster File Systems, Inc.
+ * authors, Peter Braam <braam@clusterfs.com> & 
+ * Phil Schwan <phil@clusterfs.com>
  */
 
 #define EXPORT_SYMTAB
@@ -55,8 +57,8 @@ static int ldlm_notify_incompatible(struct list_head *list,
         int rc = 0;
 
         list_for_each(tmp, list) {
-                struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock,
-                                                    l_res_link);
+                struct ldlm_lock *lock;
+                lock = list_entry(tmp, struct ldlm_lock, l_res_link);
                 if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
                         continue;
 
@@ -69,10 +71,59 @@ static int ldlm_notify_incompatible(struct list_head *list,
         return rc;
 }
 
+
+static int ldlm_reprocess_queue(struct list_head *queue, 
+                                struct list_head *granted_list)
+{
+        struct list_head *tmp1, *tmp2;
+        struct ldlm_resource *res;
+        int rc = 0;
+
+        list_for_each(tmp1, queue) { 
+                struct ldlm_lock *pending;
+                rc = 0; 
+                pending = list_entry(tmp1, struct ldlm_lock, l_res_link);
+
+                /* check if pending can go in ... */ 
+                list_for_each(tmp2, granted_list) {
+                        struct ldlm_lock *lock;
+                        lock = list_entry(tmp2, struct ldlm_lock, l_res_link);
+                        if (lockmode_compat(lock->l_granted_mode, 
+                                            pending->l_req_mode))
+                                continue;
+                        else { 
+                                /* no, we are done */
+                                rc = 1;
+                                break;
+                        }
+                }
+
+                if (rc) { 
+                        /* no - we are done */
+                        break;
+                }
+
+                res = pending->l_resource;
+                list_del(&pending->l_res_link); 
+                list_add(&pending->l_res_link, &res->lr_granted);
+                pending->l_granted_mode = pending->l_req_mode;
+
+                if (pending->l_granted_mode < res->lr_most_restr)
+                        res->lr_most_restr = pending->l_granted_mode;
+
+                /* XXX call completion here */ 
+                
+
+        }
+
+        return rc;
+}
+
 ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
                                      struct ldlm_resource *parent_res,
                                      struct ldlm_lock *parent_lock,
-                                     __u32 *res_id, ldlm_mode_t mode)
+                                     __u32 *res_id, ldlm_mode_t mode, 
+                                     struct ldlm_handle *lockh)
 {
         struct ldlm_namespace *ns;
         struct ldlm_resource *res;
@@ -93,6 +144,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
         if (lock == NULL)
                 BUG();
 
+        lockh->addr = (__u64)(unsigned long)lock;
         spin_lock(&res->lr_lock);
 
         /* FIXME: We may want to optimize by checking lr_most_restr */
@@ -119,6 +171,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
         if (mode < res->lr_most_restr)
                 res->lr_most_restr = mode;
 
+        /* XXX call the completion call back function */ 
+
         rc = ELDLM_OK;
         GOTO(out, rc);
 
@@ -127,6 +181,29 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
         return rc;
 }
 
+ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, 
+                                     struct ldlm_handle *lockh)
+{
+        struct ldlm_lock *lock;
+        struct ldlm_resource *res = lock->l_resource;
+        ENTRY;
+
+        lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+        list_del(&lock->l_res_link); 
+
+        kmem_cache_free(ldlm_lock_slab, lock); 
+        if (ldlm_resource_put(lock->l_resource)) {
+                EXIT;
+                return 0;
+        }
+        
+        ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted); 
+        if (list_empty(&res->lr_converting))
+                ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted); 
+        
+        return 0;
+}
+
 void ldlm_lock_dump(struct ldlm_lock *lock)
 {
         char ver[128];
index 8b54915..c606170 100644 (file)
@@ -34,6 +34,7 @@ int ldlm_test(struct obd_device *obddev)
         struct ldlm_resource *res;
         __u32 res_id[RES_NAME_SIZE] = {1, 2, 3, 4, 5, 6};
         ldlm_error_t err;
+        struct ldlm_handle h;
 
         ns = ldlm_namespace_new(obddev, 1);
         if (ns == NULL)
@@ -46,11 +47,13 @@ int ldlm_test(struct obd_device *obddev)
         res->lr_blocking = ldlm_test_callback;
 
         /* Get a couple of read locks */
-        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, 
+                                      LCK_CR, &h);
         if (err != ELDLM_OK)
                 BUG();
 
-        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR);
+        err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, 
+                                      LCK_CR, &h);
         if (err != ELDLM_OK)
                 BUG();
 
index 94af251..e613c03 100644 (file)
@@ -193,8 +193,9 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
        return res;
 }
 
-void ldlm_resource_put(struct ldlm_resource *res)
+int ldlm_resource_put(struct ldlm_resource *res)
 {
+        int rc = 0; 
         ldlm_lock(res->lr_namespace->ns_obddev);
 
         if (atomic_dec_and_test(&res->lr_refcount)) {
@@ -212,9 +213,10 @@ void ldlm_resource_put(struct ldlm_resource *res)
                 list_del(&res->lr_childof);
 
                 kmem_cache_free(ldlm_resource_slab, res);
+                rc = 1;
         }
-
         ldlm_unlock(res->lr_namespace->ns_obddev);
+        return rc; 
 }
 
 int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
diff --git a/lustre/obdclass/pack.c b/lustre/obdclass/pack.c
deleted file mode 100644 (file)
index 07ad20a..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *  pack.c
- *  Copyright (C) 2001  Cluster File Systems, Inc.
- *
- *  This code is issued under the GNU General Public License.
- *  See the file COPYING in this distribution
- *
- *  
- */
-
-
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/fs.h>
-#include <linux/sched.h>
-#include <asm/uaccess.h>
-
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/obd_lib.h>
-
-
-void obd_req_pack(char **buf, int max, struct obd_req *req)
-{
-       char *ptr;
-
-       ptr = *buf;
-
-       LOGP(ptr, struct obdo, obd req->oa);
-       LOGP(ptr, struct obd_conn, obd req->obd);
-
-}
index 7e9d609..10a595b 100644 (file)
@@ -678,7 +678,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
                 RETURN(-EINVAL);
         }
 
-        ost->ost_service = ptlrpc_init_svc( 64 * 1024,
+        ost->ost_service = ptlrpc_init_svc( 2 * 1024, 
                                             OST_REQUEST_PORTAL,
                                             OSC_REPLY_PORTAL,
                                             "self",
index 0edfe1a..463266a 100644 (file)
@@ -99,15 +99,19 @@ int server_request_callback(ptl_event_t *ev, void *data)
          */
 
         spin_lock(&service->srv_lock); 
-        service->srv_ref_count[service->srv_md_active]++;
+        if ( ev->mem_desc.start != 
+             service->srv_md[service->srv_md_active].start ) {
+                BUG();
+        }
 
+        service->srv_ref_count[service->srv_md_active]++;
         CDEBUG(D_INODE, "event offset %d buf size %d\n", 
                ev->offset, service->srv_buf_size);
         if (ev->offset >= (service->srv_buf_size - 1024)) {
-                CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active);
+                CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active);
 
-                rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]);
-                service->srv_me_h[service->srv_me_active] = 0;
+                rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]);
+                service->srv_me_h[service->srv_md_active] = 0;
 
                 if (rc != PTL_OK) {
                         CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
@@ -116,12 +120,14 @@ int server_request_callback(ptl_event_t *ev, void *data)
                         return rc;
                 }
 
-                service->srv_me_active = NEXT_INDEX(service->srv_me_active,
-                                                    service->srv_ring_length);
+                service->srv_md_active = (service->srv_md_active + 1) % 
+                        service->srv_ring_length;
 
-                if (service->srv_me_h[service->srv_me_active] == 0)
+                if (service->srv_me_h[service->srv_md_active] == 0) { 
                         CERROR("All %d ring ME's are unlinked!\n",
                                service->srv_ring_length);
+                        BUG();
+                }
         }
 
         spin_unlock(&service->srv_lock); 
index 4a32a08..3e31957 100644 (file)
@@ -335,7 +335,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         return rc;
 }
 
-/* ptl_received_rpc() should be called by the sleeping process once
+/* ptl_handled_rpc() should be called by the sleeping process once
  * it finishes processing an event.  This ensures the ref count is
  * decremented and that the rpc ring buffer cycles properly.
  */ 
@@ -344,7 +344,7 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
         int rc, index = 0;
 
         spin_lock(&service->srv_lock);
-        /* XXX this is wrong must find index on which request arrived!!!*/ 
+
         while (index < service->srv_ring_length) {
                 if ( service->srv_md[index].start == start) 
                         break;
@@ -360,23 +360,22 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
         if (service->srv_ref_count[index] < 0)
                 BUG();
         
-        if ((service->srv_ref_count[index] == 0) &&
-            (service->srv_me_h[index] == 0)) {
+        if (service->srv_ref_count[index] == 0 &&
+            service->srv_me_h[index] == 0) {
 
                 /* Replace the unlinked ME and MD */
                 rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
                                  service->srv_id, 0, ~0, PTL_RETAIN,
                                  PTL_INS_AFTER, &(service->srv_me_h[index]));
-                CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
-                service->srv_me_tail = index;
-                service->srv_ref_count[index] = 0;
-                
                 if (rc != PTL_OK) {
                         CERROR("PtlMEInsert failed: %d\n", rc);
                         BUG();
                         spin_unlock(&service->srv_lock);
                         return rc;
                 }
+                CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
+
+                service->srv_me_tail = index;
 
                 service->srv_md[index].start        = service->srv_buf[index];
                 service->srv_md[index].length       = service->srv_buf_size;
@@ -397,9 +396,6 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
                         spin_unlock(&service->srv_lock);
                         return rc;
                 }
-
-                service->srv_md_active =
-                        NEXT_INDEX(index, service->srv_ring_length);
         } 
         
         spin_unlock(&service->srv_lock);
index 652da36..cb01cac 100644 (file)
@@ -274,7 +274,6 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid)
         }
 
         service->srv_ring_length = RPC_RING_LENGTH;
-        service->srv_me_active = 0;
         service->srv_md_active = 0;
 
         service->srv_id.addr_kind = PTL_ADDR_GID;