X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fptlrpc%2Ftarget.c;h=d0e99de3187ddbcff8c7ad0a2d5a3eb7b2406058;hb=69b4763882b4515ff4a1b9b223e522172fdb27f4;hp=df732ad2b5daa59ab957591d6c1568a49d494f18;hpb=273e40aa6a61423ddf9c9cea4c1b9d092f0d3116;p=fs%2Flustre-release.git diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c index df732ad..d0e99de 100644 --- a/lustre/ptlrpc/target.c +++ b/lustre/ptlrpc/target.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -28,6 +26,8 @@ /* * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2012, Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,6 +41,45 @@ #include #include +#include +#include + +/** + * Common data shared by tg-level handlers. This is allocated per-thread to + * reduce stack consumption. + */ +struct tg_thread_info { + /* server and client data buffers */ + struct lr_server_data tti_lsd; + struct lsd_client_data tti_lcd; + struct lu_buf tti_buf; + loff_t tti_off; +}; + +static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti) +{ + tti->tti_buf.lb_buf = &tti->tti_lsd; + tti->tti_buf.lb_len = sizeof(tti->tti_lsd); + return &tti->tti_buf; +} + +static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti) +{ + tti->tti_buf.lb_buf = &tti->tti_lcd; + tti->tti_buf.lb_len = sizeof(tti->tti_lcd); + return &tti->tti_buf; +} + +extern struct lu_context_key tg_thread_key; + +static inline struct tg_thread_info *tg_th_info(const struct lu_env *env) +{ + struct tg_thread_info *tti; + + tti = lu_context_key_get(&env->le_ctx, &tg_thread_key); + LASSERT(tti); + return tti; +} /** * Update client data in last_rcvd file. An obd API @@ -142,35 +181,6 @@ static void obt_boot_epoch_update(struct lu_target *lut) } /** - * write data in last_rcvd file. - */ -static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, - const struct lu_buf *buf, loff_t *off, int sync) -{ - struct thandle *th; - struct txn_param p; - int rc, credits; - ENTRY; - - credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom, - DTO_WRITE_BLOCK); - txn_param_init(&p, credits); - - th = dt_trans_start(env, lut->lut_bottom, &p); - if (IS_ERR(th)) - RETURN(PTR_ERR(th)); - - rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th); - dt_trans_stop(env, lut->lut_bottom, th); - - CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" - "uuid = %s\nlast_transno = "LPU64"\n", - rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno); - - RETURN(rc); -} - -/** * Allocate in-memory data for client slot related to export. */ int lut_client_alloc(struct obd_export *exp) @@ -203,67 +213,232 @@ void lut_client_free(struct obd_export *exp) if (ted->ted_lr_idx < 0) return; /* Clear bit when lcd is freed */ - cfs_spin_lock(&lut->lut_client_bitmap_lock); + LASSERT(lut->lut_client_bitmap); if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) { CERROR("%s: client %u bit already clear in bitmap\n", exp->exp_obd->obd_name, ted->ted_lr_idx); LBUG(); } - cfs_spin_unlock(&lut->lut_client_bitmap_lock); } EXPORT_SYMBOL(lut_client_free); +int lut_client_data_read(const struct lu_env *env, struct lu_target *tg, + struct lsd_client_data *lcd, loff_t *off, int index) +{ + struct tg_thread_info *tti = tg_th_info(env); + int rc; + + tti_buf_lcd(tti); + rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off); + if (rc == 0) { + check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd); + lcd_le_to_cpu(&tti->tti_lcd, lcd); + } + + CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64 + ", last_xid = "LPU64", last_result = %u, last_data = %u, " + "last_close_transno = "LPU64", last_close_xid = "LPU64", " + "last_close_result = %u\n", *off, + rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid, + lcd->lcd_last_result, lcd->lcd_last_data, + lcd->lcd_last_close_transno, lcd->lcd_last_close_xid, + lcd->lcd_last_close_result); + return rc; +} +EXPORT_SYMBOL(lut_client_data_read); + +int lut_client_data_write(const struct lu_env *env, struct lu_target *tg, + struct lsd_client_data *lcd, loff_t *off, + struct thandle *th) +{ + struct tg_thread_info *tti = tg_th_info(env); + + lcd_cpu_to_le(lcd, &tti->tti_lcd); + tti_buf_lcd(tti); + + return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th); +} +EXPORT_SYMBOL(lut_client_data_write); + /** * Update client data in last_rcvd */ int lut_client_data_update(const struct lu_env *env, struct obd_export *exp) { - struct tg_export_data *ted = &exp->exp_target_data; - struct lu_target *lut = class_exp2tgt(exp); - struct lsd_client_data tmp_lcd; - loff_t tmp_off = ted->ted_lr_off; - struct lu_buf tmp_buf = { - .lb_buf = &tmp_lcd, - .lb_len = sizeof(tmp_lcd) - }; - int rc = 0; - - lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd); - LASSERT(lut->lut_last_rcvd); - rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0); + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *tg = class_exp2tgt(exp); + struct tg_thread_info *tti = tg_th_info(env); + struct thandle *th; + int rc = 0; + + ENTRY; + + th = dt_trans_create(env, tg->lut_bottom); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_declare_record_write(env, tg->lut_last_rcvd, + sizeof(struct lsd_client_data), + ted->ted_lr_off, th); + if (rc) + GOTO(out, rc); + + rc = dt_trans_start_local(env, tg->lut_bottom, th); + if (rc) + GOTO(out, rc); + /* + * Until this operations will be committed the sync is needed + * for this export. This should be done _after_ starting the + * transaction so that many connecting clients will not bring + * server down with lots of sync writes. + */ + rc = lut_new_client_cb_add(th, exp); + if (rc) { + /* can't add callback, do sync now */ + th->th_sync = 1; + } else { + cfs_spin_lock(&exp->exp_lock); + exp->exp_need_sync = 1; + cfs_spin_unlock(&exp->exp_lock); + } + + tti->tti_off = ted->ted_lr_off; + rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th); + EXIT; +out: + dt_trans_stop(env, tg->lut_bottom, th); + CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno); + + return rc; +} +int lut_server_data_read(const struct lu_env *env, struct lu_target *tg) +{ + struct tg_thread_info *tti = tg_th_info(env); + int rc; + + tti->tti_off = 0; + tti_buf_lsd(tti); + rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off); + if (rc == 0) + lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd); + + CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, " + "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc, + tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno); return rc; } +EXPORT_SYMBOL(lut_server_data_read); + +int lut_server_data_write(const struct lu_env *env, struct lu_target *tg, + struct thandle *th) +{ + struct tg_thread_info *tti = tg_th_info(env); + int rc; + ENTRY; + + tti->tti_off = 0; + tti_buf_lsd(tti); + lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd); + + rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, + &tti->tti_off, th); + + CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno); + + RETURN(rc); +} +EXPORT_SYMBOL(lut_server_data_write); /** * Update server data in last_rcvd */ -static int lut_server_data_update(const struct lu_env *env, - struct lu_target *lut, int sync) +int lut_server_data_update(const struct lu_env *env, struct lu_target *tg, + int sync) { - struct lr_server_data tmp_lsd; - loff_t tmp_off = 0; - struct lu_buf tmp_buf = { - .lb_buf = &tmp_lsd, - .lb_len = sizeof(tmp_lsd) - }; - int rc = 0; - ENTRY; - - CDEBUG(D_SUPER, - "%s: mount_count is "LPU64", last_transno is "LPU64"\n", - lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count, - lut->lut_last_transno); - - cfs_spin_lock(&lut->lut_translock); - lut->lut_lsd.lsd_last_transno = lut->lut_last_transno; - cfs_spin_unlock(&lut->lut_translock); + struct tg_thread_info *tti = tg_th_info(env); + struct thandle *th; + int rc = 0; + + ENTRY; + + CDEBUG(D_SUPER, + "%s: mount_count is "LPU64", last_transno is "LPU64"\n", + tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count, + tg->lut_last_transno); + + /* Always save latest transno to keep it fresh */ + cfs_spin_lock(&tg->lut_translock); + tg->lut_lsd.lsd_last_transno = tg->lut_last_transno; + cfs_spin_unlock(&tg->lut_translock); + + th = dt_trans_create(env, tg->lut_bottom); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + th->th_sync = sync; + + rc = dt_declare_record_write(env, tg->lut_last_rcvd, + sizeof(struct lr_server_data), + tti->tti_off, th); + if (rc) + GOTO(out, rc); + + rc = dt_trans_start(env, tg->lut_bottom, th); + if (rc) + GOTO(out, rc); + + rc = lut_server_data_write(env, tg, th); +out: + dt_trans_stop(env, tg->lut_bottom, th); + + CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno); + RETURN(rc); +} +EXPORT_SYMBOL(lut_server_data_update); - lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd); - if (lut->lut_last_rcvd != NULL) - rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync); - RETURN(rc); +int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, + loff_t size) +{ + struct dt_object *dt = tg->lut_last_rcvd; + struct thandle *th; + struct lu_attr attr; + int rc; + + ENTRY; + + attr.la_size = size; + attr.la_valid = LA_SIZE; + + th = dt_trans_create(env, tg->lut_bottom); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th); + if (rc) + GOTO(cleanup, rc); + rc = dt_declare_attr_set(env, dt, &attr, th); + if (rc) + GOTO(cleanup, rc); + rc = dt_trans_start_local(env, tg->lut_bottom, th); + if (rc) + GOTO(cleanup, rc); + + rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA); + if (rc == 0) + rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA); + +cleanup: + dt_trans_stop(env, tg->lut_bottom, th); + + RETURN(rc); } +EXPORT_SYMBOL(lut_truncate_last_rcvd); void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp) { @@ -295,7 +470,7 @@ void lut_boot_epoch_update(struct lu_target *lut) if (lut->lut_bottom == NULL) return obt_boot_epoch_update(lut); - rc = lu_env_init(&env, LCT_DT_THREAD); + rc = lu_env_init(&env, LCT_DT_THREAD); if (rc) { CERROR("Can't initialize environment rc=%d\n", rc); return; @@ -338,44 +513,301 @@ EXPORT_SYMBOL(lut_boot_epoch_update); /** * commit callback, need to update last_commited value */ -void lut_cb_last_committed(struct lu_target *lut, __u64 transno, - void *data, int err) +struct lut_last_committed_callback { + struct dt_txn_commit_cb llcc_cb; + struct lu_target *llcc_lut; + struct obd_export *llcc_exp; + __u64 llcc_transno; +}; + +void lut_cb_last_committed(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) { - struct obd_export *exp = data; - LASSERT(exp->exp_obd == lut->lut_obd); - cfs_spin_lock(&lut->lut_translock); - if (transno > lut->lut_obd->obd_last_committed) - lut->lut_obd->obd_last_committed = transno; - - LASSERT(exp); - if (transno > exp->exp_last_committed) { - exp->exp_last_committed = transno; - cfs_spin_unlock(&lut->lut_translock); - ptlrpc_commit_replies(exp); + struct lut_last_committed_callback *ccb; + + ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb); + + LASSERT(ccb->llcc_lut != NULL); + LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd); + + cfs_spin_lock(&ccb->llcc_lut->lut_translock); + if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed) + ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno; + + LASSERT(ccb->llcc_exp); + if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) { + ccb->llcc_exp->exp_last_committed = ccb->llcc_transno; + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); + ptlrpc_commit_replies(ccb->llcc_exp); } else { - cfs_spin_unlock(&lut->lut_translock); + cfs_spin_unlock(&ccb->llcc_lut->lut_translock); } - class_export_cb_put(exp); - if (transno) + class_export_cb_put(ccb->llcc_exp); + if (ccb->llcc_transno) CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", - lut->lut_obd->obd_name, transno); + ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno); + OBD_FREE_PTR(ccb); +} + +int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut, + struct obd_export *exp, __u64 transno) +{ + struct lut_last_committed_callback *ccb; + struct dt_txn_commit_cb *dcb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->llcc_lut = lut; + ccb->llcc_exp = class_export_cb_get(exp); + ccb->llcc_transno = transno; + + dcb = &ccb->llcc_cb; + dcb->dcb_func = lut_cb_last_committed; + CFS_INIT_LIST_HEAD(&dcb->dcb_linkage); + strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN); + dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0'; + + rc = dt_trans_cb_add(th, dcb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + + if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0) + /* report failure to force synchronous operation */ + return -EPERM; + + return rc; +} +EXPORT_SYMBOL(lut_last_commit_cb_add); + +struct lut_new_client_callback { + struct dt_txn_commit_cb lncc_cb; + struct obd_export *lncc_exp; +}; + +void lut_cb_new_client(struct lu_env *env, struct thandle *th, + struct dt_txn_commit_cb *cb, int err) +{ + struct lut_new_client_callback *ccb; + + ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb); + + LASSERT(ccb->lncc_exp->exp_obd); + + CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n", + ccb->lncc_exp->exp_obd->obd_name, + ccb->lncc_exp->exp_client_uuid.uuid); + + cfs_spin_lock(&ccb->lncc_exp->exp_lock); + ccb->lncc_exp->exp_need_sync = 0; + cfs_spin_unlock(&ccb->lncc_exp->exp_lock); + class_export_cb_put(ccb->lncc_exp); + + OBD_FREE_PTR(ccb); +} + +int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp) +{ + struct lut_new_client_callback *ccb; + struct dt_txn_commit_cb *dcb; + int rc; + + OBD_ALLOC_PTR(ccb); + if (ccb == NULL) + return -ENOMEM; + + ccb->lncc_exp = class_export_cb_get(exp); + + dcb = &ccb->lncc_cb; + dcb->dcb_func = lut_cb_new_client; + CFS_INIT_LIST_HEAD(&dcb->dcb_linkage); + strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN); + dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0'; + + rc = dt_trans_cb_add(th, dcb); + if (rc) { + class_export_cb_put(exp); + OBD_FREE_PTR(ccb); + } + return rc; +} + +/** + * Add new client to the last_rcvd upon new connection. + * + * We use a bitmap to locate a free space in the last_rcvd file and initialize + * tg_export_data. + */ +int lut_client_new(const struct lu_env *env, struct obd_export *exp) +{ + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *tg = class_exp2tgt(exp); + int rc = 0, idx; + + ENTRY; + + LASSERT(tg->lut_client_bitmap != NULL); + if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid)) + RETURN(0); + + cfs_mutex_init(&ted->ted_lcd_lock); + + if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0) + RETURN(0); + + /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so + * there's no need for extra complication here + */ + idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS); +repeat: + if (idx >= LR_MAX_CLIENTS || + OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) { + CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n", + tg->lut_obd->obd_name, idx); + RETURN(-EOVERFLOW); + } + if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) { + idx = cfs_find_next_zero_bit(tg->lut_client_bitmap, + LR_MAX_CLIENTS, idx); + goto repeat; + } + + CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n", + tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid); + + ted->ted_lr_idx = idx; + ted->ted_lr_off = tg->lut_lsd.lsd_client_start + + idx * tg->lut_lsd.lsd_client_size; + + LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off); + + CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n", + tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off, + ted->ted_lcd->lcd_uuid); + + if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD)) + RETURN(-ENOSPC); + + rc = lut_client_data_update(env, exp); + if (rc) + CERROR("%s: Failed to write client lcd at idx %d, rc %d\n", + tg->lut_obd->obd_name, idx, rc); + + RETURN(rc); +} +EXPORT_SYMBOL(lut_client_new); + +/* Add client data to the MDS. We use a bitmap to locate a free space + * in the last_rcvd file if cl_off is -1 (i.e. a new client). + * Otherwise, we just have to read the data from the last_rcvd file and + * we know its offset. + * + * It should not be possible to fail adding an existing client - otherwise + * mdt_init_server_data() callsite needs to be fixed. + */ +int lut_client_add(const struct lu_env *env, struct obd_export *exp, int idx) +{ + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *tg = class_exp2tgt(exp); + + ENTRY; + + LASSERT(tg->lut_client_bitmap != NULL); + LASSERTF(idx >= 0, "%d\n", idx); + + if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid) || + (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0) + RETURN(0); + + if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) { + CERROR("%s: client %d: bit already set in bitmap!!\n", + tg->lut_obd->obd_name, idx); + LBUG(); + } + + CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n", + tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid); + + ted->ted_lr_idx = idx; + ted->ted_lr_off = tg->lut_lsd.lsd_client_start + + idx * tg->lut_lsd.lsd_client_size; + + cfs_mutex_init(&ted->ted_lcd_lock); + + LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off); + + RETURN(0); } -EXPORT_SYMBOL(lut_cb_last_committed); +EXPORT_SYMBOL(lut_client_add); -void lut_cb_client(struct lu_target *lut, __u64 transno, - void *data, int err) +int lut_client_del(const struct lu_env *env, struct obd_export *exp) { - LASSERT(lut->lut_obd); - target_client_add_cb(lut->lut_obd, transno, data, err); + struct tg_export_data *ted = &exp->exp_target_data; + struct lu_target *tg = class_exp2tgt(exp); + int rc; + + ENTRY; + + LASSERT(ted->ted_lcd); + + /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ + if (!strcmp((char *)ted->ted_lcd->lcd_uuid, + (char *)tg->lut_obd->obd_uuid.uuid) || + (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0) + RETURN(0); + + CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n", + tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off, + ted->ted_lcd->lcd_uuid); + + /* Clear the bit _after_ zeroing out the client so we don't + race with filter_client_add and zero out new clients.*/ + if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) { + CERROR("%s: client %u: bit already clear in bitmap!!\n", + tg->lut_obd->obd_name, ted->ted_lr_idx); + LBUG(); + } + + /* Do not erase record for recoverable client. */ + if (exp->exp_flags & OBD_OPT_FAILOVER) + RETURN(0); + + /* Make sure the server's last_transno is up to date. + * This should be done before zeroing client slot so last_transno will + * be in server data or in client data in case of failure */ + rc = lut_server_data_update(env, tg, 0); + if (rc != 0) { + CERROR("%s: failed to update server data, skip client %s " + "zeroing, rc %d\n", tg->lut_obd->obd_name, + ted->ted_lcd->lcd_uuid, rc); + RETURN(rc); + } + + cfs_mutex_lock(&ted->ted_lcd_lock); + memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid); + rc = lut_client_data_update(env, exp); + cfs_mutex_unlock(&ted->ted_lcd_lock); + + CDEBUG(rc == 0 ? D_INFO : D_ERROR, + "%s: zeroing out client %s at idx %u (%llu), rc %d\n", + tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid, + ted->ted_lr_idx, ted->ted_lr_off, rc); + RETURN(rc); } -EXPORT_SYMBOL(lut_cb_client); +EXPORT_SYMBOL(lut_client_del); int lut_init(const struct lu_env *env, struct lu_target *lut, - struct obd_device *obd, struct dt_device *dt) + struct obd_device *obd, struct dt_device *dt) { - struct lu_fid fid; - struct dt_object *o; - int rc = 0; + struct dt_object_format dof; + struct lu_attr attr; + struct lu_fid fid; + struct dt_object *o; + int rc = 0; ENTRY; LASSERT(lut); @@ -386,7 +818,6 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, obd->u.obt.obt_lut = lut; cfs_spin_lock_init(&lut->lut_translock); - cfs_spin_lock_init(&lut->lut_client_bitmap_lock); OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3); if (lut->lut_client_bitmap == NULL) @@ -395,7 +826,15 @@ int lut_init(const struct lu_env *env, struct lu_target *lut, /** obdfilter has no lu_device stack yet */ if (dt == NULL) RETURN(rc); - o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid); + + memset(&attr, 0, sizeof(attr)); + attr.la_valid = LA_MODE; + attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; + dof.dof_type = dt_mode_to_dft(S_IFREG); + + lu_local_obj_fid(&fid, MDT_LAST_RECV_OID); + + o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr); if (!IS_ERR(o)) { lut->lut_last_rcvd = o; } else { @@ -424,3 +863,23 @@ void lut_fini(const struct lu_env *env, struct lu_target *lut) EXIT; } EXPORT_SYMBOL(lut_fini); + +/* context key constructor/destructor: tg_key_init, tg_key_fini */ +LU_KEY_INIT_FINI(tg, struct tg_thread_info); +/* context key: tg_thread_key */ +LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD); +LU_KEY_INIT_GENERIC(tg); +EXPORT_SYMBOL(tg_thread_key); + +int lut_mod_init(void) +{ + tg_key_init_generic(&tg_thread_key, NULL); + lu_context_key_register_many(&tg_thread_key, NULL); + return 0; +} + +void lut_mod_exit(void) +{ + lu_context_key_degister_many(&tg_thread_key, NULL); +} +