* version 1.4.6
* bug fixes
+Severity : minor
+Frequency : liblustre-only, when liblustre client dies or becomes busy
+Bugzilla : 7311
+Description: Doing ls on Linux clients can take a long time with active
+ liblustre clients
+Details : Newer more complete fix for 7311 issue: add connection flags
+ handling.
+
Severity : major
Frequency : rare
Bugzilla : 7407
/* ldlm_request.c */
int ldlm_expired_completion_wait(void *data);
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag);
+int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data);
int ldlm_cli_enqueue(struct obd_export *exp,
struct ptlrpc_request *req,
/* This has to be here because recursive inclusion sucks. */
int intent_disposition(struct ldlm_reply *rep, int flag);
void intent_set_disposition(struct ldlm_reply *rep, int flag);
-int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag);
/* ioctls for trying requests */
ENTRY;
err = copy_from_user(&hdr, (void *)arg, sizeof(hdr));
- if (err)
+ if (err)
RETURN(err);
if (hdr.ioc_version != OBD_IOCTL_VERSION) {
#define LIBLUSTRE_CLIENT (1)
#endif
+
#endif /* _LUSTRE_LIB_H */
struct obd_device *class_name2obd(char *name);
int class_uuid2dev(struct obd_uuid *uuid);
struct obd_device *class_uuid2obd(struct obd_uuid *uuid);
-struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
+struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
char * typ_name,
struct obd_uuid *grp_uuid);
struct obd_device * class_find_client_notype(struct obd_uuid *tgt_uuid,
struct obd_uuid *grp_uuid);
-struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid,
+struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid,
int *next);
int oig_init(struct obd_io_group **oig);
void oig_add_one(struct obd_io_group *oig,
struct oig_callback_context *occ);
-void oig_complete_one(struct obd_io_group *oig,
- struct oig_callback_context *occ, int rc);
+void oig_complete_one(struct obd_io_group *oig,
+ struct oig_callback_context *occ, int rc);
void oig_release(struct obd_io_group *oig);
int oig_wait(struct obd_io_group *oig);
/* ping evictor */
static inline int obd_connect(struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid,
- struct obd_connect_data *data)
+ struct obd_connect_data *d)
{
int rc;
+ __u64 ocf = d ? d->ocd_connect_flags : 0; /* for post-condition check */
ENTRY;
OBD_CHECK_DEV_ACTIVE(obd);
OBD_CHECK_OP(obd, connect, -EOPNOTSUPP);
OBD_COUNTER_INCREMENT(obd, connect);
- rc = OBP(obd, connect)(conn, obd, cluuid, data);
+ rc = OBP(obd, connect)(conn, obd, cluuid, d);
+ /* check that only subset is granted */
+ LASSERT(ergo(d != NULL,
+ (d->ocd_connect_flags & ocf) == d->ocd_connect_flags));
RETURN(rc);
}
} else {
spin_unlock(&imp->imp_lock);
GOTO(out_free, rc = -ENOENT);
-
+
}
spin_unlock(&imp->imp_lock);
struct client_obd *cli = &obd->u.cli;
struct obd_import *imp = cli->cl_import;
struct obd_export *exp;
+ struct obd_connect_data *ocd;
int rc;
ENTRY;
if (rc != 0)
GOTO(out_ldlm, rc);
+ ocd = &imp->imp_connect_data;
if (data)
- memcpy(&imp->imp_connect_data, data, sizeof(*data));
+ *ocd = *data;
+
rc = ptlrpc_connect_import(imp, NULL);
if (rc != 0) {
LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
}
LASSERT(exp->exp_connection);
+ if (data) {
+ LASSERT((ocd->ocd_connect_flags & data->ocd_connect_flags) ==
+ ocd->ocd_connect_flags);
+ data->ocd_connect_flags = ocd->ocd_connect_flags;
+ }
+
ptlrpc_pinger_add_import(imp);
EXIT;
spin_lock_irqsave(&export->exp_lock, flags);
if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
- cluuid.uuid, export->exp_conn_cnt,
+ cluuid.uuid, export->exp_conn_cnt,
req->rq_reqmsg->conn_cnt);
spin_unlock_irqrestore(&export->exp_lock, flags);
GOTO(out, rc = -EALREADY);
}
}
-/* Called from a cleanup function if the device is being cleaned up
- forcefully. The exports should all have been disconnected already,
- the only thing left to do is
+/* Called from a cleanup function if the device is being cleaned up
+ forcefully. The exports should all have been disconnected already,
+ the only thing left to do is
- clear the recovery flags
- cancel the timer
- free queued requests and replies, but don't send replies
Because the obd_stopping flag is set, no new requests should be received.
-
+
*/
void target_cleanup_recovery(struct obd_device *obd)
{
return (ptlrpc_send_reply(req, 1));
}
-void
+void
target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
{
int netrc;
rs->rs_xid = req->rq_xid;
rs->rs_transno = req->rq_transno;
rs->rs_export = exp;
-
+
spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
if (rs->rs_transno > obd->obd_last_committed) {
- /* not committed already */
- list_add_tail (&rs->rs_obd_list,
+ /* not committed already */
+ list_add_tail (&rs->rs_obd_list,
&obd->obd_uncommitted_replies);
}
/* ldlm_request.c */
EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_blocking_ast);
+EXPORT_SYMBOL(ldlm_glimpse_ast);
EXPORT_SYMBOL(ldlm_expired_completion_wait);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
RETURN(0);
}
+/*
+ * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
+ */
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ int do_ast;
+ ENTRY;
+
+ if (flag == LDLM_CB_CANCELING) {
+ /* Don't need to do anything here. */
+ RETURN(0);
+ }
+
+ l_lock(&lock->l_resource->lr_namespace->ns_lock);
+ /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
+ * that ldlm_blocking_ast is called just before intent_policy method
+ * takes the ns_lock, then by the time we get the lock, we might not
+ * be the correct blocking function anymore. So check, and return
+ * early, if so. */
+ if (lock->l_blocking_ast != ldlm_blocking_ast) {
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ RETURN(0);
+ }
+
+ lock->l_flags |= LDLM_FL_CBPENDING;
+ do_ast = (!lock->l_readers && !lock->l_writers);
+ l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+ if (do_ast) {
+ struct lustre_handle lockh;
+ int rc;
+
+ LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc < 0)
+ CERROR("ldlm_cli_cancel: %d\n", rc);
+ } else {
+ LDLM_DEBUG(lock, "Lock still has references, will be "
+ "cancelled later");
+ }
+ RETURN(0);
+}
+
+/*
+ * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
+ * comment in filter_intent_policy() on why you may need this.
+ */
+int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
+{
+ /*
+ * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
+ * that is rather subtle: with OST-side locking, it may so happen that
+ * _all_ extent locks are held by the OST. If client wants to obtain
+ * current file size it calls ll{,u}_glimpse_size(), and (as locks are
+ * on the server), dummy glimpse callback fires and does
+ * nothing. Client still receives correct file size due to the
+ * following fragment in filter_intent_policy():
+ *
+ * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
+ * if (rc != 0 && res->lr_namespace->ns_lvbo &&
+ * res->lr_namespace->ns_lvbo->lvbo_update) {
+ * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+ * }
+ *
+ * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
+ * returns correct file size to the client.
+ */
+ return -ELDLM_NO_LOCK_DATA;
+}
+
static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
struct ldlm_res_id res_id,
__u32 type,
GOTO(cleanup, rc);
}
+ /*
+ * Liblustre client doesn't get extent locks, except for O_APPEND case
+ * where [0, OBD_OBJECT_EOF] lock is taken.
+ */
+ LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT ||
+ (policy->l_extent.start == 0 &&
+ policy->l_extent.end == OBD_OBJECT_EOF)));
+
reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
lustre_swab_ldlm_reply);
if (reply == NULL) {
list_del_init(&lock->l_res_link);
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
}
+EXPORT_SYMBOL(ldlm_resource_unlink_lock);
void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
{
struct obd_export *ll_osc_exp;
obd_id ll_rootino;
int ll_flags;
+ __u64 ll_connect_flags;
struct list_head ll_conn_chain;
struct obd_uuid ll_mds_uuid;
struct obd_uuid ll_mds_peer_uuid;
- char *ll_instance;
+ char *ll_instance;
};
#define LL_SBI_NOLCK 0x1
void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2);
typedef int (*intent_finish_cb)(struct ptlrpc_request *,
- struct inode *parent, struct pnode *pnode,
+ struct inode *parent, struct pnode *pnode,
struct lookup_intent *, int offset, obd_id ino);
int llu_intent_lock(struct inode *parent, struct pnode *pnode,
struct lookup_intent *, int flags, intent_finish_cb);
}
LASSERT(len == 0 || is_read); /* libsysio should guarantee this */
+ /*
+ * BUG: lock is released too early. Fix is in bug 9296.
+ */
err = llu_extent_unlock(fd, inode, lsm, p.lrp_lock_mode, &lockh);
if (err)
CERROR("extent unlock error %d\n", err);
if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n",
- valid, LTIME_S(st->st_mtime),
+ valid, LTIME_S(st->st_mtime),
LTIME_S(st->st_ctime));
if (valid & OBD_MD_FLATIME) {
if (rc)
RETURN(rc);
- refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
+ refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
OBD_MD_FLCTIME | OBD_MD_FLSIZE;
obdo_refresh_inode(inode, &oa, refresh_valid);
liblustre_wait_event(0);
- LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME |
+ LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME |
SETATTR_UID | SETATTR_GID |
SETATTR_LEN | SETATTR_MODE)));
memset(&iattr, 0, sizeof(iattr));
inode = llu_new_inode(fs, &fid);
if (inode)
llu_update_inode(inode, md->body, md->lsm);
-
+
return inode;
}
char *zconf_mdsnid, *zconf_mdsname, *zconf_profile;
char *osc = NULL, *mdc = NULL;
int async = 1, err = -EINVAL;
+ struct obd_connect_data ocd = {0,};
ENTRY;
obd_set_info(obd->obd_self_export, strlen("async"), "async",
sizeof(async), &async);
- err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, NULL /* ocd */);
+ ocd.ocd_connect_flags |= OBD_CONNECT_SRVLOCK;
+ err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd);
if (err) {
CERROR("cannot connect to %s: rc = %d\n", osc, err);
GOTO(out_mdc, err);
}
sbi->ll_osc_exp = class_conn2export(&osc_conn);
+ sbi->ll_connect_flags = ocd.ocd_connect_flags;
mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
int ll_flags;
struct list_head ll_conn_chain; /* per-conn chain of SBs */
+ __u64 ll_connect_flags;
struct hlist_head ll_orphan_dentry_list; /*please don't ask -p*/
struct ll_close_queue *ll_lcq;
GOTO(out_mdc, err);
}
sbi->ll_osc_exp = class_conn2export(&osc_conn);
+ sbi->ll_connect_flags = data->ocd_connect_flags;
mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
struct obd_device *tgt_obd;
struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
struct lustre_handle conn = {0, };
+ struct obd_import *imp;
#ifdef __KERNEL__
struct proc_dir_entry *lov_proc_dir;
#endif
ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
}
- if (tgt_obd->u.cli.cl_import->imp_invalid) {
+ /*
+ * Divine LOV knows that OBDs under it are OSCs.
+ */
+ imp = tgt_obd->u.cli.cl_import;
+
+ if (imp->imp_invalid) {
CERROR("not connecting OSC %s; administratively "
"disabled\n", tgt_uuid->uuid);
rc = obd_register_observer(tgt_obd, obd);
struct lov_obd *lov = &obd->u.lov;
struct lov_tgt_desc *tgt;
struct obd_export *exp;
+ __u64 connect_flags = data ? data->ocd_connect_flags : 0;
int rc, rc2, i;
ENTRY;
rc = lov_connect_obd(obd, tgt, 0, data);
if (rc)
GOTO(out_disc, rc);
+ if (data)
+ connect_flags &= data->ocd_connect_flags;
}
+ if (data)
+ data->ocd_connect_flags = connect_flags;
+
class_export_put(exp);
RETURN (0);
res_id.name[1] = de->d_inode->i_generation;
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
LDLM_PLAIN, NULL, lock_mode, &flags,
- mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, lockh);
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, lockh);
if (rc != ELDLM_OK) {
l_dput(de);
retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
RETURN(0);
}
-int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- int do_ast;
- ENTRY;
-
- if (flag == LDLM_CB_CANCELING) {
- /* Don't need to do anything here. */
- RETURN(0);
- }
-
- /* XXX layering violation! -phil */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
- /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
- * such that mds_blocking_ast is called just before l_i_p takes the
- * ns_lock, then by the time we get the lock, we might not be the
- * correct blocking function anymore. So check, and return early, if
- * so. */
- if (lock->l_blocking_ast != mds_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- RETURN(0);
- }
-
- lock->l_flags |= LDLM_FL_CBPENDING;
- do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-
- if (do_ast) {
- struct lustre_handle lockh;
- int rc;
-
- LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc < 0)
- CERROR("ldlm_cli_cancel: %d\n", rc);
- } else {
- LDLM_DEBUG(lock, "Lock still has references, will be "
- "cancelled later");
- }
- RETURN(0);
-}
-
int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
int *size, int lock)
{
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id,
LDLM_PLAIN, NULL, LCK_EX, &lock_flags,
- mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, child_lockh);
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, child_lockh);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_enqueue: %d\n", rc);
else if (child_lockh == &lockh)
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
LDLM_PLAIN, NULL, lock_modes[0], &flags[0],
- mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
- NULL, 0, NULL, handles[0]);
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, handles[0]);
if (rc != ELDLM_OK)
RETURN(-EIO);
ldlm_lock_dump_handle(D_OTHER, handles[0]);
} else if (res_id[1]->name[0] != 0) {
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
*res_id[1], LDLM_PLAIN, NULL,
- lock_modes[1], &flags[1],mds_blocking_ast,
- ldlm_completion_ast, NULL, NULL, NULL, 0,
- NULL, handles[1]);
+ lock_modes[1], &flags[1],
+ ldlm_blocking_ast, ldlm_completion_ast,
+ NULL, NULL, NULL, 0, NULL, handles[1]);
if (rc != ELDLM_OK) {
ldlm_lock_decref(handles[0], lock_modes[0]);
RETURN(-EIO);
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
*res_id[i], LDLM_PLAIN, NULL,
lock_modes[i], &flags,
- mds_blocking_ast,
+ ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL,
NULL, 0, NULL, dlm_handles[i]);
if (rc != ELDLM_OK)
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
*child_res_id, LDLM_PLAIN, NULL,
- child_mode, &flags, mds_blocking_ast,
+ child_mode, &flags, ldlm_blocking_ast,
ldlm_completion_ast, NULL, NULL, NULL, 0,
NULL, child_lockh);
if (rc != ELDLM_OK)
return id;
}
-/* direct cut-n-paste of mds_blocking_ast() */
-static int filter_blocking_ast(struct ldlm_lock *lock,
- struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- int do_ast;
- ENTRY;
-
- if (flag == LDLM_CB_CANCELING) {
- /* Don't need to do anything here. */
- RETURN(0);
- }
-
- /* XXX layering violation! -phil */
- l_lock(&lock->l_resource->lr_namespace->ns_lock);
- /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
- * such that filter_blocking_ast is called just before l_i_p takes the
- * ns_lock, then by the time we get the lock, we might not be the
- * correct blocking function anymore. So check, and return early, if
- * so. */
- if (lock->l_blocking_ast != filter_blocking_ast) {
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- RETURN(0);
- }
-
- lock->l_flags |= LDLM_FL_CBPENDING;
- do_ast = (!lock->l_readers && !lock->l_writers);
- l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-
- if (do_ast) {
- struct lustre_handle lockh;
- int rc;
-
- LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc < 0)
- CERROR("ldlm_cli_cancel: %d\n", rc);
- } else {
- LDLM_DEBUG(lock, "Lock still has references, will be "
- "cancelled later");
- }
- RETURN(0);
-}
-
static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
{
down(&dparent->d_inode->i_sem);
* throw away any cached pages. */
rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
LDLM_EXTENT, &policy, LCK_PW,
- &flags, filter_blocking_ast, ldlm_completion_ast,
+ &flags, ldlm_blocking_ast, ldlm_completion_ast,
NULL, NULL, NULL, 0, NULL, &lockh);
/* We only care about the side-effects, just drop the lock. */
lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
lock->l_req_mode = LCK_PR;
- l_lock(&res->lr_namespace->ns_lock);
+ LASSERT(ns == res->lr_namespace);
+ l_lock(&ns->ns_lock);
res->lr_tmp = &rpc_list;
rc = policy(lock, &tmpflags, 0, &err);
OBD_FREE(w, sizeof(*w));
}
+ /* The lock met with no resistance; we're finished. */
if (rc == LDLM_ITER_CONTINUE) {
- /* The lock met with no resistance; we're finished. */
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ns->ns_lock);
+ /*
+ * do not grant locks to the liblustre clients: they cannot
+ * handle ASTs robustly.
+ */
+ if (lock->l_export->exp_libclient) {
+ ldlm_resource_unlink_lock(lock);
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
RETURN(ELDLM_LOCK_REPLACED);
}
if (tmplock->l_granted_mode == LCK_PR)
continue;
-
+ /*
+ * ->ns_lock guarantees that no new locks are granted, and,
+ * therefore, that res->lr_lvb_data cannot increase beyond the
+ * end of already granted lock. As a result, it is safe to
+ * check against "stale" reply_lvb->lvb_size value without
+ * res->lr_lvb_sem.
+ */
if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
continue;
LDLM_LOCK_PUT(l);
l = LDLM_LOCK_GET(tmplock);
}
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ns->ns_lock);
/* There were no PW locks beyond the size in the LVB; finished. */
if (l == NULL) {
*
* Of course, this will all disappear when we switch to
* taking liblustre locks on the OST. */
- if (res->lr_namespace->ns_lvbo &&
- res->lr_namespace->ns_lvbo->lvbo_update) {
- res->lr_namespace->ns_lvbo->lvbo_update
- (res, NULL, 0, 1);
- }
+ if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
+ ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
}
RETURN(ELDLM_LOCK_ABORTED);
}
-
+ /*
+ * This check is for lock taken in filter_prepare_destroy() that does
+ * not have l_glimpse_ast set. So the logic is: if there is a lock
+ * with no l_glimpse_ast set, this object is being destroyed already.
+ *
+ * Hence, if you are grabbing DLM locks on the server, always set
+ * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
+ */
if (l->l_glimpse_ast == NULL) {
/* We are racing with unlink(); just return -ENOENT */
rep->lock_policy_res1 = -ENOENT;
LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
/* Update the LVB from disk if the AST failed (this is a legal race) */
- if (rc != 0 && res->lr_namespace->ns_lvbo &&
- res->lr_namespace->ns_lvbo->lvbo_update) {
- res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
- }
+ /*
+ * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
+ * sending ast is not handled. This can result in lost client writes.
+ */
+ if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
+ ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
down(&res->lr_lvb_sem);
*reply_lvb = *res_lvb;
unlock_kernel();
must_relock++;
}
-
+
mntput(filter->fo_vfsmnt);
//destroy_buffers(filter->fo_sb->s_dev);
filter->fo_sb = NULL;
oa->o_valid = OBD_MD_FLID;
/* Quota release need uid/gid info */
- obdo_from_inode(oa, dentry->d_inode,
+ obdo_from_inode(oa, dentry->d_inode,
FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID);
out_unlock:
/* trigger quota release */
if (rc == 0 && iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
- rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
+ rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
oa->o_uid, oa->o_gid, 1);
if (rc2)
CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
/* trigger quota release */
if (rc == 0) {
- rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
+ rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
oa->o_uid, oa->o_gid, 1);
if (rc2)
CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
{
struct filter_obd *filter = &obd->u.filter;
int rc = 0;
-
+
/*
* health_check to return 0 on healthy
* and 1 on unhealthy.
for (i = 0; i < objcount; i++, obj++) {
for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
res->page = NULL;
- }
+ }
}
/* Grab the dirty and seen grant announcements from the incoming obdo.
if (oa && oa->o_valid & OBD_MD_FLGRANT) {
oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
oa->o_valid = OBD_MD_FLGRANT;
- } else if (oa) {
+ } else if (oa)
oa->o_valid = 0;
- }
spin_unlock(&exp->exp_obd->obd_osfs_lock);
if (cmd == OBD_BRW_WRITE)
return filter_preprw_write(cmd, exp, oa, objcount, obj,
niocount, nb, res, oti);
-
if (cmd == OBD_BRW_READ)
return filter_preprw_read(cmd, exp, oa, objcount, obj,
niocount, nb, res, oti);
-
LBUG();
return -EPROTO;
}
* up by, e.g., ->writepage().
*/
LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
-#ifndef __KERNEL__
- LASSERT(0); /* check that liblustre angels do fear to tread here. */
-#endif
+ LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
+ * tread here. */
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
int nrbufs = obj->ioo_bufcnt;
struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
ldlm_policy_data_t policy;
+ int i;
ENTRY;
LASSERT(mode == LCK_PR || mode == LCK_PW);
- /*
- * assertions to add here:
- *
- * - all niobufs have the same OBD_BRW_SRVLOCK value
- *
- * - in OST-side locking case, niobufs are contiguous ->offset-wise.
- */
+
+ /* EXPENSIVE ASSERTION */
+ for (i = 1; i < nrbufs; i ++)
+ LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
+ (nb[i].flags & OBD_BRW_SRVLOCK));
if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
RETURN(0);
+ /* EXPENSIVE ASSERTION */
+ for (i = 1; i < nrbufs; i ++)
+ /*
+ * check that niobufs are contiguous ->offset-wise. Strictly
+ * speaking, this is not required by the code below. What we
+ * are trying to assert here is that RPC we are handling was
+ * sent by a liblustre-style cache-less client rather than by
+ * usual llite OSC layer than can arbitrarily mix pages from
+ * different write(2) calls.
+ */
+ LASSERT(nb[i].offset == nb[i - 1].offset + nb[i - 1].len);
+
policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
policy.l_extent.end = (nb[nrbufs - 1].offset +
nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
RETURN(ldlm_cli_enqueue(NULL, NULL, exp->exp_obd->obd_namespace,
res_id, LDLM_EXTENT, &policy, mode, &flags,
- ost_blocking_ast, ldlm_completion_ast,
- ost_glimpse_ast, NULL, NULL, 0, NULL, lh));
+ ldlm_blocking_ast, ldlm_completion_ast,
+ ldlm_glimpse_ast,
+ NULL, NULL, 0, NULL, lh));
}
static void ost_brw_lock_put(int mode,
#include <linux/module.h>
#else
#include <liblustre.h>
+#include <sys/types.h>
+#include <linux/unistd.h>
+
+_syscall0(pid_t,gettid)
+
#endif
#include <linux/obd_class.h>
#include <linux/lustre_net.h>
struct ptlrpc_ni ptlrpc_interfaces[8];
int ptlrpc_ninterfaces;
-/*
+/*
* Client's outgoing request callback
*/
void request_out_callback(ptl_event_t *ev)
spin_lock_irqsave(&req->rq_lock, flags);
req->rq_net_err = 1;
spin_unlock_irqrestore(&req->rq_lock, flags);
-
+
ptlrpc_wake_client_req(req);
}
LASSERT (ev->md.start == req->rq_repmsg);
LASSERT (ev->offset == 0);
LASSERT (ev->mlength <= req->rq_replen);
-
+
DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
"type %d, status %d", ev->type, ev->ni_fail_type);
EXIT;
}
-/*
+/*
* Client's bulk has been written/read
*/
void client_bulk_callback (ptl_event_t *ev)
unsigned long flags;
ENTRY;
- LASSERT ((desc->bd_type == BULK_PUT_SINK &&
+ LASSERT ((desc->bd_type == BULK_PUT_SINK &&
ev->type == PTL_EVENT_PUT_END) ||
(desc->bd_type == BULK_GET_SOURCE &&
ev->type == PTL_EVENT_GET_END) ||
LASSERT (ev->unlinked);
CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
+ "event type %d, status %d, desc %p\n",
ev->type, ev->ni_fail_type, desc);
spin_lock_irqsave (&desc->bd_lock, flags);
EXIT;
}
-/*
+/*
* Server's incoming request callback
*/
void request_in_callback(ptl_event_t *ev)
rqbd->rqbd_buffer + service->srv_buf_size);
CDEBUG((ev->ni_fail_type == PTL_OK) ? D_NET : D_ERROR,
- "event type %d, status %d, service %s\n",
+ "event type %d, status %d, service %s\n",
ev->type, ev->ni_fail_type, service->srv_name);
if (ev->unlinked) {
if (req == NULL) {
CERROR("Can't allocate incoming request descriptor: "
"Dropping %s RPC from %s\n",
- service->srv_name,
+ service->srv_name,
portals_id2str(srv_ni->sni_ni->pni_number,
ev->initiator, str));
return;
ev->type == PTL_EVENT_REPLY_END));
CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
- "event type %d, status %d, desc %p\n",
+ "event type %d, status %d, desc %p\n",
ev->type, ev->ni_fail_type, desc);
spin_lock_irqsave (&desc->bd_lock, flags);
-
+
if ((ev->type == PTL_EVENT_ACK ||
ev->type == PTL_EVENT_REPLY_END) &&
ev->ni_fail_type == PTL_NI_OK) {
callback == request_in_callback ||
callback == reply_out_callback ||
callback == server_bulk_callback);
-
+
callback (ev);
}
int rc;
ENTRY;
-
+
rc = lustre_uuid_to_peer (uuid->uuid, &peer_nal, &peer_nid);
if (rc != 0)
if (pni->pni_number == peer_nal) {
#else
/* compatible nals but may be from different bridges */
- if (NALID_FROM_IFACE(pni->pni_number) ==
+ if (NALID_FROM_IFACE(pni->pni_number) ==
NALID_FROM_IFACE(peer_nal)) {
#endif
peer->peer_id.nid = peer_nid;
struct l_wait_info lwi;
int rc;
int retries;
-
+
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
case PTL_OK:
PtlNIFini(pni->pni_ni_h);
return;
-
+
case PTL_EQ_IN_USE:
if (retries != 0)
CWARN("Event queue for %s still busy\n",
pni->pni_name);
-
+
/* Wait for a bit */
init_waitqueue_head(&waitq);
lwi = LWI_TIMEOUT(2*HZ, NULL, NULL);
ptl_pid_t pid;
#ifndef __KERNEL__
- pid = getpid();
+ pid = gettid();
# if CRAY_PORTALS
/* hack to keep pid in range accepted by ernal */
pid &= 0xFF;
/* We're not passing any limits yet... */
rc = PtlNIInit(number, pid, NULL, NULL, &nih);
if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
- CDEBUG (D_NET, "Can't init network interface %s: %d\n",
+ CDEBUG (D_NET, "Can't init network interface %s: %d\n",
name, rc);
return (-ENOENT);
}
liblustre_register_wait_callback (int (*fn)(void *arg), void *arg)
{
struct liblustre_wait_callback *llwc;
-
+
OBD_ALLOC(llwc, sizeof(*llwc));
LASSERT (llwc != NULL);
-
+
llwc->llwc_fn = fn;
llwc->llwc_arg = arg;
list_add_tail(&llwc->llwc_list, &liblustre_wait_callbacks);
-
+
return (llwc);
}
liblustre_deregister_wait_callback (void *opaque)
{
struct liblustre_wait_callback *llwc = opaque;
-
+
list_del(&llwc->llwc_list);
OBD_FREE(llwc, sizeof(*llwc));
}
&ev, &i);
if (rc == PTL_EQ_EMPTY)
RETURN(0);
-
+
LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
-
+
/* liblustre: no asynch callback so we can't affort to miss any
* events... */
if (rc == PTL_EQ_DROPPED) {
CERROR ("Dropped an event!!!\n");
abort();
}
-
+
ptlrpc_master_callback (&ev);
RETURN(1);
}
/* Give all registered callbacks a bite at the cherry */
list_for_each(tmp, &liblustre_wait_callbacks) {
- llwc = list_entry(tmp, struct liblustre_wait_callback,
+ llwc = list_entry(tmp, struct liblustre_wait_callback,
llwc_list);
-
+
if (llwc->llwc_fn(llwc->llwc_arg))
found_something = 1;
}
return -EIO;
}
#ifndef __KERNEL__
- liblustre_services_callback =
+ liblustre_services_callback =
liblustre_register_wait_callback(&liblustre_check_services, NULL);
#endif
return 0;
if (*uuid_len < strlen(UUID_STR))
return;
-
+
if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
UUID_STR, strlen(UUID_STR)))
*uuid_len -= strlen(UUID_STR);
target_len, target_start,
ptlrpc_peernid2str(&imp->imp_connection->c_peer,
nidbuf),
- imp->imp_replayable
+ imp->imp_replayable
? "wait for recovery to complete"
: "fail");
RETURN(-EINVAL);
}
- if (imp->imp_conn_current &&
+ if (imp->imp_conn_current &&
imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) {
imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
struct obd_import_conn, oic_item);
/* All imports are pingable */
imp->imp_pingable = 1;
-
+
if (aa->pcaa_initial_connect) {
if (msg_flags & MSG_CONNECT_REPLAYABLE) {
CDEBUG(D_HA, "connected to replayable target: %s\n",
RETURN(0);
}
} else {
+ struct obd_connect_data *ocd;
+
+ ocd = lustre_swab_repbuf(request, 0,
+ sizeof *ocd, lustre_swab_connect);
+ if (ocd == NULL) {
+ CERROR("Wrong connect data from server\n");
+ rc = -EPROTO;
+ GOTO(out, rc);
+ }
spin_lock_irqsave(&imp->imp_lock, flags);
+ /*
+ * check that server granted subset of flags we asked for.
+ */
+ LASSERT((ocd->ocd_connect_flags &
+ imp->imp_connect_data.ocd_connect_flags) ==
+ ocd->ocd_connect_flags);
+ imp->imp_connect_data = *ocd;
if (imp->imp_conn_current != NULL) {
list_del(&imp->imp_conn_current->oic_item);
list_add(&imp->imp_conn_current->oic_item,
ptlrpc_import_recovery_state_machine(req->rq_import);
} else {
CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
- "reconnecting\n",
+ "reconnecting\n",
req->rq_import->imp_obd->obd_name, req->rq_status);
ptlrpc_connect_import(req->rq_import, NULL);
}