From: braam Date: Wed, 20 Mar 2002 23:47:14 +0000 (+0000) Subject: - updated documentation (introduction) X-Git-Tag: v1_7_100~5892 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=294e6a9a2d3e9646e076fde08ea4aea21ef6a8aa;p=fs%2Flustre-release.git - updated documentation (introduction) - race fix in md_active - new lock stuff --- diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index da191eb..c2e680d 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -157,7 +157,8 @@ extern struct obd_ops ldlm_obd_ops; ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obbdev, __u32 ns_id, struct ldlm_resource *parent_res, struct ldlm_lock *parent_lock, - __u32 *res_id, ldlm_mode_t mode); + __u32 *res_id, ldlm_mode_t mode, + struct ldlm_handle *); void ldlm_lock_dump(struct ldlm_lock *lock); /* ldlm_test.c */ @@ -170,7 +171,7 @@ void ldlm_resource_dump(struct ldlm_resource *res); struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent, __u32 *name, int create); -void ldlm_resource_put(struct ldlm_resource *res); +int ldlm_resource_put(struct ldlm_resource *res); #endif /* __KERNEL__ */ diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index f30f0b8..af1bf71 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -53,9 +53,6 @@ /* default rpc ring length */ #define RPC_RING_LENGTH 2 -/* generic wrappable next */ -#define NEXT_INDEX(index, max) (((index+1) >= max) ? 0 : (index+1)) - #define SVC_STOPPING 1 #define SVC_RUNNING 2 #define SVC_STOPPED 4 @@ -159,7 +156,6 @@ struct ptlrpc_service { /* FIXME: perhaps a list of EQs, if multiple NIs are used? */ char *srv_buf[RPC_RING_LENGTH]; __u32 srv_buf_size; - __u32 srv_me_active; __u32 srv_me_tail; __u32 srv_md_active; __u32 srv_ring_length; diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 285e16b..f1f4720 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -7,6 +7,8 @@ * See the file COPYING in this distribution * * by Cluster File Systems, Inc. + * authors, Peter Braam & + * Phil Schwan */ #define EXPORT_SYMTAB @@ -55,8 +57,8 @@ static int ldlm_notify_incompatible(struct list_head *list, int rc = 0; list_for_each(tmp, list) { - struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, - l_res_link); + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); if (lockmode_compat(lock->l_req_mode, new->l_req_mode)) continue; @@ -69,10 +71,59 @@ static int ldlm_notify_incompatible(struct list_head *list, return rc; } + +static int ldlm_reprocess_queue(struct list_head *queue, + struct list_head *granted_list) +{ + struct list_head *tmp1, *tmp2; + struct ldlm_resource *res; + int rc = 0; + + list_for_each(tmp1, queue) { + struct ldlm_lock *pending; + rc = 0; + pending = list_entry(tmp1, struct ldlm_lock, l_res_link); + + /* check if pending can go in ... */ + list_for_each(tmp2, granted_list) { + struct ldlm_lock *lock; + lock = list_entry(tmp2, struct ldlm_lock, l_res_link); + if (lockmode_compat(lock->l_granted_mode, + pending->l_req_mode)) + continue; + else { + /* no, we are done */ + rc = 1; + break; + } + } + + if (rc) { + /* no - we are done */ + break; + } + + res = pending->l_resource; + list_del(&pending->l_res_link); + list_add(&pending->l_res_link, &res->lr_granted); + pending->l_granted_mode = pending->l_req_mode; + + if (pending->l_granted_mode < res->lr_most_restr) + res->lr_most_restr = pending->l_granted_mode; + + /* XXX call completion here */ + + + } + + return rc; +} + ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, struct ldlm_resource *parent_res, struct ldlm_lock *parent_lock, - __u32 *res_id, ldlm_mode_t mode) + __u32 *res_id, ldlm_mode_t mode, + struct ldlm_handle *lockh) { struct ldlm_namespace *ns; struct ldlm_resource *res; @@ -93,6 +144,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, if (lock == NULL) BUG(); + lockh->addr = (__u64)(unsigned long)lock; spin_lock(&res->lr_lock); /* FIXME: We may want to optimize by checking lr_most_restr */ @@ -119,6 +171,8 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, if (mode < res->lr_most_restr) res->lr_most_restr = mode; + /* XXX call the completion call back function */ + rc = ELDLM_OK; GOTO(out, rc); @@ -127,6 +181,29 @@ ldlm_error_t ldlm_local_lock_enqueue(struct obd_device *obddev, __u32 ns_id, return rc; } +ldlm_error_t ldlm_local_lock_cancel(struct obd_device *obddev, + struct ldlm_handle *lockh) +{ + struct ldlm_lock *lock; + struct ldlm_resource *res = lock->l_resource; + ENTRY; + + lock = (struct ldlm_lock *)(unsigned long)lockh->addr; + list_del(&lock->l_res_link); + + kmem_cache_free(ldlm_lock_slab, lock); + if (ldlm_resource_put(lock->l_resource)) { + EXIT; + return 0; + } + + ldlm_reprocess_queue(&res->lr_converting, &res->lr_granted); + if (list_empty(&res->lr_converting)) + ldlm_reprocess_queue(&res->lr_waiting, &res->lr_granted); + + return 0; +} + void ldlm_lock_dump(struct ldlm_lock *lock) { char ver[128]; diff --git a/lustre/ldlm/ldlm_test.c b/lustre/ldlm/ldlm_test.c index 8b54915..c606170 100644 --- a/lustre/ldlm/ldlm_test.c +++ b/lustre/ldlm/ldlm_test.c @@ -34,6 +34,7 @@ int ldlm_test(struct obd_device *obddev) struct ldlm_resource *res; __u32 res_id[RES_NAME_SIZE] = {1, 2, 3, 4, 5, 6}; ldlm_error_t err; + struct ldlm_handle h; ns = ldlm_namespace_new(obddev, 1); if (ns == NULL) @@ -46,11 +47,13 @@ int ldlm_test(struct obd_device *obddev) res->lr_blocking = ldlm_test_callback; /* Get a couple of read locks */ - err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, + LCK_CR, &h); if (err != ELDLM_OK) BUG(); - err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, LCK_CR); + err = ldlm_local_lock_enqueue(obddev, 1, NULL, NULL, res_id, + LCK_CR, &h); if (err != ELDLM_OK) BUG(); diff --git a/lustre/ldlm/resource.c b/lustre/ldlm/resource.c index 94af251..e613c03 100644 --- a/lustre/ldlm/resource.c +++ b/lustre/ldlm/resource.c @@ -193,8 +193,9 @@ struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns, return res; } -void ldlm_resource_put(struct ldlm_resource *res) +int ldlm_resource_put(struct ldlm_resource *res) { + int rc = 0; ldlm_lock(res->lr_namespace->ns_obddev); if (atomic_dec_and_test(&res->lr_refcount)) { @@ -212,9 +213,10 @@ void ldlm_resource_put(struct ldlm_resource *res) list_del(&res->lr_childof); kmem_cache_free(ldlm_resource_slab, res); + rc = 1; } - ldlm_unlock(res->lr_namespace->ns_obddev); + return rc; } int ldlm_get_resource_handle(struct ldlm_resource *res, struct ldlm_handle *h) diff --git a/lustre/obdclass/pack.c b/lustre/obdclass/pack.c deleted file mode 100644 index 07ad20a..0000000 --- a/lustre/obdclass/pack.c +++ /dev/null @@ -1,32 +0,0 @@ -/* - * pack.c - * Copyright (C) 2001 Cluster File Systems, Inc. - * - * This code is issued under the GNU General Public License. - * See the file COPYING in this distribution - * - * - */ - - -#include -#include -#include -#include -#include - -#include -#include -#include - - -void obd_req_pack(char **buf, int max, struct obd_req *req) -{ - char *ptr; - - ptr = *buf; - - LOGP(ptr, struct obdo, obd req->oa); - LOGP(ptr, struct obd_conn, obd req->obd); - -} diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 7e9d609..10a595b 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -678,7 +678,7 @@ static int ost_setup(struct obd_device *obddev, obd_count len, RETURN(-EINVAL); } - ost->ost_service = ptlrpc_init_svc( 64 * 1024, + ost->ost_service = ptlrpc_init_svc( 2 * 1024, OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, "self", diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 0edfe1a..463266a 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -99,15 +99,19 @@ int server_request_callback(ptl_event_t *ev, void *data) */ spin_lock(&service->srv_lock); - service->srv_ref_count[service->srv_md_active]++; + if ( ev->mem_desc.start != + service->srv_md[service->srv_md_active].start ) { + BUG(); + } + service->srv_ref_count[service->srv_md_active]++; CDEBUG(D_INODE, "event offset %d buf size %d\n", ev->offset, service->srv_buf_size); if (ev->offset >= (service->srv_buf_size - 1024)) { - CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_me_active); + CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active); - rc = PtlMEUnlink(service->srv_me_h[service->srv_me_active]); - service->srv_me_h[service->srv_me_active] = 0; + rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]); + service->srv_me_h[service->srv_md_active] = 0; if (rc != PTL_OK) { CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc); @@ -116,12 +120,14 @@ int server_request_callback(ptl_event_t *ev, void *data) return rc; } - service->srv_me_active = NEXT_INDEX(service->srv_me_active, - service->srv_ring_length); + service->srv_md_active = (service->srv_md_active + 1) % + service->srv_ring_length; - if (service->srv_me_h[service->srv_me_active] == 0) + if (service->srv_me_h[service->srv_md_active] == 0) { CERROR("All %d ring ME's are unlinked!\n", service->srv_ring_length); + BUG(); + } } spin_unlock(&service->srv_lock); diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index 4a32a08..3e31957 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -335,7 +335,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer) return rc; } -/* ptl_received_rpc() should be called by the sleeping process once +/* ptl_handled_rpc() should be called by the sleeping process once * it finishes processing an event. This ensures the ref count is * decremented and that the rpc ring buffer cycles properly. */ @@ -344,7 +344,7 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) int rc, index = 0; spin_lock(&service->srv_lock); - /* XXX this is wrong must find index on which request arrived!!!*/ + while (index < service->srv_ring_length) { if ( service->srv_md[index].start == start) break; @@ -360,23 +360,22 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) if (service->srv_ref_count[index] < 0) BUG(); - if ((service->srv_ref_count[index] == 0) && - (service->srv_me_h[index] == 0)) { + if (service->srv_ref_count[index] == 0 && + service->srv_me_h[index] == 0) { /* Replace the unlinked ME and MD */ rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail], service->srv_id, 0, ~0, PTL_RETAIN, PTL_INS_AFTER, &(service->srv_me_h[index])); - CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc); - service->srv_me_tail = index; - service->srv_ref_count[index] = 0; - if (rc != PTL_OK) { CERROR("PtlMEInsert failed: %d\n", rc); BUG(); spin_unlock(&service->srv_lock); return rc; } + CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc); + + service->srv_me_tail = index; service->srv_md[index].start = service->srv_buf[index]; service->srv_md[index].length = service->srv_buf_size; @@ -397,9 +396,6 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start) spin_unlock(&service->srv_lock); return rc; } - - service->srv_md_active = - NEXT_INDEX(index, service->srv_ring_length); } spin_unlock(&service->srv_lock); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 652da36..cb01cacd 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -274,7 +274,6 @@ int rpc_register_service(struct ptlrpc_service *service, char *uuid) } service->srv_ring_length = RPC_RING_LENGTH; - service->srv_me_active = 0; service->srv_md_active = 0; service->srv_id.addr_kind = PTL_ADDR_GID;