Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / mds / mds_lov.c
index a411b89..0206299 100644 (file)
@@ -56,6 +56,7 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids)
         unlock_kernel();
         EXIT;
 }
         unlock_kernel();
         EXIT;
 }
+EXPORT_SYMBOL(mds_lov_update_objids);
 
 static int mds_lov_read_objids(struct obd_device *obd)
 {
 
 static int mds_lov_read_objids(struct obd_device *obd)
 {
@@ -125,6 +126,7 @@ int mds_lov_write_objids(struct obd_device *obd)
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
+EXPORT_SYMBOL(mds_lov_write_objids);
 
 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
 
 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
@@ -140,8 +142,9 @@ int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
          * missing objects below this ID, they will be created.  If it finds
          * objects above this ID, they will be removed. */
         memset(&oa, 0, sizeof(oa));
          * missing objects below this ID, they will be created.  If it finds
          * objects above this ID, they will be removed. */
         memset(&oa, 0, sizeof(oa));
-        oa.o_valid = OBD_MD_FLFLAGS;
         oa.o_flags = OBD_FL_DELORPHAN;
         oa.o_flags = OBD_FL_DELORPHAN;
+        oa.o_gr = FILTER_GROUP_MDS0 + mds->mds_id;
+        oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         if (ost_uuid != NULL) {
                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
                 oa.o_valid |= OBD_MD_FLINLINE;
         if (ost_uuid != NULL) {
                 memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
                 oa.o_valid |= OBD_MD_FLINLINE;
@@ -235,6 +238,19 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov)
                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
                stripes);
 
                "%d/%d\n", mds->mds_max_mdsize, mds->mds_max_cookiesize,
                stripes);
 
+        /* If we added a target we have to reconnect the llogs */
+        /* We only _need_ to do this at first add (idx), or the first time
+           after recovery.  However, it should now be safe to call anytime. */
+        mutex_down(&obd->obd_dev_sem);
+        llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+        mutex_up(&obd->obd_dev_sem);
+
+        /*XXX this notifies the MDD until lov handling use old mds code */
+        if (obd->obd_upcall.onu_owner) {
+                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
+                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                                                 obd->obd_upcall.onu_owner);
+        }
 out:
         OBD_FREE(ld, sizeof(*ld));
         RETURN(rc);
 out:
         OBD_FREE(ld, sizeof(*ld));
         RETURN(rc);
@@ -292,14 +308,6 @@ static int mds_lov_update_mds(struct obd_device *obd,
                       mds->mds_lov_objids[idx], idx);
         }
 
                       mds->mds_lov_objids[idx], idx);
         }
 
-        /* If we added a target we have to reconnect the llogs */
-        /* We only _need_ to do this at first add (idx), or the first time
-           after recovery.  However, it should now be safe to call anytime. */
-        CDEBUG(D_CONFIG, "reset llogs idx=%d\n", idx);
-        mutex_down(&obd->obd_dev_sem);
-        llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, uuid);
-        mutex_up(&obd->obd_dev_sem);
-
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -329,10 +337,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
         if (data == NULL)
                 RETURN(-ENOMEM);
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
         if (data == NULL)
                 RETURN(-ENOMEM);
         data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_INDEX |
-                                  OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64;
+                                  OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
+                                  OBD_CONNECT_OSS_CAPA;
         data->ocd_version = LUSTRE_VERSION_CODE;
         data->ocd_version = LUSTRE_VERSION_CODE;
+        data->ocd_group = mds->mds_id +  FILTER_GROUP_MDS0;
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
-        rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, data);
+        rc = obd_connect(NULL, &conn, mds->mds_osc_obd, &obd->obd_uuid, data);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
         OBD_FREE(data, sizeof(*data));
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
@@ -347,7 +357,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                        lov_name, rc);
                 GOTO(err_discon, rc);
         }
                        lov_name, rc);
                 GOTO(err_discon, rc);
         }
-        
+
         /* Deny new client connections until we are sure we have some OSTs */
         obd->obd_no_conn = 1;
 
         /* Deny new client connections until we are sure we have some OSTs */
         obd->obd_no_conn = 1;
 
@@ -362,7 +372,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 GOTO(err_reg, rc);
 
         /* tgt_count may be 0! */
                 GOTO(err_reg, rc);
 
         /* tgt_count may be 0! */
-        rc = llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+        rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
         if (rc) {
                 CERROR("failed to initialize catalog %d\n", rc);
                 GOTO(err_reg, rc);
         if (rc) {
                 CERROR("failed to initialize catalog %d\n", rc);
                 GOTO(err_reg, rc);
@@ -391,8 +401,10 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
          * set_nextid().  The class driver can help us here, because
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
          * set_nextid().  The class driver can help us here, because
          * it can use the obd_recovering flag to determine when the
          * the OBD is full available. */
+        /* MDD device will care about that
         if (!obd->obd_recovering)
                 rc = mds_postrecov(obd);
         if (!obd->obd_recovering)
                 rc = mds_postrecov(obd);
+         */
         RETURN(rc);
 
 err_reg:
         RETURN(rc);
 
 err_reg:
@@ -583,15 +595,17 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 int rc2;
                 struct llog_ctxt *ctxt =
                         llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT);
                 int rc2;
+                __u32 group;
 
                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
 
                 obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
                 push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
                 pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL);
-                llog_cat_initialize(obd, mds->mds_lov_desc.ld_tgt_count, NULL);
+                llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL);
+                group = FILTER_GROUP_MDS0 + mds->mds_id;
                 rc2 = obd_set_info_async(mds->mds_osc_exp,
                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
                 rc2 = obd_set_info_async(mds->mds_osc_exp,
                                          strlen(KEY_MDS_CONN), KEY_MDS_CONN,
-                                         0, NULL, NULL);
+                                         sizeof(group), &group, NULL);
                 if (!rc)
                         rc = rc2;
                 RETURN(rc);
                 if (!rc)
                         rc = rc2;
                 RETURN(rc);
@@ -610,7 +624,7 @@ int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("aborting recovery for device %s\n", obd->obd_name);
 
         case OBD_IOC_ABORT_RECOVERY:
                 CERROR("aborting recovery for device %s\n", obd->obd_name);
-                target_abort_recovery(obd);
+                target_stop_recovery_thread(obd);
                 RETURN(0);
 
         default:
                 RETURN(0);
 
         default:
@@ -627,6 +641,32 @@ struct mds_lov_sync_info {
         __u32              mlsi_index;   /* index of target */
 };
 
         __u32              mlsi_index;   /* index of target */
 };
 
+static int mds_propagate_capa_keys(struct mds_obd *mds)
+{
+        struct lustre_capa_key *key;
+        int i, rc = 0;
+
+        ENTRY;
+
+        if (!mds->mds_capa_keys)
+                RETURN(0);
+
+        for (i = 0; i < 2; i++) {
+                key = &mds->mds_capa_keys[i];
+                DEBUG_CAPA_KEY(D_SEC, key, "propagate");
+
+                rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_CAPA_KEY),
+                                        KEY_CAPA_KEY, sizeof(*key), key, NULL);
+                if (rc) {
+                        DEBUG_CAPA_KEY(D_ERROR, key,
+                                       "propagate failed (rc = %d) for", rc);
+                        RETURN(rc);
+                }
+        }
+
+        RETURN(0);
+}
+
 /* We only sync one osc at a time, so that we don't have to hold
    any kind of lock on the whole mds_lov_desc, which may change
    (grow) as a result of mds_lov_add_ost.  This also avoids any
 /* We only sync one osc at a time, so that we don't have to hold
    any kind of lock on the whole mds_lov_desc, which may change
    (grow) as a result of mds_lov_add_ost.  This also avoids any
@@ -640,6 +680,7 @@ static int __mds_lov_synchronize(void *data)
         struct mds_obd *mds = &obd->u.mds;
         struct obd_uuid *uuid;
         __u32  idx = mlsi->mlsi_index;
         struct mds_obd *mds = &obd->u.mds;
         struct obd_uuid *uuid;
         __u32  idx = mlsi->mlsi_index;
+        struct mds_group_info mgi;
         int rc = 0;
         ENTRY;
 
         int rc = 0;
         ENTRY;
 
@@ -653,12 +694,18 @@ static int __mds_lov_synchronize(void *data)
         rc = mds_lov_update_mds(obd, watched, idx, uuid);
         if (rc != 0)
                 GOTO(out, rc);
         rc = mds_lov_update_mds(obd, watched, idx, uuid);
         if (rc != 0)
                 GOTO(out, rc);
-
+        mgi.group = FILTER_GROUP_MDS0 + mds->mds_id;
+        mgi.uuid = uuid;
         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
         rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_MDS_CONN),
-                                KEY_MDS_CONN, 0, uuid, NULL);
+                                KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
         if (rc != 0)
                 GOTO(out, rc);
 
         if (rc != 0)
                 GOTO(out, rc);
 
+        /* propagate capability keys */
+        rc = mds_propagate_capa_keys(mds);
+        if (rc)
+                GOTO(out, rc);
+
         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
                           mds->mds_lov_desc.ld_tgt_count,
                           NULL, NULL, uuid);
         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
                           mds->mds_lov_desc.ld_tgt_count,
                           NULL, NULL, uuid);
@@ -671,7 +718,10 @@ static int __mds_lov_synchronize(void *data)
 
         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
               obd->obd_name, obd_uuid2str(uuid));
 
         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
               obd->obd_name, obd_uuid2str(uuid));
-
+        /*
+         * FIXME: this obd_stopping was useless, 
+         * since obd in mdt layer was set
+         */
         if (obd->obd_stopping)
                 GOTO(out, rc = -ENODEV);
 
         if (obd->obd_stopping)
                 GOTO(out, rc = -ENODEV);
 
@@ -681,7 +731,16 @@ static int __mds_lov_synchronize(void *data)
                        obd->obd_name, rc);
                 GOTO(out, rc);
         }
                        obd->obd_name, rc);
                 GOTO(out, rc);
         }
-
+        
+        if (obd->obd_upcall.onu_owner) {
+                /*
+                 * This is a hack for mds_notify->mdd_notify. When the mds obd
+                 * in mdd is removed, This hack should be removed.
+                 */
+                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
+                 rc = obd->obd_upcall.onu_upcall(NULL, NULL, 0,
+                                                 obd->obd_upcall.onu_owner);
+        }
         EXIT;
 out:
         class_decref(obd);
         EXIT;
 out:
         class_decref(obd);
@@ -777,7 +836,6 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
         }
 
         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
         }
 
         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
-
         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
                 CERROR("unexpected notification of %s %s!\n",
                        watched->obd_type->typ_name, watched->obd_name);
         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
                 CERROR("unexpected notification of %s %s!\n",
                        watched->obd_type->typ_name, watched->obd_name);
@@ -792,6 +850,15 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched,
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
                    after the mdt in the config log.  They didn't make it into
                    mds_lov_connect. */
                 rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp);
+                if (rc)
+                        RETURN(rc);
+                /* We should update init llog here too for replay unlink and 
+                 * possiable llog init race when recovery complete */
+                mutex_down(&obd->obd_dev_sem);
+                llog_cat_initialize(obd, NULL, 
+                                    obd->u.mds.mds_lov_desc.ld_tgt_count,
+                                    &watched->u.cli.cl_target_uuid);
+                mutex_up(&obd->obd_dev_sem);
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
@@ -868,4 +935,4 @@ void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
         }
 }
                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
         }
 }
-
+EXPORT_SYMBOL(mds_objids_from_lmm);