From dbc47c4f0d0b8db9e4cfb63c502650d6a566ffa2 Mon Sep 17 00:00:00 2001 From: nikita Date: Thu, 29 Sep 2005 12:58:24 +0000 Subject: [PATCH] Latest OST-side locking with connection flags. b=7311 r=adiler --- lustre/ChangeLog | 8 +++ lustre/include/linux/lustre_dlm.h | 5 +- lustre/include/linux/lustre_lib.h | 3 +- lustre/include/linux/obd_class.h | 16 +++--- lustre/ldlm/ldlm_lib.c | 31 +++++++---- lustre/ldlm/ldlm_lockd.c | 2 + lustre/ldlm/ldlm_request.c | 80 ++++++++++++++++++++++++++++ lustre/ldlm/ldlm_resource.c | 1 + lustre/liblustre/llite_lib.h | 5 +- lustre/liblustre/rw.c | 3 ++ lustre/liblustre/super.c | 13 +++-- lustre/llite/llite_internal.h | 1 + lustre/llite/llite_lib.c | 1 + lustre/lov/lov_obd.c | 14 ++++- lustre/mds/handler.c | 47 +---------------- lustre/mds/mds_open.c | 4 +- lustre/mds/mds_reint.c | 14 ++--- lustre/obdfilter/filter.c | 108 +++++++++++++++----------------------- lustre/obdfilter/filter_io.c | 7 +-- lustre/osc/osc_request.c | 5 +- lustre/ost/ost_handler.c | 30 +++++++---- lustre/ptlrpc/events.c | 63 ++++++++++++---------- lustre/ptlrpc/import.c | 26 +++++++-- 23 files changed, 288 insertions(+), 199 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 3113a39..c82d9ee 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -2,6 +2,14 @@ tbd Cluster File Systems, Inc. * version 1.4.6 * bug fixes +Severity : minor +Frequency : liblustre-only, when liblustre client dies or becomes busy +Bugzilla : 7311 +Description: Doing ls on Linux clients can take a long time with active + liblustre clients +Details : Newer more complete fix for 7311 issue: add connection flags + handling. + Severity : major Frequency : rare Bugzilla : 7407 diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index 959ec4d..72f285c 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -522,6 +522,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *, struct ldlm_lock *, /* ldlm_request.c */ int ldlm_expired_completion_wait(void *data); +int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag); +int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp); int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data); int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request *req, @@ -552,8 +555,6 @@ int ldlm_cli_join_lru(struct ldlm_namespace *, struct ldlm_res_id *, /* This has to be here because recursive inclusion sucks. */ int intent_disposition(struct ldlm_reply *rep, int flag); void intent_set_disposition(struct ldlm_reply *rep, int flag); -int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag); /* ioctls for trying requests */ diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 4519f97..a2f54b0 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -321,7 +321,7 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg) ENTRY; err = copy_from_user(&hdr, (void *)arg, sizeof(hdr)); - if (err) + if (err) RETURN(err); if (hdr.ioc_version != OBD_IOCTL_VERSION) { @@ -725,5 +725,6 @@ do { \ #define LIBLUSTRE_CLIENT (1) #endif + #endif /* _LUSTRE_LIB_H */ diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 95c9d40..70839ab 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -64,19 +64,19 @@ int class_name2dev(char *name); struct obd_device *class_name2obd(char *name); int class_uuid2dev(struct obd_uuid *uuid); struct obd_device *class_uuid2obd(struct obd_uuid *uuid); -struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, +struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, char * typ_name, struct obd_uuid *grp_uuid); struct obd_device * class_find_client_notype(struct obd_uuid *tgt_uuid, struct obd_uuid *grp_uuid); -struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, +struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next); int oig_init(struct obd_io_group **oig); void oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ); -void oig_complete_one(struct obd_io_group *oig, - struct oig_callback_context *occ, int rc); +void oig_complete_one(struct obd_io_group *oig, + struct oig_callback_context *occ, int rc); void oig_release(struct obd_io_group *oig); int oig_wait(struct obd_io_group *oig); /* ping evictor */ @@ -533,16 +533,20 @@ static inline int obd_del_conn(struct obd_import *imp, struct obd_uuid *uuid) static inline int obd_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, - struct obd_connect_data *data) + struct obd_connect_data *d) { int rc; + __u64 ocf = d ? d->ocd_connect_flags : 0; /* for post-condition check */ ENTRY; OBD_CHECK_DEV_ACTIVE(obd); OBD_CHECK_OP(obd, connect, -EOPNOTSUPP); OBD_COUNTER_INCREMENT(obd, connect); - rc = OBP(obd, connect)(conn, obd, cluuid, data); + rc = OBP(obd, connect)(conn, obd, cluuid, d); + /* check that only subset is granted */ + LASSERT(ergo(d != NULL, + (d->ocd_connect_flags & ocf) == d->ocd_connect_flags)); RETURN(rc); } diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 684cfdc..82a7244 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -95,7 +95,7 @@ static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid, } else { spin_unlock(&imp->imp_lock); GOTO(out_free, rc = -ENOENT); - + } spin_unlock(&imp->imp_lock); @@ -348,6 +348,7 @@ int client_connect_import(struct lustre_handle *dlm_handle, struct client_obd *cli = &obd->u.cli; struct obd_import *imp = cli->cl_import; struct obd_export *exp; + struct obd_connect_data *ocd; int rc; ENTRY; @@ -373,8 +374,10 @@ int client_connect_import(struct lustre_handle *dlm_handle, if (rc != 0) GOTO(out_ldlm, rc); + ocd = &imp->imp_connect_data; if (data) - memcpy(&imp->imp_connect_data, data, sizeof(*data)); + *ocd = *data; + rc = ptlrpc_connect_import(imp, NULL); if (rc != 0) { LASSERT (imp->imp_state == LUSTRE_IMP_DISCON); @@ -382,6 +385,12 @@ int client_connect_import(struct lustre_handle *dlm_handle, } LASSERT(exp->exp_connection); + if (data) { + LASSERT((ocd->ocd_connect_flags & data->ocd_connect_flags) == + ocd->ocd_connect_flags); + data->ocd_connect_flags = ocd->ocd_connect_flags; + } + ptlrpc_pinger_add_import(imp); EXIT; @@ -655,7 +664,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler) spin_lock_irqsave(&export->exp_lock, flags); if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) { CERROR("%s: already connected at a higher conn_cnt: %d > %d\n", - cluuid.uuid, export->exp_conn_cnt, + cluuid.uuid, export->exp_conn_cnt, req->rq_reqmsg->conn_cnt); spin_unlock_irqrestore(&export->exp_lock, flags); GOTO(out, rc = -EALREADY); @@ -805,14 +814,14 @@ static void abort_recovery_queue(struct obd_device *obd) } } -/* Called from a cleanup function if the device is being cleaned up - forcefully. The exports should all have been disconnected already, - the only thing left to do is +/* Called from a cleanup function if the device is being cleaned up + forcefully. The exports should all have been disconnected already, + the only thing left to do is - clear the recovery flags - cancel the timer - free queued requests and replies, but don't send replies Because the obd_stopping flag is set, no new requests should be received. - + */ void target_cleanup_recovery(struct obd_device *obd) { @@ -1223,7 +1232,7 @@ target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) return (ptlrpc_send_reply(req, 1)); } -void +void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) { int netrc; @@ -1267,12 +1276,12 @@ target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) rs->rs_xid = req->rq_xid; rs->rs_transno = req->rq_transno; rs->rs_export = exp; - + spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags); if (rs->rs_transno > obd->obd_last_committed) { - /* not committed already */ - list_add_tail (&rs->rs_obd_list, + /* not committed already */ + list_add_tail (&rs->rs_obd_list, &obd->obd_uncommitted_replies); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index e7463e4..7f1221f 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1651,6 +1651,8 @@ EXPORT_SYMBOL(ldlm_lock_allow_match); /* ldlm_request.c */ EXPORT_SYMBOL(ldlm_completion_ast); +EXPORT_SYMBOL(ldlm_blocking_ast); +EXPORT_SYMBOL(ldlm_glimpse_ast); EXPORT_SYMBOL(ldlm_expired_completion_wait); EXPORT_SYMBOL(ldlm_cli_convert); EXPORT_SYMBOL(ldlm_cli_enqueue); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 5eb65a4..4d8f98e 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -151,6 +151,78 @@ noreproc: RETURN(0); } +/* + * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs. + */ +int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, int flag) +{ + int do_ast; + ENTRY; + + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + + l_lock(&lock->l_resource->lr_namespace->ns_lock); + /* Get this: if ldlm_blocking_ast is racing with intent_policy, such + * that ldlm_blocking_ast is called just before intent_policy method + * takes the ns_lock, then by the time we get the lock, we might not + * be the correct blocking function anymore. So check, and return + * early, if so. */ + if (lock->l_blocking_ast != ldlm_blocking_ast) { + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + RETURN(0); + } + + lock->l_flags |= LDLM_FL_CBPENDING; + do_ast = (!lock->l_readers && !lock->l_writers); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + + if (do_ast) { + struct lustre_handle lockh; + int rc; + + LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } else { + LDLM_DEBUG(lock, "Lock still has references, will be " + "cancelled later"); + } + RETURN(0); +} + +/* + * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See + * comment in filter_intent_policy() on why you may need this. + */ +int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp) +{ + /* + * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for + * that is rather subtle: with OST-side locking, it may so happen that + * _all_ extent locks are held by the OST. If client wants to obtain + * current file size it calls ll{,u}_glimpse_size(), and (as locks are + * on the server), dummy glimpse callback fires and does + * nothing. Client still receives correct file size due to the + * following fragment in filter_intent_policy(): + * + * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB + * if (rc != 0 && res->lr_namespace->ns_lvbo && + * res->lr_namespace->ns_lvbo->lvbo_update) { + * res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); + * } + * + * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and + * returns correct file size to the client. + */ + return -ELDLM_NO_LOCK_DATA; +} + static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id, __u32 type, @@ -348,6 +420,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, GOTO(cleanup, rc); } + /* + * Liblustre client doesn't get extent locks, except for O_APPEND case + * where [0, OBD_OBJECT_EOF] lock is taken. + */ + LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT || + (policy->l_extent.start == 0 && + policy->l_extent.end == OBD_OBJECT_EOF))); + reply = lustre_swab_repbuf(req, 0, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 6664101..72e3230 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -683,6 +683,7 @@ void ldlm_resource_unlink_lock(struct ldlm_lock *lock) list_del_init(&lock->l_res_link); l_unlock(&lock->l_resource->lr_namespace->ns_lock); } +EXPORT_SYMBOL(ldlm_resource_unlink_lock); void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc) { diff --git a/lustre/liblustre/llite_lib.h b/lustre/liblustre/llite_lib.h index 3cfc29f..349548b 100644 --- a/lustre/liblustre/llite_lib.h +++ b/lustre/liblustre/llite_lib.h @@ -30,11 +30,12 @@ struct llu_sb_info struct obd_export *ll_osc_exp; obd_id ll_rootino; int ll_flags; + __u64 ll_connect_flags; struct list_head ll_conn_chain; struct obd_uuid ll_mds_uuid; struct obd_uuid ll_mds_peer_uuid; - char *ll_instance; + char *ll_instance; }; #define LL_SBI_NOLCK 0x1 @@ -149,7 +150,7 @@ struct it_cb_data { void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2); typedef int (*intent_finish_cb)(struct ptlrpc_request *, - struct inode *parent, struct pnode *pnode, + struct inode *parent, struct pnode *pnode, struct lookup_intent *, int offset, obd_id ino); int llu_intent_lock(struct inode *parent, struct pnode *pnode, struct lookup_intent *, int flags, intent_finish_cb); diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index bba4c91..39a0800 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -676,6 +676,9 @@ ssize_t llu_file_prwv(const struct iovec *iovec, int iovlen, } LASSERT(len == 0 || is_read); /* libsysio should guarantee this */ + /* + * BUG: lock is released too early. Fix is in bug 9296. + */ err = llu_extent_unlock(fd, inode, lsm, p.lrp_lock_mode, &lockh); if (err) CERROR("extent unlock error %d\n", err); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 290ef5e..f32d98e 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -238,7 +238,7 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid) if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME)) CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n", - valid, LTIME_S(st->st_mtime), + valid, LTIME_S(st->st_mtime), LTIME_S(st->st_ctime)); if (valid & OBD_MD_FLATIME) { @@ -329,7 +329,7 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm) if (rc) RETURN(rc); - refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | + refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE; obdo_refresh_inode(inode, &oa, refresh_valid); @@ -781,7 +781,7 @@ static int llu_iop_setattr(struct pnode *pno, liblustre_wait_event(0); - LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME | + LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME | SETATTR_UID | SETATTR_GID | SETATTR_LEN | SETATTR_MODE))); memset(&iattr, 0, sizeof(iattr)); @@ -1635,7 +1635,7 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md) inode = llu_new_inode(fs, &fid); if (inode) llu_update_inode(inode, md->body, md->lsm); - + return inode; } @@ -1667,6 +1667,7 @@ llu_fsswop_mount(const char *source, char *zconf_mdsnid, *zconf_mdsname, *zconf_profile; char *osc = NULL, *mdc = NULL; int async = 1, err = -EINVAL; + struct obd_connect_data ocd = {0,}; ENTRY; @@ -1763,12 +1764,14 @@ llu_fsswop_mount(const char *source, obd_set_info(obd->obd_self_export, strlen("async"), "async", sizeof(async), &async); - err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, NULL /* ocd */); + ocd.ocd_connect_flags |= OBD_CONNECT_SRVLOCK; + err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd); if (err) { CERROR("cannot connect to %s: rc = %d\n", osc, err); GOTO(out_mdc, err); } sbi->ll_osc_exp = class_conn2export(&osc_conn); + sbi->ll_connect_flags = ocd.ocd_connect_flags; mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 6ade334..3ca8df3 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -136,6 +136,7 @@ struct ll_sb_info { int ll_flags; struct list_head ll_conn_chain; /* per-conn chain of SBs */ + __u64 ll_connect_flags; struct hlist_head ll_orphan_dentry_list; /*please don't ask -p*/ struct ll_close_queue *ll_lcq; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 1f9ed45..8fb9a24 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -212,6 +212,7 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc) GOTO(out_mdc, err); } sbi->ll_osc_exp = class_conn2export(&osc_conn); + sbi->ll_connect_flags = data->ocd_connect_flags; mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 1d98c64..9913e50 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -65,6 +65,7 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt, struct obd_device *tgt_obd; struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" }; struct lustre_handle conn = {0, }; + struct obd_import *imp; #ifdef __KERNEL__ struct proc_dir_entry *lov_proc_dir; #endif @@ -89,7 +90,12 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt, ptlrpc_activate_import(tgt_obd->u.cli.cl_import); } - if (tgt_obd->u.cli.cl_import->imp_invalid) { + /* + * Divine LOV knows that OBDs under it are OSCs. + */ + imp = tgt_obd->u.cli.cl_import; + + if (imp->imp_invalid) { CERROR("not connecting OSC %s; administratively " "disabled\n", tgt_uuid->uuid); rc = obd_register_observer(tgt_obd, obd); @@ -155,6 +161,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, struct lov_obd *lov = &obd->u.lov; struct lov_tgt_desc *tgt; struct obd_export *exp; + __u64 connect_flags = data ? data->ocd_connect_flags : 0; int rc, rc2, i; ENTRY; @@ -178,8 +185,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, rc = lov_connect_obd(obd, tgt, 0, data); if (rc) GOTO(out_disc, rc); + if (data) + connect_flags &= data->ocd_connect_flags; } + if (data) + data->ocd_connect_flags = connect_flags; + class_export_put(exp); RETURN (0); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index ac61f2f..173932f 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -178,8 +178,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, res_id.name[1] = de->d_inode->i_generation; rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, LDLM_PLAIN, NULL, lock_mode, &flags, - mds_blocking_ast, ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, lockh); + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { l_dput(de); retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ @@ -436,49 +436,6 @@ static int mds_getstatus(struct ptlrpc_request *req) RETURN(0); } -int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - int do_ast; - ENTRY; - - if (flag == LDLM_CB_CANCELING) { - /* Don't need to do anything here. */ - RETURN(0); - } - - /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); - /* Get this: if mds_blocking_ast is racing with mds_intent_policy, - * such that mds_blocking_ast is called just before l_i_p takes the - * ns_lock, then by the time we get the lock, we might not be the - * correct blocking function anymore. So check, and return early, if - * so. */ - if (lock->l_blocking_ast != mds_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(0); - } - - lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - - if (do_ast) { - struct lustre_handle lockh; - int rc; - - LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) - CERROR("ldlm_cli_cancel: %d\n", rc); - } else { - LDLM_DEBUG(lock, "Lock still has references, will be " - "cancelled later"); - } - RETURN(0); -} - int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, int *size, int lock) { diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 4a4cdd0..f4c25bc 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -796,8 +796,8 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode, rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id, LDLM_PLAIN, NULL, LCK_EX, &lock_flags, - mds_blocking_ast, ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, child_lockh); + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_enqueue: %d\n", rc); else if (child_lockh == &lockh) diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 67a162a..5cce652 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -977,8 +977,8 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0], LDLM_PLAIN, NULL, lock_modes[0], &flags[0], - mds_blocking_ast, ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, handles[0]); + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) RETURN(-EIO); ldlm_lock_dump_handle(D_OTHER, handles[0]); @@ -989,9 +989,9 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, } else if (res_id[1]->name[0] != 0) { rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[1], LDLM_PLAIN, NULL, - lock_modes[1], &flags[1],mds_blocking_ast, - ldlm_completion_ast, NULL, NULL, NULL, 0, - NULL, handles[1]); + lock_modes[1], &flags[1], + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, handles[1]); if (rc != ELDLM_OK) { ldlm_lock_decref(handles[0], lock_modes[0]); RETURN(-EIO); @@ -1066,7 +1066,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[i], LDLM_PLAIN, NULL, lock_modes[i], &flags, - mds_blocking_ast, + ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, dlm_handles[i]); if (rc != ELDLM_OK) @@ -1151,7 +1151,7 @@ static int mds_verify_child(struct obd_device *obd, rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *child_res_id, LDLM_PLAIN, NULL, - child_mode, &flags, mds_blocking_ast, + child_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 616fb4c..3cd6690 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -879,51 +879,6 @@ __u64 filter_last_id(struct filter_obd *filter, struct obdo *oa) return id; } -/* direct cut-n-paste of mds_blocking_ast() */ -static int filter_blocking_ast(struct ldlm_lock *lock, - struct ldlm_lock_desc *desc, - void *data, int flag) -{ - int do_ast; - ENTRY; - - if (flag == LDLM_CB_CANCELING) { - /* Don't need to do anything here. */ - RETURN(0); - } - - /* XXX layering violation! -phil */ - l_lock(&lock->l_resource->lr_namespace->ns_lock); - /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy, - * such that filter_blocking_ast is called just before l_i_p takes the - * ns_lock, then by the time we get the lock, we might not be the - * correct blocking function anymore. So check, and return early, if - * so. */ - if (lock->l_blocking_ast != filter_blocking_ast) { - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - RETURN(0); - } - - lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = (!lock->l_readers && !lock->l_writers); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); - - if (do_ast) { - struct lustre_handle lockh; - int rc; - - LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) - CERROR("ldlm_cli_cancel: %d\n", rc); - } else { - LDLM_DEBUG(lock, "Lock still has references, will be " - "cancelled later"); - } - RETURN(0); -} - static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) { down(&dparent->d_inode->i_sem); @@ -1039,7 +994,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) * throw away any cached pages. */ rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, LDLM_EXTENT, &policy, LCK_PW, - &flags, filter_blocking_ast, ldlm_completion_ast, + &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL, 0, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ @@ -1113,7 +1068,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns, lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF; lock->l_req_mode = LCK_PR; - l_lock(&res->lr_namespace->ns_lock); + LASSERT(ns == res->lr_namespace); + l_lock(&ns->ns_lock); res->lr_tmp = &rpc_list; rc = policy(lock, &tmpflags, 0, &err); @@ -1129,9 +1085,17 @@ static int filter_intent_policy(struct ldlm_namespace *ns, OBD_FREE(w, sizeof(*w)); } + /* The lock met with no resistance; we're finished. */ if (rc == LDLM_ITER_CONTINUE) { - /* The lock met with no resistance; we're finished. */ - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ns->ns_lock); + /* + * do not grant locks to the liblustre clients: they cannot + * handle ASTs robustly. + */ + if (lock->l_export->exp_libclient) { + ldlm_resource_unlink_lock(lock); + RETURN(ELDLM_LOCK_ABORTED); + } RETURN(ELDLM_LOCK_REPLACED); } @@ -1151,7 +1115,13 @@ static int filter_intent_policy(struct ldlm_namespace *ns, if (tmplock->l_granted_mode == LCK_PR) continue; - + /* + * ->ns_lock guarantees that no new locks are granted, and, + * therefore, that res->lr_lvb_data cannot increase beyond the + * end of already granted lock. As a result, it is safe to + * check against "stale" reply_lvb->lvb_size value without + * res->lr_lvb_sem. + */ if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size) continue; @@ -1176,7 +1146,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LDLM_LOCK_PUT(l); l = LDLM_LOCK_GET(tmplock); } - l_unlock(&res->lr_namespace->ns_lock); + l_unlock(&ns->ns_lock); /* There were no PW locks beyond the size in the LVB; finished. */ if (l == NULL) { @@ -1191,15 +1161,19 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * * Of course, this will all disappear when we switch to * taking liblustre locks on the OST. */ - if (res->lr_namespace->ns_lvbo && - res->lr_namespace->ns_lvbo->lvbo_update) { - res->lr_namespace->ns_lvbo->lvbo_update - (res, NULL, 0, 1); - } + if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update) + ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); } RETURN(ELDLM_LOCK_ABORTED); } - + /* + * This check is for lock taken in filter_prepare_destroy() that does + * not have l_glimpse_ast set. So the logic is: if there is a lock + * with no l_glimpse_ast set, this object is being destroyed already. + * + * Hence, if you are grabbing DLM locks on the server, always set + * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()). + */ if (l->l_glimpse_ast == NULL) { /* We are racing with unlink(); just return -ENOENT */ rep->lock_policy_res1 = -ENOENT; @@ -1209,10 +1183,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns, LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l); rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */ /* Update the LVB from disk if the AST failed (this is a legal race) */ - if (rc != 0 && res->lr_namespace->ns_lvbo && - res->lr_namespace->ns_lvbo->lvbo_update) { - res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1); - } + /* + * XXX nikita: situation when ldlm_server_glimpse_ast() failed before + * sending ast is not handled. This can result in lost client writes. + */ + if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) + ns->ns_lvbo->lvbo_update(res, NULL, 0, 1); down(&res->lr_lvb_sem); *reply_lvb = *res_lvb; @@ -1627,7 +1603,7 @@ static int filter_cleanup(struct obd_device *obd) unlock_kernel(); must_relock++; } - + mntput(filter->fo_vfsmnt); //destroy_buffers(filter->fo_sb->s_dev); filter->fo_sb = NULL; @@ -2005,7 +1981,7 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa, oa->o_valid = OBD_MD_FLID; /* Quota release need uid/gid info */ - obdo_from_inode(oa, dentry->d_inode, + obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID); out_unlock: @@ -2018,7 +1994,7 @@ out_unlock: /* trigger quota release */ if (rc == 0 && iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { - rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, + rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, oa->o_uid, oa->o_gid, 1); if (rc2) CERROR("error filter adjust qunit! (rc:%d)\n", rc2); @@ -2531,7 +2507,7 @@ cleanup: /* trigger quota release */ if (rc == 0) { - rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, + rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, oa->o_uid, oa->o_gid, 1); if (rc2) CERROR("error filter adjust qunit! (rc:%d)\n", rc2); @@ -2753,7 +2729,7 @@ static int filter_health_check(struct obd_device *obd) { struct filter_obd *filter = &obd->u.filter; int rc = 0; - + /* * health_check to return 0 on healthy * and 1 on unhealthy. diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index e0c4fa6..0f89961 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -68,7 +68,7 @@ static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj, for (i = 0; i < objcount; i++, obj++) { for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) res->page = NULL; - } + } } /* Grab the dirty and seen grant announcements from the incoming obdo. @@ -551,9 +551,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, if (oa && oa->o_valid & OBD_MD_FLGRANT) { oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); oa->o_valid = OBD_MD_FLGRANT; - } else if (oa) { + } else if (oa) oa->o_valid = 0; - } spin_unlock(&exp->exp_obd->obd_osfs_lock); @@ -660,11 +659,9 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, niocount, nb, res, oti); - if (cmd == OBD_BRW_READ) return filter_preprw_read(cmd, exp, oa, objcount, obj, niocount, nb, res, oti); - LBUG(); return -EPROTO; } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 2fbb8db..e9ea9dc 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2109,9 +2109,8 @@ static int osc_set_async_flags(struct obd_export *exp, * up by, e.g., ->writepage(). */ LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)); -#ifndef __KERNEL__ - LASSERT(0); /* check that liblustre angels do fear to tread here. */ -#endif + LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to + * tread here. */ if (cli->cl_import == NULL || cli->cl_import->imp_invalid) RETURN(-EIO); diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 4b308e8..09e00e6 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -471,29 +471,41 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp, int nrbufs = obj->ioo_bufcnt; struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; ldlm_policy_data_t policy; + int i; ENTRY; LASSERT(mode == LCK_PR || mode == LCK_PW); - /* - * assertions to add here: - * - * - all niobufs have the same OBD_BRW_SRVLOCK value - * - * - in OST-side locking case, niobufs are contiguous ->offset-wise. - */ + + /* EXPENSIVE ASSERTION */ + for (i = 1; i < nrbufs; i ++) + LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) == + (nb[i].flags & OBD_BRW_SRVLOCK)); if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) RETURN(0); + /* EXPENSIVE ASSERTION */ + for (i = 1; i < nrbufs; i ++) + /* + * check that niobufs are contiguous ->offset-wise. Strictly + * speaking, this is not required by the code below. What we + * are trying to assert here is that RPC we are handling was + * sent by a liblustre-style cache-less client rather than by + * usual llite OSC layer than can arbitrarily mix pages from + * different write(2) calls. + */ + LASSERT(nb[i].offset == nb[i - 1].offset + nb[i - 1].len); + policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK; policy.l_extent.end = (nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK; RETURN(ldlm_cli_enqueue(NULL, NULL, exp->exp_obd->obd_namespace, res_id, LDLM_EXTENT, &policy, mode, &flags, - ost_blocking_ast, ldlm_completion_ast, - ost_glimpse_ast, NULL, NULL, 0, NULL, lh)); + ldlm_blocking_ast, ldlm_completion_ast, + ldlm_glimpse_ast, + NULL, NULL, 0, NULL, lh)); } static void ost_brw_lock_put(int mode, diff --git a/lustre/ptlrpc/events.c b/lustre/ptlrpc/events.c index 68ef2c3..fba2870 100644 --- a/lustre/ptlrpc/events.c +++ b/lustre/ptlrpc/events.c @@ -29,6 +29,11 @@ #include #else #include +#include +#include + +_syscall0(pid_t,gettid) + #endif #include #include @@ -43,7 +48,7 @@ static void cray_portals_callback(ptl_event_t *ev); struct ptlrpc_ni ptlrpc_interfaces[8]; int ptlrpc_ninterfaces; -/* +/* * Client's outgoing request callback */ void request_out_callback(ptl_event_t *ev) @@ -69,7 +74,7 @@ void request_out_callback(ptl_event_t *ev) spin_lock_irqsave(&req->rq_lock, flags); req->rq_net_err = 1; spin_unlock_irqrestore(&req->rq_lock, flags); - + ptlrpc_wake_client_req(req); } @@ -94,7 +99,7 @@ void reply_in_callback(ptl_event_t *ev) LASSERT (ev->md.start == req->rq_repmsg); LASSERT (ev->offset == 0); LASSERT (ev->mlength <= req->rq_replen); - + DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req, "type %d, status %d", ev->type, ev->ni_fail_type); @@ -117,7 +122,7 @@ void reply_in_callback(ptl_event_t *ev) EXIT; } -/* +/* * Client's bulk has been written/read */ void client_bulk_callback (ptl_event_t *ev) @@ -127,7 +132,7 @@ void client_bulk_callback (ptl_event_t *ev) unsigned long flags; ENTRY; - LASSERT ((desc->bd_type == BULK_PUT_SINK && + LASSERT ((desc->bd_type == BULK_PUT_SINK && ev->type == PTL_EVENT_PUT_END) || (desc->bd_type == BULK_GET_SOURCE && ev->type == PTL_EVENT_GET_END) || @@ -135,7 +140,7 @@ void client_bulk_callback (ptl_event_t *ev) LASSERT (ev->unlinked); CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, - "event type %d, status %d, desc %p\n", + "event type %d, status %d, desc %p\n", ev->type, ev->ni_fail_type, desc); spin_lock_irqsave (&desc->bd_lock, flags); @@ -157,7 +162,7 @@ void client_bulk_callback (ptl_event_t *ev) EXIT; } -/* +/* * Server's incoming request callback */ void request_in_callback(ptl_event_t *ev) @@ -178,7 +183,7 @@ void request_in_callback(ptl_event_t *ev) rqbd->rqbd_buffer + service->srv_buf_size); CDEBUG((ev->ni_fail_type == PTL_OK) ? D_NET : D_ERROR, - "event type %d, status %d, service %s\n", + "event type %d, status %d, service %s\n", ev->type, ev->ni_fail_type, service->srv_name); if (ev->unlinked) { @@ -199,7 +204,7 @@ void request_in_callback(ptl_event_t *ev) if (req == NULL) { CERROR("Can't allocate incoming request descriptor: " "Dropping %s RPC from %s\n", - service->srv_name, + service->srv_name, portals_id2str(srv_ni->sni_ni->pni_number, ev->initiator, str)); return; @@ -316,11 +321,11 @@ void server_bulk_callback (ptl_event_t *ev) ev->type == PTL_EVENT_REPLY_END)); CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, - "event type %d, status %d, desc %p\n", + "event type %d, status %d, desc %p\n", ev->type, ev->ni_fail_type, desc); spin_lock_irqsave (&desc->bd_lock, flags); - + if ((ev->type == PTL_EVENT_ACK || ev->type == PTL_EVENT_REPLY_END) && ev->ni_fail_type == PTL_NI_OK) { @@ -354,7 +359,7 @@ static void ptlrpc_master_callback(ptl_event_t *ev) callback == request_in_callback || callback == reply_out_callback || callback == server_bulk_callback); - + callback (ev); } @@ -368,7 +373,7 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer) int rc; ENTRY; - + rc = lustre_uuid_to_peer (uuid->uuid, &peer_nal, &peer_nid); if (rc != 0) @@ -381,7 +386,7 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer) if (pni->pni_number == peer_nal) { #else /* compatible nals but may be from different bridges */ - if (NALID_FROM_IFACE(pni->pni_number) == + if (NALID_FROM_IFACE(pni->pni_number) == NALID_FROM_IFACE(peer_nal)) { #endif peer->peer_id.nid = peer_nid; @@ -402,7 +407,7 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni) struct l_wait_info lwi; int rc; int retries; - + /* Wait for the event queue to become idle since there may still be * messages in flight with pending events (i.e. the fire-and-forget * messages == client requests and "non-difficult" server @@ -417,12 +422,12 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni) case PTL_OK: PtlNIFini(pni->pni_ni_h); return; - + case PTL_EQ_IN_USE: if (retries != 0) CWARN("Event queue for %s still busy\n", pni->pni_name); - + /* Wait for a bit */ init_waitqueue_head(&waitq); lwi = LWI_TIMEOUT(2*HZ, NULL, NULL); @@ -438,7 +443,7 @@ ptl_pid_t ptl_get_pid(void) ptl_pid_t pid; #ifndef __KERNEL__ - pid = getpid(); + pid = gettid(); # if CRAY_PORTALS /* hack to keep pid in range accepted by ernal */ pid &= 0xFF; @@ -463,7 +468,7 @@ int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni) /* We're not passing any limits yet... */ rc = PtlNIInit(number, pid, NULL, NULL, &nih); if (rc != PTL_OK && rc != PTL_IFACE_DUP) { - CDEBUG (D_NET, "Can't init network interface %s: %d\n", + CDEBUG (D_NET, "Can't init network interface %s: %d\n", name, rc); return (-ENOENT); } @@ -526,14 +531,14 @@ void * liblustre_register_wait_callback (int (*fn)(void *arg), void *arg) { struct liblustre_wait_callback *llwc; - + OBD_ALLOC(llwc, sizeof(*llwc)); LASSERT (llwc != NULL); - + llwc->llwc_fn = fn; llwc->llwc_arg = arg; list_add_tail(&llwc->llwc_list, &liblustre_wait_callbacks); - + return (llwc); } @@ -541,7 +546,7 @@ void liblustre_deregister_wait_callback (void *opaque) { struct liblustre_wait_callback *llwc = opaque; - + list_del(&llwc->llwc_list); OBD_FREE(llwc, sizeof(*llwc)); } @@ -558,16 +563,16 @@ liblustre_check_events (int timeout) &ev, &i); if (rc == PTL_EQ_EMPTY) RETURN(0); - + LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK); - + /* liblustre: no asynch callback so we can't affort to miss any * events... */ if (rc == PTL_EQ_DROPPED) { CERROR ("Dropped an event!!!\n"); abort(); } - + ptlrpc_master_callback (&ev); RETURN(1); } @@ -591,9 +596,9 @@ liblustre_wait_event (int timeout) /* Give all registered callbacks a bite at the cherry */ list_for_each(tmp, &liblustre_wait_callbacks) { - llwc = list_entry(tmp, struct liblustre_wait_callback, + llwc = list_entry(tmp, struct liblustre_wait_callback, llwc_list); - + if (llwc->llwc_fn(llwc->llwc_arg)) found_something = 1; } @@ -684,7 +689,7 @@ int ptlrpc_init_portals(void) return -EIO; } #ifndef __KERNEL__ - liblustre_services_callback = + liblustre_services_callback = liblustre_register_wait_callback(&liblustre_check_services, NULL); #endif return 0; diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 25da640..4ea4774 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -102,7 +102,7 @@ static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uu if (*uuid_len < strlen(UUID_STR)) return; - + if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR), UUID_STR, strlen(UUID_STR))) *uuid_len -= strlen(UUID_STR); @@ -132,7 +132,7 @@ int ptlrpc_set_import_discon(struct obd_import *imp) target_len, target_start, ptlrpc_peernid2str(&imp->imp_connection->c_peer, nidbuf), - imp->imp_replayable + imp->imp_replayable ? "wait for recovery to complete" : "fail"); @@ -264,7 +264,7 @@ static int import_select_connection(struct obd_import *imp) RETURN(-EINVAL); } - if (imp->imp_conn_current && + if (imp->imp_conn_current && imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) { imp_conn = list_entry(imp->imp_conn_current->oic_item.next, struct obd_import_conn, oic_item); @@ -443,7 +443,7 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request, /* All imports are pingable */ imp->imp_pingable = 1; - + if (aa->pcaa_initial_connect) { if (msg_flags & MSG_CONNECT_REPLAYABLE) { CDEBUG(D_HA, "connected to replayable target: %s\n", @@ -533,7 +533,23 @@ finish: RETURN(0); } } else { + struct obd_connect_data *ocd; + + ocd = lustre_swab_repbuf(request, 0, + sizeof *ocd, lustre_swab_connect); + if (ocd == NULL) { + CERROR("Wrong connect data from server\n"); + rc = -EPROTO; + GOTO(out, rc); + } spin_lock_irqsave(&imp->imp_lock, flags); + /* + * check that server granted subset of flags we asked for. + */ + LASSERT((ocd->ocd_connect_flags & + imp->imp_connect_data.ocd_connect_flags) == + ocd->ocd_connect_flags); + imp->imp_connect_data = *ocd; if (imp->imp_conn_current != NULL) { list_del(&imp->imp_conn_current->oic_item); list_add(&imp->imp_conn_current->oic_item, @@ -576,7 +592,7 @@ static int completed_replay_interpret(struct ptlrpc_request *req, ptlrpc_import_recovery_state_machine(req->rq_import); } else { CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, " - "reconnecting\n", + "reconnecting\n", req->rq_import->imp_obd->obd_name, req->rq_status); ptlrpc_connect_import(req->rq_import, NULL); } -- 1.8.3.1