-static int mdt_server_data_update(const struct lu_env *env,
- struct mdt_device *mdt)
-{
- int rc = 0;
- ENTRY;
-
- CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
- mdt->mdt_mount_count, mdt->mdt_last_transno);
-
- spin_lock(&mdt->mdt_transno_lock);
- mdt->mdt_msd.msd_last_transno = mdt->mdt_last_transno;
- spin_unlock(&mdt->mdt_transno_lock);
-
- /*
- * This may be called from difficult reply handler and
- * mdt->mdt_last_rcvd may be NULL that time.
- */
- if (mdt->mdt_last_rcvd != NULL)
- rc = mdt_last_rcvd_header_write(env, mdt);
- RETURN(rc);
-}
-
-void mdt_cb_new_client(const struct mdt_device *mdt, __u64 transno,
- void *data, int err)
-{
- struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
-
- target_client_add_cb(obd, transno, data, err);
-}
-
-int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt)
-{
- unsigned long *bitmap = mdt->mdt_client_bitmap;
- struct mdt_thread_info *mti;
- struct mdt_export_data *med;
- struct mdt_client_data *mcd;
- struct mdt_server_data *msd = &mdt->mdt_msd;
- struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
- struct thandle *th;
- loff_t off;
- int rc;
- int cl_idx;
- ENTRY;
-
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- LASSERT(mti != NULL);
-
- med = &mti->mti_exp->exp_mdt_data;
- mcd = med->med_mcd;
-
- LASSERT(bitmap != NULL);
- if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
- RETURN(0);
-
- /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
- * there's no need for extra complication here
- */
- spin_lock(&mdt->mdt_client_bitmap_lock);
- cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
- if (cl_idx >= LR_MAX_CLIENTS ||
- MDT_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
- CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
- cl_idx);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
- RETURN(-EOVERFLOW);
- }
- set_bit(cl_idx, bitmap);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
-
- CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
- cl_idx, med->med_mcd->mcd_uuid);
-
- med->med_lr_idx = cl_idx;
- med->med_lr_off = msd->msd_client_start +
- (cl_idx * msd->msd_client_size);
- init_mutex(&med->med_mcd_lock);
-
- LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
- /* write new client data */
- off = med->med_lr_off;
- th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
- if (IS_ERR(th))
- RETURN(PTR_ERR(th));
-
- /* until this operations will be committed the sync is needed for this
- * export */
- mdt_trans_add_cb(th, mdt_cb_new_client, mti->mti_exp);
- spin_lock(&mti->mti_exp->exp_lock);
- mti->mti_exp->exp_need_sync = 1;
- spin_unlock(&mti->mti_exp->exp_lock);
-
- rc = mdt_last_rcvd_write(env, mdt, mcd, &off, th);
- CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
- cl_idx, med->med_lr_off, sizeof(*mcd));
- mdt_trans_stop(env, mdt, th);
-
- RETURN(rc);
-}
-
-/* Add client data to the MDS. We use a bitmap to locate a free space
- * in the last_rcvd file if cl_off is -1 (i.e. a new client).
- * Otherwise, we just have to read the data from the last_rcvd file and
- * we know its offset.
- *
- * It should not be possible to fail adding an existing client - otherwise
- * mdt_init_server_data() callsite needs to be fixed.
- */
-int mdt_client_add(const struct lu_env *env,
- struct mdt_device *mdt, int cl_idx)
-{
- struct mdt_thread_info *mti;
- struct mdt_export_data *med;
- unsigned long *bitmap = mdt->mdt_client_bitmap;
- struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
- struct mdt_server_data *msd = &mdt->mdt_msd;
- int rc = 0;
- ENTRY;
-
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- LASSERT(mti != NULL);
-
- med = &mti->mti_exp->exp_mdt_data;
-
- LASSERT(bitmap != NULL);
- LASSERTF(cl_idx >= 0, "%d\n", cl_idx);
-
- if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
- RETURN(0);
-
- spin_lock(&mdt->mdt_client_bitmap_lock);
- if (test_and_set_bit(cl_idx, bitmap)) {
- CERROR("MDS client %d: bit already set in bitmap!!\n",
- cl_idx);
- LBUG();
- }
- spin_unlock(&mdt->mdt_client_bitmap_lock);
-
- CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
- cl_idx, med->med_mcd->mcd_uuid);
-
- med->med_lr_idx = cl_idx;
- med->med_lr_off = msd->msd_client_start +
- (cl_idx * msd->msd_client_size);
- init_mutex(&med->med_mcd_lock);
-
- LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
-
- RETURN(rc);
-}
-
-int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt)
-{
- struct mdt_thread_info *mti;
- struct mdt_export_data *med;
- struct mdt_client_data *mcd;
- struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
- struct thandle *th;
- loff_t off;
- int rc = 0;
- ENTRY;
-
- mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
- LASSERT(mti != NULL);
-
- med = &mti->mti_exp->exp_mdt_data;
- mcd = med->med_mcd;
- if (!mcd)
- RETURN(0);
-
- /* XXX: If mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
- if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid))
- GOTO(free, 0);
-
- CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
- med->med_lr_idx, med->med_lr_off);
-
- off = med->med_lr_off;
-
- /*
- * Don't clear med_lr_idx here as it is likely also unset. At worst we
- * leak a client slot that will be cleaned on the next recovery.
- */
- if (off <= 0) {
- CERROR("client idx %d has offset %lld\n",
- med->med_lr_idx, off);
- GOTO(free, rc = -EINVAL);
- }
-
- /*
- * Clear the bit _after_ zeroing out the client so we don't race with
- * mdt_client_add and zero out new clients.
- */
- if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
- CERROR("MDT client %u: bit already clear in bitmap!!\n",
- med->med_lr_idx);
- LBUG();
- }
-
- /*
- * This may be called from difficult reply handler path and
- * mdt->mdt_last_rcvd may be NULL that time.
- */
- if (mdt->mdt_last_rcvd != NULL) {
- th = mdt_trans_start(env, mdt, MDT_TXN_LAST_RCVD_WRITE_CREDITS);
- if (IS_ERR(th))
- GOTO(free, rc = PTR_ERR(th));
-
- mutex_down(&med->med_mcd_lock);
- memset(mcd, 0, sizeof *mcd);
-
- rc = mdt_last_rcvd_write(env, mdt, mcd, &off, th);
- mutex_up(&med->med_mcd_lock);
- mdt_trans_stop(env, mdt, th);
- }
-
- CDEBUG(rc == 0 ? D_INFO : D_ERROR, "Zeroing out client idx %u in "
- "%s rc %d\n", med->med_lr_idx, LAST_RCVD, rc);
-
- spin_lock(&mdt->mdt_client_bitmap_lock);
- clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
- spin_unlock(&mdt->mdt_client_bitmap_lock);
-
- /*
- * Make sure the server's last_transno is up to date. Do this after the
- * client is freed so we know all the client's transactions have been
- * committed.
- */
- mdt_server_data_update(env, mdt);
- EXIT;
-free:
- OBD_FREE_PTR(mcd);
- med->med_mcd = NULL;
- return 0;
-}
-