ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obbdev, __u32 ns_id,
struct ldlm_resource *parent_res,
struct ldlm_lock *parent_lock,
- __u32 *res_id, ldlm_mode_t mode);
+ __u32 *res_id, ldlm_mode_t mode,
+ struct ldlm_handle *);
void ldlm_lock_dump(struct ldlm_lock *lock);
/* ldlm_test.c */
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u32 *name, int create);
-void ldlm_resource_put(struct ldlm_resource *res);
+int ldlm_resource_put(struct ldlm_resource *res);
#endif /* __KERNEL__ */
/* default rpc ring length */
#define RPC_RING_LENGTH 2
-/* generic wrappable next */
-#define NEXT_INDEX(index, max) (((index+1) >= max) ? 0 : (index+1))
-
#define SVC_STOPPING 1
#define SVC_RUNNING 2
#define SVC_STOPPED 4
/* FIXME: perhaps a list of EQs, if multiple NIs are used? */
char *srv_buf[RPC_RING_LENGTH];
__u32 srv_buf_size;
- __u32 srv_me_active;
__u32 srv_me_tail;
__u32 srv_md_active;
__u32 srv_ring_length;
* See the file COPYING in this distribution
*
* by Cluster File Systems, Inc.
+ * authors, Peter Braam <braam@clusterfs.com> &
+ * Phil Schwan <phil@clusterfs.com>
*/
#define EXPORT_SYMTAB
int rc = 0;
list_for_each(tmp, list) {
- struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock,
- l_res_link);
+ struct ldlm_lock *lock;
+ lock = list_entry(tmp, struct ldlm_lock, l_res_link);
if (lockmode_compat(lock->l_req_mode, new->l_req_mode))
continue;
return rc;
}
+
+static int ldlm_reprocess_queue(struct list_head *queue,
+ struct list_head *granted_list)
+{
+ struct list_head *tmp1, *tmp2;
+ struct ldlm_resource *res;
+ int rc = 0;
+
+ list_for_each(tmp1, queue) {
+ struct ldlm_lock *pending;
+ rc = 0;
+ pending = list_entry(tmp1, struct ldlm_lock, l_res_link);
+
+ /* check if pending can go in ... */
+ list_for_each(tmp2, granted_list) {
+ struct ldlm_lock *lock;
+ lock = list_entry(tmp2, struct ldlm_lock, l_res_link);
+ if (lockmode_compat(lock->l_granted_mode,
+ pending->l_req_mode))
+ continue;
+ else {
+ /* no, we are done */
+ rc = 1;
+ break;
+ }
+ }
+
+ if (rc) {
+ /* no - we are done */
+ break;
+ }
+
+ res = pending->l_resource;
+ list_del(&pending->l_res_link);
+ list_add(&pending->l_res_link, &res->lr_granted);
+ pending->l_granted_mode = pending->l_req_mode;
+
+ if (pending->l_granted_mode < res->lr_most_restr)
+ res->lr_most_restr = pending->l_granted_mode;
+
+ /* XXX call completion here */
+
+
+ }
+
+ return rc;
+}
+
ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id,
struct ldlm_resource *parent_res,
struct ldlm_lock *parent_lock,
- __u32 *res_id, ldlm_mode_t mode)
+ __u32 *res_id, ldlm_mode_t mode,
+ struct ldlm_handle *lockh)
{
struct ldlm_namespace *ns;
struct ldlm_resource *res;
if (lock == NULL)
BUG();
+ lockh->addr = (__u64)(unsigned long)lock;
spin_lock(&res->lr_lock);
/* FIXME: We may want to optimize by checking lr_most_restr */
if (mode < res->lr_most_restr)
res->lr_most_restr = mode;
+ /* XXX call the completion call back function */
+
rc = ELDLM_OK;
GOTO(out, rc);
return rc;
}
+ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev,
+ struct ldlm_handle *lockh)
+{
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res = lock->l_resource;
+ ENTRY;
+
+ lock = (struct ldlm_lock *)(unsigned long)lockh->addr;
+ list_del(&lock->l_res_link);
+
+ kmem_cache_free(ldlm_lock_slab, lock);
+ if (ldlm_resource_put(lock->l_resource)) {
+ EXIT;
+ return 0;
+ }
+
+ ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted);
+ if (list_empty(&res->lr_converting))
+ ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted);
+
+ return 0;
+}
+
void ldlm_lock_dump(struct ldlm_lock *lock)
{
char ver[128];
struct ldlm_resource *res;
__u32 res_id[RES_NAME_SIZE] = {1, 2, 3, 4, 5, 6};
ldlm_error_t err;
+ struct ldlm_handle h;
ns = ldlm_namespace_new(obddev, 1);
if (ns == NULL)
res->lr_blocking = ldlm_test_callback;
/* Get a couple of read locks */
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id,
+ LCK_CR, &h);
if (err != ELDLM_OK)
BUG();
- err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR);
+ err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id,
+ LCK_CR, &h);
if (err != ELDLM_OK)
BUG();
return res;
}
-void ldlm_resource_put(struct ldlm_resource *res)
+int ldlm_resource_put(struct ldlm_resource *res)
{
+ int rc = 0;
ldlm_lock(res->lr_namespace->ns_obddev);
if (atomic_dec_and_test(&res->lr_refcount)) {
list_del(&res->lr_childof);
kmem_cache_free(ldlm_resource_slab, res);
+ rc = 1;
}
-
ldlm_unlock(res->lr_namespace->ns_obddev);
+ return rc;
}
int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h)
+++ /dev/null
-/*
- * pack.c
- * Copyright (C) 2001 Cluster File Systems, Inc.
- *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
- *
- *
- */
-
-
-#include <linux/mm.h>
-#include <linux/pagemap.h>
-#include <linux/fs.h>
-#include <linux/sched.h>
-#include <asm/uaccess.h>
-
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/obd_lib.h>
-
-
-void obd_req_pack(char **buf, int max, struct obd_req *req)
-{
- char *ptr;
-
- ptr = *buf;
-
- LOGP(ptr, struct obdo, obd req->oa);
- LOGP(ptr, struct obd_conn, obd req->obd);
-
-}
RETURN(-EINVAL);
}
- ost->ost_service = ptlrpc_init_svc( 64 * 1024,
+ ost->ost_service = ptlrpc_init_svc( 2 * 1024,
OST_REQUEST_PORTAL,
OSC_REPLY_PORTAL,
"self",
*/
spin_lock(&service->srv_lock);
- service->srv_ref_count[service->srv_md_active]++;
+ if ( ev->mem_desc.start !=
+ service->srv_md[service->srv_md_active].start ) {
+ BUG();
+ }
+ service->srv_ref_count[service->srv_md_active]++;
CDEBUG(D_INODE, "event offset %d buf size %d\n",
ev->offset, service->srv_buf_size);
if (ev->offset >= (service->srv_buf_size - 1024)) {
- CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active);
+ CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active);
- rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]);
- service->srv_me_h[service->srv_me_active] = 0;
+ rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]);
+ service->srv_me_h[service->srv_md_active] = 0;
if (rc != PTL_OK) {
CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
return rc;
}
- service->srv_me_active = NEXT_INDEX(service->srv_me_active,
- service->srv_ring_length);
+ service->srv_md_active = (service->srv_md_active + 1) %
+ service->srv_ring_length;
- if (service->srv_me_h[service->srv_me_active] == 0)
+ if (service->srv_me_h[service->srv_md_active] == 0) {
CERROR("All %d ring ME's are unlinked!\n",
service->srv_ring_length);
+ BUG();
+ }
}
spin_unlock(&service->srv_lock);
return rc;
}
-/* ptl_received_rpc() should be called by the sleeping process once
+/* ptl_handled_rpc() should be called by the sleeping process once
* it finishes processing an event. This ensures the ref count is
* decremented and that the rpc ring buffer cycles properly.
*/
int rc, index = 0;
spin_lock(&service->srv_lock);
- /* XXX this is wrong must find index on which request arrived!!!*/
+
while (index < service->srv_ring_length) {
if ( service->srv_md[index].start == start)
break;
if (service->srv_ref_count[index] < 0)
BUG();
- if ((service->srv_ref_count[index] == 0) &&
- (service->srv_me_h[index] == 0)) {
+ if (service->srv_ref_count[index] == 0 &&
+ service->srv_me_h[index] == 0) {
/* Replace the unlinked ME and MD */
rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
service->srv_id, 0, ~0, PTL_RETAIN,
PTL_INS_AFTER, &(service->srv_me_h[index]));
- CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
- service->srv_me_tail = index;
- service->srv_ref_count[index] = 0;
-
if (rc != PTL_OK) {
CERROR("PtlMEInsert failed: %d\n", rc);
BUG();
spin_unlock(&service->srv_lock);
return rc;
}
+ CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
+
+ service->srv_me_tail = index;
service->srv_md[index].start = service->srv_buf[index];
service->srv_md[index].length = service->srv_buf_size;
spin_unlock(&service->srv_lock);
return rc;
}
-
- service->srv_md_active =
- NEXT_INDEX(index, service->srv_ring_length);
}
spin_unlock(&service->srv_lock);
}
service->srv_ring_length = RPC_RING_LENGTH;
- service->srv_me_active = 0;
service->srv_md_active = 0;
service->srv_id.addr_kind = PTL_ADDR_GID;