*/
struct ldlm_namespace {
- struct ptlrpc_client ns_client; /* used for revocation callbacks */
- __u32 ns_local; /* is this a local lock tree? */
+ char *ns_name;
+ struct ptlrpc_client ns_rpc_client; /* used for revocation callbacks */
+ __u32 ns_client; /* is this a client-side lock tree? */
struct list_head *ns_hash; /* hash table for ns */
__u32 ns_refcount; /* count of resources in the hash */
- struct list_head ns_root_list; /* all root resources in ns */
+ struct list_head ns_root_list; /* all root resources in ns */
spinlock_t ns_lock; /* protects hash, refcount, list */
};
struct ldlm_lock;
typedef int (*ldlm_lock_callback)(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len);
+ void *data, __u32 data_len,
+ struct ptlrpc_request **req);
struct ldlm_lock {
struct ldlm_resource *l_resource;
__u32 lr_version[RES_VERSION_SIZE];
__u32 lr_refcount;
spinlock_t lr_lock; /* protects lists, refcount */
+ void *lr_tmp;
};
static inline struct ldlm_extent *ldlm_res2extent(struct ldlm_resource *res)
extern struct obd_ops ldlm_obd_ops;
+#define LDLM_DEBUG(lock, format, a...) \
+do { \
+ CDEBUG(D_DLMTRACE, "### " format \
+ " (%s: lock %p mode %d/%d on res %Lu (rc %d) " \
+ " type %d remote %Lx)\n", ## a, \
+ lock->l_resource->lr_namespace->ns_name, lock, \
+ lock->l_granted_mode, lock->l_req_mode, \
+ lock->l_resource->lr_name[0], \
+ lock->l_resource->lr_refcount, \
+ lock->l_resource->lr_type, \
+ lock->l_remote_handle.addr); \
+} while (0)
+
+#define LDLM_DEBUG_NOLOCK(format, a...) \
+ CDEBUG(D_DLMTRACE, "### " format "\n", ## a);
+
+
/* ldlm_extent.c */
int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, void *);
int ldlm_test(struct obd_device *device, struct ptlrpc_connection *conn);
/* resource.c */
-struct ldlm_namespace *ldlm_namespace_new(__u32 local);
+struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local);
int ldlm_namespace_free(struct ldlm_namespace *ns);
-/* resourc.c - internal */
+/* resource.c - internal */
struct ldlm_resource *ldlm_resource_get(struct ldlm_namespace *ns,
struct ldlm_resource *parent,
__u64 *name, __u32 type, int create);
__u32 data_len,
struct lustre_handle *lockh);
int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len);
+ void *data, __u32 data_len, struct ptlrpc_request **reqp);
int ldlm_cli_convert(struct ptlrpc_client *, struct lustre_handle *,
int new_mode, int *flags);
int ldlm_cli_cancel(struct ptlrpc_client *, struct ldlm_lock *);
-
#endif /* __KERNEL__ */
/* ioctls for trying requests */
/* mds/handler.c */
struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt);
int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, int data_len);
+ void *data, int data_len, struct ptlrpc_request **req);
int mds_reint(int offset, struct ptlrpc_request *req);
/* mdc/mdc_request.c */
struct ptlrpc_request {
int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
struct list_head rq_list;
+ struct list_head rq_multi;
struct obd_device *rq_obd;
int rq_status;
int rq_flags;
struct ldlm_reply *rep;
struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
__u32 type = lock->l_resource->lr_type;
- __u64 new_resid[3] = {0, 0, 0};
+ __u64 new_resid[3] = {0, 0, 0}, old_res;
int bufcount, rc, size[3] = {sizeof(struct ldlm_reply),
sizeof(struct mds_body),
sizeof(struct obdo)};
it->opc = NTOH__u64(it->opc);
+ LDLM_DEBUG(lock, "intent policy, opc: %Ld", it->opc);
+
+ /* prepare reply */
switch(it->opc) {
case IT_GETATTR:
/* Note that in the negative case you may be returning
case IT_RENAME:
bufcount = 3;
break;
- default:
+ case IT_UNLINK:
bufcount = 2;
+ size[1] = sizeof(struct obdo);
+ break;
+ default:
+ LBUG();
}
rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
rep = lustre_msg_buf(req->rq_repmsg, 0);
rep->lock_policy_res1 = 1;
+
+ /* execute policy */
switch ( it->opc ) {
case IT_CREAT:
case IT_CREAT|IT_OPEN:
case IT_SYMLINK:
case IT_MKNOD:
case IT_LINK:
+ case IT_UNLINK:
case IT_RENAME2:
if (mds_reint_p == NULL)
mds_reint_p =
LBUG();
}
+ if (it->opc == IT_UNLINK || it->opc == IT_RMDIR)
+ RETURN(ELDLM_LOCK_ABORTED);
+
mds_rep = lustre_msg_buf(req->rq_repmsg, 1);
rep->lock_policy_res2 = req->rq_status;
new_resid[0] = mds_rep->ino;
+ old_res = lock->l_resource->lr_name[0];
CDEBUG(D_INFO, "remote intent: locking %d instead of"
- "%ld\n", mds_rep->ino,
- (long)lock->l_resource->lr_name[0]);
+ "%ld\n", mds_rep->ino, (long)old_res);
ldlm_resource_put(lock->l_resource);
lock->l_resource =
LBUG();
RETURN(-ENOMEM);
}
+ LDLM_DEBUG(lock, "intent policy, old res %ld",
+ (long)old_res);
RETURN(ELDLM_LOCK_CHANGED);
} else {
int size = sizeof(struct ldlm_reply);
spin_unlock(&lock->l_lock);
}
-void ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
+int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new)
{
+ struct ptlrpc_request *req = NULL;
ENTRY;
spin_lock(&lock->l_lock);
if (lock->l_flags & LDLM_FL_AST_SENT) {
- EXIT;
- return;
+ RETURN(0);
}
lock->l_flags |= LDLM_FL_AST_SENT;
- spin_unlock(&lock->l_lock);
- lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len);
- EXIT;
+ lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req);
+ spin_unlock(&lock->l_lock);
+ if (req != NULL) {
+ struct list_head *list = lock->l_resource->lr_tmp;
+ list_add(&req->rq_multi, list);
+ }
+ RETURN(1);
}
/* Args: unlocked lock */
if (!lock->l_readers && !lock->l_writers &&
lock->l_flags & LDLM_FL_DYING) {
/* Read this lock its rights. */
- if (!lock->l_resource->lr_namespace->ns_local) {
+ if (!lock->l_resource->lr_namespace->ns_client) {
CERROR("LDLM_FL_DYING set on non-local lock!\n");
LBUG();
}
CDEBUG(D_INFO, "final decref done on dying lock, "
"calling callback.\n");
spin_unlock(&lock->l_lock);
+ /* This function pointer is unfortunately overloaded. This
+ * call will not result in an RPC. */
lock->l_blocking_ast(lock, NULL, lock->l_data,
- lock->l_data_len);
+ lock->l_data_len, NULL);
} else
spin_unlock(&lock->l_lock);
EXIT;
}
-/* Args: locked lock */
+/* Args: unlocked lock */
static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs,
struct list_head *queue)
{
CDEBUG(D_OTHER, "compat function failed and lock modes incompat\n");
if (send_cbs && child->l_blocking_ast != NULL) {
CDEBUG(D_OTHER, "incompatible; sending blocking AST.\n");
+ /* It's very difficult to actually send the AST from
+ * here, because we'd have to drop the lock before going
+ * to sleep to wait for the reply. Instead we build the
+ * packet and send it later. */
ldlm_send_blocking_ast(child, lock);
}
}
res->lr_most_restr = lock->l_granted_mode;
if (lock->l_completion_ast)
- lock->l_completion_ast(lock, NULL,
- lock->l_data, lock->l_data_len);
+ lock->l_completion_ast(lock, NULL, lock->l_data,
+ lock->l_data_len, NULL);
EXIT;
}
lock = lustre_handle2object(lockh);
res = lock->l_resource;
- local = res->lr_namespace->ns_local;
+ local = res->lr_namespace->ns_client;
spin_lock(&res->lr_lock);
lock->l_blocking_ast = blocking;
res = lock->l_resource;
*flags |= LDLM_FL_LOCK_CHANGED;
}
- if (rc < 0) {
+ if (rc == ELDLM_LOCK_ABORTED) {
/* Abort. */
ldlm_resource_put(lock->l_resource);
ldlm_lock_free(lock);
struct ldlm_lock *pending;
pending = list_entry(tmp, struct ldlm_lock, l_res_link);
+ CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
+
/* the resource lock protects ldlm_lock_compat */
if (ldlm_lock_compat(pending, 1))
RETURN(1);
/* Must be called with resource->lr_lock not taken. */
void ldlm_reprocess_all(struct ldlm_resource *res)
{
+ struct list_head rpc_list, *tmp, *pos;
+
+ INIT_LIST_HEAD(&rpc_list);
+
/* Local lock trees don't get reprocessed. */
- if (res->lr_namespace->ns_local)
+ if (res->lr_namespace->ns_client)
return;
spin_lock(&res->lr_lock);
+ res->lr_tmp = &rpc_list;
+
ldlm_reprocess_queue(res, &res->lr_converting);
if (list_empty(&res->lr_converting))
ldlm_reprocess_queue(res, &res->lr_waiting);
+
+ res->lr_tmp = NULL;
spin_unlock(&res->lr_lock);
+
+ list_for_each_safe(tmp, pos, &rpc_list) {
+ int rc;
+ struct ptlrpc_request *req =
+ list_entry(tmp, struct ptlrpc_request, rq_multi);
+
+ CDEBUG(D_INFO, "Sending callback.\n");
+
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ if (rc)
+ CERROR("Callback send failed: %d\n", rc);
+ }
}
/* Must be called with lock and lock->l_resource unlocked */
list_del_init(&lock->l_res_link);
/* If this is a local resource, put it on the appropriate list. */
- if (res->lr_namespace->ns_local) {
+ if (res->lr_namespace->ns_client) {
if (*flags & LDLM_FL_BLOCK_CONV)
ldlm_resource_add_lock(res, res->lr_converting.prev,
lock);
CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
- CDEBUG(D_OTHER, " Resource: %p\n", lock->l_resource);
+ CDEBUG(D_OTHER, " Resource: %p (%Ld)\n", lock->l_resource,
+ lock->l_resource->lr_name[0]);
CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
(int)lock->l_req_mode, (int)lock->l_granted_mode);
CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req);
extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req);
-#define LOOPBACK(x) (((x) & cpu_to_be32(0xff000000)) == cpu_to_be32(0x7f000000))
-
-static int is_local_conn(struct ptlrpc_connection *conn)
-{
- ENTRY;
- if (conn == NULL)
- RETURN(1);
-
- RETURN(LOOPBACK(conn->c_peer.peer_nid));
-}
-
/* _ldlm_callback and local_callback setup the variables then call this common
* code */
static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- ldlm_mode_t mode, void *data, __u32 data_len)
+ ldlm_mode_t mode, void *data, __u32 data_len,
+ struct ptlrpc_request **reqp)
{
ENTRY;
"callback (%p).\n", lock->l_blocking_ast);
if (lock->l_blocking_ast != NULL)
lock->l_blocking_ast(lock, new, lock->l_data,
- lock->l_data_len);
+ lock->l_data_len, reqp);
} else {
CDEBUG(D_INFO, "Lock still has references; lock will be"
" cancelled later.\n");
RETURN(0);
}
-/* FIXME: I think that this is no longer necessary. */
-static int local_callback(struct ldlm_lock *l, struct ldlm_lock *new,
- void *data, __u32 data_len)
-{
- struct ldlm_lock *lock;
- /* the 'remote handle' is the lock in the FS's namespace */
- lock = lustre_handle2object(&l->l_remote_handle);
-
- return common_callback(lock, new, l->l_granted_mode, data, data_len);
-}
-
static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc,
struct ptlrpc_request *req)
{
void *cookie;
ENTRY;
- /* Is this lock managed locally? */
- if (is_local_conn(req->rq_connection))
- callback = local_callback;
- else
- callback = ldlm_cli_callback;
+ callback = ldlm_cli_callback;
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_MDSINTENT) {
if (err != ELDLM_OK)
GOTO(out, err);
+ lock = lustre_handle2object(&lockh);
+ LDLM_DEBUG(lock, "server-side enqueue handler");
+
flags = dlm_req->lock_flags;
err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags,
callback, callback);
dlm_rep->lock_flags = flags;
memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh));
- lock = lustre_handle2object(&lockh);
if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
memcpy(&dlm_rep->lock_extent, &lock->l_extent,
sizeof(lock->l_extent));
struct ldlm_request *dlm_req;
struct ldlm_reply *dlm_rep;
struct ldlm_resource *res;
+ struct ldlm_lock *lock;
int rc, size = sizeof(*dlm_rep);
ENTRY;
dlm_rep = lustre_msg_buf(req->rq_repmsg, 0);
dlm_rep->lock_flags = dlm_req->lock_flags;
+ lock = lustre_handle2object(&dlm_req->lock_handle1);
+ LDLM_DEBUG(lock, "server-side convert handler");
+
res = ldlm_local_lock_convert(&dlm_req->lock_handle1,
dlm_req->lock_desc.l_req_mode,
&dlm_rep->lock_flags);
dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
lock = lustre_handle2object(&dlm_req->lock_handle1);
+ LDLM_DEBUG(lock, "server-side cancel handler");
res = ldlm_local_lock_cancel(lock);
req->rq_status = 0;
if (ptlrpc_reply(svc, req) != 0)
lock1 = lustre_handle2object(&dlm_req->lock_handle1);
lock2 = lustre_handle2object(&dlm_req->lock_handle2);
+ LDLM_DEBUG(lock1, "client %s callback handler START",
+ lock2 == NULL ? "completion" : "blocked");
+
common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL,
- 0);
+ 0, NULL);
+
+ LDLM_DEBUG_NOLOCK("client %s callback handler END",
+ lock2 == NULL ? "completion" : "blocked");
+
RETURN(0);
}
GOTO(out, rc);
lock = lustre_handle2object(lockh);
+ LDLM_DEBUG(lock, "client-side enqueue START");
if (req == NULL) {
req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL);
ldlm_resource_put(lock->l_resource);
spin_unlock(&lock->l_resource->lr_lock);
ldlm_lock_free(lock);
+ if (rc == ELDLM_LOCK_ABORTED)
+ rc = 0;
GOTO(out, rc);
}
LBUG();
RETURN(-ENOMEM);
}
+ LDLM_DEBUG(lock, "client-side enqueue, new resource");
}
if (!req_passed_in)
rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback,
callback);
+ LDLM_DEBUG(lock, "client-side enqueue END");
if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
LDLM_FL_BLOCK_CONV)) {
/* Go to sleep until the lock is granted. */
}
int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len)
+ void *data, __u32 data_len, struct ptlrpc_request **reqp)
{
struct ldlm_request *body;
struct ptlrpc_request *req;
- struct ptlrpc_client *cl = &lock->l_resource->lr_namespace->ns_client;
+ struct ptlrpc_client *cl =
+ &lock->l_resource->lr_namespace->ns_rpc_client;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1,
+ &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
ldlm_object2handle(new, &body->lock_handle2);
}
+ LDLM_DEBUG(lock, "server preparing %s AST",
+ new == NULL ? "completion" : "blocked");
+
req->rq_replen = lustre_msg_size(0, NULL);
- rc = ptlrpc_queue_wait(req);
- rc = ptlrpc_check_status(req, rc);
- ptlrpc_free_req(req);
+ if (reqp == NULL) {
+ rc = ptlrpc_queue_wait(req);
+ rc = ptlrpc_check_status(req, rc);
+ ptlrpc_free_req(req);
+ } else {
+ *reqp = req;
+ }
EXIT;
out:
lock = lustre_handle2object(lockh);
*flags = 0;
+ LDLM_DEBUG(lock, "client-side convert");
+
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size,
NULL);
if (!req)
int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock)
{
- struct ldlm_request *body;
struct ptlrpc_request *req;
+ struct ldlm_request *body;
struct ldlm_resource *res;
int rc, size = sizeof(*body);
ENTRY;
+ LDLM_DEBUG(lock, "client-side cancel");
req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size,
NULL);
if (!req)
kmem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
-struct ldlm_namespace *ldlm_namespace_new(__u32 local)
+struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
{
struct ldlm_namespace *ns = NULL;
struct list_head *bucket;
GOTO(out, ns);
}
- ptlrpc_init_client(NULL, NULL,
- LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
- &ns->ns_client);
+ OBD_ALLOC(ns->ns_name, strlen(name) + 1);
+ if (!ns->ns_name) {
+ LBUG();
+ GOTO(out, ns);
+ }
+ strcpy(ns->ns_name, name);
+
+ ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
+ &ns->ns_rpc_client);
INIT_LIST_HEAD(&ns->ns_root_list);
ns->ns_lock = SPIN_LOCK_UNLOCKED;
ns->ns_refcount = 0;
- ns->ns_local = local;
+ ns->ns_client = client;
for (bucket = ns->ns_hash + RES_HASH_SIZE - 1; bucket >= ns->ns_hash;
bucket--)
out:
if (ns && ns->ns_hash)
vfree(ns->ns_hash);
+ if (ns && ns->ns_name)
+ OBD_FREE(ns->ns_name, strlen(name) + 1);
if (ns)
OBD_FREE(ns, sizeof(*ns));
return NULL;
static int cleanup_resource(struct ldlm_resource *res, struct list_head *q)
{
struct list_head *tmp, *pos;
- int rc = 0, client = res->lr_namespace->ns_local;
+ int rc = 0, client = res->lr_namespace->ns_client;
ENTRY;
list_for_each_safe(tmp, pos, q) {
}
vfree(ns->ns_hash /* , sizeof(struct list_head) * RES_HASH_SIZE */);
- ptlrpc_cleanup_client(&ns->ns_client);
+ ptlrpc_cleanup_client(&ns->ns_rpc_client);
+ OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
OBD_FREE(ns, sizeof(*ns));
return ELDLM_OK;
/* Must be called with resource->lr_lock taken */
void ldlm_resource_del_lock(struct ldlm_lock *lock)
{
- list_del_init(&lock->l_res_link);
- lock->l_resource->lr_refcount--;
+ if (!list_empty(&lock->l_res_link)) {
+ list_del_init(&lock->l_res_link);
+ lock->l_resource->lr_refcount--;
+ }
}
int ldlm_get_resource_handle(struct ldlm_resource *res, struct lustre_handle *h)
(unsigned long long)res->lr_name[2]);
CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name);
- CDEBUG(D_OTHER, "Namespace: %p\n", res->lr_namespace);
+ CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace,
+ res->lr_namespace->ns_name);
CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root);
CDEBUG(D_OTHER, "Granted locks:\n");
#include <linux/lustre_dlm.h>
static int ldlm_test_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len)
+ void *data, __u32 data_len,
+ struct ptlrpc_request **reqp)
{
printk("ldlm_test_callback: lock=%p, new=%p\n", lock, new);
return 0;
struct lustre_handle lockh_1, lockh_2;
int flags;
- ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+ ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER);
if (ns == NULL)
LBUG();
ldlm_error_t err;
int flags;
- ns = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+ ns = ldlm_namespace_new("test_server", LDLM_NAMESPACE_SERVER);
if (ns == NULL)
LBUG();
rec->ul_opcode = HTON__u32(REINT_UNLINK);
ll_inode2fid(&rec->ul_fid1, inode);
- ll_inode2fid(&rec->ul_fid2, child);
+ if (child)
+ ll_inode2fid(&rec->ul_fid2, child);
tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1);
LOGL0(name, namelen, tmp);
return;
}
- handle = (struct lustre_handle *)de->d_it->it_lock_handle;
- lock = lustre_handle2object(handle);
- CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock,
- de->d_it->it_lock_mode);
- ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+ if (de->d_it->it_lock_mode) {
+ handle = (struct lustre_handle *)de->d_it->it_lock_handle;
+ lock = lustre_handle2object(handle);
+ CDEBUG(D_INFO, "calling ldlm_lock_decref(%p, %d)\n", lock,
+ de->d_it->it_lock_mode);
+ ldlm_lock_decref(lock, de->d_it->it_lock_mode);
+ }
de->d_it = NULL;
EXIT;
}
}
static int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, __u32 data_len)
+ void *data, __u32 data_len,
+ struct ptlrpc_request **reqp)
{
struct inode *inode = lock->l_data;
ENTRY;
err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
it, LCK_PW, dir, dentry, lockh, 0, NULL, 0,
dir, sizeof(*dir));
- else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN))
+ else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK))
err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT,
it, LCK_PR, dir, dentry, lockh, 0, NULL, 0,
dir, sizeof(*dir));
+
else
LBUG();
it->it_disposition && !it->it_status)
GOTO(negative, NULL);
- if ( (it->it_op & (IT_GETATTR)) &&
+ if ( (it->it_op & (IT_GETATTR | IT_UNLINK)) &&
it->it_disposition && it->it_status)
GOTO(negative, NULL);
RETURN(ERR_PTR(-abs(err)));
}
offset = 0;
+ } else if (it->it_op == IT_UNLINK) {
+ struct obdo *obdo;
+ request = (struct ptlrpc_request *)it->it_data;
+ obdo = lustre_msg_buf(request->rq_repmsg, 1);
+ inode = new_inode(dir->i_sb);
+ ll_i2info(inode)->lli_obdo = obdo_alloc();
+
+ /* XXX fix mem allocation error */
+ memcpy(ll_i2info(inode)->lli_obdo, obdo, sizeof(*obdo));
+
+ if (!inode)
+ GOTO(out_req, -ENOMEM);
+ inode->i_mode= S_IFREG;
+ inode->i_nlink = 1;
+ GOTO(out_req, 0);
} else {
struct mds_body *body;
if (it->it_op & IT_RENAME)
it->it_data = dentry;
+ out_req:
ptlrpc_free_req(request);
if (!inode)
RETURN(ERR_PTR(-ENOMEM));
struct page * page;
int err = -ENOENT;
+ if (dentry->d_it && dentry->d_it->it_disposition) {
+ inode->i_nlink = 0;
+ GOTO(out, err=0);
+ }
+
de = ext2_find_entry (dir, dentry, &page);
if (!de)
goto out;
static void ll_delete_inode(struct inode *inode)
{
if (S_ISREG(inode->i_mode)) {
- int err;
- struct obdo *oa;
+ int err;
+ struct obdo *oa;
oa = ll_i2info(inode)->lli_obdo;
- if (!oa) {
- CERROR("no memory\n");
- GOTO(out, -ENOMEM);
- }
+ if (!oa)
+ GOTO(out, -EINVAL);
+
+ if (oa->o_id == 0)
+ /* No obdo was ever created */
+ GOTO(out, 0);
err = obd_destroy(ll_i2obdconn(inode), oa);
CDEBUG(D_INODE, "obd destroy of %Ld error %d\n",
(unsigned long long)oa->o_id, err);
- obdo_free(oa);
}
out:
clear_inode(inode);
}
static int mdc_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, int data_len)
+ void *data, int data_len,
+ struct ptlrpc_request **req)
{
int rc;
struct inode *inode = data;
struct ldlm_intent *lit;
ENTRY;
+#warning FIXME: Andreas, the sgid directory stuff also goes here, but check again on mds
+
+ LDLM_DEBUG_NOLOCK("mdsintent %d dir %ld", it->it_op, dir->i_ino);
+
switch (it->it_op) {
case IT_MKDIR:
it->it_mode = (it->it_mode | S_IFDIR) & ~current->fs->umask;
size[0] = sizeof(struct ldlm_reply);
size[1] = sizeof(struct mds_body);
req->rq_replen = lustre_msg_size(2, size);
+ } else if ( it->it_op == IT_UNLINK ) {
+ size[2] = sizeof(struct mds_rec_unlink);
+ size[3] = de->d_name.len + 1;
+ req = ptlrpc_prep_req(mdc->mdc_ldlm_client, mdc->mdc_conn,
+ LDLM_ENQUEUE, 4, size, NULL);
+ if (!req)
+ RETURN(-ENOMEM);
+
+ /* pack the intent */
+ lit = lustre_msg_buf(req->rq_reqmsg, 1);
+ lit->opc = NTOH__u64((__u64)it->it_op);
+
+ /* pack the intended request */
+ mds_unlink_pack(req, 2, dir, NULL, de->d_name.name,
+ de->d_name.len);
+ size[0] = sizeof(struct ldlm_reply);
+ size[1] = sizeof(struct obdo);
+ req->rq_replen = lustre_msg_size(2, size);
} else if ( it->it_op == IT_GETATTR || it->it_op == IT_RENAME ||
it->it_op == IT_OPEN ) {
size[2] = sizeof(struct mds_body);
obddev->obd_namespace, NULL, res_id, lock_type,
NULL, 0, lock_mode, &flags,
(void *)mdc_lock_callback, data, datalen, lockh);
-
- if (rc != 0) {
+ if (rc == -ENOENT) {
+ lock_mode = 0;
+ memset(lockh, 0, sizeof(*lockh));
+ } else if (rc != 0) {
CERROR("ldlm_cli_enqueue: %d\n", rc);
RETURN(rc);
}
struct mds_body *body;
int rc, size[2] = {sizeof(*body)}, bufcount = 1;
struct ptlrpc_request *req;
+ ENTRY;
if (obdo != NULL) {
bufcount = 2;
ENTRY;
- conn->oc_dev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+ conn->oc_dev->obd_namespace =
+ ldlm_namespace_new("mdc", LDLM_NAMESPACE_CLIENT);
if (conn->oc_dev->obd_namespace == NULL)
RETURN(-ENOMEM);
}
int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
- void *data, int data_len)
+ void *data, int data_len, struct ptlrpc_request **reqp)
{
ENTRY;
rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &lockh);
if (rc == 0) {
+ LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
NULL, mds->mds_local_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
CERROR("lock enqueue: err: %d\n", rc);
GOTO(out_create_de, rc = -EIO);
}
+ } else {
+ lock = lustre_handle2object(&lockh);
+ LDLM_DEBUG(lock, "matched");
}
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
GOTO(err_thread, rc);
}
- obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+ obddev->obd_namespace =
+ ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL) {
LBUG();
mds_cleanup(obddev);
GOTO(err_thread, rc);
}
- mds->mds_local_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+ mds->mds_local_namespace =
+ ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
if (mds->mds_local_namespace == NULL) {
LBUG();
mds_cleanup(obddev);
static int mds_extN_set_obdo(struct inode *inode, void *handle,
struct obdo *obdo)
{
- struct mds_objid *data = (struct mds_objid *)obdo->o_inline;
+ struct mds_objid *data;
int rc;
- data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
+
lock_kernel();
down(&inode->i_sem);
if (obdo == NULL)
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
- else
+ else {
+ data = (struct mds_objid *)obdo->o_inline;
+ data->mo_magic = cpu_to_le64(XATTR_MDS_MO_MAGIC);
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, obdo->o_inline,
OBD_INLINESZ, XATTR_CREATE);
+ }
up(&inode->i_sem);
unlock_kernel();
unlock_kernel();
if (rc < 0) {
- CERROR("error getting EA %s from MDS inode %ld: rc = %d\n",
- XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
+ CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: "
+ "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
obdo->o_id = 0;
} else if (data->mo_magic != cpu_to_le64(XATTR_MDS_MO_MAGIC)) {
CERROR("MDS object id %Ld has bad magic %Lx\n",
rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
NULL, 0, lock_mode, &lockh);
if (rc == 0) {
+ LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
NULL, mds->mds_local_namespace, NULL,
res_id, LDLM_PLAIN, NULL, 0, lock_mode,
CERROR("lock enqueue: err: %d\n", rc);
GOTO(out_create_de, rc = -EIO);
}
+ } else {
+ lock = lustre_handle2object(&lockh);
+ LDLM_DEBUG(lock, "matched");
}
ldlm_lock_dump((void *)(unsigned long)lockh.addr);
struct mds_body *body;
struct obdo *obdo;
struct inode *inode = dchild->d_inode;
- CERROR("child exists (dir %ld, name %s, ino %ld)\n",
+ CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n",
dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
body = lustre_msg_buf(req->rq_repmsg, offset);
struct dentry *de = NULL;
struct dentry *dchild = NULL;
struct mds_obd *mds = &req->rq_obd->u.mds;
+ struct obdo *obdo;
struct inode *dir, *inode;
+ int lock_mode, flags;
+ __u64 res_id[3] = {0};
+ struct lustre_handle lockh;
+ struct ldlm_lock *lock;
void *handle;
int rc = 0;
int err;
LBUG();
GOTO(out_unlink, rc = -ESTALE);
}
+
dir = de->d_inode;
CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
+ lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
+ res_id[0] = dir->i_ino;
+
+ rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
+ NULL, 0, lock_mode, &lockh);
+ if (rc == 0) {
+ LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
+ rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+ NULL, mds->mds_local_namespace, NULL,
+ res_id, LDLM_PLAIN, NULL, 0, lock_mode,
+ &flags, (void *)mds_lock_callback, NULL,
+ 0, &lockh);
+ if (rc != ELDLM_OK) {
+ CERROR("lock enqueue: err: %d\n", rc);
+ GOTO(out_unlink_de, rc = -EIO);
+ }
+ } else {
+ lock = lustre_handle2object(&lockh);
+ LDLM_DEBUG(lock, "matched");
+ }
+ ldlm_lock_dump((void *)(unsigned long)lockh.addr);
down(&dir->i_sem);
dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
GOTO(out_unlink_dchild, rc = -ESTALE);
}
+#if 0 /* in intent case the client doesn't have the inode */
if (inode->i_ino != rec->ur_fid2->id) {
CERROR("inode and FID ID do not match (%ld != %Ld)\n",
inode->i_ino, rec->ur_fid2->id);
LBUG();
GOTO(out_unlink_dchild, rc = -ESTALE);
}
+#endif
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
rc = vfs_rmdir(dir, dchild);
break;
default:
+ if (offset) {
+ obdo = lustre_msg_buf(req->rq_repmsg, 1);
+ rc = mds_fs_get_obdo(mds, inode, obdo);
+ if (rc < 0)
+ CDEBUG(D_INFO, "No obdo for ino %ld err %d\n",
+ inode->i_ino, rc);
+ }
+
handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
if (!handle)
GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
EXIT;
out_unlink_dchild:
+ res_id[0] = inode->i_ino;
l_dput(dchild);
out_unlink_de:
up(&dir->i_sem);
+ lock = lustre_handle2object(&lockh);
+ ldlm_lock_decref(lock, lock_mode);
+ if (!rc) {
+ /* Take an exclusive lock on the resource that we're
+ * about to free, to force everyone to drop their
+ * locks. */
+ LDLM_DEBUG_NOLOCK("getting EX lock res %Lu", res_id[0]);
+ rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
+ NULL, mds->mds_local_namespace, NULL,
+ res_id,
+ LDLM_PLAIN, NULL, 0, LCK_EX, &flags,
+ (void *)mds_lock_callback, NULL, 0,
+ &lockh);
+ if (rc)
+ CERROR("failed to get child inode lock (child ino %Ld, "
+ "dir ino %ld)\n",
+ res_id[0], de->d_inode->i_ino);
+ }
+
l_dput(de);
+
+ if (!rc) {
+ lock = lustre_handle2object(&lockh);
+ ldlm_lock_decref(lock, LCK_EX);
+ if (ldlm_cli_cancel(lock->l_client, lock))
+ CERROR("failed to get child inode lock ino %Ld\n",
+ res_id[0]);
+ }
+
out_unlink:
req->rq_status = rc;
return 0;
if (!osc->osc_conn)
RETURN(-ENOENT);
- obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_CLIENT);
+ obddev->obd_namespace =
+ ldlm_namespace_new("osc", LDLM_NAMESPACE_CLIENT);
if (obddev->obd_namespace == NULL)
GOTO(out_conn, rc = -ENOMEM);
GOTO(error_dec, err = -EINVAL);
}
- obddev->obd_namespace = ldlm_namespace_new(LDLM_NAMESPACE_SERVER);
+ obddev->obd_namespace =
+ ldlm_namespace_new("ost", LDLM_NAMESPACE_SERVER);
if (obddev->obd_namespace == NULL)
LBUG();
OBD_ALLOC(bulk, sizeof(*bulk));
if (bulk != NULL) {
bulk->b_connection = ptlrpc_connection_addref(conn);
+ atomic_set(&bulk->b_pages_remaining, 0);
init_waitqueue_head(&bulk->b_waitq);
INIT_LIST_HEAD(&bulk->b_page_list);
}
request->rq_reqmsg->target_id = HTON__u32(cl->cli_target_devno);
INIT_LIST_HEAD(&request->rq_list);
+ INIT_LIST_HEAD(&request->rq_multi);
/* this will be dec()d once in req_finished, once in free_committed */
atomic_set(&request->rq_refcount, 2);
}
ptlrpc_put_connection(request->rq_connection);
-
+ list_del(&request->rq_multi);
OBD_FREE(request, sizeof(*request));
EXIT;
}
}
if (req->rq_repmsg->status != 0) {
- CERROR("req->rq_repmsg->status is %d\n",
- req->rq_repmsg->status);
+ if (req->rq_repmsg->status < 0)
+ CERROR("req->rq_repmsg->status is %d\n",
+ req->rq_repmsg->status);
+ else
+ CDEBUG(D_INFO, "req->rq_repmsg->status is %d\n",
+ req->rq_repmsg->status);
/* XXX: translate this error from net to host */
RETURN(req->rq_repmsg->status);
}