* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
#include <obd.h>
#include <lustre_fsfilt.h>
+
/**
* Update client data in last_rcvd file. An obd API
*/
static int obt_client_data_update(struct obd_export *exp)
{
- struct lu_export_data *led = &exp->exp_target_data;
+ struct tg_export_data *ted = &exp->exp_target_data;
struct obd_device_target *obt = &exp->exp_obd->u.obt;
- loff_t off = led->led_lr_off;
+ struct lu_target *lut = class_exp2tgt(exp);
+ loff_t off = ted->ted_lr_off;
int rc = 0;
rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
- led->led_lcd, sizeof(*led->led_lcd), &off, 0);
+ ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
- led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
- le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
+ ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
+ le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
return rc;
}
/**
* Update server data in last_rcvd file. An obd API
*/
-int obt_server_data_update(struct obd_device *obd, int force_sync)
+int obt_server_data_update(struct lu_target *lut, int force_sync)
{
- struct obd_device_target *obt = &obd->u.obt;
+ struct obd_device_target *obt = &lut->lut_obd->u.obt;
loff_t off = 0;
int rc;
ENTRY;
CDEBUG(D_SUPER,
"%s: mount_count is "LPU64", last_transno is "LPU64"\n",
- obt->obt_lsd->lsd_uuid,
- le64_to_cpu(obt->obt_lsd->lsd_mount_count),
- le64_to_cpu(obt->obt_lsd->lsd_last_transno));
+ lut->lut_lsd.lsd_uuid,
+ le64_to_cpu(lut->lut_lsd.lsd_mount_count),
+ le64_to_cpu(lut->lut_lsd.lsd_last_transno));
- rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
- sizeof(*obt->obt_lsd), &off, force_sync);
+ rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
+ &lut->lut_lsd, sizeof(lut->lut_lsd),
+ &off, force_sync);
if (rc)
CERROR("error writing lr_server_data: rc = %d\n", rc);
*/
void obt_client_epoch_update(struct obd_export *exp)
{
- struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
- struct obd_device_target *obt = &exp->exp_obd->u.obt;
+ struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
+ struct lu_target *lut = class_exp2tgt(exp);
/** VBR: set client last_epoch to current epoch */
if (le32_to_cpu(lcd->lcd_last_epoch) >=
- le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
+ le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
return;
- lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
+ lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
obt_client_data_update(exp);
}
/**
* Increment server epoch. An obd API
*/
-static void obt_boot_epoch_update(struct obd_device *obd)
+static void obt_boot_epoch_update(struct lu_target *lut)
{
+ struct obd_device *obd = lut->lut_obd;
__u32 start_epoch;
- struct obd_device_target *obt = &obd->u.obt;
struct ptlrpc_request *req;
- struct list_head client_list;
+ cfs_list_t client_list;
- spin_lock(&obt->obt_translock);
- start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
- obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
+ cfs_spin_lock(&lut->lut_translock);
+ start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
+ lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
LR_EPOCH_BITS);
- obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
- spin_unlock(&obt->obt_translock);
+ lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
+ cfs_spin_unlock(&lut->lut_translock);
CFS_INIT_LIST_HEAD(&client_list);
- spin_lock_bh(&obd->obd_processing_task_lock);
- list_splice_init(&obd->obd_final_req_queue, &client_list);
- spin_unlock_bh(&obd->obd_processing_task_lock);
+ cfs_spin_lock(&obd->obd_recovery_task_lock);
+ cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
+ cfs_spin_unlock(&obd->obd_recovery_task_lock);
/**
* go through list of exports participated in recovery and
* set new epoch for them
*/
- list_for_each_entry(req, &client_list, rq_list) {
+ cfs_list_for_each_entry(req, &client_list, rq_list) {
LASSERT(!req->rq_export->exp_delayed);
obt_client_epoch_update(req->rq_export);
}
/** return list back at once */
- spin_lock_bh(&obd->obd_processing_task_lock);
- list_splice_init(&client_list, &obd->obd_final_req_queue);
- spin_unlock_bh(&obd->obd_processing_task_lock);
- obt_server_data_update(obd, 1);
+ cfs_spin_lock(&obd->obd_recovery_task_lock);
+ cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
+ cfs_spin_unlock(&obd->obd_recovery_task_lock);
+ obt_server_data_update(lut, 1);
}
/**
}
/**
+ * Allocate in-memory data for client slot related to export.
+ */
+int lut_client_alloc(struct obd_export *exp)
+{
+ OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
+ if (exp->exp_target_data.ted_lcd == NULL)
+ RETURN(-ENOMEM);
+ /* Mark that slot is not yet valid, 0 doesn't work here */
+ exp->exp_target_data.ted_lr_idx = -1;
+ RETURN(0);
+}
+EXPORT_SYMBOL(lut_client_alloc);
+
+/**
+ * Free in-memory data for client slot related to export.
+ */
+void lut_client_free(struct obd_export *exp)
+{
+ struct tg_export_data *ted = &exp->exp_target_data;
+ struct lu_target *lut = class_exp2tgt(exp);
+
+ OBD_FREE_PTR(ted->ted_lcd);
+ ted->ted_lcd = NULL;
+
+ /* Slot may be not yet assigned */
+ if (ted->ted_lr_idx < 0)
+ return;
+ /* Clear bit when lcd is freed */
+ cfs_spin_lock(&lut->lut_client_bitmap_lock);
+ if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
+ CERROR("%s: client %u bit already clear in bitmap\n",
+ exp->exp_obd->obd_name, ted->ted_lr_idx);
+ LBUG();
+ }
+ cfs_spin_unlock(&lut->lut_client_bitmap_lock);
+}
+EXPORT_SYMBOL(lut_client_free);
+
+/**
* Update client data in last_rcvd
*/
-int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
- struct obd_export *exp)
+int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
{
- struct lu_export_data *led = &exp->exp_target_data;
+ struct tg_export_data *ted = &exp->exp_target_data;
+ struct lu_target *lut = class_exp2tgt(exp);
struct lsd_client_data tmp_lcd;
- loff_t tmp_off = led->led_lr_off;
+ loff_t tmp_off = ted->ted_lr_off;
struct lu_buf tmp_buf = {
.lb_buf = &tmp_lcd,
.lb_len = sizeof(tmp_lcd)
};
int rc = 0;
- lcd_cpu_to_le(led->led_lcd, &tmp_lcd);
+ lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
LASSERT(lut->lut_last_rcvd);
rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
CDEBUG(D_SUPER,
"%s: mount_count is "LPU64", last_transno is "LPU64"\n",
- lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
+ lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
lut->lut_last_transno);
- spin_lock(&lut->lut_translock);
+ cfs_spin_lock(&lut->lut_translock);
lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
- spin_unlock(&lut->lut_translock);
+ cfs_spin_unlock(&lut->lut_translock);
lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
if (lut->lut_last_rcvd != NULL)
RETURN(rc);
}
-void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
- struct obd_export *exp)
+void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
{
- struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+ struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
+ struct lu_target *lut = class_exp2tgt(exp);
LASSERT(lut->lut_bottom);
/** VBR: set client last_epoch to current epoch */
if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
return;
lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
- lut_client_data_update(env, lut, exp);
+ lut_client_data_update(env, exp);
}
/**
struct lu_env env;
struct ptlrpc_request *req;
__u32 start_epoch;
- struct list_head client_list;
+ cfs_list_t client_list;
int rc;
if (lut->lut_obd->obd_stopping)
return;
/** Increase server epoch after recovery */
if (lut->lut_bottom == NULL)
- return obt_boot_epoch_update(lut->lut_obd);
+ return obt_boot_epoch_update(lut);
rc = lu_env_init(&env, LCT_DT_THREAD);
if (rc) {
- CERROR("Can't initialize environment rc=%i\n", rc);
+ CERROR("Can't initialize environment rc=%d\n", rc);
return;
}
- spin_lock(&lut->lut_translock);
+ cfs_spin_lock(&lut->lut_translock);
start_epoch = lr_epoch(lut->lut_last_transno) + 1;
lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
lut->lut_lsd.lsd_start_epoch = start_epoch;
- spin_unlock(&lut->lut_translock);
+ cfs_spin_unlock(&lut->lut_translock);
CFS_INIT_LIST_HEAD(&client_list);
/**
* The recovery is not yet finished and final queue can still be updated
* with resend requests. Move final list to separate one for processing
*/
- spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
- list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
- spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+ cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
+ cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
+ cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
/**
* go through list of exports participated in recovery and
* set new epoch for them
*/
- list_for_each_entry(req, &client_list, rq_list) {
+ cfs_list_for_each_entry(req, &client_list, rq_list) {
LASSERT(!req->rq_export->exp_delayed);
- lut_client_epoch_update(&env, lut, req->rq_export);
+ if (!req->rq_export->exp_vbr_failed)
+ lut_client_epoch_update(&env, req->rq_export);
}
/** return list back at once */
- spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
- list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
- spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+ cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
+ cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
+ cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
/** update server epoch */
lut_server_data_update(&env, lut, 1);
lu_env_fini(&env);
void *data, int err)
{
struct obd_export *exp = data;
-
- spin_lock(&lut->lut_translock);
+ LASSERT(exp->exp_obd == lut->lut_obd);
+ cfs_spin_lock(&lut->lut_translock);
if (transno > lut->lut_obd->obd_last_committed)
lut->lut_obd->obd_last_committed = transno;
LASSERT(exp);
- if (!lut->lut_obd->obd_stopping &&
- transno > exp->exp_last_committed) {
+ if (transno > exp->exp_last_committed) {
exp->exp_last_committed = transno;
- spin_unlock(&lut->lut_translock);
+ cfs_spin_unlock(&lut->lut_translock);
ptlrpc_commit_replies(exp);
} else {
- spin_unlock(&lut->lut_translock);
+ cfs_spin_unlock(&lut->lut_translock);
}
+ class_export_cb_put(exp);
if (transno)
CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
lut->lut_obd->obd_name, transno);
int rc = 0;
ENTRY;
+ LASSERT(lut);
+ LASSERT(obd);
lut->lut_obd = obd;
lut->lut_bottom = dt;
lut->lut_last_rcvd = NULL;
+ obd->u.obt.obt_lut = lut;
- spin_lock_init(&lut->lut_translock);
- spin_lock_init(&lut->lut_client_bitmap_lock);
- spin_lock_init(&lut->lut_trans_table_lock);
+ cfs_spin_lock_init(&lut->lut_translock);
+ cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
+
+ OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+ if (lut->lut_client_bitmap == NULL)
+ RETURN(-ENOMEM);
/** obdfilter has no lu_device stack yet */
if (dt == NULL)
if (!IS_ERR(o)) {
lut->lut_last_rcvd = o;
} else {
+ OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+ lut->lut_client_bitmap = NULL;
rc = PTR_ERR(o);
CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
}
void lut_fini(const struct lu_env *env, struct lu_target *lut)
{
ENTRY;
- if (lut->lut_last_rcvd)
+
+ if (lut->lut_client_bitmap) {
+ OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+ lut->lut_client_bitmap = NULL;
+ }
+ if (lut->lut_last_rcvd) {
lu_object_put(env, &lut->lut_last_rcvd->do_lu);
- lut->lut_last_rcvd = NULL;
+ lut->lut_last_rcvd = NULL;
+ }
EXIT;
}
EXPORT_SYMBOL(lut_fini);