Whamcloud - gitweb
Branch b_release_1_6_7
authorjohann <johann>
Wed, 7 Jan 2009 23:18:05 +0000 (23:18 +0000)
committerjohann <johann>
Wed, 7 Jan 2009 23:18:05 +0000 (23:18 +0000)
b=16839
i=adilger
i=tappro
i=green

Patch from shadow.
send ACTIVATE event only if connect finished and import have state FULL.

14 files changed:
lustre/ChangeLog
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/llite_lib.c
lustre/liblustre/super.c
lustre/llite/llite_lib.c
lustre/lov/lov_obd.c
lustre/mds/mds_lov.c
lustre/obdclass/obd_mount.c
lustre/obdecho/echo_client.c
lustre/osc/cache.c
lustre/osc/osc_request.c
lustre/ptlrpc/import.c

index 9d918b2..f9c42a1 100644 (file)
@@ -283,6 +283,15 @@ Details    : enable OBD_CONNECT_MDT flag when connecting from the MDS so that
             from a different NID, so we do not need to wait for the export to
             be evicted.
 
+Severity   : normal
+Frequency  : start MDS on uncleanly shutdowned MDS device
+Bugzilla   : 16839
+Description: ll_sync thread stay in waiting mds<>ost recovery finished
+Details    : fix race due to two ll_sync threads processing the same lov target
+            causing random failures in various tests. The solution is to send
+            ACTIVATE event only if connect already finished and import is in
+            FULL state.
+
 --------------------------------------------------------------------------
 
 2008-08-31 Sun Microsystems, Inc.
index 30f57b5..9252272 100644 (file)
@@ -768,6 +768,8 @@ enum llog_ctxt_id {
  * Events signalled through obd_notify() upcall-chain.
  */
 enum obd_notify_event {
+        /* DEVICE connect start */
+        OBD_NOTIFY_CONNECT,
         /* Device activated */
         OBD_NOTIFY_ACTIVE,
         /* Device deactivated */
@@ -1127,14 +1129,14 @@ struct obd_ops {
 
         int (*o_ping)(struct obd_export *exp);
 
-        int (*o_register_page_removal_cb)(struct obd_export *exp,
+        int (*o_register_page_removal_cb)(struct obd_device *obd,
                                           obd_page_removal_cb_t cb,
                                           obd_pin_extent_cb pin_cb);
-        int (*o_unregister_page_removal_cb)(struct obd_export *exp,
+        int (*o_unregister_page_removal_cb)(struct obd_device *obd,
                                             obd_page_removal_cb_t cb);
-        int (*o_register_lock_cancel_cb)(struct obd_export *exp,
+        int (*o_register_lock_cancel_cb)(struct obd_device *obd,
                                        obd_lock_cancel_cb cb);
-        int (*o_unregister_lock_cancel_cb)(struct obd_export *exp,
+        int (*o_unregister_lock_cancel_cb)(struct obd_device *obd,
                                          obd_lock_cancel_cb cb);
 
         /*
index 67aca5f..03bad87 100644 (file)
@@ -1489,56 +1489,56 @@ static inline int obd_register_observer(struct obd_device *obd,
         RETURN(0);
 }
 
-static inline int obd_register_page_removal_cb(struct obd_export *exp,
+static inline int obd_register_page_removal_cb(struct obd_device *obd,
                                                obd_page_removal_cb_t cb,
                                                obd_pin_extent_cb pin_cb)
 {
         int rc;
         ENTRY;
 
-        OBD_CHECK_OP(exp->exp_obd, register_page_removal_cb, 0);
-        OBD_COUNTER_INCREMENT(exp->exp_obd, register_page_removal_cb);
+        OBD_CHECK_OP(obd, register_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(obd, register_page_removal_cb);
 
-        rc = OBP(exp->exp_obd, register_page_removal_cb)(exp, cb, pin_cb);
+        rc = OBP(obd, register_page_removal_cb)(obd, cb, pin_cb);
         RETURN(rc);
 }
 
-static inline int obd_unregister_page_removal_cb(struct obd_export *exp,
+static inline int obd_unregister_page_removal_cb(struct obd_device *obd,
                                                  obd_page_removal_cb_t cb)
 {
         int rc;
         ENTRY;
 
-        OBD_CHECK_OP(exp->exp_obd, unregister_page_removal_cb, 0);
-        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_page_removal_cb);
+        OBD_CHECK_OP(obd, unregister_page_removal_cb, 0);
+        OBD_COUNTER_INCREMENT(obd, unregister_page_removal_cb);
 
-        rc = OBP(exp->exp_obd, unregister_page_removal_cb)(exp, cb);
+        rc = OBP(obd, unregister_page_removal_cb)(obd, cb);
         RETURN(rc);
 }
 
-static inline int obd_register_lock_cancel_cb(struct obd_export *exp,
+static inline int obd_register_lock_cancel_cb(struct obd_device *obd,
                                               obd_lock_cancel_cb cb)
 {
         int rc;
         ENTRY;
 
-        OBD_CHECK_OP(exp->exp_obd, register_lock_cancel_cb, 0);
-        OBD_COUNTER_INCREMENT(exp->exp_obd, register_lock_cancel_cb);
+        OBD_CHECK_OP(obd, register_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(obd, register_lock_cancel_cb);
 
-        rc = OBP(exp->exp_obd, register_lock_cancel_cb)(exp, cb);
+        rc = OBP(obd, register_lock_cancel_cb)(obd, cb);
         RETURN(rc);
 }
 
-static inline int obd_unregister_lock_cancel_cb(struct obd_export *exp,
+static inline int obd_unregister_lock_cancel_cb(struct obd_device *obd,
                                                  obd_lock_cancel_cb cb)
 {
         int rc;
         ENTRY;
 
-        OBD_CHECK_OP(exp->exp_obd, unregister_lock_cancel_cb, 0);
-        OBD_COUNTER_INCREMENT(exp->exp_obd, unregister_lock_cancel_cb);
+        OBD_CHECK_OP(obd, unregister_lock_cancel_cb, 0);
+        OBD_COUNTER_INCREMENT(obd, unregister_lock_cancel_cb);
 
-        rc = OBP(exp->exp_obd, unregister_lock_cancel_cb)(exp, cb);
+        rc = OBP(obd, unregister_lock_cancel_cb)(obd, cb);
         RETURN(rc);
 }
 
index 2bbb7f0..0ad55ad 100644 (file)
@@ -378,7 +378,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
 {
         struct client_obd *cli = &obd->u.cli;
         struct obd_import *imp = cli->cl_import;
-        struct obd_export *exp;
+        struct obd_export **exp = localdata;
         struct obd_connect_data *ocd;
         struct ldlm_namespace *to_be_freed = NULL;
         int rc;
@@ -392,7 +392,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         cli->cl_conn_count++;
         if (cli->cl_conn_count > 1)
                 GOTO(out_sem, rc);
-        exp = class_conn2export(dlm_handle);
+        *exp = class_conn2export(dlm_handle);
 
         if (obd->obd_namespace != NULL)
                 CERROR("already have namespace!\n");
@@ -418,7 +418,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
                 GOTO(out_ldlm, rc);
         }
-        LASSERT(exp->exp_connection);
+        LASSERT((*exp)->exp_connection);
 
         if (data) {
                 LASSERT((ocd->ocd_connect_flags & data->ocd_connect_flags) ==
@@ -436,10 +436,9 @@ out_ldlm:
                 obd->obd_namespace = NULL;
 out_disco:
                 cli->cl_conn_count--;
-                class_disconnect(exp);
-        } else {
-                class_export_put(exp);
-        }
+                class_disconnect(*exp);
+                *exp = NULL;
+        } 
 out_sem:
         up_write(&cli->cl_sem);
         if (to_be_freed)
index 7de9411..34d0c46 100644 (file)
@@ -179,15 +179,13 @@ int liblustre_process_log(struct config_llog_instance *cfg,
 #endif
         ocd->ocd_version = LUSTRE_VERSION_CODE;
 
-        rc = obd_connect(&mgc_conn, obd, &mgc_uuid, ocd, NULL);
+        rc = obd_connect(&mgc_conn, obd, &mgc_uuid, ocd, &exp);
         if (rc) {
                 CERROR("cannot connect to %s at %s: rc = %d\n",
                        LUSTRE_MGS_OBDNAME, mgsnid, rc);
                 GOTO(out_cleanup, rc);
         }
 
-        exp = class_conn2export(&mgc_conn);
-
         ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
         cfg->cfg_flags |= CFG_F_COMPAT146;
         rc = class_config_parse_llog(ctxt, profile, cfg);
index f28190c..39374c2 100644 (file)
@@ -108,13 +108,15 @@ static void llu_fsop_gone(struct filesys *fs)
 {
         struct llu_sb_info *sbi = (struct llu_sb_info *) fs->fs_private;
         struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp);
+        struct obd_device *lov_obd = class_exp2obd(sbi->ll_osc_exp);
         int next = 0;
         ENTRY;
 
         list_del(&sbi->ll_conn_chain);
-        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
-                                      llu_extent_lock_cancel_cb);
+
         obd_disconnect(sbi->ll_osc_exp);
+        obd_unregister_lock_cancel_cb(lov_obd, llu_extent_lock_cancel_cb);
+
         obd_disconnect(sbi->ll_mdc_exp);
 
         while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) != NULL)
@@ -1962,12 +1964,11 @@ llu_fsswop_mount(const char *source,
         ocd.ocd_version = LUSTRE_VERSION_CODE;
 
         /* setup mdc */
-        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, &ocd, NULL);
+        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, &ocd, &sbi->ll_mdc_exp);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out_free, err);
         }
-        sbi->ll_mdc_exp = class_conn2export(&mdc_conn);
 
         err = obd_statfs(obd, &osfs, 100000000, 0);
         if (err)
@@ -1989,33 +1990,26 @@ llu_fsswop_mount(const char *source,
         obd->obd_upcall.onu_owner = &sbi->ll_lco;
         obd->obd_upcall.onu_upcall = ll_ocd_update;
 
+        obd_register_lock_cancel_cb(obd, llu_extent_lock_cancel_cb);
+
         ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL |
                 OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_AT;
         ocd.ocd_version = LUSTRE_VERSION_CODE;
-        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd, NULL);
+        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, &ocd, &sbi->ll_osc_exp);
         if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
-                GOTO(out_mdc, err);
+                GOTO(out_lock_cb, err);
         }
-        sbi->ll_osc_exp = class_conn2export(&osc_conn);
         sbi->ll_lco.lco_flags = ocd.ocd_connect_flags;
         sbi->ll_lco.lco_mdc_exp = sbi->ll_mdc_exp;
         sbi->ll_lco.lco_osc_exp = sbi->ll_osc_exp;
 
-
-        err = obd_register_lock_cancel_cb(sbi->ll_osc_exp,
-                                          llu_extent_lock_cancel_cb);
-        if (err) {
-                CERROR("cannot register lock cancel callback: rc = %d\n", err);
-                GOTO(out_osc, err);
-        }
-
         mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
 
         err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_lock_cb, err);
         }
         CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
         sbi->ll_rootino = rootfid.id;
@@ -2026,7 +2020,7 @@ llu_fsswop_mount(const char *source,
                           &request);
         if (err) {
                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
 
         err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
@@ -2069,11 +2063,11 @@ out_inode:
         _sysio_i_gone(root);
 out_request:
         ptlrpc_req_finished(request);
-out_lock_cn_cb:
-        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
-                                      llu_extent_lock_cancel_cb);
 out_osc:
         obd_disconnect(sbi->ll_osc_exp);
+out_lock_cb:
+        obd = class_name2obd(osc);
+        obd_unregister_lock_cancel_cb(obd, llu_extent_lock_cancel_cb);
 out_mdc:
         obd_disconnect(sbi->ll_mdc_exp);
 out_free:
index e6667df..c6af8e9 100644 (file)
@@ -226,7 +226,8 @@ static int client_common_fill_super(struct super_block *sb,
         else
                 sbi->ll_fop = &ll_file_operations_noflock;
 
-        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, data, NULL);
+
+        err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid, data, &sbi->ll_mdc_exp);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x14f, "An MDT (mdc %s) is performing "
                                    "recovery, of which this client is not a "
@@ -237,7 +238,6 @@ static int client_common_fill_super(struct super_block *sb,
                 CERROR("cannot connect to %s: rc = %d\n", mdc, err);
                 GOTO(out, err);
         }
-        sbi->ll_mdc_exp = class_conn2export(&mdc_conn);
 
         err = obd_statfs(obd, &osfs, cfs_time_current_64() - HZ, 0);
         if (err)
@@ -324,42 +324,31 @@ static int client_common_fill_super(struct super_block *sb,
         obd->obd_upcall.onu_upcall = ll_ocd_update;
         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
 
-        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, data, NULL);
+        obd_register_lock_cancel_cb(obd, ll_extent_lock_cancel_cb);
+        obd_register_page_removal_cb(obd, ll_page_removal_cb, ll_pin_extent_cb);
+
+
+        err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid, data, &sbi->ll_osc_exp);
         if (err == -EBUSY) {
                 LCONSOLE_ERROR_MSG(0x150, "An OST (osc %s) is performing "
                                    "recovery, of which this client is not a "
                                    "part.  Please wait for recovery to "
                                    "complete, abort, or time out.\n", osc);
-                GOTO(out, err);
+                GOTO(out, err); // need clear cb?
         } else if (err) {
                 CERROR("cannot connect to %s: rc = %d\n", osc, err);
-                GOTO(out_mdc, err);
+                GOTO(out_cb, err);
         }
-        sbi->ll_osc_exp = class_conn2export(&osc_conn);
         spin_lock(&sbi->ll_lco.lco_lock);
         sbi->ll_lco.lco_flags = data->ocd_connect_flags;
         sbi->ll_lco.lco_mdc_exp = sbi->ll_mdc_exp;
         sbi->ll_lco.lco_osc_exp = sbi->ll_osc_exp;
         spin_unlock(&sbi->ll_lco.lco_lock);
 
-        err = obd_register_page_removal_cb(sbi->ll_osc_exp,
-                                           ll_page_removal_cb, 
-                                           ll_pin_extent_cb);
-        if (err) {
-                CERROR("cannot register page removal callback: rc = %d\n",err);
-                GOTO(out_osc, err);
-        }
-        err = obd_register_lock_cancel_cb(sbi->ll_osc_exp,
-                                          ll_extent_lock_cancel_cb);
-        if (err) {
-                CERROR("cannot register lock cancel callback: rc = %d\n", err);
-                GOTO(out_page_rm_cb, err);
-        }
-
         err = mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
         if (err) {
                 CERROR("cannot set max EA and cookie sizes: rc = %d\n", err);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
 
         err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
@@ -368,7 +357,7 @@ static int client_common_fill_super(struct super_block *sb,
                 LCONSOLE_ERROR_MSG(0x151, "There are no OST's in this "
                                    "filesystem. There must be at least one "
                                    "active OST for a client to start.\n");
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
 
         if (!ll_async_page_slab) {
@@ -378,13 +367,13 @@ static int client_common_fill_super(struct super_block *sb,
                                                           ll_async_page_slab_size,
                                                           0, 0);
                 if (!ll_async_page_slab)
-                        GOTO(out_lock_cn_cb, -ENOMEM);
+                        GOTO(out_osc, err = -ENOMEM);
         }
 
         err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid);
         if (err) {
                 CERROR("cannot mds_connect: rc = %d\n", err);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
         CDEBUG(D_SUPER, "rootfid "LPU64"\n", rootfid.id);
         sbi->ll_rootino = rootfid.id;
@@ -404,14 +393,14 @@ static int client_common_fill_super(struct super_block *sb,
                           0, &request);
         if (err) {
                 CERROR("mdc_getattr failed for root: rc = %d\n", err);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
 
         err = mdc_req2lustre_md(request, REPLY_REC_OFF, sbi->ll_osc_exp, &md);
         if (err) {
                 CERROR("failed to understand root inode md: rc = %d\n",err);
                 ptlrpc_req_finished (request);
-                GOTO(out_lock_cn_cb, err);
+                GOTO(out_osc, err);
         }
 
         LASSERT(sbi->ll_rootino != 0);
@@ -464,15 +453,13 @@ static int client_common_fill_super(struct super_block *sb,
 out_root:
         if (root)
                 iput(root);
-out_lock_cn_cb:
-        obd_unregister_lock_cancel_cb(sbi->ll_osc_exp,
-                                      ll_extent_lock_cancel_cb);
-out_page_rm_cb:
-        obd_unregister_page_removal_cb(sbi->ll_osc_exp,
-                                       ll_page_removal_cb);
 out_osc:
         obd_disconnect(sbi->ll_osc_exp);
         sbi->ll_osc_exp = NULL;
+out_cb:
+        obd = class_name2obd(osc);
+        obd_unregister_lock_cancel_cb(obd, ll_extent_lock_cancel_cb);
+        obd_unregister_page_removal_cb(obd, ll_page_removal_cb);
 out_mdc:
         obd_disconnect(sbi->ll_mdc_exp);
         sbi->ll_mdc_exp = NULL;
@@ -935,14 +922,12 @@ static int old_lustre_process_log(struct super_block *sb, char *newprofile,
         /* If we don't have this then an ACL MDS will refuse the connection */
         ocd.ocd_connect_flags = OBD_CONNECT_ACL;
 
-        rc = obd_connect(&mdc_conn, obd, &mdc_uuid, &ocd, NULL);
+        rc = obd_connect(&mdc_conn, obd, &mdc_uuid, &ocd, &exp);
         if (rc) {
                 CERROR("cannot connect to %s: rc = %d\n", mdt, rc);
                 GOTO(out_cleanup, rc);
         }
 
-        exp = class_conn2export(&mdc_conn);
-
         ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT);
 
         cfg->cfg_flags |= CFG_F_COMPAT146;
index 314d61d..247fd16 100644 (file)
@@ -104,12 +104,11 @@ void lov_putref(struct obd_device *obd)
         mutex_up(&lov->lov_lock);
 }
 
-static int lov_register_page_removal_cb(struct obd_export *exp,
+static int lov_obd_register_page_removal_cb(struct obd_device *obd,
                                         obd_page_removal_cb_t func,
                                         obd_pin_extent_cb pin_cb)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
+        struct lov_obd *lov = &obd->u.lov;
 
         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
                 return -EBUSY;
@@ -117,24 +116,15 @@ static int lov_register_page_removal_cb(struct obd_export *exp,
         if (lov->lov_page_pin_cb && lov->lov_page_pin_cb != pin_cb)
                 return -EBUSY;
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_register_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
-                                                   func, pin_cb);
-        }
-
         lov->lov_page_removal_cb = func;
         lov->lov_page_pin_cb = pin_cb;
-
-        return rc;
+        return 0;
 }
 
-static int lov_unregister_page_removal_cb(struct obd_export *exp,
-                                        obd_page_removal_cb_t func)
+static int lov_obd_unregister_page_removal_cb(struct obd_device *obd,
+                                              obd_page_removal_cb_t func)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
+        struct lov_obd *lov = &obd->u.lov;
 
         if (lov->lov_page_removal_cb && lov->lov_page_removal_cb != func)
                 return -EINVAL;
@@ -142,54 +132,33 @@ static int lov_unregister_page_removal_cb(struct obd_export *exp,
         lov->lov_page_removal_cb = NULL;
         lov->lov_page_pin_cb = NULL;
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_unregister_page_removal_cb(lov->lov_tgts[i]->ltd_exp,
-                                                     func);
-        }
-
-        return rc;
+        return 0;
 }
 
-static int lov_register_lock_cancel_cb(struct obd_export *exp,
-                                         obd_lock_cancel_cb func)
+static int lov_obd_register_lock_cancel_cb(struct obd_device *obd,
+                                           obd_lock_cancel_cb func)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
+        struct lov_obd *lov = &obd->u.lov;
 
         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
                 return -EBUSY;
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_register_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
-                                                  func);
-        }
-
         lov->lov_lock_cancel_cb = func;
 
-        return rc;
+        return 0;
 }
 
-static int lov_unregister_lock_cancel_cb(struct obd_export *exp,
-                                         obd_lock_cancel_cb func)
+static int lov_obd_unregister_lock_cancel_cb(struct obd_device *obd,
+                                             obd_lock_cancel_cb func)
 {
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        int i, rc = 0;
+        struct lov_obd *lov = &obd->u.lov;
 
         if (lov->lov_lock_cancel_cb && lov->lov_lock_cancel_cb != func)
                 return -EINVAL;
 
-        for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_exp)
-                        continue;
-                rc |= obd_unregister_lock_cancel_cb(lov->lov_tgts[i]->ltd_exp,
-                                                    func);
-        }
         lov->lov_lock_cancel_cb = NULL;
-        return rc;
+        return 0;
+
 }
 
 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
@@ -202,8 +171,6 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
         ENTRY;
 
         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
-                struct obd_uuid *uuid;
-
                 LASSERT(watched);
 
                 if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME)) {
@@ -212,16 +179,17 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                                watched->obd_name);
                         RETURN(-EINVAL);
                 }
-                uuid = &watched->u.cli.cl_target_uuid;
 
                 /* Set OSC as active before notifying the observer, so the
                  * observer can use the OSC normally.
                  */
-                rc = lov_set_osc_active(obd, uuid, ev == OBD_NOTIFY_ACTIVE);
+                rc = lov_set_osc_active(obd, &watched->u.cli.cl_target_uuid,
+                                        ev == OBD_NOTIFY_ACTIVE);
                 if (rc < 0) {
                         CERROR("%sactivation of %s failed: %d\n",
                                (ev == OBD_NOTIFY_ACTIVE) ? "" : "de",
-                               obd_uuid2str(uuid), rc);
+                               obd_uuid2str(&watched->u.cli.cl_target_uuid),
+                               rc);
                         RETURN(rc);
                 }
                 /* active event should be pass lov target index as data */
@@ -237,16 +205,17 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                 struct lov_obd *lov = &obd->u.lov;
                 struct obd_device *tgt_obd;
                 int i;
+
+                if ((ev == OBD_NOTIFY_SYNC) ||
+                    (ev == OBD_NOTIFY_SYNC_NONBLOCK))
+                        data = &i;
+
                 lov_getref(obd);
                 for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                         if (!lov->lov_tgts[i])
                                 continue;
                         tgt_obd = class_exp2obd(lov->lov_tgts[i]->ltd_exp);
 
-                        if ((ev == OBD_NOTIFY_SYNC) ||
-                            (ev == OBD_NOTIFY_SYNC_NONBLOCK))
-                                data = &i;
-
                         rc = obd_notify_observer(obd, tgt_obd, ev, data);
                         if (rc) {
                                 CERROR("%s: notify %s of %s failed %d\n",
@@ -311,71 +280,37 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 ptlrpc_activate_import(imp);
         }
 
-        if (imp->imp_invalid) {
-                CERROR("not connecting OSC %s; administratively "
-                       "disabled\n", obd_uuid2str(&tgt_uuid));
-                rc = obd_register_observer(tgt_obd, obd);
-                if (rc) {
-                        CERROR("Target %s register_observer error %d; "
-                               "will not be able to reactivate\n",
-                               obd_uuid2str(&tgt_uuid), rc);
-                }
-                RETURN(0);
-        }
-
-        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, data, NULL);
+        rc = obd_register_observer(tgt_obd, obd);
         if (rc) {
-                CERROR("Target %s connect error %d\n",
-                       obd_uuid2str(&tgt_uuid), rc);
+                CERROR("Target %s register_observer error %d; "
+                        "will not be able to reactivate\n",
+                        obd_uuid2str(&tgt_uuid), rc);
                 RETURN(rc);
         }
-        lov->lov_tgts[index]->ltd_exp = class_conn2export(&conn);
-        if (!lov->lov_tgts[index]->ltd_exp) {
-                CERROR("Target %s: null export!\n", obd_uuid2str(&tgt_uuid));
-                RETURN(-ENODEV);
-        }
 
-        rc = obd_register_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                          lov->lov_page_removal_cb,
-                                          lov->lov_page_pin_cb);
-        if (rc) {
-                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
-                lov->lov_tgts[index]->ltd_exp = NULL;
-                RETURN(rc);
+        if (imp->imp_invalid) {
+                CERROR("not connecting OSC %s; administratively "
+                       "disabled\n", obd_uuid2str(&tgt_uuid));
+                RETURN(0);
         }
+        if (lov->lov_lock_cancel_cb)
+                rc = obd_register_lock_cancel_cb(tgt_obd, lov->lov_lock_cancel_cb);
+                if (rc)
+                        RETURN(rc);
 
-        rc = obd_register_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                         lov->lov_lock_cancel_cb);
-        if (rc) {
-                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                               lov->lov_page_removal_cb);
-                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
-                lov->lov_tgts[index]->ltd_exp = NULL;
-                RETURN(rc);
-        }
+        if (lov->lov_page_removal_cb)
+                rc = obd_register_page_removal_cb(tgt_obd, lov->lov_page_removal_cb,
+                                                  lov->lov_page_pin_cb);
+                if (rc)
+                        GOTO(out_lock_cb, rc);
 
-        rc = obd_register_observer(tgt_obd, obd);
-        if (rc) {
-                CERROR("Target %s register_observer error %d\n",
+        rc = obd_connect(&conn, tgt_obd, &lov_osc_uuid, data, &lov->lov_tgts[index]->ltd_exp);
+        if (rc || !lov->lov_tgts[index]->ltd_exp) {
+                CERROR("Target %s connect error %d\n",
                        obd_uuid2str(&tgt_uuid), rc);
-                obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                              lov->lov_lock_cancel_cb);
-                obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                               lov->lov_page_removal_cb);
-                obd_disconnect(lov->lov_tgts[index]->ltd_exp);
-                lov->lov_tgts[index]->ltd_exp = NULL;
-                RETURN(rc);
+                GOTO(out_page_cb, rc);
         }
 
-        lov->lov_tgts[index]->ltd_reap = 0;
-        if (activate) {
-                lov->lov_tgts[index]->ltd_active = 1;
-                lov->desc.ld_active_tgt_count++;
-                lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
-        }
-        CDEBUG(D_CONFIG, "Connected tgt idx %d %s (%s) %sactive\n", index,
-               obd_uuid2str(&tgt_uuid), tgt_obd->obd_name, activate ? "":"in");
-
 #ifdef __KERNEL__
         lov_proc_dir = lprocfs_srch(obd->obd_proc_entry, "target_obds");
         if (lov_proc_dir) {
@@ -400,12 +335,18 @@ static int lov_connect_obd(struct obd_device *obd, __u32 index, int activate,
                 }
         }
 #endif
-
         rc = qos_add_tgt(obd, index);
-        if (rc) 
+        if (rc)
                 CERROR("qos_add_tgt failed %d\n", rc);
 
         RETURN(0);
+
+out_page_cb:
+        obd_unregister_page_removal_cb(obd, lov->lov_page_removal_cb);
+out_lock_cb:
+        obd_unregister_lock_cancel_cb(obd, lov->lov_lock_cancel_cb);
+
+        RETURN(rc);
 }
 
 static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
@@ -414,6 +355,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 {
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
+        struct obd_export **exp = localdata;
         int i, rc;
         ENTRY;
 
@@ -422,6 +364,8 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         rc = class_connect(conn, obd, cluuid);
         if (rc)
                 RETURN(rc);
+                
+        *exp = class_conn2export(conn);
 
         /* Why should there ever be more than 1 connect? */
         lov->lov_connects++;
@@ -449,7 +393,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                         continue;
 
                 rc = lov_notify(obd, lov->lov_tgts[i]->ltd_exp->exp_obd,
-                                OBD_NOTIFY_ACTIVE, (void *)&i);
+                                OBD_NOTIFY_CONNECT, (void *)&i);
                 if (rc) {
                         CERROR("%s error sending notify %d\n",
                                obd->obd_name, rc);
@@ -473,10 +417,6 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
         CDEBUG(D_CONFIG, "%s: disconnecting target %s\n", 
                obd->obd_name, osc_obd->obd_name);
 
-        obd_unregister_lock_cancel_cb(lov->lov_tgts[index]->ltd_exp,
-                                      lov->lov_lock_cancel_cb);
-        obd_unregister_page_removal_cb(lov->lov_tgts[index]->ltd_exp,
-                                       lov->lov_page_removal_cb);
         if (lov->lov_tgts[index]->ltd_active) {
                 lov->lov_tgts[index]->ltd_active = 0;
                 lov->desc.ld_active_tgt_count--;
@@ -508,6 +448,9 @@ static int lov_disconnect_obd(struct obd_device *obd, __u32 index)
 
         obd_register_observer(osc_obd, NULL);
 
+        obd_unregister_page_removal_cb(osc_obd, lov->lov_page_removal_cb);
+        obd_unregister_lock_cancel_cb(osc_obd, lov->lov_lock_cancel_cb);
+
         rc = obd_disconnect(lov->lov_tgts[index]->ltd_exp);
         if (rc) {
                 CERROR("Target %s disconnect error %d\n",
@@ -718,7 +661,7 @@ static int lov_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
                 GOTO(out, rc = 0);
 
         rc = lov_notify(obd, tgt->ltd_exp->exp_obd, 
-                        active ? OBD_NOTIFY_ACTIVE : OBD_NOTIFY_INACTIVE,
+                        active ? OBD_NOTIFY_CONNECT : OBD_NOTIFY_INACTIVE,
                         (void *)&index);
 
 out:
@@ -3211,10 +3154,10 @@ struct obd_ops lov_obd_ops = {
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
-        .o_register_page_removal_cb = lov_register_page_removal_cb,
-        .o_unregister_page_removal_cb = lov_unregister_page_removal_cb,
-        .o_register_lock_cancel_cb = lov_register_lock_cancel_cb,
-        .o_unregister_lock_cancel_cb = lov_unregister_lock_cancel_cb,
+        .o_register_page_removal_cb = lov_obd_register_page_removal_cb,
+        .o_unregister_page_removal_cb = lov_obd_unregister_page_removal_cb,
+        .o_register_lock_cancel_cb = lov_obd_register_lock_cancel_cb,
+        .o_unregister_lock_cancel_cb = lov_obd_unregister_lock_cancel_cb,
 };
 
 static quota_interface_t *quota_interface;
index c3911d9..0ec1de2 100644 (file)
@@ -619,13 +619,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
 #endif
         data->ocd_version = LUSTRE_VERSION_CODE;
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
-        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, NULL);
+        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data, &mds->mds_osc_exp);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
                 GOTO(error_exit, rc);
         }
-        mds->mds_osc_exp = class_conn2export(&conn);
         /* we not want postrecov in case clean fs, in other cases postrecov will
          * be called from ldlm. otherwise we can call postrecov twice - in case
          * short recovery */
index f255969..f21492b 100644 (file)
@@ -732,14 +732,12 @@ static int lustre_start_mgc(struct super_block *sb)
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_AT;
         data->ocd_version = LUSTRE_VERSION_CODE;
         /* We connect to the MGS at setup, and don't disconnect until cleanup */
-        rc = obd_connect(&mgc_conn, obd, &(obd->obd_uuid), data, NULL);
+        rc = obd_connect(&mgc_conn, obd, &(obd->obd_uuid), data, &exp);
         OBD_FREE_PTR(data);
         if (rc) {
                 CERROR("connect failed %d\n", rc);
                 GOTO(out, rc);
         }
-
-        exp = class_conn2export(&mgc_conn);
         obd->u.cli.cl_mgc_mgsexp = exp;
 
 out:
index bd8127d..ab75bc0 100644 (file)
@@ -1306,7 +1306,7 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
         ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL;
         ocd->ocd_version = LUSTRE_VERSION_CODE;
 
-        rc = obd_connect(&conn, tgt, &echo_uuid, ocd, NULL);
+        rc = obd_connect(&conn, tgt, &echo_uuid, ocd, &ec->ec_exp);
 
         OBD_FREE(ocd, sizeof(*ocd));
 
@@ -1315,7 +1315,6 @@ echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
                        lustre_cfg_string(lcfg, 1));
                 return (rc);
         }
-        ec->ec_exp = class_conn2export(&conn);
 
         RETURN(rc);
 }
index 371b78e..1f7f64f 100644 (file)
@@ -227,6 +227,7 @@ int cache_del_extent_removal_cb(struct lustre_cache *cache,
 {
         int found = 0;
         struct page_removal_cb_element *element, *t;
+        ENTRY;
 
         write_lock(&cache->lc_page_removal_cb_lock);
         list_for_each_entry_safe(element, t,
@@ -347,6 +348,7 @@ static int cache_remove_extents_from_lock(struct lustre_cache *cache,
                            page with address 0x5a5a5a5a in
                            cache_extent_removal_event */
                         ext_data = extent->oap_page;
+                        LASSERT(cache->lc_pin_extent_cb != NULL);
                         cache->lc_pin_extent_cb(extent->oap_page);
 
                         if (lock->l_flags & LDLM_FL_BL_AST)
@@ -402,46 +404,48 @@ struct lustre_cache *cache_create(struct obd_device *obd)
         OBD_ALLOC(cache, sizeof(*cache));
         if (!cache)
                 GOTO(out, NULL);
+
         spin_lock_init(&cache->lc_locks_list_lock);
         CFS_INIT_LIST_HEAD(&cache->lc_locks_list);
         CFS_INIT_LIST_HEAD(&cache->lc_page_removal_callback_list);
         rwlock_init(&cache->lc_page_removal_cb_lock);
         cache->lc_obd = obd;
 
-      out:
+out:
         return cache;
 }
 
 /* Destroy @cache and free its memory */
 int cache_destroy(struct lustre_cache *cache)
 {
-        if (cache) {
-                spin_lock(&cache->lc_locks_list_lock);
-                if (!list_empty(&cache->lc_locks_list)) {
-                        struct ldlm_lock *lock, *tmp;
-                        CERROR("still have locks in the list on cleanup:\n");
-
-                        list_for_each_entry_safe(lock, tmp,
-                                                 &cache->lc_locks_list,
-                                                 l_cache_locks_list) {
-                                list_del_init(&lock->l_cache_locks_list);
-                                /* XXX: Of course natural idea would be to print
-                                   offending locks here, but if we use
-                                   e.g. LDLM_ERROR, we will likely crash here,
-                                   as LDLM error tries to access e.g.
-                                   nonexisting namespace. Normally this kind of
-                                   case could only happen when somebody did not
-                                   release lock reference and we have other ways
-                                   to detect this. */
-                                /* Make sure there are no pages left under the
-                                   lock */
-                                LASSERT(list_empty(&lock->l_extents_list));
-                        }
+        if (!cache)
+                RETURN(0);
+
+        spin_lock(&cache->lc_locks_list_lock);
+        if (!list_empty(&cache->lc_locks_list)) {
+                struct ldlm_lock *lock, *tmp;
+                CERROR("still have locks in the list on cleanup:\n");
+
+                list_for_each_entry_safe(lock, tmp,
+                                         &cache->lc_locks_list,
+                                         l_cache_locks_list) {
+                        list_del_init(&lock->l_cache_locks_list);
+                        /* XXX: Of course natural idea would be to print
+                         * offending locks here, but if we use
+                         * e.g. LDLM_ERROR, we will likely crash here,
+                         * as LDLM error tries to access e.g.
+                         * nonexisting namespace. Normally this kind of
+                         * case could only happen when somebody did not
+                         * release lock reference and we have other ways
+                         * to detect this. */
+                        /* Make sure there are no pages left under the
+                         * lock */
+                        LASSERT(list_empty(&lock->l_extents_list));
                 }
-                spin_unlock(&cache->lc_locks_list_lock);
-                LASSERT(list_empty(&cache->lc_page_removal_callback_list));
-                OBD_FREE(cache, sizeof(*cache));
         }
+        spin_unlock(&cache->lc_locks_list_lock);
+        LASSERT(list_empty(&cache->lc_page_removal_callback_list));
 
+        OBD_FREE(cache, sizeof(*cache));
         return 0;
 }
index 88cf904..33d12b3 100644 (file)
@@ -4006,40 +4006,54 @@ int osc_cleanup(struct obd_device *obd)
         RETURN(rc);
 }
 
-static int osc_register_page_removal_cb(struct obd_export *exp,
+static int osc_register_page_removal_cb(struct obd_device *obd,
                                         obd_page_removal_cb_t func,
                                         obd_pin_extent_cb pin_cb)
 {
-        return cache_add_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func,
+        ENTRY;
+
+        /* this server - not need init */
+        if (func == NULL)
+                return 0;
+
+        return cache_add_extent_removal_cb(obd->u.cli.cl_cache, func,
                                            pin_cb);
 }
 
-static int osc_unregister_page_removal_cb(struct obd_export *exp,
+static int osc_unregister_page_removal_cb(struct obd_device *obd,
                                           obd_page_removal_cb_t func)
 {
-        return cache_del_extent_removal_cb(exp->exp_obd->u.cli.cl_cache, func);
+        ENTRY;
+        return cache_del_extent_removal_cb(obd->u.cli.cl_cache, func);
 }
 
-static int osc_register_lock_cancel_cb(struct obd_export *exp,
+static int osc_register_lock_cancel_cb(struct obd_device *obd,
                                        obd_lock_cancel_cb cb)
 {
-        LASSERT(exp->exp_obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+        ENTRY;
+        LASSERT(obd->u.cli.cl_ext_lock_cancel_cb == NULL);
+
+        /* this server - not need init */
+        if (cb == NULL)
+                return 0;
 
-        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = cb;
+        obd->u.cli.cl_ext_lock_cancel_cb = cb;
         return 0;
 }
 
-static int osc_unregister_lock_cancel_cb(struct obd_export *exp,
+static int osc_unregister_lock_cancel_cb(struct obd_device *obd,
                                          obd_lock_cancel_cb cb)
 {
-        if (exp->exp_obd->u.cli.cl_ext_lock_cancel_cb != cb) {
+        ENTRY;
+
+        if (obd->u.cli.cl_ext_lock_cancel_cb != cb) {
                 CERROR("Unregistering cancel cb %p, while only %p was "
                        "registered\n", cb,
-                       exp->exp_obd->u.cli.cl_ext_lock_cancel_cb);
+                       obd->u.cli.cl_ext_lock_cancel_cb);
                 RETURN(-EINVAL);
         }
 
-        exp->exp_obd->u.cli.cl_ext_lock_cancel_cb = NULL;
+        obd->u.cli.cl_ext_lock_cancel_cb = NULL;
         return 0;
 }
 
index 990149d..85214eb 100644 (file)
@@ -782,14 +782,7 @@ static int ptlrpc_connect_interpret(struct ptlrpc_request *request,
                         IMPORT_SET_STATE(imp, LUSTRE_IMP_REPLAY_LOCKS);
                 } else {
                         IMPORT_SET_STATE(imp, LUSTRE_IMP_FULL);
-                }
-
-                spin_lock(&imp->imp_lock);
-                if (imp->imp_invalid) {
-                        spin_unlock(&imp->imp_lock);
                         ptlrpc_activate_import(imp);
-                } else {
-                        spin_unlock(&imp->imp_lock);
                 }
 
                 GOTO(finish, rc = 0);