int size, offset = offsetof(struct llog_create_locks, lcl_locks);
int i;
ENTRY;
-
+
for (i = 0; i < lcl->lcl_count; i ++) {
if (lcl->lcl_locks[i] != NULL) {
#ifdef __KERNEL__
+ unsigned type = de->file_type;
+ __u32 *mds;
+ mds = (__u32 *)((char *) de + EXT3_DIR_REC_LEN(de->name_len));
-+ if ((type & 128) && EXT3_HAS_INCOMPAT_FEATURE(dir->i_sb,
++ if ((type & 128) && EXT3_HAS_INCOMPAT_FEATURE(dir->i_sb,
+ EXT3_FEATURE_INCOMPAT_MDSNUM) &&
+ mds[0] != EXT3_SB(dir->i_sb)->s_mdsnum) {
+ struct ext3_super_block *es;
char *top;
reclen = EXT3_DIR_REC_LEN(namelen);
-+ if (EXT3_HAS_INCOMPAT_FEATURE(sb, EXT3_FEATURE_INCOMPAT_MDSNUM)
++ if (EXT3_HAS_INCOMPAT_FEATURE(sb, EXT3_FEATURE_INCOMPAT_MDSNUM)
+ && (dentry->d_flags & DCACHE_CROSS_REF)
+ && (dentry->d_mdsnum != EXT3_SB(sb)->s_mdsnum))
+ reclen += 8; /* we need space to store mds num */
static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq);
+int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag);
+
/**
* list_for_remaining_safe - iterate over the remaining entries in a list
* and safeguard against removal of a list entry.
LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
mode, flags);
+ LASSERT(list_empty(&lock->l_flock_waitq));
+
list_del_init(&lock->l_res_link);
if (flags == LDLM_FL_WAIT_NOREPROC) {
/* client side - set a flag to prevent sending a CANCEL */
struct ldlm_lock *new = req;
struct ldlm_lock *new2 = NULL;
ldlm_mode_t mode = req->l_req_mode;
+ int local = ns->ns_client;
int added = (mode == LCK_NL);
int overlaps = 0;
ENTRY;
*err = ELDLM_OK;
- /* No blocking ASTs are sent for Posix file & record locks */
- req->l_blocking_ast = NULL;
+ if (local) {
+ /* No blocking ASTs are sent to the clients for
+ * Posix file & record locks */
+ req->l_blocking_ast = NULL;
+ } else {
+ /* Called on the server for lock cancels. */
+ req->l_blocking_ast = ldlm_flock_blocking_ast;
+ }
if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
/* This loop determines where this processes locks start
RETURN(LDLM_ITER_STOP);
}
+ /* In case we had slept on this lock request take it off of the
+ * deadlock detection waitq. */
+ list_del_init(&req->l_flock_waitq);
+
/* Scan the locks owned by this process that overlap this request.
* We may have to merge or split existing locks. */
ENTRY;
lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
+
+ /* take lock off the deadlock detection waitq. */
+ list_del_init(&lock->l_flock_waitq);
+
ldlm_lock_decref_internal(lock, lock->l_req_mode);
ldlm_lock2handle(lock, &lockh);
rc = ldlm_cli_cancel(&lockh);
- CDEBUG(D_DLMTRACE, "ldlm_cli_cancel: %d\n", rc);
EXIT;
}
LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
"sleeping");
- ldlm_lock_dump(D_OTHER, lock, 0);
+ ldlm_lock_dump(D_DLMTRACE, lock, 0);
fwd.fwd_lock = lock;
obd = class_exp2obd(lock->l_conn_export);
ns = lock->l_resource->lr_namespace;
l_lock(&ns->ns_lock);
- /* take data off of deadlock detection waitq. */
+ /* take lock off the deadlock detection waitq. */
list_del_init(&lock->l_flock_waitq);
/* ldlm_lock_enqueue() has already placed lock on the granted list. */
l_unlock(&ns->ns_lock);
RETURN(0);
}
+
+int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct ldlm_namespace *ns;
+ ENTRY;
+
+ LASSERT(lock);
+ LASSERT(flag == LDLM_CB_CANCELING);
+
+ ns = lock->l_resource->lr_namespace;
+
+ /* take lock off the deadlock detection waitq. */
+ l_lock(&ns->ns_lock);
+ list_del_init(&lock->l_flock_waitq);
+ l_unlock(&ns->ns_lock);
+ RETURN(0);
+}
struct ldlm_reply *reply;
int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1;
int is_replay = *flags & LDLM_FL_REPLAY;
+ int cleanup_phase = 0;
ENTRY;
if (exp == NULL) {
lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
completion, glimpse, data, lvb_len);
if (lock == NULL)
- GOTO(out_nolock, rc = -ENOMEM);
+ RETURN(-ENOMEM);
/* for the local lock, add the reference */
ldlm_lock_addref_internal(lock, mode);
ldlm_lock2handle(lock, lockh);
LDLM_DEBUG(lock, "client-side enqueue START");
}
+ /* lock not sent to server yet */
+ cleanup_phase = 2;
+
if (req == NULL) {
req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
size, NULL);
if (req == NULL)
- GOTO(out_lock, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
req_passed_in = 0;
} else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
LBUG();
tmplvb = lustre_swab_repbuf(req, 1, lvb_len,
lvb_swabber);
if (tmplvb == NULL)
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
if (lvb != NULL)
memcpy(lvb, tmplvb, lvb_len);
}
}
- GOTO(out_lock, rc);
+ GOTO(cleanup, rc);
}
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
CERROR("Can't unpack ldlm_reply\n");
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
}
+ /* XXX - Phil, wasn't sure if this shoiuld go before or after the
+ /* lustre_swab_repbuf() ? If we can't unpack the reply then we
+ /* don't know what occurred on the server so I think the safest
+ /* bet is to cleanup the lock as if it didn't make it ? */
+
+ /* lock enqueued on the server */
+ cleanup_phase = 1;
+
memcpy(&lock->l_remote_handle, &reply->lock_handle,
sizeof(lock->l_remote_handle));
*flags = reply->lock_flags;
reply->lock_desc.l_resource.lr_name);
if (lock->l_resource == NULL) {
LBUG();
- GOTO(out_lock, rc = -ENOMEM);
+ GOTO(cleanup, rc = -ENOMEM);
}
LDLM_DEBUG(lock, "client-side enqueue, new resource");
}
void *tmplvb;
tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber);
if (tmplvb == NULL)
- GOTO(out_lock, rc = -EPROTO);
+ GOTO(cleanup, rc = -EPROTO);
memcpy(lock->l_lvb_data, tmplvb, lvb_len);
}
LDLM_DEBUG(lock, "client-side enqueue END");
EXIT;
- out_lock:
- if (rc)
- failed_lock_cleanup(ns, lock, lockh, mode);
- if (!req_passed_in && req != NULL)
- ptlrpc_req_finished(req);
+cleanup:
+ switch (cleanup_phase) {
+ case 2:
+ if (rc)
+ failed_lock_cleanup(ns, lock, lockh, mode);
+ case 1:
+ if (!req_passed_in && req != NULL)
+ ptlrpc_req_finished(req);
+ }
+
LDLM_LOCK_PUT(lock);
- out_nolock:
return rc;
}
switch (cmd) {
case F_SETLKW:
+#ifdef F_SETLKW64
+ case F_SETLKW64:
+#endif
flags = 0;
break;
case F_SETLK:
+#ifdef F_SETLK64
+ case F_SETLK64:
+#endif
flags = LDLM_FL_BLOCK_NOWAIT;
break;
case F_GETLK:
+#ifdef F_GETLK64
+ case F_GETLK64:
+#endif
flags = LDLM_FL_TEST_LOCK;
/* Save the old mode so that if the mode in the lock changes we
* can decrement the appropriate reader or writer refcount. */
flags, mode, flock.l_flock.start, flock.l_flock.end);
obddev = md_get_real_obd(sbi->ll_mdc_exp, NULL, 0);
- rc = ldlm_cli_enqueue(sbi->ll_mdc_exp, NULL, obddev->obd_namespace,
+ rc = ldlm_cli_enqueue(obddev->obd_self_export, NULL,
+ obddev->obd_namespace,
res_id, LDLM_FLOCK, &flock, mode, &flags,
NULL, ldlm_flock_completion_ast, NULL, file_lock,
NULL, 0, NULL, &lockh);
.sendfile = generic_file_sendfile,
#endif
.fsync = ll_fsync,
- //.lock ll_file_flock
+ .lock = ll_file_flock
};
struct inode_operations ll_file_inode_operations = {
lmv_put_obj(obj);
}
mds = rpfid.mds;
-
+
CDEBUG(D_OTHER, "revalidate lookup for %lu/%lu/%lu to %d MDS\n",
(unsigned long)cfid->mds, (unsigned long)cfid->id,
(unsigned long)cfid->generation, mds);
-
+
rc = md_intent_lock(lmv->tgts[mds].ltd_exp, uctxt, pfid, name,
len, lmm, lmmsize, cfid, it, flags,
reqp, cb_blocking);
down(&lov->lov_llog_sem);
for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
struct obd_device *child =
- lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
+ lov->tgts[loi->loi_ost_idx].ltd_exp->exp_obd;
struct llog_ctxt *cctxt;
cctxt = llog_get_context(&child->obd_llogs, ctxt->loc_idx);
cathandle = ctxt->loc_handle;
if (cathandle)
llog_cat_put(ctxt->loc_handle);
-
-// OBD_FREE(ctxt, sizeof(*ctxt));
+
return 0;
}
EXPORT_SYMBOL(llog_catalog_cleanup);
OBD_ALLOC(oa, sizeof(*oa));
if (!oa)
RETURN(ERR_PTR(-ENOMEM));
-
+
oa->o_gr = FILTER_GROUP_LLOG;
oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
struct mea *mea = NULL;
int mea_size, rc = 0;
ENTRY;
-
+
rc = mds_get_lmv_attr(obd, inode, &mea, &mea_size);
if (rc)
RETURN(rc);
rc = -ERESTART;
}
}
-
+
if (mea)
OBD_FREE(mea, mea_size);
RETURN(rc);
down(&parent_inode->i_sem);
if (oa->o_id) {
namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
-
+
dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
if (IS_ERR(dchild))
GOTO(out_pop, rc = PTR_ERR(dchild));
*mea = NULL;
} else
rc = 0;
-
+
RETURN(rc);
}
if (req != NULL && req->rq_repmsg != NULL &&
(reply_body->valid & OBD_MD_FLEASIZE) &&
mds_log_op_unlink(obd, pending_child->d_inode,
- lmm, req->rq_repmsg->buflens[1],
- lustre_msg_buf(req->rq_repmsg, 2, 0),
- req->rq_repmsg->buflens[2], &lcl) > 0) {
+ lmm, req->rq_repmsg->buflens[1],
+ lustre_msg_buf(req->rq_repmsg, 2, 0),
+ req->rq_repmsg->buflens[2], &lcl) > 0) {
reply_body->valid |= OBD_MD_FLCOOKIE;
}
rec->ur_namelen, &child_lockh,
&dchild, LCK_EX,
MDS_INODELOCK_LOOKUP |
- MDS_INODELOCK_UPDATE, NULL);
+ MDS_INODELOCK_UPDATE, NULL);
}
if (rc)
GOTO(cleanup, rc);
spin_unlock(&oscc->oscc_lock);
osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
-
+
return have_objs || ost_full || osc_invalid;
}