TBA
* bug fixes
- fix many simultaneous client startup (392)
-
+ - fix dentry->d_it clobbering
+ - credentials weren't being shipped for readdir/getattr operations
+ - remove invalid assertions triggered during some concurrent MD
+ updates
+
2002-11-18 Phil Schwan <phil@clusterfs.com>
* version v0_5_17
* bug fixes
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * (visit-tags-table FILE)
* vim:expandtab:shiftwidth=8:tabstop=8:
*/
#define LDLM_FL_NO_CALLBACK (1 << 12) /* see ldlm_cli_cancel_unused */
#define LDLM_FL_HAS_INTENT (1 << 13) /* lock request has intent */
#define LDLM_FL_REDUCE (1 << 14) /* throw away unused locks */
+#define LDLM_FL_CANCELING (1 << 15) /* lock is being canceled */
#define LDLM_CB_BLOCKING 1
#define LDLM_CB_CANCELING 2
do { \
if (lock->l_resource == NULL) { \
CDEBUG(D_DLMTRACE, "### " format \
- " (UNKNOWN: lock %p(rc=%d/%d,%d) mode %s/%s on " \
- "res \?\? (rc=\?\?) type \?\?\? remote "LPX64")\n" \
+ " ns: \?\? lock: %p lrc: %d/%d,%d mode: %s/%s " \
+ "res: \?\? rrc=\?\? type: \?\?\? remote: "LPX64")\n" \
, ## a, lock, lock->l_refc, lock->l_readers, \
lock->l_writers, \
ldlm_lockname[lock->l_granted_mode], \
} \
if (lock->l_resource->lr_type == LDLM_EXTENT) { \
CDEBUG(D_DLMTRACE, "### " format \
- " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res "LPU64 \
- "/"LPU64" (rc=%d) type %s ["LPU64"->"LPU64"] remote " \
- LPX64")\n" , ## a, \
+ " ns: %s lock: %p lrc: %d/%d,%d mode: %s/%s res: "LPU64 \
+ "/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64"] remote: " \
+ LPX64"\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
lock->l_refc, lock->l_readers, lock->l_writers, \
ldlm_lockname[lock->l_granted_mode], \
} \
{ \
CDEBUG(D_DLMTRACE, "### " format \
- " (%s: lock %p(rc=%d/%d,%d) mode %s/%s on res "LPU64 \
- "/"LPU64" (rc=%d) type %s remote "LPX64")\n" , ## a, \
+ " ns: %s lock: %p lrc: %d/%d,%d mode: %s/%s res: "LPU64 \
+ "/"LPU64" rrc: %d type: %s remote: "LPX64"\n" , ## a, \
lock->l_resource->lr_namespace->ns_name, lock, \
lock->l_refc, lock->l_readers, lock->l_writers, \
ldlm_lockname[lock->l_granted_mode], \
ldlm_mode_t mode, int flags, void *data));
void ldlm_unregister_intent(void);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
-struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int strict);
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int strict, int flags);
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
void ldlm_cancel_callback(struct ldlm_lock *);
int ldlm_lock_set_data(struct lustre_handle *, void *data, int datalen);
static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h)
{
- return __ldlm_handle2lock(h, 1);
+ return __ldlm_handle2lock(h, 1, 0);
}
#define LDLM_LOCK_PUT(lock) \
LASSERT(ll_d2d(de) != NULL); \
\
down(&ll_d2d(de)->lld_it_sem); \
- LASSERT(de->d_it == NULL); \
+ LASSERT(de->d_it == NULL); \
de->d_it = it; \
CDEBUG(D_DENTRY, "D_IT DOWN dentry %p fsdata %p intent: %s sem %d\n", \
de, ll_d2d(de), ldlm_it2str(de->d_it->it_op), \
up(&ll_d2d(de)->lld_it_sem); \
} while(0)
+
/* dir.c */
extern struct file_operations ll_dir_operations;
extern struct inode_operations ll_dir_inode_operations;
int mds_update_unpack(struct ptlrpc_request *, int offset,
struct mds_update_record *);
+void mds_readdir_pack(struct ptlrpc_request *req, int offset,
+ obd_id ino, int type);
void mds_getattr_pack(struct ptlrpc_request *req, int offset,
struct inode *inode, const char *name, int namelen);
void mds_setattr_pack(struct ptlrpc_request *, int offset, struct inode *,
lockh->cookie = lock->l_random;
}
+/*
+ * if flags: atomically get the lock and set the flags.
+ * Return NULL if flag already set
+ */
+
struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle,
- int strict)
+ int strict, int flags)
{
struct ldlm_lock *lock = NULL, *retval = NULL;
ENTRY;
}
if (lock->l_random != handle->cookie) {
- CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64"\n",
- lock, lock->l_random, handle->cookie);
+ //CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64"\n",
+ // lock, lock->l_random, handle->cookie);
GOTO(out2, NULL);
}
if (!lock->l_resource) {
GOTO(out, NULL);
}
+ if (flags && (lock->l_flags & flags))
+ GOTO(out, NULL);
+
+ if (flags)
+ lock->l_flags |= flags;
+
retval = LDLM_LOCK_GET(lock);
if (!retval)
CERROR("lock disappeared below us!!! %p\n", lock);
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
{
- struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
+ struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0, 0);
struct ldlm_namespace *ns;
ENTRY;
int rc = 0, size = sizeof(*body);
ENTRY;
- lock = ldlm_handle2lock(lockh);
+ /* concurrent cancels on the same handle can happen */
+ lock = __ldlm_handle2lock(lockh, 0, LDLM_FL_CANCELING);
if (!lock) {
- /* It's possible that the decref that we did just before this
- * cancel was the last reader/writer, and caused a cancel before
- * we could call this function. If we want to make this
- * impossible (by adding a dec_and_cancel() or similar), then
- * we can put the LBUG back. */
- //LBUG();
RETURN(0);
}
struct inode *inode,
const char *name, int namelen)
{
- struct mds_body *rec;
- rec = lustre_msg_buf(req->rq_reqmsg, offset);
+ struct mds_body *b;
+ b = lustre_msg_buf(req->rq_reqmsg, offset);
+
+ b->fsuid = HTON__u32(current->fsuid);
+ b->fsgid = HTON__u32(current->fsgid);
+ b->capability = HTON__u32(current->cap_effective);
- ll_inode2fid(&rec->fid1, inode);
+ ll_inode2fid(&b->fid1, inode);
if (name) {
char *tmp;
tmp = lustre_msg_buf(req->rq_reqmsg, offset + 1);
}
}
+void mds_readdir_pack(struct ptlrpc_request *req, int offset,
+ obd_id ino, int type)
+{
+ struct mds_body *b;
+
+ b = lustre_msg_buf(req->rq_reqmsg, offset);
+ b->fsuid = HTON__u32(current->fsuid);
+ b->fsgid = HTON__u32(current->fsgid);
+ b->capability = HTON__u32(current->cap_effective);
+ b->fid1.id = HTON__u64(ino);
+ b->fid1.f_type = HTON__u32(type);
+ b->size = HTON__u64(offset);
+}
+
void mds_pack_req_body(struct ptlrpc_request *req)
{
ldlm_lock_decref(handle, it->it_lock_mode);
}
- if (it->it_op == IT_RELEASED_MAGIC) {
+ if (!de->d_it || it->it_op == IT_RELEASED_MAGIC) {
EXIT;
return;
}
- if (de->d_it && de->d_it == it) {
- de->d_it = NULL;
- up(&ll_d2d(de)->lld_it_sem);
- it->it_op = IT_RELEASED_MAGIC;
- }
+ if (de->d_it == it)
+ LL_GET_INTENT(de, it);
EXIT;
}
}
if (it == NULL && ll_have_lock(de))
- GOTO(out, rc = 0);
+ RETURN(1);
rc = ll_intent_lock(de->d_parent->d_inode, &de, it, revalidate2_finish);
if (rc < 0) {
spin_unlock(&dcache_lock);
d_rehash(de);
- out:
- if (!it)
- de->d_it = NULL;
-
RETURN(1);
}
{
inode->i_nlink--;
}
-
static inline int ext2_add_nondir(struct dentry *dentry, struct inode *inode)
{
int err;
it->it_op != IT_LOOKUP) {
LL_SAVE_INTENT(dentry, it);
} else {
- dentry->d_it = NULL;
CDEBUG(D_DENTRY,
"D_IT dentry %p fsdata %p intent: %s status %d\n",
dentry, ll_d2d(dentry), ldlm_it2str(it->it_op),
if (rc < 0 || it->it_op == IT_LOOKUP)
ll_intent_release(dentry, it);
- return rc;
+ RETURN(rc);
drop_req:
ptlrpc_free_req(request);
drop_lock:
#warning FIXME: must release lock here
- return rc;
+ RETURN(rc);
}
/* Search "inode"'s alias list for a dentry that has the same name and parent as
/* truncate == punch from new size to absolute end of file */
err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size,
OBD_OBJECT_EOF);
- if (err)
- CERROR("obd_truncate fails (%d)\n", err);
- else
+ if (err) {
+ LBUG();
+ CERROR("obd_truncate fails (%d) ino %ld\n", err,
+ inode->i_ino);
+ } else
obdo_to_inode(inode, &oa, oa.o_valid);
err = ll_size_unlock(inode, lsm, LCK_PW, lockhs);
/* Get the authoritative file size */
if (lli->lli_smd && (inode->i_mode & S_IFREG)) {
int rc;
-
+ LASSERT(lli->lli_smd->lsm_object_id != 0);
rc = ll_file_size(inode, lli->lli_smd);
if (rc) {
CERROR("ll_file_size: %d\n", rc);
list_for_each(tmp, &inode->i_dentry) {
dentry = list_entry(tmp, struct dentry, d_alias);
- // if (atomic_read(&dentry->d_count))
- // continue;
- //if (!list_empty(&dentry->d_lru))
- // continue;
-
list_del_init(&dentry->d_hash);
list_add(&dentry->d_hash, &sbi->ll_orphan_dentry_list);
}
GOTO(out2, rc);
}
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- body->fid1.id = ino;
- body->fid1.f_type = type;
- body->size = offset;
+ mds_readdir_pack(req, 0, ino, type);
req->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(req);
inode->i_ino, inode->i_nlink,
atomic_read(&inode->i_count), inode->i_generation,
generation);
- LBUG();
iput(inode);
- RETURN(ERR_PTR(-ESTALE));
+ RETURN(ERR_PTR(-ENOENT));
}
/* now to find a dentry. If possible, get a well-connected one */
int rc;
if (lmm_size == 0) {
- CERROR("no space reserved for inode %u MD\n", inode->i_ino);
+ CDEBUG(D_INFO, "no space reserved for inode %u MD\n", inode->i_ino);
RETURN(0);
}
*/
if ((rc = mds_fs_get_md(mds, inode, lmm, lmm_size)) < 0) {
CDEBUG(D_INFO, "No md for ino %u: rc = %d\n", inode->i_ino, rc);
- } else {
+ } else if (rc > 0) {
body->valid |= OBD_MD_FLEASIZE;
rc = 0;
}
push_ctxt(&saved, &mds->mds_ctxt, &uc);
de = mds_fid2dentry(mds, &body->fid1, NULL);
if (IS_ERR(de)) {
- LBUG();
- GOTO(out_pre_de, rc = -ESTALE);
+ GOTO(out_pre_de, rc = -ENOENT);
}
dir = de->d_inode;
CERROR("error getting inode %u MD: rc = %d\n",
inode->i_ino, rc);
size[bufcount] = 0;
- } else if (rc < mds->mds_max_mdsize) {
+ } else if (rc > mds->mds_max_mdsize) {
size[bufcount] = 0;
CERROR("MD size %d larger than maximum possible %u\n",
rc, mds->mds_max_mdsize);
/* This gives us the MD size */
if (lmm == NULL)
- return rc;
+ return (rc == -ENODATA) ? 0 : rc;
if (rc < 0) {
CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: "
"rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc);
memset(lmm, 0, size);
- return rc;
+ return (rc == -ENODATA) ? 0 : rc;
}
/* This field is byteswapped because it appears in the
} else {
de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
if (!de || IS_ERR(de)) {
- LBUG();
- GOTO(out_setattr_de, rc = -ESTALE);
+ GOTO(out_setattr_de, rc = PTR_ERR(de));
}
}
inode = de->d_inode;
desc = ptlrpc_prep_bulk(connection);
if (!desc)
- GOTO(out_req, rc = -ENOMEM);
+ GOTO(out_req, rc = -ENOMEM);
desc->bd_portal = OSC_BULK_PORTAL;
desc->bd_ptl_ev_hdlr = osc_ptl_ev_hdlr;
CDEBUG(D_PAGE, "desc = %p\n", desc);
ENTRY;
init_waitqueue_head(&req->rq_wait_for_rep);
+ req->rq_reqmsg->status = HTON__u32(current->pid); /* for distributed debugging */
+ CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"
+ LPX64":%x:%d\n",
+ NTOH__u32(req->rq_reqmsg->status),
+ req->rq_xid,
+ conn->c_peer.peer_nid,
+ NTOH__u32(req->rq_reqmsg->opc)
+ );
+
//DEBUG_REQ(D_HA, req, "subsys: %s:", cli->cli_name);
/* XXX probably both an import and connection level are needed */
*
*/
-#define DEBUG_SUBSYSTEM S_CLASS
+#define DEBUG_SUBSYSTEM S_RPC
#include <linux/obd_support.h>
#include <linux/lustre_net.h>
goto out;
}
+ CDEBUG(D_RPCTRACE, "Handling RPC pid:xid:nid:opc %d:"
+ LPX64":%x:%d\n",
+ NTOH__u32(request->rq_reqmsg->status),
+ request->rq_xid,
+ event->initiator.nid,
+ NTOH__u32(request->rq_reqmsg->opc));
+
if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) {
CERROR("wrong packet type received (type=%u)\n",
request->rq_reqmsg->type);
my $mount_count = shift || usage();
my $i = shift || usage();
my $files = 5;
-my $mcreate = 0; # should we use mcreate or open?
+my $mcreate = 1; # should we use mcreate or open?
sub usage () {
print "Usage: $0 <mount point prefix> <mount count> <iterations>\n";
if ($mcreate) {
my $tmp = `./mcreate $path`;
if ($tmp) {
+ print "Creating [" . $$."]...\n";
$tmp =~ /.*error: (.*)\n/;
- print "Created $path: $1\n";
+ print "Create done [$$] $path: $!\n";
} else {
- print "Created $path: Success\n";
+ print "Create done [$$] $path: Success\n";
}
} else {
+ print "Opening [" . $$."]...\n";
open(FH, ">$path") || die "open($PATH): $!";
- print "Opened $path: Success\n";
+ print "Open done [$$] $path: Success\n";
close(FH) || die;
}
}
}
$d = int(rand() * $files);
$path = "$mtpt$which/$d";
+ print "Unlink start [" . $$."]...\n";
if (unlink($path)) {
- print "Unlinked $path: Success\n";
+ print "Unlink done [$$] $path: Success\n";
} else {
- print "Unlinked $path: $!\n";
+ print "Unlink done [$$] $path: $!\n";
}
}
print "Done.\n";
MDSSIZE=100000
OSTDEV=$TMP/ost1
-OSTSIZE=250000
+OSTSIZE=400000
kver=`uname -r | cut -d "." -f 1,2`
;;
esac
+
# create nodes
${LMC} -o $config --node localhost --net localhost tcp || exit 1
${LMC} -m $config --format --node localhost $FSTYPE --ost $OSTDEV $OSTSIZE || exit 3
# create client config
-${LMC} -m $config --node localhost --mtpt /mnt/lustre mds1 OSC_localhost || exit 4
+${LMC} -m $config --node localhost --mtpt /mnt/lustre1 mds1 OSC_localhost || exit 4
${LMC} -m $config --node localhost --mtpt /mnt/lustre2 mds1 OSC_localhost || exit 4
#!/usr/bin/perl
use Getopt::Long;
+use Term::ANSIColor;
+
GetOptions("pid=i" => \$pid,
"trace!" => \$trace,
+ "silent!" => \$silent,
+ "rpctrace!" => \$rpctrace,
"nodlm!" => \$nodlm,
+ "noclass!" => \$noclass,
"nonet!" => \$nonet);
print "pid: $pid, nodlm $nodlm nonet $nonet trace $trace\n";
+
$subsys->{UNDEFINED} = 0;
$subsys->{MDC} = 1;
$subsys->{MDS} = 2;
$masks->{ERROR} = 1 << 17; # /* CERROR} = ...) == CDEBUG} = D_ERROR, ...) */
$masks->{EMERG} = 1 << 18; # /* CEMERG} = ...) == CDEBUG} = D_EMERG, ...) */
$masks->{HA} = 1 << 19; # /* recovery and failover */
-
-
+$masks->{RPCTRACE} = 1 << 19; # /* recovery and failover */
sub extractpid
{
return hex($mask);
}
+sub setcolor
+{
+ my $linemask = shift;
+ if ($linemask == $masks->{TRACE}) {
+ print color("yellow on_black");
+ }
+ if ($linemask == $masks->{DLMTRACE}) {
+ print color("magenta on_black");
+ }
+ if ($linemask == $masks->{DLM}) {
+ print color("magenta on_black");
+ }
+ if ($linemask == $masks->{DENTRY}) {
+ print color("red on_black");
+ }
+}
+
+sub study_lock
+{
+ $_ = shift;
+ my $rc;
+
+ $rc = /completion callback handler START ns: (.*) lock: (.*) lrc: (.*) mode/;
+ if ($rc) {
+ $completion_callbacks{$1}->{$2} = $3;
+# print color("white");
+# print "---CP CB START: $1 $2 $3\n";
+# print color("reset");
+ }
+ $rc = /callback handler finished.* ns: (.*) lock: (.*) lrc: (.*) mode/;
+ if ($rc) {
+# print color("white");
+# print "---CP CB END: $1 $2 $3 deleting $completion_callbacks{$1}->{$2}\n";
+# print color("reset");
+ delete $completion_callbacks{$1}->{$2};
+ }
+
+ if ($rc) {
+ $rc = /client blocking AST callback handler START ns: (.*) lock: (.*) lrc: (.*) mode/;
+ $blocking_callbacks{$1}->{$2} = $3;
+# print color("white");
+# print "---BL CB START: $1 $2\n";
+# print color("reset");
+ }
+ $rc = /client blocking callback handler END ns: (.*) lock: (.*) lrc: (.*) mode/;
+ if ($rc) {
+# print color("white");
+# print "---BL CB END: $1 $2 $3 deleting $blocking_callbacks{$1}->{$2}\n";
+# print color("reset");
+ delete $blocking_callbacks{$1}->{$2};
+ }
+
+ $rc = /ldlm_lock_addref.*ns: (.*) lock: (.*) lrc: (.*) mode/;
+# print color("white");
+# print "------>addref ns: $1 lock: $2 lrc: $3\n" if ($rc);
+# print color("reset");
+ $locks{$1}->{$2} = {$3} if ($rc);
+ $rc = /ldlm_lock_decref.*ns: (.*) lock: (.*) lrc: (.*) mode/;
+# print color("white");
+# print "------>decref ns: $1 lock: $2 lrc: $3\n" if ($rc);
+# print color("reset");
+ $locks{$1}->{$2} = {$3} if ($rc);
+}
+
+sub hanging_locks
+{
+ my $found;
+ my $ns;
+
+ foreach (keys %completion_callbacks) {
+ $ns = $_;
+ $found = 0;
+ foreach (keys %{$completion_callbacks{$ns}}) {
+ if (!$found) {
+ print "Unfinished completions in ns $ns: \n";
+ $found =1;
+ }
+ print " lock: $_ lrc: $completion_callbacks{$ns}->{$_}\n";
+ }
+ }
+ foreach (keys %blocking_callbacks) {
+ $ns = $_;
+ $found = 0;
+ foreach (keys %{$blocking_callbacks{$ns}}) {
+ if (!$found) {
+ print "Unfinished blocking in ns $ns: \n";
+ $found =1;
+ }
+ printf(" lock: $_ lrc: %s\n", $blocking_callbacks{$ns}->{$_});
+ }
+ }
+
+}
+
+sub study_intent
+{
+ $_ = shift;
+ my $rc;
+
+ $rc = /D_IT UP dentry (.*) fsdata/;
+ delete $it{$1} if ($rc);
+ $rc = /D_IT DOWN dentry (.*) fsdata/;
+ $it{$1} = "yes" if ($rc);
+}
+
+sub unmatched_intents {
+ my $found;
+ foreach (keys %it) {
+ if (!$found) {
+ print "Unmatched intents: \n";
+ $found =1;
+ }
+ print " $_\n";
+ }
+}
+
while (<STDIN>) {
$linepid = extractpid($_);
$linemask = getmask($_);
chop $prefix->{$linepid};
chop $prefix->{$linepid};
}
+
+ if ($linemask == $masks->{DENTRY}) {
+ study_intent($_);
+ }
+ if ($linemask == $masks->{DLMTRACE}) {
+ study_lock($_);
+ }
+
if ( !$pid || $linepid == $pid) {
+ next if ($rpctrace && $linemask != $masks->{RPCTRACE});
next if ($trace && $linemask != $masks->{TRACE});
+
+
next if ($nodlm &&
( $linesubsys == $subsys->{LDLM}));
+ next if ($noclass &&
+ ( $linesubsys == $subsys->{CLASS}));
+
next if ($nonet &&
( $linesubsys == $subsys->{RPC} ||
$linesubsys == $subsys->{NET} ||
$linesubsys == $subsys->{QSWNAL} ||
$linesubsys == $subsys->{GMNAL}));
-# printf "sub/mask: %s - %s\n", getsubsys($_), getmask($_);
- printf "%s%s", $prefix->{$linepid}, $_;
- last if $count++ > 100;
+# printf "sub/mask: %s - %s\n", getsubsys($_), getmask($_);
+ if (!$silent) {
+ setcolor($linemask);
+ printf("%s%s", $prefix->{$linepid}, $_);
+ print color("reset");
+ }
+ # last if $count++ > 100;
}
if (entering($_)) {
$prefix->{$linepid} .= ' ';
}
}
+
+unmatched_intents();
+hanging_locks();
# printf "argv %s pid %d\n", $ARGV[0], extractpid($ARGV[0]);
} else {
printf("Try interactive use without arguments or use one of:\n");
for (cmd = cmds; cmd->pc_name; cmd++)
- printf("\"%s\" ", cmd->pc_name);
- printf("\nas argument.\n");
+ printf("\"%s\"\n", cmd->pc_name);
+ printf("as argument.\n");
}
return -1;
}