4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Lustre Common Target
37 * These are common function for MDT and OST recovery-related functionality
39 * Author: Mikhail Pershin <tappro@sun.com>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45 #include <lustre_fid.h>
48 * Common data shared by tg-level handlers. This is allocated per-thread to
49 * reduce stack consumption.
51 struct tg_thread_info {
52 /* server and client data buffers */
53 struct lr_server_data tti_lsd;
54 struct lsd_client_data tti_lcd;
55 struct lu_buf tti_buf;
59 static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti)
61 tti->tti_buf.lb_buf = &tti->tti_lsd;
62 tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
66 static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti)
68 tti->tti_buf.lb_buf = &tti->tti_lcd;
69 tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
73 extern struct lu_context_key tg_thread_key;
75 static inline struct tg_thread_info *tg_th_info(const struct lu_env *env)
77 struct tg_thread_info *tti;
79 tti = lu_context_key_get(&env->le_ctx, &tg_thread_key);
85 * Update client data in last_rcvd file. An obd API
87 static int obt_client_data_update(struct obd_export *exp)
89 struct tg_export_data *ted = &exp->exp_target_data;
90 struct obd_device_target *obt = &exp->exp_obd->u.obt;
91 struct lu_target *lut = class_exp2tgt(exp);
92 loff_t off = ted->ted_lr_off;
95 rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
96 ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
98 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
99 ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
100 le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
106 * Update server data in last_rcvd file. An obd API
108 int obt_server_data_update(struct lu_target *lut, int force_sync)
110 struct obd_device_target *obt = &lut->lut_obd->u.obt;
116 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
117 lut->lut_lsd.lsd_uuid,
118 le64_to_cpu(lut->lut_lsd.lsd_mount_count),
119 le64_to_cpu(lut->lut_lsd.lsd_last_transno));
121 rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
122 &lut->lut_lsd, sizeof(lut->lut_lsd),
125 CERROR("error writing lr_server_data: rc = %d\n", rc);
131 * Update client epoch with server's one
133 void obt_client_epoch_update(struct obd_export *exp)
135 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
136 struct lu_target *lut = class_exp2tgt(exp);
138 /** VBR: set client last_epoch to current epoch */
139 if (le32_to_cpu(lcd->lcd_last_epoch) >=
140 le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
142 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
143 obt_client_data_update(exp);
147 * Increment server epoch. An obd API
149 static void obt_boot_epoch_update(struct lu_target *lut)
151 struct obd_device *obd = lut->lut_obd;
153 struct ptlrpc_request *req;
154 cfs_list_t client_list;
156 cfs_spin_lock(&lut->lut_translock);
157 start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
158 lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
160 lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
161 cfs_spin_unlock(&lut->lut_translock);
163 CFS_INIT_LIST_HEAD(&client_list);
164 cfs_spin_lock(&obd->obd_recovery_task_lock);
165 cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
166 cfs_spin_unlock(&obd->obd_recovery_task_lock);
169 * go through list of exports participated in recovery and
170 * set new epoch for them
172 cfs_list_for_each_entry(req, &client_list, rq_list) {
173 LASSERT(!req->rq_export->exp_delayed);
174 obt_client_epoch_update(req->rq_export);
176 /** return list back at once */
177 cfs_spin_lock(&obd->obd_recovery_task_lock);
178 cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
179 cfs_spin_unlock(&obd->obd_recovery_task_lock);
180 obt_server_data_update(lut, 1);
184 * Allocate in-memory data for client slot related to export.
186 int lut_client_alloc(struct obd_export *exp)
188 LASSERT(exp != exp->exp_obd->obd_self_export);
190 OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
191 if (exp->exp_target_data.ted_lcd == NULL)
193 /* Mark that slot is not yet valid, 0 doesn't work here */
194 exp->exp_target_data.ted_lr_idx = -1;
197 EXPORT_SYMBOL(lut_client_alloc);
200 * Free in-memory data for client slot related to export.
202 void lut_client_free(struct obd_export *exp)
204 struct tg_export_data *ted = &exp->exp_target_data;
205 struct lu_target *lut = class_exp2tgt(exp);
207 LASSERT(exp != exp->exp_obd->obd_self_export);
209 OBD_FREE_PTR(ted->ted_lcd);
212 /* Slot may be not yet assigned */
213 if (ted->ted_lr_idx < 0)
215 /* Clear bit when lcd is freed */
216 LASSERT(lut->lut_client_bitmap);
217 if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
218 CERROR("%s: client %u bit already clear in bitmap\n",
219 exp->exp_obd->obd_name, ted->ted_lr_idx);
223 EXPORT_SYMBOL(lut_client_free);
225 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
226 struct lsd_client_data *lcd, loff_t *off, int index)
228 struct tg_thread_info *tti = tg_th_info(env);
232 rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
234 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
235 lcd_le_to_cpu(&tti->tti_lcd, lcd);
238 CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64
239 ", last_xid = "LPU64", last_result = %u, last_data = %u, "
240 "last_close_transno = "LPU64", last_close_xid = "LPU64", "
241 "last_close_result = %u\n", *off,
242 rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
243 lcd->lcd_last_result, lcd->lcd_last_data,
244 lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
245 lcd->lcd_last_close_result);
248 EXPORT_SYMBOL(lut_client_data_read);
250 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
251 struct lsd_client_data *lcd, loff_t *off,
254 struct tg_thread_info *tti = tg_th_info(env);
256 lcd_cpu_to_le(lcd, &tti->tti_lcd);
259 return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
261 EXPORT_SYMBOL(lut_client_data_write);
264 * Update client data in last_rcvd
266 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
268 struct tg_export_data *ted = &exp->exp_target_data;
269 struct lu_target *tg = class_exp2tgt(exp);
270 struct tg_thread_info *tti = tg_th_info(env);
276 th = dt_trans_create(env, tg->lut_bottom);
280 rc = dt_declare_record_write(env, tg->lut_last_rcvd,
281 sizeof(struct lsd_client_data),
282 ted->ted_lr_off, th);
286 rc = dt_trans_start_local(env, tg->lut_bottom, th);
290 * Until this operations will be committed the sync is needed
291 * for this export. This should be done _after_ starting the
292 * transaction so that many connecting clients will not bring
293 * server down with lots of sync writes.
295 rc = lut_new_client_cb_add(th, exp);
297 /* can't add callback, do sync now */
300 cfs_spin_lock(&exp->exp_lock);
301 exp->exp_need_sync = 1;
302 cfs_spin_unlock(&exp->exp_lock);
305 tti->tti_off = ted->ted_lr_off;
306 rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
309 dt_trans_stop(env, tg->lut_bottom, th);
310 CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n"
311 "uuid = %s\nlast_transno = "LPU64"\n",
312 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
317 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
319 struct tg_thread_info *tti = tg_th_info(env);
324 rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
326 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
328 CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, "
329 "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc,
330 tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
333 EXPORT_SYMBOL(lut_server_data_read);
335 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
338 struct tg_thread_info *tti = tg_th_info(env);
344 lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
346 rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
349 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
350 "uuid = %s\nlast_transno = "LPU64"\n",
351 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
355 EXPORT_SYMBOL(lut_server_data_write);
358 * Update server data in last_rcvd
360 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
363 struct tg_thread_info *tti = tg_th_info(env);
370 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
371 tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
372 tg->lut_last_transno);
374 /* Always save latest transno to keep it fresh */
375 cfs_spin_lock(&tg->lut_translock);
376 tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
377 cfs_spin_unlock(&tg->lut_translock);
379 th = dt_trans_create(env, tg->lut_bottom);
385 rc = dt_declare_record_write(env, tg->lut_last_rcvd,
386 sizeof(struct lr_server_data),
391 rc = dt_trans_start(env, tg->lut_bottom, th);
395 rc = lut_server_data_write(env, tg, th);
397 dt_trans_stop(env, tg->lut_bottom, th);
399 CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n"
400 "uuid = %s\nlast_transno = "LPU64"\n",
401 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
404 EXPORT_SYMBOL(lut_server_data_update);
406 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
409 struct dt_object *dt = tg->lut_last_rcvd;
417 attr.la_valid = LA_SIZE;
419 th = dt_trans_create(env, tg->lut_bottom);
422 rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
425 rc = dt_declare_attr_set(env, dt, &attr, th);
428 rc = dt_trans_start_local(env, tg->lut_bottom, th);
432 rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
434 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
437 dt_trans_stop(env, tg->lut_bottom, th);
441 EXPORT_SYMBOL(lut_truncate_last_rcvd);
443 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
445 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
446 struct lu_target *lut = class_exp2tgt(exp);
448 LASSERT(lut->lut_bottom);
449 /** VBR: set client last_epoch to current epoch */
450 if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
452 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
453 lut_client_data_update(env, exp);
457 * Update boot epoch when recovery ends
459 void lut_boot_epoch_update(struct lu_target *lut)
462 struct ptlrpc_request *req;
464 cfs_list_t client_list;
467 if (lut->lut_obd->obd_stopping)
469 /** Increase server epoch after recovery */
470 if (lut->lut_bottom == NULL)
471 return obt_boot_epoch_update(lut);
473 rc = lu_env_init(&env, LCT_DT_THREAD);
475 CERROR("Can't initialize environment rc=%d\n", rc);
479 cfs_spin_lock(&lut->lut_translock);
480 start_epoch = lr_epoch(lut->lut_last_transno) + 1;
481 lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
482 lut->lut_lsd.lsd_start_epoch = start_epoch;
483 cfs_spin_unlock(&lut->lut_translock);
485 CFS_INIT_LIST_HEAD(&client_list);
487 * The recovery is not yet finished and final queue can still be updated
488 * with resend requests. Move final list to separate one for processing
490 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
491 cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
492 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
495 * go through list of exports participated in recovery and
496 * set new epoch for them
498 cfs_list_for_each_entry(req, &client_list, rq_list) {
499 LASSERT(!req->rq_export->exp_delayed);
500 if (!req->rq_export->exp_vbr_failed)
501 lut_client_epoch_update(&env, req->rq_export);
503 /** return list back at once */
504 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
505 cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
506 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
507 /** update server epoch */
508 lut_server_data_update(&env, lut, 1);
511 EXPORT_SYMBOL(lut_boot_epoch_update);
514 * commit callback, need to update last_commited value
516 struct lut_last_committed_callback {
517 struct dt_txn_commit_cb llcc_cb;
518 struct lu_target *llcc_lut;
519 struct obd_export *llcc_exp;
523 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
524 struct dt_txn_commit_cb *cb, int err)
526 struct lut_last_committed_callback *ccb;
528 ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
530 LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
532 cfs_spin_lock(&ccb->llcc_lut->lut_translock);
533 if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
534 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
536 LASSERT(ccb->llcc_exp);
537 if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
538 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
539 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
540 ptlrpc_commit_replies(ccb->llcc_exp);
542 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
544 class_export_cb_put(ccb->llcc_exp);
545 if (ccb->llcc_transno)
546 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
547 ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
551 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
552 struct obd_export *exp, __u64 transno)
554 struct lut_last_committed_callback *ccb;
561 ccb->llcc_cb.dcb_func = lut_cb_last_committed;
562 CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
564 ccb->llcc_exp = class_export_cb_get(exp);
565 ccb->llcc_transno = transno;
567 rc = dt_trans_cb_add(th, &ccb->llcc_cb);
569 class_export_cb_put(exp);
574 EXPORT_SYMBOL(lut_last_commit_cb_add);
576 struct lut_new_client_callback {
577 struct dt_txn_commit_cb lncc_cb;
578 struct obd_export *lncc_exp;
581 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
582 struct dt_txn_commit_cb *cb, int err)
584 struct lut_new_client_callback *ccb;
586 ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
588 LASSERT(ccb->lncc_exp->exp_obd);
590 CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
591 ccb->lncc_exp->exp_obd->obd_name,
592 ccb->lncc_exp->exp_client_uuid.uuid);
594 cfs_spin_lock(&ccb->lncc_exp->exp_lock);
595 ccb->lncc_exp->exp_need_sync = 0;
596 cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
597 class_export_cb_put(ccb->lncc_exp);
602 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
604 struct lut_new_client_callback *ccb;
611 ccb->lncc_cb.dcb_func = lut_cb_new_client;
612 CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
613 ccb->lncc_exp = class_export_cb_get(exp);
615 rc = dt_trans_cb_add(th, &ccb->lncc_cb);
617 class_export_cb_put(exp);
624 * Add new client to the last_rcvd upon new connection.
626 * We use a bitmap to locate a free space in the last_rcvd file and initialize
629 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
631 struct tg_export_data *ted = &exp->exp_target_data;
632 struct lu_target *tg = class_exp2tgt(exp);
637 LASSERT(tg->lut_client_bitmap != NULL);
638 if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
641 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
642 * there's no need for extra complication here
644 idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
646 if (idx >= LR_MAX_CLIENTS ||
647 OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
648 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
649 tg->lut_obd->obd_name, idx);
652 if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
653 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
654 LR_MAX_CLIENTS, idx);
658 CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
659 tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
661 ted->ted_lr_idx = idx;
662 ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
663 idx * tg->lut_lsd.lsd_client_size;
665 cfs_mutex_init(&ted->ted_lcd_lock);
667 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
669 CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
670 tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
671 ted->ted_lcd->lcd_uuid);
673 if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
676 rc = lut_client_data_update(env, exp);
678 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
679 tg->lut_obd->obd_name, idx, rc);
683 EXPORT_SYMBOL(lut_client_new);
685 /* Add client data to the MDS. We use a bitmap to locate a free space
686 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
687 * Otherwise, we just have to read the data from the last_rcvd file and
688 * we know its offset.
690 * It should not be possible to fail adding an existing client - otherwise
691 * mdt_init_server_data() callsite needs to be fixed.
693 int lut_client_add(const struct lu_env *env, struct obd_export *exp, int idx)
695 struct tg_export_data *ted = &exp->exp_target_data;
696 struct lu_target *tg = class_exp2tgt(exp);
700 LASSERT(tg->lut_client_bitmap != NULL);
701 LASSERTF(idx >= 0, "%d\n", idx);
703 if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
706 if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
707 CERROR("%s: client %d: bit already set in bitmap!!\n",
708 tg->lut_obd->obd_name, idx);
712 CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
713 tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
715 ted->ted_lr_idx = idx;
716 ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
717 idx * tg->lut_lsd.lsd_client_size;
719 cfs_mutex_init(&ted->ted_lcd_lock);
721 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
725 EXPORT_SYMBOL(lut_client_add);
727 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
729 struct tg_export_data *ted = &exp->exp_target_data;
730 struct lu_target *tg = class_exp2tgt(exp);
735 LASSERT(ted->ted_lcd);
737 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
738 if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
739 (char *)tg->lut_obd->obd_uuid.uuid))
742 CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
743 tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
744 ted->ted_lcd->lcd_uuid);
746 /* Clear the bit _after_ zeroing out the client so we don't
747 race with filter_client_add and zero out new clients.*/
748 if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
749 CERROR("%s: client %u: bit already clear in bitmap!!\n",
750 tg->lut_obd->obd_name, ted->ted_lr_idx);
754 /* Do not erase record for recoverable client. */
755 if (exp->exp_flags & OBD_OPT_FAILOVER)
758 /* Make sure the server's last_transno is up to date.
759 * This should be done before zeroing client slot so last_transno will
760 * be in server data or in client data in case of failure */
761 rc = lut_server_data_update(env, tg, 0);
763 CERROR("%s: failed to update server data, skip client %s "
764 "zeroing, rc %d\n", tg->lut_obd->obd_name,
765 ted->ted_lcd->lcd_uuid, rc);
769 cfs_mutex_lock(&ted->ted_lcd_lock);
770 memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
771 rc = lut_client_data_update(env, exp);
772 cfs_mutex_unlock(&ted->ted_lcd_lock);
774 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
775 "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
776 tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
777 ted->ted_lr_idx, ted->ted_lr_off, rc);
780 EXPORT_SYMBOL(lut_client_del);
782 int lut_init(const struct lu_env *env, struct lu_target *lut,
783 struct obd_device *obd, struct dt_device *dt)
785 struct dt_object_format dof;
795 lut->lut_bottom = dt;
796 lut->lut_last_rcvd = NULL;
797 obd->u.obt.obt_lut = lut;
799 cfs_spin_lock_init(&lut->lut_translock);
801 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
802 if (lut->lut_client_bitmap == NULL)
805 /** obdfilter has no lu_device stack yet */
809 memset(&attr, 0, sizeof(attr));
810 attr.la_valid = LA_MODE;
811 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
812 dof.dof_type = dt_mode_to_dft(S_IFREG);
814 lu_local_obj_fid(&fid, MDT_LAST_RECV_OID);
816 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
818 lut->lut_last_rcvd = o;
820 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
821 lut->lut_client_bitmap = NULL;
823 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
828 EXPORT_SYMBOL(lut_init);
830 void lut_fini(const struct lu_env *env, struct lu_target *lut)
834 if (lut->lut_client_bitmap) {
835 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
836 lut->lut_client_bitmap = NULL;
838 if (lut->lut_last_rcvd) {
839 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
840 lut->lut_last_rcvd = NULL;
844 EXPORT_SYMBOL(lut_fini);
846 /* context key constructor/destructor: tg_key_init, tg_key_fini */
847 LU_KEY_INIT_FINI(tg, struct tg_thread_info);
848 /* context key: tg_thread_key */
849 LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD);
850 LU_KEY_INIT_GENERIC(tg);
851 EXPORT_SYMBOL(tg_thread_key);
853 int lut_mod_init(void)
855 tg_key_init_generic(&tg_thread_key, NULL);
856 lu_context_key_register_many(&tg_thread_key, NULL);
860 void lut_mod_exit(void)
862 lu_context_key_degister_many(&tg_thread_key, NULL);