Whamcloud - gitweb
Latest OST-side locking with connection flags.
authornikita <nikita>
Thu, 29 Sep 2005 12:58:24 +0000 (12:58 +0000)
committernikita <nikita>
Thu, 29 Sep 2005 12:58:24 +0000 (12:58 +0000)
b=7311
r=adiler

23 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/liblustre/llite_lib.h
lustre/liblustre/rw.c
lustre/liblustre/super.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/lov/lov_obd.c
lustre/mds/handler.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_io.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/import.c

index 3113a39..c82d9ee 100644 (file)
@@ -2,6 +2,14 @@ tbd         Cluster File Systems, Inc. <info@clusterfs.com>
        * version 1.4.6
        * bug fixes
 
+Severity   : minor
+Frequency  : liblustre-only, when liblustre client dies or becomes busy
+Bugzilla   : 7311
+Description: Doing ls on Linux clients can take a long time with active
+            liblustre clients
+Details    : Newer more complete fix for 7311 issue: add connection flags
+            handling.
+
 Severity   : major
 Frequency  : rare
 Bugzilla   : 7407
index 959ec4d..72f285c 100644 (file)
@@ -522,6 +522,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *, struct ldlm_lock *,
 
 /* ldlm_request.c */
 int ldlm_expired_completion_wait(void *data);
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                      void *data, int flag);
+int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp);
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 int ldlm_cli_enqueue(struct obd_export *exp,
                      struct ptlrpc_request *req,
@@ -552,8 +555,6 @@ int ldlm_cli_join_lru(struct ldlm_namespace *, struct ldlm_res_id *,
 /* This has to be here because recursive inclusion sucks. */
 int intent_disposition(struct ldlm_reply *rep, int flag);
 void intent_set_disposition(struct ldlm_reply *rep, int flag);
-int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag);
 
 
 /* ioctls for trying requests */
index 4519f97..a2f54b0 100644 (file)
@@ -321,7 +321,7 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
         ENTRY;
 
         err = copy_from_user(&hdr, (void *)arg, sizeof(hdr));
-        if (err) 
+        if (err)
                 RETURN(err);
 
         if (hdr.ioc_version != OBD_IOCTL_VERSION) {
@@ -725,5 +725,6 @@ do {                                                                           \
 #define LIBLUSTRE_CLIENT (1)
 #endif
 
+
 #endif /* _LUSTRE_LIB_H */
 
index 95c9d40..70839ab 100644 (file)
@@ -64,19 +64,19 @@ int class_name2dev(char *name);
 struct obd_device *class_name2obd(char *name);
 int class_uuid2dev(struct obd_uuid *uuid);
 struct obd_device *class_uuid2obd(struct obd_uuid *uuid);
-struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid, 
+struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
                                           char * typ_name,
                                           struct obd_uuid *grp_uuid);
 struct obd_device * class_find_client_notype(struct obd_uuid *tgt_uuid,
                                              struct obd_uuid *grp_uuid);
-struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, 
+struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid,
                                            int *next);
 
 int oig_init(struct obd_io_group **oig);
 void oig_add_one(struct obd_io_group *oig,
                   struct oig_callback_context *occ);
-void oig_complete_one(struct obd_io_group *oig, 
-                       struct oig_callback_context *occ, int rc);
+void oig_complete_one(struct obd_io_group *oig,
+                      struct oig_callback_context *occ, int rc);
 void oig_release(struct obd_io_group *oig);
 int oig_wait(struct obd_io_group *oig);
 /* ping evictor */
@@ -533,16 +533,20 @@ static inline int obd_del_conn(struct obd_import *imp, struct obd_uuid *uuid)
 
 static inline int obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                               struct obd_uuid *cluuid,
-                              struct obd_connect_data *data)
+                              struct obd_connect_data *d)
 {
         int rc;
+        __u64 ocf = d ? d->ocd_connect_flags : 0; /* for post-condition check */
         ENTRY;
 
         OBD_CHECK_DEV_ACTIVE(obd);
         OBD_CHECK_OP(obd, connect, -EOPNOTSUPP);
         OBD_COUNTER_INCREMENT(obd, connect);
 
-        rc = OBP(obd, connect)(conn, obd, cluuid, data);
+        rc = OBP(obd, connect)(conn, obd, cluuid, d);
+        /* check that only subset is granted */
+        LASSERT(ergo(d != NULL,
+                     (d->ocd_connect_flags & ocf) == d->ocd_connect_flags));
         RETURN(rc);
 }
 
index 684cfdc..82a7244 100644 (file)
@@ -95,7 +95,7 @@ static int import_set_conn(struct obd_import *imp, struct obd_uuid *uuid,
         } else {
                 spin_unlock(&imp->imp_lock);
                 GOTO(out_free, rc = -ENOENT);
-                
+
         }
 
         spin_unlock(&imp->imp_lock);
@@ -348,6 +348,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
         struct obd_export *exp;
+        struct obd_connect_data *ocd;
         int rc;
         ENTRY;
 
@@ -373,8 +374,10 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         if (rc != 0)
                 GOTO(out_ldlm, rc);
 
+        ocd = &imp->imp_connect_data;
         if (data)
-                memcpy(&imp->imp_connect_data, data, sizeof(*data));
+                *ocd = *data;
+
         rc = ptlrpc_connect_import(imp, NULL);
         if (rc != 0) {
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
@@ -382,6 +385,12 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         }
         LASSERT(exp->exp_connection);
 
+        if (data) {
+                LASSERT((ocd->ocd_connect_flags & data->ocd_connect_flags) ==
+                        ocd->ocd_connect_flags);
+                data->ocd_connect_flags = ocd->ocd_connect_flags;
+        }
+
         ptlrpc_pinger_add_import(imp);
         EXIT;
 
@@ -655,7 +664,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         spin_lock_irqsave(&export->exp_lock, flags);
         if (export->exp_conn_cnt >= req->rq_reqmsg->conn_cnt) {
                 CERROR("%s: already connected at a higher conn_cnt: %d > %d\n",
-                       cluuid.uuid, export->exp_conn_cnt, 
+                       cluuid.uuid, export->exp_conn_cnt,
                        req->rq_reqmsg->conn_cnt);
                 spin_unlock_irqrestore(&export->exp_lock, flags);
                 GOTO(out, rc = -EALREADY);
@@ -805,14 +814,14 @@ static void abort_recovery_queue(struct obd_device *obd)
         }
 }
 
-/* Called from a cleanup function if the device is being cleaned up 
-   forcefully.  The exports should all have been disconnected already, 
-   the only thing left to do is 
+/* Called from a cleanup function if the device is being cleaned up
+   forcefully.  The exports should all have been disconnected already,
+   the only thing left to do is
      - clear the recovery flags
      - cancel the timer
      - free queued requests and replies, but don't send replies
    Because the obd_stopping flag is set, no new requests should be received.
-     
+
 */
 void target_cleanup_recovery(struct obd_device *obd)
 {
@@ -1223,7 +1232,7 @@ target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
         return (ptlrpc_send_reply(req, 1));
 }
 
-void 
+void
 target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
 {
         int                        netrc;
@@ -1267,12 +1276,12 @@ target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
         rs->rs_xid       = req->rq_xid;
         rs->rs_transno   = req->rq_transno;
         rs->rs_export    = exp;
-        
+
         spin_lock_irqsave (&obd->obd_uncommitted_replies_lock, flags);
 
         if (rs->rs_transno > obd->obd_last_committed) {
-                /* not committed already */ 
-                list_add_tail (&rs->rs_obd_list, 
+                /* not committed already */
+                list_add_tail (&rs->rs_obd_list,
                                &obd->obd_uncommitted_replies);
         }
 
index e7463e4..7f1221f 100644 (file)
@@ -1651,6 +1651,8 @@ EXPORT_SYMBOL(ldlm_lock_allow_match);
 
 /* ldlm_request.c */
 EXPORT_SYMBOL(ldlm_completion_ast);
+EXPORT_SYMBOL(ldlm_blocking_ast);
+EXPORT_SYMBOL(ldlm_glimpse_ast);
 EXPORT_SYMBOL(ldlm_expired_completion_wait);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
index 5eb65a4..4d8f98e 100644 (file)
@@ -151,6 +151,78 @@ noreproc:
         RETURN(0);
 }
 
+/*
+ * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs.
+ */
+int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                      void *data, int flag)
+{
+        int do_ast;
+        ENTRY;
+
+        if (flag == LDLM_CB_CANCELING) {
+                /* Don't need to do anything here. */
+                RETURN(0);
+        }
+
+        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        /* Get this: if ldlm_blocking_ast is racing with intent_policy, such
+         * that ldlm_blocking_ast is called just before intent_policy method
+         * takes the ns_lock, then by the time we get the lock, we might not
+         * be the correct blocking function anymore.  So check, and return
+         * early, if so. */
+        if (lock->l_blocking_ast != ldlm_blocking_ast) {
+                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+                RETURN(0);
+        }
+
+        lock->l_flags |= LDLM_FL_CBPENDING;
+        do_ast = (!lock->l_readers && !lock->l_writers);
+        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+
+        if (do_ast) {
+                struct lustre_handle lockh;
+                int rc;
+
+                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0)
+                        CERROR("ldlm_cli_cancel: %d\n", rc);
+        } else {
+                LDLM_DEBUG(lock, "Lock still has references, will be "
+                           "cancelled later");
+        }
+        RETURN(0);
+}
+
+/*
+ * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See
+ * comment in filter_intent_policy() on why you may need this.
+ */
+int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp)
+{
+        /*
+         * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for
+         * that is rather subtle: with OST-side locking, it may so happen that
+         * _all_ extent locks are held by the OST. If client wants to obtain
+         * current file size it calls ll{,u}_glimpse_size(), and (as locks are
+         * on the server), dummy glimpse callback fires and does
+         * nothing. Client still receives correct file size due to the
+         * following fragment in filter_intent_policy():
+         *
+         * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB
+         * if (rc != 0 && res->lr_namespace->ns_lvbo &&
+         *     res->lr_namespace->ns_lvbo->lvbo_update) {
+         *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
+         * }
+         *
+         * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and
+         * returns correct file size to the client.
+         */
+        return -ELDLM_NO_LOCK_DATA;
+}
+
 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
                                   struct ldlm_res_id res_id,
                                   __u32 type,
@@ -348,6 +420,14 @@ int ldlm_cli_enqueue(struct obd_export *exp,
                 GOTO(cleanup, rc);
         }
 
+        /*
+         * Liblustre client doesn't get extent locks, except for O_APPEND case
+         * where [0, OBD_OBJECT_EOF] lock is taken.
+         */
+        LASSERT(ergo(LIBLUSTRE_CLIENT, type != LDLM_EXTENT ||
+                     (policy->l_extent.start == 0 &&
+                      policy->l_extent.end   == OBD_OBJECT_EOF)));
+
         reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
                                    lustre_swab_ldlm_reply);
         if (reply == NULL) {
index 6664101..72e3230 100644 (file)
@@ -683,6 +683,7 @@ void ldlm_resource_unlink_lock(struct ldlm_lock *lock)
         list_del_init(&lock->l_res_link);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 }
+EXPORT_SYMBOL(ldlm_resource_unlink_lock);
 
 void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc)
 {
index 3cfc29f..349548b 100644 (file)
@@ -30,11 +30,12 @@ struct llu_sb_info
         struct obd_export      *ll_osc_exp;
         obd_id                  ll_rootino;
         int                     ll_flags;
+        __u64                   ll_connect_flags;
         struct list_head        ll_conn_chain;
 
         struct obd_uuid         ll_mds_uuid;
         struct obd_uuid         ll_mds_peer_uuid;
-        char                   *ll_instance; 
+        char                   *ll_instance;
 };
 
 #define LL_SBI_NOLCK            0x1
@@ -149,7 +150,7 @@ struct it_cb_data {
 void ll_i2gids(__u32 *suppgids, struct inode *i1,struct inode *i2);
 
 typedef int (*intent_finish_cb)(struct ptlrpc_request *,
-                                struct inode *parent, struct pnode *pnode, 
+                                struct inode *parent, struct pnode *pnode,
                                 struct lookup_intent *, int offset, obd_id ino);
 int llu_intent_lock(struct inode *parent, struct pnode *pnode,
                     struct lookup_intent *, int flags, intent_finish_cb);
index bba4c91..39a0800 100644 (file)
@@ -676,6 +676,9 @@ ssize_t llu_file_prwv(const struct iovec *iovec, int iovlen,
         }
         LASSERT(len == 0 || is_read); /* libsysio should guarantee this */
 
+        /*
+         * BUG: lock is released too early. Fix is in bug 9296.
+         */
         err = llu_extent_unlock(fd, inode, lsm, p.lrp_lock_mode, &lockh);
         if (err)
                 CERROR("extent unlock error %d\n", err);
index 290ef5e..f32d98e 100644 (file)
@@ -238,7 +238,7 @@ void obdo_from_inode(struct obdo *dst, struct inode *src, obd_flag valid)
 
         if (valid & (OBD_MD_FLCTIME | OBD_MD_FLMTIME))
                 CDEBUG(D_INODE, "valid %x, new time %lu/%lu\n",
-                       valid, LTIME_S(st->st_mtime), 
+                       valid, LTIME_S(st->st_mtime),
                        LTIME_S(st->st_ctime));
 
         if (valid & OBD_MD_FLATIME) {
@@ -329,7 +329,7 @@ int llu_inode_getattr(struct inode *inode, struct lov_stripe_md *lsm)
         if (rc)
                 RETURN(rc);
 
-        refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME | 
+        refresh_valid = OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ | OBD_MD_FLMTIME |
                         OBD_MD_FLCTIME | OBD_MD_FLSIZE;
 
         obdo_refresh_inode(inode, &oa, refresh_valid);
@@ -781,7 +781,7 @@ static int llu_iop_setattr(struct pnode *pno,
 
         liblustre_wait_event(0);
 
-        LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME | 
+        LASSERT(!(mask & ~(SETATTR_MTIME | SETATTR_ATIME |
                            SETATTR_UID | SETATTR_GID |
                            SETATTR_LEN | SETATTR_MODE)));
         memset(&iattr, 0, sizeof(iattr));
@@ -1635,7 +1635,7 @@ struct inode *llu_iget(struct filesys *fs, struct lustre_md *md)
         inode = llu_new_inode(fs, &fid);
         if (inode)
                 llu_update_inode(inode, md->body, md->lsm);
-        
+
         return inode;
 }
 
@@ -1667,6 +1667,7 @@ llu_fsswop_mount(const char *source,
        char *zconf_mdsnid, *zconf_mdsname, *zconf_profile;
         char *osc = NULL, *mdc = NULL;
         int async = 1, err = -EINVAL;
+        struct obd_connect_data ocd = {0,};
 
         ENTRY;
 
@@ -1763,12 +1764,14 @@ llu_fsswop_mount(const char *source,
         obd_set_info(obd->obd_self_export, strlen("async"), "async",
                      sizeof(async), &async);
 
-        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, NULL /* ocd */);
+        ocd.ocd_connect_flags |= OBD_CONNECT_SRVLOCK;
+        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
                 GOTO(out_mdc, err);
         }
         sbi->ll_osc_exp = class_conn2export(&osc_conn);
+        sbi->ll_connect_flags = ocd.ocd_connect_flags;
 
         mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
 
index 6ade334..3ca8df3 100644 (file)
@@ -136,6 +136,7 @@ struct ll_sb_info {
 
         int                       ll_flags;
         struct list_head          ll_conn_chain; /* per-conn chain of SBs */
+        __u64                     ll_connect_flags;
 
         struct hlist_head         ll_orphan_dentry_list; /*please don't ask -p*/
         struct ll_close_queue    *ll_lcq;
index 1f9ed45..8fb9a24 100644 (file)
@@ -212,6 +212,7 @@ int lustre_common_fill_super(struct super_block *sb, char *mdc, char *osc)
                 GOTO(out_mdc, err);
         }
         sbi->ll_osc_exp = class_conn2export(&osc_conn);
+        sbi->ll_connect_flags = data->ocd_connect_flags;
 
         mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
 
index 1d98c64..9913e50 100644 (file)
@@ -65,6 +65,7 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
         struct obd_device *tgt_obd;
         struct obd_uuid lov_osc_uuid = { "LOV_OSC_UUID" };
         struct lustre_handle conn = {0, };
+        struct obd_import *imp;
 #ifdef __KERNEL__
         struct proc_dir_entry *lov_proc_dir;
 #endif
@@ -89,7 +90,12 @@ static int lov_connect_obd(struct obd_device *obd, struct lov_tgt_desc *tgt,
                 ptlrpc_activate_import(tgt_obd->u.cli.cl_import);
         }
 
-        if (tgt_obd->u.cli.cl_import->imp_invalid) {
+        /*
+         * Divine LOV knows that OBDs under it are OSCs.
+         */
+        imp = tgt_obd->u.cli.cl_import;
+
+        if (imp->imp_invalid) {
                 CERROR("not connecting OSC %s; administratively "
                        "disabled\n", tgt_uuid->uuid);
                 rc = obd_register_observer(tgt_obd, obd);
@@ -155,6 +161,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
         struct obd_export *exp;
+        __u64 connect_flags = data ? data->ocd_connect_flags : 0;
         int rc, rc2, i;
         ENTRY;
 
@@ -178,8 +185,13 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 rc = lov_connect_obd(obd, tgt, 0, data);
                 if (rc)
                         GOTO(out_disc, rc);
+                if (data)
+                        connect_flags &= data->ocd_connect_flags;
         }
 
+        if (data)
+                data->ocd_connect_flags = connect_flags;
+
         class_export_put(exp);
         RETURN (0);
 
index ac61f2f..173932f 100644 (file)
@@ -178,8 +178,8 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
         res_id.name[1] = de->d_inode->i_generation;
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
                               LDLM_PLAIN, NULL, lock_mode, &flags,
-                              mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, lockh);
+                              ldlm_blocking_ast, ldlm_completion_ast,
+                              NULL, NULL, NULL, 0, NULL, lockh);
         if (rc != ELDLM_OK) {
                 l_dput(de);
                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
@@ -436,49 +436,6 @@ static int mds_getstatus(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                     void *data, int flag)
-{
-        int do_ast;
-        ENTRY;
-
-        if (flag == LDLM_CB_CANCELING) {
-                /* Don't need to do anything here. */
-                RETURN(0);
-        }
-
-        /* XXX layering violation!  -phil */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
-         * such that mds_blocking_ast is called just before l_i_p takes the
-         * ns_lock, then by the time we get the lock, we might not be the
-         * correct blocking function anymore.  So check, and return early, if
-         * so. */
-        if (lock->l_blocking_ast != mds_blocking_ast) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
-        }
-
-        lock->l_flags |= LDLM_FL_CBPENDING;
-        do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-
-        if (do_ast) {
-                struct lustre_handle lockh;
-                int rc;
-
-                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc < 0)
-                        CERROR("ldlm_cli_cancel: %d\n", rc);
-        } else {
-                LDLM_DEBUG(lock, "Lock still has references, will be "
-                           "cancelled later");
-        }
-        RETURN(0);
-}
-
 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
                int *size, int lock)
 {
index 4a4cdd0..f4c25bc 100644 (file)
@@ -796,8 +796,8 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode,
 
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, child_res_id,
                               LDLM_PLAIN, NULL, LCK_EX, &lock_flags,
-                              mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, child_lockh);
+                              ldlm_blocking_ast, ldlm_completion_ast,
+                              NULL, NULL, NULL, 0, NULL, child_lockh);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_enqueue: %d\n", rc);
         else if (child_lockh == &lockh)
index 67a162a..5cce652 100644 (file)
@@ -977,8 +977,8 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
 
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, *res_id[0],
                               LDLM_PLAIN, NULL, lock_modes[0], &flags[0],
-                              mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
-                              NULL, 0, NULL, handles[0]);
+                              ldlm_blocking_ast, ldlm_completion_ast,
+                              NULL, NULL, NULL, 0, NULL, handles[0]);
         if (rc != ELDLM_OK)
                 RETURN(-EIO);
         ldlm_lock_dump_handle(D_OTHER, handles[0]);
@@ -989,9 +989,9 @@ int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id,
         } else if (res_id[1]->name[0] != 0) {
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       *res_id[1], LDLM_PLAIN, NULL,
-                                      lock_modes[1], &flags[1],mds_blocking_ast,
-                                      ldlm_completion_ast, NULL, NULL, NULL, 0,
-                                      NULL, handles[1]);
+                                      lock_modes[1], &flags[1],
+                                      ldlm_blocking_ast, ldlm_completion_ast,
+                                      NULL, NULL, NULL, 0, NULL, handles[1]);
                 if (rc != ELDLM_OK) {
                         ldlm_lock_decref(handles[0], lock_modes[0]);
                         RETURN(-EIO);
@@ -1066,7 +1066,7 @@ int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id,
                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                               *res_id[i], LDLM_PLAIN, NULL,
                                               lock_modes[i], &flags,
-                                              mds_blocking_ast,
+                                              ldlm_blocking_ast,
                                               ldlm_completion_ast, NULL, NULL,
                                               NULL, 0, NULL, dlm_handles[i]);
                         if (rc != ELDLM_OK)
@@ -1151,7 +1151,7 @@ static int mds_verify_child(struct obd_device *obd,
 
                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
                                       *child_res_id, LDLM_PLAIN, NULL,
-                                      child_mode, &flags, mds_blocking_ast,
+                                      child_mode, &flags, ldlm_blocking_ast,
                                       ldlm_completion_ast, NULL, NULL, NULL, 0,
                                       NULL, child_lockh);
                 if (rc != ELDLM_OK)
index 616fb4c..3cd6690 100644 (file)
@@ -879,51 +879,6 @@ __u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
         return id;
 }
 
-/* direct cut-n-paste of mds_blocking_ast() */
-static int filter_blocking_ast(struct ldlm_lock *lock,
-                               struct ldlm_lock_desc *desc,
-                               void *data, int flag)
-{
-        int do_ast;
-        ENTRY;
-
-        if (flag == LDLM_CB_CANCELING) {
-                /* Don't need to do anything here. */
-                RETURN(0);
-        }
-
-        /* XXX layering violation!  -phil */
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        /* Get this: if filter_blocking_ast is racing with ldlm_intent_policy,
-         * such that filter_blocking_ast is called just before l_i_p takes the
-         * ns_lock, then by the time we get the lock, we might not be the
-         * correct blocking function anymore.  So check, and return early, if
-         * so. */
-        if (lock->l_blocking_ast != filter_blocking_ast) {
-                l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-                RETURN(0);
-        }
-
-        lock->l_flags |= LDLM_FL_CBPENDING;
-        do_ast = (!lock->l_readers && !lock->l_writers);
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
-
-        if (do_ast) {
-                struct lustre_handle lockh;
-                int rc;
-
-                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
-                ldlm_lock2handle(lock, &lockh);
-                rc = ldlm_cli_cancel(&lockh);
-                if (rc < 0)
-                        CERROR("ldlm_cli_cancel: %d\n", rc);
-        } else {
-                LDLM_DEBUG(lock, "Lock still has references, will be "
-                           "cancelled later");
-        }
-        RETURN(0);
-}
-
 static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
 {
         down(&dparent->d_inode->i_sem);
@@ -1039,7 +994,7 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
          * throw away any cached pages. */
         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
                               LDLM_EXTENT, &policy, LCK_PW,
-                              &flags, filter_blocking_ast, ldlm_completion_ast,
+                              &flags, ldlm_blocking_ast, ldlm_completion_ast,
                               NULL, NULL, NULL, 0, NULL, &lockh);
 
         /* We only care about the side-effects, just drop the lock. */
@@ -1113,7 +1068,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         lock->l_policy_data.l_extent.end = OBD_OBJECT_EOF;
         lock->l_req_mode = LCK_PR;
 
-        l_lock(&res->lr_namespace->ns_lock);
+        LASSERT(ns == res->lr_namespace);
+        l_lock(&ns->ns_lock);
 
         res->lr_tmp = &rpc_list;
         rc = policy(lock, &tmpflags, 0, &err);
@@ -1129,9 +1085,17 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 OBD_FREE(w, sizeof(*w));
         }
 
+        /* The lock met with no resistance; we're finished. */
         if (rc == LDLM_ITER_CONTINUE) {
-                /* The lock met with no resistance; we're finished. */
-                l_unlock(&res->lr_namespace->ns_lock);
+                l_unlock(&ns->ns_lock);
+                /*
+                 * do not grant locks to the liblustre clients: they cannot
+                 * handle ASTs robustly.
+                 */
+                if (lock->l_export->exp_libclient) {
+                        ldlm_resource_unlink_lock(lock);
+                        RETURN(ELDLM_LOCK_ABORTED);
+                }
                 RETURN(ELDLM_LOCK_REPLACED);
         }
 
@@ -1151,7 +1115,13 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
                 if (tmplock->l_granted_mode == LCK_PR)
                         continue;
-
+                /*
+                 * ->ns_lock guarantees that no new locks are granted, and,
+                 * therefore, that res->lr_lvb_data cannot increase beyond the
+                 * end of already granted lock. As a result, it is safe to
+                 * check against "stale" reply_lvb->lvb_size value without
+                 * res->lr_lvb_sem.
+                 */
                 if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
                         continue;
 
@@ -1176,7 +1146,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 LDLM_LOCK_PUT(l);
                 l = LDLM_LOCK_GET(tmplock);
         }
-        l_unlock(&res->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
 
         /* There were no PW locks beyond the size in the LVB; finished. */
         if (l == NULL) {
@@ -1191,15 +1161,19 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                          *
                          * Of course, this will all disappear when we switch to
                          * taking liblustre locks on the OST. */
-                        if (res->lr_namespace->ns_lvbo &&
-                            res->lr_namespace->ns_lvbo->lvbo_update) {
-                                res->lr_namespace->ns_lvbo->lvbo_update
-                                                             (res, NULL, 0, 1);
-                        }
+                        if (ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
+                                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
                 }
                 RETURN(ELDLM_LOCK_ABORTED);
         }
-
+        /*
+         * This check is for lock taken in filter_prepare_destroy() that does
+         * not have l_glimpse_ast set. So the logic is: if there is a lock
+         * with no l_glimpse_ast set, this object is being destroyed already.
+         *
+         * Hence, if you are grabbing DLM locks on the server, always set
+         * non-NULL glimpse_ast (e.g., ldlm_request.c:ldlm_glimpse_ast()).
+         */
         if (l->l_glimpse_ast == NULL) {
                 /* We are racing with unlink(); just return -ENOENT */
                 rep->lock_policy_res1 = -ENOENT;
@@ -1209,10 +1183,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
         /* Update the LVB from disk if the AST failed (this is a legal race) */
-        if (rc != 0 && res->lr_namespace->ns_lvbo &&
-            res->lr_namespace->ns_lvbo->lvbo_update) {
-                res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);
-        }
+        /*
+         * XXX nikita: situation when ldlm_server_glimpse_ast() failed before
+         * sending ast is not handled. This can result in lost client writes.
+         */
+        if (rc != 0 && ns->ns_lvbo && ns->ns_lvbo->lvbo_update)
+                ns->ns_lvbo->lvbo_update(res, NULL, 0, 1);
 
         down(&res->lr_lvb_sem);
         *reply_lvb = *res_lvb;
@@ -1627,7 +1603,7 @@ static int filter_cleanup(struct obd_device *obd)
                 unlock_kernel();
                 must_relock++;
         }
-        
+
         mntput(filter->fo_vfsmnt);
         //destroy_buffers(filter->fo_sb->s_dev);
         filter->fo_sb = NULL;
@@ -2005,7 +1981,7 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa,
 
         oa->o_valid = OBD_MD_FLID;
         /* Quota release need uid/gid info */
-        obdo_from_inode(oa, dentry->d_inode, 
+        obdo_from_inode(oa, dentry->d_inode,
                         FILTER_VALID_FLAGS | OBD_MD_FLUID | OBD_MD_FLGID);
 
 out_unlock:
@@ -2018,7 +1994,7 @@ out_unlock:
 
         /* trigger quota release */
         if (rc == 0 && iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
-                rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, 
+                rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
                                          oa->o_uid, oa->o_gid, 1);
                 if (rc2)
                         CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
@@ -2531,7 +2507,7 @@ cleanup:
 
         /* trigger quota release */
         if (rc == 0) {
-                rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt, 
+                rc2 = qctxt_adjust_qunit(obd, &filter->fo_quota_ctxt,
                                          oa->o_uid, oa->o_gid, 1);
                 if (rc2)
                         CERROR("error filter adjust qunit! (rc:%d)\n", rc2);
@@ -2753,7 +2729,7 @@ static int filter_health_check(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
         int rc = 0;
-       
+
         /*
          * health_check to return 0 on healthy
          * and 1 on unhealthy.
index e0c4fa6..0f89961 100644 (file)
@@ -68,7 +68,7 @@ static void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
         for (i = 0; i < objcount; i++, obj++) {
                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++)
                                 res->page = NULL;
-                        }
+        }
 }
 
 /* Grab the dirty and seen grant announcements from the incoming obdo.
@@ -551,9 +551,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
                 oa->o_valid = OBD_MD_FLGRANT;
-        } else if (oa) {
+        } else if (oa)
                 oa->o_valid = 0;
-        }
 
         spin_unlock(&exp->exp_obd->obd_osfs_lock);
 
@@ -660,11 +659,9 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
         if (cmd == OBD_BRW_WRITE)
                 return filter_preprw_write(cmd, exp, oa, objcount, obj,
                                            niocount, nb, res, oti);
-
         if (cmd == OBD_BRW_READ)
                 return filter_preprw_read(cmd, exp, oa, objcount, obj,
                                           niocount, nb, res, oti);
-
         LBUG();
         return -EPROTO;
 }
index 2fbb8db..e9ea9dc 100644 (file)
@@ -2109,9 +2109,8 @@ static int osc_set_async_flags(struct obd_export *exp,
          * up by, e.g., ->writepage().
          */
         LASSERT(!(oap->oap_brw_flags & OBD_BRW_SRVLOCK));
-#ifndef __KERNEL__
-        LASSERT(0); /* check that liblustre angels do fear to tread here. */
-#endif
+        LASSERT(!LIBLUSTRE_CLIENT); /* check that liblustre angels do fear to
+                                     * tread here. */
 
         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
                 RETURN(-EIO);
index 4b308e8..09e00e6 100644 (file)
@@ -471,29 +471,41 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp,
         int nrbufs                = obj->ioo_bufcnt;
         struct ldlm_res_id res_id = { .name = { obj->ioo_id } };
         ldlm_policy_data_t policy;
+        int i;
 
         ENTRY;
 
         LASSERT(mode == LCK_PR || mode == LCK_PW);
-        /*
-         * assertions to add here:
-         *
-         *  - all niobufs have the same OBD_BRW_SRVLOCK value
-         *
-         *  - in OST-side locking case, niobufs are contiguous ->offset-wise.
-         */
+
+        /* EXPENSIVE ASSERTION */
+        for (i = 1; i < nrbufs; i ++)
+                LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==
+                        (nb[i].flags & OBD_BRW_SRVLOCK));
 
         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
                 RETURN(0);
 
+        /* EXPENSIVE ASSERTION */
+        for (i = 1; i < nrbufs; i ++)
+                /*
+                 * check that niobufs are contiguous ->offset-wise. Strictly
+                 * speaking, this is not required by the code below. What we
+                 * are trying to assert here is that RPC we are handling was
+                 * sent by a liblustre-style cache-less client rather than by
+                 * usual llite OSC layer than can arbitrarily mix pages from
+                 * different write(2) calls.
+                 */
+                LASSERT(nb[i].offset == nb[i - 1].offset + nb[i - 1].len);
+
         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
         policy.l_extent.end   = (nb[nrbufs - 1].offset +
                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
 
         RETURN(ldlm_cli_enqueue(NULL, NULL, exp->exp_obd->obd_namespace,
                                 res_id, LDLM_EXTENT, &policy, mode, &flags,
-                                ost_blocking_ast, ldlm_completion_ast,
-                                ost_glimpse_ast, NULL, NULL, 0, NULL, lh));
+                                ldlm_blocking_ast, ldlm_completion_ast,
+                                ldlm_glimpse_ast,
+                                NULL, NULL, 0, NULL, lh));
 }
 
 static void ost_brw_lock_put(int mode,
index 68ef2c3..fba2870 100644 (file)
 #include <linux/module.h>
 #else
 #include <liblustre.h>
+#include <sys/types.h>
+#include <linux/unistd.h>
+
+_syscall0(pid_t,gettid)
+
 #endif
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
@@ -43,7 +48,7 @@ static void cray_portals_callback(ptl_event_t *ev);
 struct ptlrpc_ni  ptlrpc_interfaces[8];
 int               ptlrpc_ninterfaces;
 
-/*  
+/*
  *  Client's outgoing request callback
  */
 void request_out_callback(ptl_event_t *ev)
@@ -69,7 +74,7 @@ void request_out_callback(ptl_event_t *ev)
                 spin_lock_irqsave(&req->rq_lock, flags);
                 req->rq_net_err = 1;
                 spin_unlock_irqrestore(&req->rq_lock, flags);
-                
+
                 ptlrpc_wake_client_req(req);
         }
 
@@ -94,7 +99,7 @@ void reply_in_callback(ptl_event_t *ev)
         LASSERT (ev->md.start == req->rq_repmsg);
         LASSERT (ev->offset == 0);
         LASSERT (ev->mlength <= req->rq_replen);
-        
+
         DEBUG_REQ((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR, req,
                   "type %d, status %d", ev->type, ev->ni_fail_type);
 
@@ -117,7 +122,7 @@ void reply_in_callback(ptl_event_t *ev)
         EXIT;
 }
 
-/* 
+/*
  * Client's bulk has been written/read
  */
 void client_bulk_callback (ptl_event_t *ev)
@@ -127,7 +132,7 @@ void client_bulk_callback (ptl_event_t *ev)
         unsigned long            flags;
         ENTRY;
 
-        LASSERT ((desc->bd_type == BULK_PUT_SINK && 
+        LASSERT ((desc->bd_type == BULK_PUT_SINK &&
                   ev->type == PTL_EVENT_PUT_END) ||
                  (desc->bd_type == BULK_GET_SOURCE &&
                   ev->type == PTL_EVENT_GET_END) ||
@@ -135,7 +140,7 @@ void client_bulk_callback (ptl_event_t *ev)
         LASSERT (ev->unlinked);
 
         CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n", 
+               "event type %d, status %d, desc %p\n",
                ev->type, ev->ni_fail_type, desc);
 
         spin_lock_irqsave (&desc->bd_lock, flags);
@@ -157,7 +162,7 @@ void client_bulk_callback (ptl_event_t *ev)
         EXIT;
 }
 
-/* 
+/*
  * Server's incoming request callback
  */
 void request_in_callback(ptl_event_t *ev)
@@ -178,7 +183,7 @@ void request_in_callback(ptl_event_t *ev)
                  rqbd->rqbd_buffer + service->srv_buf_size);
 
         CDEBUG((ev->ni_fail_type == PTL_OK) ? D_NET : D_ERROR,
-               "event type %d, status %d, service %s\n", 
+               "event type %d, status %d, service %s\n",
                ev->type, ev->ni_fail_type, service->srv_name);
 
         if (ev->unlinked) {
@@ -199,7 +204,7 @@ void request_in_callback(ptl_event_t *ev)
                 if (req == NULL) {
                         CERROR("Can't allocate incoming request descriptor: "
                                "Dropping %s RPC from %s\n",
-                               service->srv_name, 
+                               service->srv_name,
                                portals_id2str(srv_ni->sni_ni->pni_number,
                                               ev->initiator, str));
                         return;
@@ -316,11 +321,11 @@ void server_bulk_callback (ptl_event_t *ev)
                   ev->type == PTL_EVENT_REPLY_END));
 
         CDEBUG((ev->ni_fail_type == PTL_NI_OK) ? D_NET : D_ERROR,
-               "event type %d, status %d, desc %p\n", 
+               "event type %d, status %d, desc %p\n",
                ev->type, ev->ni_fail_type, desc);
 
         spin_lock_irqsave (&desc->bd_lock, flags);
-        
+
         if ((ev->type == PTL_EVENT_ACK ||
              ev->type == PTL_EVENT_REPLY_END) &&
             ev->ni_fail_type == PTL_NI_OK) {
@@ -354,7 +359,7 @@ static void ptlrpc_master_callback(ptl_event_t *ev)
                  callback == request_in_callback ||
                  callback == reply_out_callback ||
                  callback == server_bulk_callback);
-        
+
         callback (ev);
 }
 
@@ -368,7 +373,7 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
         int                 rc;
 
         ENTRY;
-        
+
         rc = lustre_uuid_to_peer (uuid->uuid, &peer_nal, &peer_nid);
 
         if (rc != 0)
@@ -381,7 +386,7 @@ int ptlrpc_uuid_to_peer (struct obd_uuid *uuid, struct ptlrpc_peer *peer)
                 if (pni->pni_number == peer_nal) {
 #else
                /* compatible nals but may be from different bridges */
-               if (NALID_FROM_IFACE(pni->pni_number) == 
+               if (NALID_FROM_IFACE(pni->pni_number) ==
                    NALID_FROM_IFACE(peer_nal)) {
 #endif
                         peer->peer_id.nid = peer_nid;
@@ -402,7 +407,7 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
         struct l_wait_info  lwi;
         int                 rc;
         int                 retries;
-        
+
         /* Wait for the event queue to become idle since there may still be
          * messages in flight with pending events (i.e. the fire-and-forget
          * messages == client requests and "non-difficult" server
@@ -417,12 +422,12 @@ void ptlrpc_ni_fini(struct ptlrpc_ni *pni)
                 case PTL_OK:
                         PtlNIFini(pni->pni_ni_h);
                         return;
-                        
+
                 case PTL_EQ_IN_USE:
                         if (retries != 0)
                                 CWARN("Event queue for %s still busy\n",
                                       pni->pni_name);
-                        
+
                         /* Wait for a bit */
                         init_waitqueue_head(&waitq);
                         lwi = LWI_TIMEOUT(2*HZ, NULL, NULL);
@@ -438,7 +443,7 @@ ptl_pid_t ptl_get_pid(void)
         ptl_pid_t        pid;
 
 #ifndef  __KERNEL__
-        pid = getpid();
+        pid = gettid();
 # if CRAY_PORTALS
        /* hack to keep pid in range accepted by ernal */
        pid &= 0xFF;
@@ -463,7 +468,7 @@ int ptlrpc_ni_init(int number, char *name, struct ptlrpc_ni *pni)
         /* We're not passing any limits yet... */
         rc = PtlNIInit(number, pid, NULL, NULL, &nih);
         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
-                CDEBUG (D_NET, "Can't init network interface %s: %d\n", 
+                CDEBUG (D_NET, "Can't init network interface %s: %d\n",
                         name, rc);
                 return (-ENOENT);
         }
@@ -526,14 +531,14 @@ void *
 liblustre_register_wait_callback (int (*fn)(void *arg), void *arg)
 {
         struct liblustre_wait_callback *llwc;
-        
+
         OBD_ALLOC(llwc, sizeof(*llwc));
         LASSERT (llwc != NULL);
-        
+
         llwc->llwc_fn = fn;
         llwc->llwc_arg = arg;
         list_add_tail(&llwc->llwc_list, &liblustre_wait_callbacks);
-        
+
         return (llwc);
 }
 
@@ -541,7 +546,7 @@ void
 liblustre_deregister_wait_callback (void *opaque)
 {
         struct liblustre_wait_callback *llwc = opaque;
-        
+
         list_del(&llwc->llwc_list);
         OBD_FREE(llwc, sizeof(*llwc));
 }
@@ -558,16 +563,16 @@ liblustre_check_events (int timeout)
                        &ev, &i);
         if (rc == PTL_EQ_EMPTY)
                 RETURN(0);
-        
+
         LASSERT (rc == PTL_EQ_DROPPED || rc == PTL_OK);
-        
+
         /* liblustre: no asynch callback so we can't affort to miss any
          * events... */
         if (rc == PTL_EQ_DROPPED) {
                 CERROR ("Dropped an event!!!\n");
                 abort();
         }
-        
+
         ptlrpc_master_callback (&ev);
         RETURN(1);
 }
@@ -591,9 +596,9 @@ liblustre_wait_event (int timeout)
 
                 /* Give all registered callbacks a bite at the cherry */
                 list_for_each(tmp, &liblustre_wait_callbacks) {
-                        llwc = list_entry(tmp, struct liblustre_wait_callback, 
+                        llwc = list_entry(tmp, struct liblustre_wait_callback,
                                           llwc_list);
-                
+
                         if (llwc->llwc_fn(llwc->llwc_arg))
                                 found_something = 1;
                 }
@@ -684,7 +689,7 @@ int ptlrpc_init_portals(void)
                 return -EIO;
         }
 #ifndef __KERNEL__
-        liblustre_services_callback = 
+        liblustre_services_callback =
                 liblustre_register_wait_callback(&liblustre_check_services, NULL);
 #endif
         return 0;
index 25da640..4ea4774 100644 (file)
@@ -102,7 +102,7 @@ static void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uu
 
         if (*uuid_len < strlen(UUID_STR))
                 return;
-        
+
         if (!strncmp(*uuid_start + *uuid_len - strlen(UUID_STR),
                     UUID_STR, strlen(UUID_STR)))
                 *uuid_len -= strlen(UUID_STR);
@@ -132,7 +132,7 @@ int ptlrpc_set_import_discon(struct obd_import *imp)
                                target_len, target_start,
                                ptlrpc_peernid2str(&imp->imp_connection->c_peer,
                                                   nidbuf),
-                               imp->imp_replayable 
+                               imp->imp_replayable
                                ? "wait for recovery to complete"
                                : "fail");
 
@@ -264,7 +264,7 @@ static int import_select_connection(struct obd_import *imp)
                 RETURN(-EINVAL);
         }
 
-        if (imp->imp_conn_current && 
+        if (imp->imp_conn_current &&
             imp->imp_conn_current->oic_item.next != &imp->imp_conn_list) {
                 imp_conn = list_entry(imp->imp_conn_current->oic_item.next,
                                       struct obd_import_conn, oic_item);
@@ -443,7 +443,7 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
 
         /* All imports are pingable */
         imp->imp_pingable = 1;
-        
+
         if (aa->pcaa_initial_connect) {
                 if (msg_flags & MSG_CONNECT_REPLAYABLE) {
                         CDEBUG(D_HA, "connected to replayable target: %s\n",
@@ -533,7 +533,23 @@ finish:
                         RETURN(0);
                 }
         } else {
+                struct obd_connect_data *ocd;
+
+                ocd = lustre_swab_repbuf(request, 0,
+                                         sizeof *ocd, lustre_swab_connect);
+                if (ocd == NULL) {
+                        CERROR("Wrong connect data from server\n");
+                        rc = -EPROTO;
+                        GOTO(out, rc);
+                }
                 spin_lock_irqsave(&imp->imp_lock, flags);
+                /*
+                 * check that server granted subset of flags we asked for.
+                 */
+                LASSERT((ocd->ocd_connect_flags &
+                         imp->imp_connect_data.ocd_connect_flags) ==
+                        ocd->ocd_connect_flags);
+                imp->imp_connect_data = *ocd;
                 if (imp->imp_conn_current != NULL) {
                         list_del(&imp->imp_conn_current->oic_item);
                         list_add(&imp->imp_conn_current->oic_item,
@@ -576,7 +592,7 @@ static int completed_replay_interpret(struct ptlrpc_request *req,
                 ptlrpc_import_recovery_state_machine(req->rq_import);
         } else {
                 CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
-                       "reconnecting\n", 
+                       "reconnecting\n",
                        req->rq_import->imp_obd->obd_name, req->rq_status);
                 ptlrpc_connect_import(req->rq_import, NULL);
         }