Whamcloud - gitweb
Update after codeinsp
authortappro <tappro>
Tue, 12 Sep 2006 22:08:47 +0000 (22:08 +0000)
committertappro <tappro>
Tue, 12 Sep 2006 22:08:47 +0000 (22:08 +0000)
- client adding/removal in last_rcvd is updated
- pass credits to the trans_start() instead of using hardcoded value
- protect bitmap operation with lock
- separate client_add() and client_new()
- fix using the uninitializated variable in filter.c

lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c
lustre/obdfilter/filter.c

index 763ce84..dc44085 100644 (file)
@@ -2690,7 +2690,7 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         LASSERT(obd);
 
         spin_lock_init(&m->mdt_transno_lock);
-
+        
         m->mdt_max_mdsize = MAX_MD_SIZE;
         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
 
@@ -2700,7 +2700,7 @@ static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
         m->mdt_opts.mo_acl = 0;
         m->mdt_opts.mo_compat_resname = 0;
         obd->obd_replayable = 1;
-
+        spin_lock_init(&m->mdt_client_bitmap_lock);
 
         OBD_ALLOC_PTR(s);
         if (s == NULL)
@@ -2951,7 +2951,7 @@ static int mdt_obd_connect(const struct lu_context *ctx,
                 if (mcd != NULL) {
                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
                         med->med_mcd = mcd;
-                        rc = mdt_client_add(ctx, mdt, med, -1);
+                        rc = mdt_client_new(ctx, mdt, med);
                         if (rc != 0)
                                 OBD_FREE_PTR(mcd);
                 } else
@@ -3085,7 +3085,7 @@ static int mdt_destroy_export(struct obd_export *export)
                 spin_lock(&med->med_open_lock);
         }
         spin_unlock(&med->med_open_lock);
-        mdt_client_free(&ctxt, mdt, med);
+        mdt_client_del(&ctxt, mdt, med);
 
 out:
         if (ma->ma_lmm)
index 39a4e88..2f0a469 100644 (file)
@@ -381,13 +381,16 @@ void mdt_reconstruct(struct mdt_thread_info *);
 int mdt_fs_setup(const struct lu_context *, struct mdt_device *);
 void mdt_fs_cleanup(const struct lu_context *, struct mdt_device *);
 
-int mdt_client_free(const struct lu_context *ctxt,
+int mdt_client_del(const struct lu_context *ctxt,
                     struct mdt_device *mdt,
                     struct mdt_export_data *med);
 int mdt_client_add(const struct lu_context *ctxt,
                    struct mdt_device *mdt,
                    struct mdt_export_data *med,
                    int cl_idx);
+int mdt_client_new(const struct lu_context *ctxt,
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med);
 
 int mdt_pin(struct mdt_thread_info* info);
 
index 8ea1c16..e9c6868 100644 (file)
@@ -72,18 +72,18 @@ static int mdt_record_write(const struct lu_context *ctx,
 /* only one record write */
 
 enum {
-        MDT_TXN_WRITE_RECORD_CREDITS = 3
+        MDT_TXN_LAST_RCVD_WRITE_CREDITS = 3
 };
 
 static struct thandle* mdt_trans_start(const struct lu_context *ctx,
-                                       struct mdt_device *mdt)
+                                       struct mdt_device *mdt, int credits)
 {
         struct mdt_thread_info *mti;
         struct txn_param *p;
 
         mti = lu_context_key_get(ctx, &mdt_thread_key);
         p = &mti->mti_txn_param;
-        p->tp_credits = MDT_TXN_WRITE_RECORD_CREDITS;
+        p->tp_credits = credits;
         return mdt->mdt_bottom->dd_ops->dt_trans_start(ctx, mdt->mdt_bottom, p);
 }
 
@@ -188,7 +188,7 @@ static int mdt_last_rcvd_header_write(const struct lu_context *ctx,
 
         mti = lu_context_key_get(ctx, &mdt_thread_key);
 
-        th = mdt_trans_start(ctx, mdt);
+        th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
@@ -369,8 +369,8 @@ static int mdt_clients_data_init(const struct lu_context *ctx,
                        cl_idx, last_transno);
                 /* protect __u64 value update */
                 spin_lock(&mdt->mdt_transno_lock);
-                if (last_transno > mdt->mdt_last_transno)
-                        mdt->mdt_last_transno = last_transno;
+                mdt->mdt_last_transno = max(last_transno,
+                                            mdt->mdt_last_transno);
                 spin_unlock(&mdt->mdt_transno_lock);
         }
 
@@ -410,7 +410,7 @@ static int mdt_server_data_init(const struct lu_context *ctx,
         if (rc)
                 RETURN(rc);
 
-        last_rcvd_size = la->la_size;
+        last_rcvd_size = (unsigned long)la->la_size;
         
         if (last_rcvd_size == 0) {
                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
@@ -551,7 +551,7 @@ static int mdt_server_data_update(const struct lu_context *ctx,
         rc = mdt_last_rcvd_header_write(ctx, mdt, msd);
         RETURN(rc);
 }
-#if 0
+
 int mdt_client_new(const struct lu_context *ctx,
                    struct mdt_device *mdt,
                    struct mdt_export_data *med)
@@ -559,26 +559,31 @@ int mdt_client_new(const struct lu_context *ctx,
         unsigned long *bitmap = mdt->mdt_client_bitmap;
         struct mdt_client_data *mcd = med->med_mcd;
         struct mdt_server_data *msd = &mdt->mdt_msd;
+        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        struct mdt_thread_info *mti;
+        struct thandle *th;
+        loff_t off;
         int rc = 0;
+        int cl_idx;
         ENTRY;
 
         LASSERT(bitmap != NULL);
-
+        if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
+                RETURN(0);
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
          * there's no need for extra complication here
          */
+        spin_lock(&mdt->mdt_client_bitmap_lock);
         cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
-repeat:
         if (cl_idx >= LR_MAX_CLIENTS ||
             MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
                 CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
-                return -EOVERFLOW;
-        }
-        if (test_and_set_bit(cl_idx, bitmap)) {
-                cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
-                                cl_idx);
-                goto repeat;
+                spin_unlock(&mdt->mdt_client_bitmap_lock);
+                RETURN(-EOVERFLOW);
         }
+        set_bit(cl_idx, bitmap);
+        spin_unlock(&mdt->mdt_client_bitmap_lock);
 
         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
                cl_idx, med->med_mcd->mcd_uuid);
@@ -589,23 +594,20 @@ repeat:
         init_mutex(&med->med_mcd_lock);
 
         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+        /* write new client data */
+        off = med->med_lr_off;
+        th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+        
+        rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
+        CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
+               cl_idx, med->med_lr_off, sizeof(*mcd));
+        mdt_trans_stop(ctx, mdt, th);
 
-        if (new_client) {
-                loff_t off = med->med_lr_off;
-                struct thandle *th;
-                th = mdt_trans_start(ctx, mdt);
-                if (IS_ERR(th))
-                        RETURN(PTR_ERR(th));
-                
-                rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
-                CDEBUG(D_INFO, 
-                       "wrote client mcd at idx %u off %llu (len %u)\n",
-                       cl_idx, med->med_lr_off, sizeof(*mcd));
-                mdt_trans_stop(ctx, mdt, th);
-        }
         RETURN(rc);
 }
-#endif
+
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
  * Otherwise, we just have to read the data from the last_rcvd file and
@@ -619,38 +621,24 @@ int mdt_client_add(const struct lu_context *ctx,
                    struct mdt_export_data *med, int cl_idx)
 {
         unsigned long *bitmap = mdt->mdt_client_bitmap;
-        struct mdt_client_data *mcd = med->med_mcd;
+        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
         struct mdt_server_data *msd = &mdt->mdt_msd;
-        int new_client = (cl_idx == -1);
         int rc = 0;
         ENTRY;
 
         LASSERT(bitmap != NULL);
-        LASSERTF(cl_idx > -2, "%d\n", cl_idx);
+        LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
 
-        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
-         * there's no need for extra complication here
-         */
-        if (new_client) {
-                cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
-        repeat:
-                if (cl_idx >= LR_MAX_CLIENTS ||
-                    MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
-                        CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
-                        return -EOVERFLOW;
-                }
-                if (test_and_set_bit(cl_idx, bitmap)) {
-                        cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
-                                                    cl_idx);
-                        goto repeat;
-                }
-        } else {
-                if (test_and_set_bit(cl_idx, bitmap)) {
-                        CERROR("MDS client %d: bit already set in bitmap!!\n",
-                               cl_idx);
-                        LBUG();
-                }
+        if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
+                RETURN(0);
+
+        spin_lock(&mdt->mdt_client_bitmap_lock);
+        if (test_and_set_bit(cl_idx, bitmap)) {
+                CERROR("MDS client %d: bit already set in bitmap!!\n",
+                       cl_idx);
+                LBUG();
         }
+        spin_unlock(&mdt->mdt_client_bitmap_lock);
 
         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
                cl_idx, med->med_mcd->mcd_uuid);
@@ -662,27 +650,14 @@ int mdt_client_add(const struct lu_context *ctx,
 
         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
 
-        if (new_client) {
-                loff_t off = med->med_lr_off;
-                struct thandle *th;
-                th = mdt_trans_start(ctx, mdt);
-                if (IS_ERR(th))
-                        RETURN(PTR_ERR(th));
-                
-                rc = mdt_last_rcvd_write(ctx, mdt, mcd, &off, th);
-                CDEBUG(D_INFO, 
-                       "wrote client mcd at idx %u off %llu (len %u)\n",
-                       cl_idx, med->med_lr_off, sizeof(*mcd));
-                mdt_trans_stop(ctx, mdt, th);
-        }
         RETURN(rc);
 }
 
-int mdt_client_free(const struct lu_context *ctx,
+int mdt_client_del(const struct lu_context *ctx,
                    struct mdt_device *mdt,
                    struct mdt_export_data *med)
 {
-        struct mdt_client_data *mcd  = med->med_mcd;
+        struct mdt_client_data *mcd = med->med_mcd;
         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
         struct thandle *th;
         loff_t off;
@@ -717,7 +692,7 @@ int mdt_client_free(const struct lu_context *ctx,
                 LBUG();
         }
 
-        th = mdt_trans_start(ctx, mdt);
+        th = mdt_trans_start(ctx, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
         if (IS_ERR(th))
                 GOTO(free, rc = PTR_ERR(th));
 
@@ -808,15 +783,11 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti,
 extern struct lu_context_key mdt_txn_key;
 extern struct lu_context_key mdt_thread_key;
 
-enum {
-        MDT_TXN_LAST_RCVD_CREDITS = 3
-};
-
 /* add credits for last_rcvd update */
 static int mdt_txn_start_cb(const struct lu_context *ctx,
                             struct txn_param *param, void *cookie)
 {
-        param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
+        param->tp_credits += MDT_TXN_LAST_RCVD_WRITE_CREDITS;
         return 0;
 }
 
@@ -845,6 +816,8 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
                 txi->txi_transno = 0;
                 return 0;
         }
+        req = mdt_info_req(mti);
+        LASSERT(req != NULL);
         /*TODO: checks for recovery cases, see mds_finish_transno */
         spin_lock(&mdt->mdt_transno_lock);
         if (txn->th_result != 0) {
@@ -856,18 +829,19 @@ static int mdt_txn_stop_cb(const struct lu_context *ctx,
         } else if (mti->mti_transno == 0) {
                 mti->mti_transno = ++ mdt->mdt_last_transno;
         } else {
-                /* replay */
+                /* should be replay */
+                if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))
+                        CERROR("Double transaction ("LPU64") per thread!\n",
+                               mti->mti_transno);                
                 if (mti->mti_transno > mdt->mdt_last_transno)
                         mdt->mdt_last_transno = mti->mti_transno;
         }
         spin_unlock(&mdt->mdt_transno_lock);
 
-        /* filling reply data */
-        req = mdt_info_req(mti);
-
         /* sometimes the reply message has not been successfully packed */
         LASSERT(req != NULL && req->rq_repmsg != NULL);
 
+        /* filling reply data */
         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
                mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
 
index b62f5e9..a380f19 100644 (file)
@@ -250,7 +250,7 @@ static int filter_client_free(struct obd_export *exp)
                 GOTO(free, 0);
 
         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
-               fed->fed_lr_idx, off, fed->fed_fcd->fcd_uuid);
+               fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
 
         LASSERT(filter->fo_last_rcvd_slots != NULL);