X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fptlrpc%2Ftarget.c;h=6f8bccf78da3bf54f875f98e70da130d3612033f;hb=18d78c77953017e5a76cd10bc74a0d078217a626;hp=0b6b2fe27509f377fd287b4415d038b89a76b350;hpb=d51f4e19e640a5601297599775f8835b12cdb1dc;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index 0b6b2fe..6f8bccf 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -26,8 +26,10 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,22 +43,25 @@ #include #include +#include + /** * Update client data in last_rcvd file. An obd API */ static int obt_client_data_update(struct obd_export *exp) { - struct lu_export_data *led = &exp->exp_target_data; + struct tg_export_data *ted = &exp->exp_target_data; struct obd_device_target *obt = &exp->exp_obd->u.obt; - loff_t off = led->led_lr_off; + struct lu_target *lut = class_exp2tgt(exp); + loff_t off = ted->ted_lr_off; int rc = 0; rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp, - led->led_lcd, sizeof(*led->led_lcd), &off, 0); + ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0); CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n", - led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch), - le32_to_cpu(obt->obt_lsd->lsd_start_epoch)); + ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch), + le32_to_cpu(lut->lut_lsd.lsd_start_epoch)); return rc; } @@ -64,21 +69,22 @@ static int obt_client_data_update(struct obd_export *exp) /** * Update server data in last_rcvd file. An obd API */ -int obt_server_data_update(struct obd_device *obd, int force_sync) +int obt_server_data_update(struct lu_target *lut, int force_sync) { - struct obd_device_target *obt = &obd->u.obt; + struct obd_device_target *obt = &lut->lut_obd->u.obt; loff_t off = 0; int rc; ENTRY; CDEBUG(D_SUPER, "%s: mount_count is "LPU64", last_transno is "LPU64"\n", - obt->obt_lsd->lsd_uuid, - le64_to_cpu(obt->obt_lsd->lsd_mount_count), - le64_to_cpu(obt->obt_lsd->lsd_last_transno)); + lut->lut_lsd.lsd_uuid, + le64_to_cpu(lut->lut_lsd.lsd_mount_count), + le64_to_cpu(lut->lut_lsd.lsd_last_transno)); - rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd, - sizeof(*obt->obt_lsd), &off, force_sync); + rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp, + &lut->lut_lsd, sizeof(lut->lut_lsd), + &off, force_sync); if (rc) CERROR("error writing lr_server_data: rc = %d\n", rc); @@ -90,52 +96,52 @@ int obt_server_data_update(struct obd_device *obd, int force_sync) */ void obt_client_epoch_update(struct obd_export *exp) { - struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; - struct obd_device_target *obt = &exp->exp_obd->u.obt; + struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd; + struct lu_target *lut = class_exp2tgt(exp); /** VBR: set client last_epoch to current epoch */ if (le32_to_cpu(lcd->lcd_last_epoch) >= - le32_to_cpu(obt->obt_lsd->lsd_start_epoch)) + le32_to_cpu(lut->lut_lsd.lsd_start_epoch)) return; - lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch; + lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; obt_client_data_update(exp); } /** * Increment server epoch. An obd API */ -static void obt_boot_epoch_update(struct obd_device *obd) +static void obt_boot_epoch_update(struct lu_target *lut) { + struct obd_device *obd = lut->lut_obd; __u32 start_epoch; - struct obd_device_target *obt = &obd->u.obt; struct ptlrpc_request *req; - struct list_head client_list; + cfs_list_t client_list; - spin_lock(&obt->obt_translock); - start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1; - obt->obt_last_transno = cpu_to_le64((__u64)start_epoch << + cfs_spin_lock(&lut->lut_translock); + start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1; + lut->lut_last_transno = cpu_to_le64((__u64)start_epoch << LR_EPOCH_BITS); - obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch); - spin_unlock(&obt->obt_translock); + lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch); + cfs_spin_unlock(&lut->lut_translock); CFS_INIT_LIST_HEAD(&client_list); - spin_lock_bh(&obd->obd_processing_task_lock); - list_splice_init(&obd->obd_final_req_queue, &client_list); - spin_unlock_bh(&obd->obd_processing_task_lock); + cfs_spin_lock(&obd->obd_recovery_task_lock); + cfs_list_splice_init(&obd->obd_final_req_queue, &client_list); + cfs_spin_unlock(&obd->obd_recovery_task_lock); /** * go through list of exports participated in recovery and * set new epoch for them */ - list_for_each_entry(req, &client_list, rq_list) { + cfs_list_for_each_entry(req, &client_list, rq_list) { LASSERT(!req->rq_export->exp_delayed); obt_client_epoch_update(req->rq_export); } /** return list back at once */ - spin_lock_bh(&obd->obd_processing_task_lock); - list_splice_init(&client_list, &obd->obd_final_req_queue); - spin_unlock_bh(&obd->obd_processing_task_lock); - obt_server_data_update(obd, 1); + cfs_spin_lock(&obd->obd_recovery_task_lock); + cfs_list_splice_init(&client_list, &obd->obd_final_req_queue); + cfs_spin_unlock(&obd->obd_recovery_task_lock); + obt_server_data_update(lut, 1); } /** @@ -145,19 +151,25 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, const struct lu_buf *buf, loff_t *off, int sync) { struct thandle *th; - struct txn_param p; - int rc, credits; + int rc; ENTRY; - credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom, - DTO_WRITE_BLOCK); - txn_param_init(&p, credits); - - th = dt_trans_start(env, lut->lut_bottom, &p); + th = dt_trans_create(env, lut->lut_bottom); if (IS_ERR(th)) RETURN(PTR_ERR(th)); + rc = dt_declare_record_write(env, lut->lut_last_rcvd, + buf->lb_len, *off, th); + if (rc) + goto stop; + + rc = dt_trans_start(env, lut->lut_bottom, th); + if (rc) + goto stop; + rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th); + +stop: dt_trans_stop(env, lut->lut_bottom, th); CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" @@ -168,21 +180,64 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, } /** + * Allocate in-memory data for client slot related to export. + */ +int lut_client_alloc(struct obd_export *exp) +{ + LASSERT(exp != exp->exp_obd->obd_self_export); + + OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd); + if (exp->exp_target_data.ted_lcd == NULL) + RETURN(-ENOMEM); + /* Mark that slot is not yet valid, 0 doesn't work here */ + exp->exp_target_data.ted_lr_idx = -1; + RETURN(0); +} +EXPORT_SYMBOL(lut_client_alloc); + +/** + * Free in-memory data for client slot related to export. + */ +void lut_client_free(struct obd_export *exp) +{ + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); + + LASSERT(exp != exp->exp_obd->obd_self_export); + + OBD_FREE_PTR(ted->ted_lcd); + ted->ted_lcd = NULL; + + /* Slot may be not yet assigned */ + if (ted->ted_lr_idx < 0) + return; + /* Clear bit when lcd is freed */ + cfs_spin_lock(&lut->lut_client_bitmap_lock); + if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) { + CERROR("%s: client %u bit already clear in bitmap\n", + exp->exp_obd->obd_name, ted->ted_lr_idx); + LBUG(); + } + cfs_spin_unlock(&lut->lut_client_bitmap_lock); +} +EXPORT_SYMBOL(lut_client_free); + +/** * Update client data in last_rcvd */ -int lut_client_data_update(const struct lu_env *env, struct lu_target *lut, - struct obd_export *exp) +int lut_client_data_update(const struct lu_env *env, struct obd_export *exp) { - struct lu_export_data *led = &exp->exp_target_data; + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *lut = class_exp2tgt(exp); struct lsd_client_data tmp_lcd; - loff_t tmp_off = led->led_lr_off; + loff_t tmp_off = ted->ted_lr_off; struct lu_buf tmp_buf = { .lb_buf = &tmp_lcd, .lb_len = sizeof(tmp_lcd) }; int rc = 0; - lcd_cpu_to_le(led->led_lcd, &tmp_lcd); + lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd); LASSERT(lut->lut_last_rcvd); rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0); @@ -192,8 +247,8 @@ int lut_client_data_update(const struct lu_env *env, struct lu_target *lut, /** * Update server data in last_rcvd */ -static int lut_server_data_update(const struct lu_env *env, - struct lu_target *lut, int sync) +int lut_server_data_update(const struct lu_env *env, + struct lu_target *lut, int sync) { struct lr_server_data tmp_lsd; loff_t tmp_off = 0; @@ -206,12 +261,12 @@ static int lut_server_data_update(const struct lu_env *env, CDEBUG(D_SUPER, "%s: mount_count is "LPU64", last_transno is "LPU64"\n", - lut->lut_lsd.lsd_uuid, lut->lut_mount_count, + lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count, lut->lut_last_transno); - spin_lock(&lut->lut_translock); + cfs_spin_lock(&lut->lut_translock); lut->lut_lsd.lsd_last_transno = lut->lut_last_transno; - spin_unlock(&lut->lut_translock); + cfs_spin_unlock(&lut->lut_translock); lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd); if (lut->lut_last_rcvd != NULL) @@ -219,17 +274,17 @@ static int lut_server_data_update(const struct lu_env *env, RETURN(rc); } -void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut, - struct obd_export *exp) +void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp) { - struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; + struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd; + struct lu_target *lut = class_exp2tgt(exp); LASSERT(lut->lut_bottom); /** VBR: set client last_epoch to current epoch */ if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch) return; lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; - lut_client_data_update(env, lut, exp); + lut_client_data_update(env, exp); } /** @@ -240,49 +295,49 @@ void lut_boot_epoch_update(struct lu_target *lut) struct lu_env env; struct ptlrpc_request *req; __u32 start_epoch; - struct list_head client_list; + cfs_list_t client_list; int rc; if (lut->lut_obd->obd_stopping) return; /** Increase server epoch after recovery */ if (lut->lut_bottom == NULL) - return obt_boot_epoch_update(lut->lut_obd); + return obt_boot_epoch_update(lut); rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) { - CERROR("Can't initialize environment rc=%i\n", rc); + CERROR("Can't initialize environment rc=%d\n", rc); return; } - spin_lock(&lut->lut_translock); + cfs_spin_lock(&lut->lut_translock); start_epoch = lr_epoch(lut->lut_last_transno) + 1; lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS; lut->lut_lsd.lsd_start_epoch = start_epoch; - spin_unlock(&lut->lut_translock); + cfs_spin_unlock(&lut->lut_translock); CFS_INIT_LIST_HEAD(&client_list); /** * The recovery is not yet finished and final queue can still be updated * with resend requests. Move final list to separate one for processing */ - spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); - list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list); - spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock); + cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list); + cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock); /** * go through list of exports participated in recovery and * set new epoch for them */ - list_for_each_entry(req, &client_list, rq_list) { + cfs_list_for_each_entry(req, &client_list, rq_list) { LASSERT(!req->rq_export->exp_delayed); if (!req->rq_export->exp_vbr_failed) - lut_client_epoch_update(&env, lut, req->rq_export); + lut_client_epoch_update(&env, req->rq_export); } /** return list back at once */ - spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); - list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue); - spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock); + cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue); + cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock); /** update server epoch */ lut_server_data_update(&env, lut, 1); lu_env_fini(&env); @@ -292,37 +347,115 @@ EXPORT_SYMBOL(lut_boot_epoch_update); /** * commit callback, need to update last_commited value */ -void lut_cb_last_committed(struct lu_target *lut, __u64 transno, - void *data, int err) +struct lut_last_committed_callback { + struct dt_txn_commit_cb llcc_cb; + struct lu_target *llcc_lut; + struct obd_export *llcc_exp; + __u64 llcc_transno; +}; + +void lut_cb_last_committed(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) { - struct obd_export *exp = data; - LASSERT(exp->exp_obd == lut->lut_obd); - spin_lock(&lut->lut_translock); - if (transno > lut->lut_obd->obd_last_committed) - lut->lut_obd->obd_last_committed = transno; - - LASSERT(exp); - if (transno > exp->exp_last_committed) { - exp->exp_last_committed = transno; - spin_unlock(&lut->lut_translock); - ptlrpc_commit_replies(exp); + struct lut_last_committed_callback *ccb; + + ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb); + + LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd); + + cfs_spin_lock(&ccb->llcc_lut->lut_translock); + if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed) + ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno; + + LASSERT(ccb->llcc_exp); + if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) { + ccb->llcc_exp->exp_last_committed = ccb->llcc_transno; + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); + ptlrpc_commit_replies(ccb->llcc_exp); } else { - spin_unlock(&lut->lut_translock); + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); } - class_export_cb_put(exp); - if (transno) + class_export_cb_put(ccb->llcc_exp); + if (ccb->llcc_transno) CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", - lut->lut_obd->obd_name, transno); + ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno); + cfs_list_del(&ccb->llcc_cb.dcb_linkage); + OBD_FREE_PTR(ccb); } -EXPORT_SYMBOL(lut_cb_last_committed); -void lut_cb_client(struct lu_target *lut, __u64 transno, - void *data, int err) +int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut, + struct obd_export *exp, __u64 transno) { - LASSERT(lut->lut_obd); - target_client_add_cb(lut->lut_obd, transno, data, err); + struct lut_last_committed_callback *ccb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->llcc_cb.dcb_func = lut_cb_last_committed; + CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage); + ccb->llcc_lut = lut; + ccb->llcc_exp = class_export_cb_get(exp); + ccb->llcc_transno = transno; + + rc = dt_trans_cb_add(th, &ccb->llcc_cb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + return rc; +} +EXPORT_SYMBOL(lut_last_commit_cb_add); + +struct lut_new_client_callback { + struct dt_txn_commit_cb lncc_cb; + struct obd_export *lncc_exp; +}; + +void lut_cb_new_client(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) +{ + struct lut_new_client_callback *ccb; + + ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb); + + LASSERT(ccb->lncc_exp->exp_obd); + + CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n", + ccb->lncc_exp->exp_obd->obd_name, + ccb->lncc_exp->exp_client_uuid.uuid); + + cfs_spin_lock(&ccb->lncc_exp->exp_lock); + ccb->lncc_exp->exp_need_sync = 0; + cfs_spin_unlock(&ccb->lncc_exp->exp_lock); + class_export_cb_put(ccb->lncc_exp); + + cfs_list_del(&ccb->lncc_cb.dcb_linkage); + OBD_FREE_PTR(ccb); } -EXPORT_SYMBOL(lut_cb_client); + +int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp) +{ + struct lut_new_client_callback *ccb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->lncc_cb.dcb_func = lut_cb_new_client; + CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage); + ccb->lncc_exp = class_export_cb_get(exp); + + rc = dt_trans_cb_add(th, &ccb->lncc_cb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + return rc; +} +EXPORT_SYMBOL(lut_new_client_cb_add); int lut_init(const struct lu_env *env, struct lu_target *lut, struct obd_device *obd, struct dt_device *dt) @@ -332,13 +465,19 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, int rc = 0; ENTRY; + LASSERT(lut); + LASSERT(obd); lut->lut_obd = obd; lut->lut_bottom = dt; lut->lut_last_rcvd = NULL; + obd->u.obt.obt_lut = lut; - spin_lock_init(&lut->lut_translock); - spin_lock_init(&lut->lut_client_bitmap_lock); - spin_lock_init(&lut->lut_trans_table_lock); + cfs_spin_lock_init(&lut->lut_translock); + cfs_spin_lock_init(&lut->lut_client_bitmap_lock); + + OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + if (lut->lut_client_bitmap == NULL) + RETURN(-ENOMEM); /** obdfilter has no lu_device stack yet */ if (dt == NULL) @@ -347,6 +486,8 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, if (!IS_ERR(o)) { lut->lut_last_rcvd = o; } else { + OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + lut->lut_client_bitmap = NULL; rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); } @@ -358,9 +499,15 @@ EXPORT_SYMBOL(lut_init); void lut_fini(const struct lu_env *env, struct lu_target *lut) { ENTRY; - if (lut->lut_last_rcvd) + + if (lut->lut_client_bitmap) { + OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); + lut->lut_client_bitmap = NULL; + } + if (lut->lut_last_rcvd) { lu_object_put(env, &lut->lut_last_rcvd->do_lu); - lut->lut_last_rcvd = NULL; + lut->lut_last_rcvd = NULL; + } EXIT; } EXPORT_SYMBOL(lut_fini);