4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Lustre Common Target
37 * These are common function for MDT and OST recovery-related functionality
39 * Author: Mikhail Pershin <tappro@sun.com>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45 #include <lustre_fid.h>
48 * Common data shared by tg-level handlers. This is allocated per-thread to
49 * reduce stack consumption.
51 struct tg_thread_info {
52 /* server and client data buffers */
53 struct lr_server_data tti_lsd;
54 struct lsd_client_data tti_lcd;
55 struct lu_buf tti_buf;
59 static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti)
61 tti->tti_buf.lb_buf = &tti->tti_lsd;
62 tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
66 static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti)
68 tti->tti_buf.lb_buf = &tti->tti_lcd;
69 tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
73 extern struct lu_context_key tg_thread_key;
75 static inline struct tg_thread_info *tg_th_info(const struct lu_env *env)
77 struct tg_thread_info *tti;
79 tti = lu_context_key_get(&env->le_ctx, &tg_thread_key);
85 * Update client data in last_rcvd file. An obd API
87 static int obt_client_data_update(struct obd_export *exp)
89 struct tg_export_data *ted = &exp->exp_target_data;
90 struct obd_device_target *obt = &exp->exp_obd->u.obt;
91 struct lu_target *lut = class_exp2tgt(exp);
92 loff_t off = ted->ted_lr_off;
95 rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
96 ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
98 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
99 ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
100 le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
106 * Update server data in last_rcvd file. An obd API
108 int obt_server_data_update(struct lu_target *lut, int force_sync)
110 struct obd_device_target *obt = &lut->lut_obd->u.obt;
116 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
117 lut->lut_lsd.lsd_uuid,
118 le64_to_cpu(lut->lut_lsd.lsd_mount_count),
119 le64_to_cpu(lut->lut_lsd.lsd_last_transno));
121 rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
122 &lut->lut_lsd, sizeof(lut->lut_lsd),
125 CERROR("error writing lr_server_data: rc = %d\n", rc);
131 * Update client epoch with server's one
133 void obt_client_epoch_update(struct obd_export *exp)
135 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
136 struct lu_target *lut = class_exp2tgt(exp);
138 /** VBR: set client last_epoch to current epoch */
139 if (le32_to_cpu(lcd->lcd_last_epoch) >=
140 le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
142 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
143 obt_client_data_update(exp);
147 * Increment server epoch. An obd API
149 static void obt_boot_epoch_update(struct lu_target *lut)
151 struct obd_device *obd = lut->lut_obd;
153 struct ptlrpc_request *req;
154 cfs_list_t client_list;
156 cfs_spin_lock(&lut->lut_translock);
157 start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
158 lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
160 lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
161 cfs_spin_unlock(&lut->lut_translock);
163 CFS_INIT_LIST_HEAD(&client_list);
164 cfs_spin_lock(&obd->obd_recovery_task_lock);
165 cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
166 cfs_spin_unlock(&obd->obd_recovery_task_lock);
169 * go through list of exports participated in recovery and
170 * set new epoch for them
172 cfs_list_for_each_entry(req, &client_list, rq_list) {
173 LASSERT(!req->rq_export->exp_delayed);
174 obt_client_epoch_update(req->rq_export);
176 /** return list back at once */
177 cfs_spin_lock(&obd->obd_recovery_task_lock);
178 cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
179 cfs_spin_unlock(&obd->obd_recovery_task_lock);
180 obt_server_data_update(lut, 1);
184 * Allocate in-memory data for client slot related to export.
186 int lut_client_alloc(struct obd_export *exp)
188 LASSERT(exp != exp->exp_obd->obd_self_export);
190 OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
191 if (exp->exp_target_data.ted_lcd == NULL)
193 /* Mark that slot is not yet valid, 0 doesn't work here */
194 exp->exp_target_data.ted_lr_idx = -1;
197 EXPORT_SYMBOL(lut_client_alloc);
200 * Free in-memory data for client slot related to export.
202 void lut_client_free(struct obd_export *exp)
204 struct tg_export_data *ted = &exp->exp_target_data;
205 struct lu_target *lut = class_exp2tgt(exp);
207 LASSERT(exp != exp->exp_obd->obd_self_export);
209 OBD_FREE_PTR(ted->ted_lcd);
212 /* Slot may be not yet assigned */
213 if (ted->ted_lr_idx < 0)
215 /* Clear bit when lcd is freed */
216 LASSERT(lut->lut_client_bitmap);
217 if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
218 CERROR("%s: client %u bit already clear in bitmap\n",
219 exp->exp_obd->obd_name, ted->ted_lr_idx);
223 EXPORT_SYMBOL(lut_client_free);
225 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
226 struct lsd_client_data *lcd, loff_t *off, int index)
228 struct tg_thread_info *tti = tg_th_info(env);
232 rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
234 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
235 lcd_le_to_cpu(&tti->tti_lcd, lcd);
238 CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64
239 ", last_xid = "LPU64", last_result = %u, last_data = %u, "
240 "last_close_transno = "LPU64", last_close_xid = "LPU64", "
241 "last_close_result = %u\n", *off,
242 rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
243 lcd->lcd_last_result, lcd->lcd_last_data,
244 lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
245 lcd->lcd_last_close_result);
248 EXPORT_SYMBOL(lut_client_data_read);
250 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
251 struct lsd_client_data *lcd, loff_t *off,
254 struct tg_thread_info *tti = tg_th_info(env);
256 lcd_cpu_to_le(lcd, &tti->tti_lcd);
259 return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
261 EXPORT_SYMBOL(lut_client_data_write);
264 * Update client data in last_rcvd
266 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
268 struct tg_export_data *ted = &exp->exp_target_data;
269 struct lu_target *tg = class_exp2tgt(exp);
270 struct tg_thread_info *tti = tg_th_info(env);
276 th = dt_trans_create(env, tg->lut_bottom);
280 rc = dt_declare_record_write(env, tg->lut_last_rcvd,
281 sizeof(struct lsd_client_data),
282 ted->ted_lr_off, th);
286 rc = dt_trans_start_local(env, tg->lut_bottom, th);
290 * Until this operations will be committed the sync is needed
291 * for this export. This should be done _after_ starting the
292 * transaction so that many connecting clients will not bring
293 * server down with lots of sync writes.
295 rc = lut_new_client_cb_add(th, exp);
297 /* can't add callback, do sync now */
300 cfs_spin_lock(&exp->exp_lock);
301 exp->exp_need_sync = 1;
302 cfs_spin_unlock(&exp->exp_lock);
305 tti->tti_off = ted->ted_lr_off;
306 rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
309 dt_trans_stop(env, tg->lut_bottom, th);
310 CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n"
311 "uuid = %s\nlast_transno = "LPU64"\n",
312 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
317 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
319 struct tg_thread_info *tti = tg_th_info(env);
324 rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
326 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
328 CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, "
329 "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc,
330 tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
333 EXPORT_SYMBOL(lut_server_data_read);
335 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
338 struct tg_thread_info *tti = tg_th_info(env);
344 lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
346 rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
349 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
350 "uuid = %s\nlast_transno = "LPU64"\n",
351 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
355 EXPORT_SYMBOL(lut_server_data_write);
358 * Update server data in last_rcvd
360 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
363 struct tg_thread_info *tti = tg_th_info(env);
370 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
371 tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
372 tg->lut_last_transno);
374 /* Always save latest transno to keep it fresh */
375 cfs_spin_lock(&tg->lut_translock);
376 tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
377 cfs_spin_unlock(&tg->lut_translock);
379 th = dt_trans_create(env, tg->lut_bottom);
385 rc = dt_declare_record_write(env, tg->lut_last_rcvd,
386 sizeof(struct lr_server_data),
391 rc = dt_trans_start(env, tg->lut_bottom, th);
395 rc = lut_server_data_write(env, tg, th);
397 dt_trans_stop(env, tg->lut_bottom, th);
399 CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n"
400 "uuid = %s\nlast_transno = "LPU64"\n",
401 rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
404 EXPORT_SYMBOL(lut_server_data_update);
406 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
409 struct dt_object *dt = tg->lut_last_rcvd;
417 attr.la_valid = LA_SIZE;
419 th = dt_trans_create(env, tg->lut_bottom);
422 rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
425 rc = dt_declare_attr_set(env, dt, &attr, th);
428 rc = dt_trans_start_local(env, tg->lut_bottom, th);
432 rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
434 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
437 dt_trans_stop(env, tg->lut_bottom, th);
441 EXPORT_SYMBOL(lut_truncate_last_rcvd);
443 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
445 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
446 struct lu_target *lut = class_exp2tgt(exp);
448 LASSERT(lut->lut_bottom);
449 /** VBR: set client last_epoch to current epoch */
450 if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
452 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
453 lut_client_data_update(env, exp);
457 * Update boot epoch when recovery ends
459 void lut_boot_epoch_update(struct lu_target *lut)
462 struct ptlrpc_request *req;
464 cfs_list_t client_list;
467 if (lut->lut_obd->obd_stopping)
469 /** Increase server epoch after recovery */
470 if (lut->lut_bottom == NULL)
471 return obt_boot_epoch_update(lut);
473 rc = lu_env_init(&env, LCT_DT_THREAD);
475 CERROR("Can't initialize environment rc=%d\n", rc);
479 cfs_spin_lock(&lut->lut_translock);
480 start_epoch = lr_epoch(lut->lut_last_transno) + 1;
481 lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
482 lut->lut_lsd.lsd_start_epoch = start_epoch;
483 cfs_spin_unlock(&lut->lut_translock);
485 CFS_INIT_LIST_HEAD(&client_list);
487 * The recovery is not yet finished and final queue can still be updated
488 * with resend requests. Move final list to separate one for processing
490 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
491 cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
492 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
495 * go through list of exports participated in recovery and
496 * set new epoch for them
498 cfs_list_for_each_entry(req, &client_list, rq_list) {
499 LASSERT(!req->rq_export->exp_delayed);
500 if (!req->rq_export->exp_vbr_failed)
501 lut_client_epoch_update(&env, req->rq_export);
503 /** return list back at once */
504 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
505 cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
506 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
507 /** update server epoch */
508 lut_server_data_update(&env, lut, 1);
511 EXPORT_SYMBOL(lut_boot_epoch_update);
514 * commit callback, need to update last_commited value
516 struct lut_last_committed_callback {
517 struct dt_txn_commit_cb llcc_cb;
518 struct lu_target *llcc_lut;
519 struct obd_export *llcc_exp;
523 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
524 struct dt_txn_commit_cb *cb, int err)
526 struct lut_last_committed_callback *ccb;
528 ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
530 LASSERT(ccb->llcc_lut != NULL);
531 LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
533 cfs_spin_lock(&ccb->llcc_lut->lut_translock);
534 if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
535 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
537 LASSERT(ccb->llcc_exp);
538 if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
539 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
540 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
541 ptlrpc_commit_replies(ccb->llcc_exp);
543 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
545 class_export_cb_put(ccb->llcc_exp);
546 if (ccb->llcc_transno)
547 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
548 ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
552 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
553 struct obd_export *exp, __u64 transno)
555 struct lut_last_committed_callback *ccb;
556 struct dt_txn_commit_cb *dcb;
564 ccb->llcc_exp = class_export_cb_get(exp);
565 ccb->llcc_transno = transno;
568 dcb->dcb_func = lut_cb_last_committed;
569 CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
570 strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
571 dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
573 rc = dt_trans_cb_add(th, dcb);
575 class_export_cb_put(exp);
579 if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
580 /* report failure to force synchronous operation */
585 EXPORT_SYMBOL(lut_last_commit_cb_add);
587 struct lut_new_client_callback {
588 struct dt_txn_commit_cb lncc_cb;
589 struct obd_export *lncc_exp;
592 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
593 struct dt_txn_commit_cb *cb, int err)
595 struct lut_new_client_callback *ccb;
597 ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
599 LASSERT(ccb->lncc_exp->exp_obd);
601 CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
602 ccb->lncc_exp->exp_obd->obd_name,
603 ccb->lncc_exp->exp_client_uuid.uuid);
605 cfs_spin_lock(&ccb->lncc_exp->exp_lock);
606 ccb->lncc_exp->exp_need_sync = 0;
607 cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
608 class_export_cb_put(ccb->lncc_exp);
613 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
615 struct lut_new_client_callback *ccb;
616 struct dt_txn_commit_cb *dcb;
623 ccb->lncc_exp = class_export_cb_get(exp);
626 dcb->dcb_func = lut_cb_new_client;
627 CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
628 strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN);
629 dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
631 rc = dt_trans_cb_add(th, dcb);
633 class_export_cb_put(exp);
640 * Add new client to the last_rcvd upon new connection.
642 * We use a bitmap to locate a free space in the last_rcvd file and initialize
645 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
647 struct tg_export_data *ted = &exp->exp_target_data;
648 struct lu_target *tg = class_exp2tgt(exp);
653 LASSERT(tg->lut_client_bitmap != NULL);
654 if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
657 cfs_mutex_init(&ted->ted_lcd_lock);
659 if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
662 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
663 * there's no need for extra complication here
665 idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
667 if (idx >= LR_MAX_CLIENTS ||
668 OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
669 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
670 tg->lut_obd->obd_name, idx);
673 if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
674 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
675 LR_MAX_CLIENTS, idx);
679 CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
680 tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
682 ted->ted_lr_idx = idx;
683 ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
684 idx * tg->lut_lsd.lsd_client_size;
686 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
688 CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
689 tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
690 ted->ted_lcd->lcd_uuid);
692 if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
695 rc = lut_client_data_update(env, exp);
697 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
698 tg->lut_obd->obd_name, idx, rc);
702 EXPORT_SYMBOL(lut_client_new);
704 /* Add client data to the MDS. We use a bitmap to locate a free space
705 * in the last_rcvd file if cl_off is -1 (i.e. a new client).
706 * Otherwise, we just have to read the data from the last_rcvd file and
707 * we know its offset.
709 * It should not be possible to fail adding an existing client - otherwise
710 * mdt_init_server_data() callsite needs to be fixed.
712 int lut_client_add(const struct lu_env *env, struct obd_export *exp, int idx)
714 struct tg_export_data *ted = &exp->exp_target_data;
715 struct lu_target *tg = class_exp2tgt(exp);
719 LASSERT(tg->lut_client_bitmap != NULL);
720 LASSERTF(idx >= 0, "%d\n", idx);
722 if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid) ||
723 (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
726 if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
727 CERROR("%s: client %d: bit already set in bitmap!!\n",
728 tg->lut_obd->obd_name, idx);
732 CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
733 tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
735 ted->ted_lr_idx = idx;
736 ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
737 idx * tg->lut_lsd.lsd_client_size;
739 cfs_mutex_init(&ted->ted_lcd_lock);
741 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
745 EXPORT_SYMBOL(lut_client_add);
747 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
749 struct tg_export_data *ted = &exp->exp_target_data;
750 struct lu_target *tg = class_exp2tgt(exp);
755 LASSERT(ted->ted_lcd);
757 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
758 if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
759 (char *)tg->lut_obd->obd_uuid.uuid) ||
760 (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
763 CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
764 tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
765 ted->ted_lcd->lcd_uuid);
767 /* Clear the bit _after_ zeroing out the client so we don't
768 race with filter_client_add and zero out new clients.*/
769 if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
770 CERROR("%s: client %u: bit already clear in bitmap!!\n",
771 tg->lut_obd->obd_name, ted->ted_lr_idx);
775 /* Do not erase record for recoverable client. */
776 if (exp->exp_flags & OBD_OPT_FAILOVER)
779 /* Make sure the server's last_transno is up to date.
780 * This should be done before zeroing client slot so last_transno will
781 * be in server data or in client data in case of failure */
782 rc = lut_server_data_update(env, tg, 0);
784 CERROR("%s: failed to update server data, skip client %s "
785 "zeroing, rc %d\n", tg->lut_obd->obd_name,
786 ted->ted_lcd->lcd_uuid, rc);
790 cfs_mutex_lock(&ted->ted_lcd_lock);
791 memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
792 rc = lut_client_data_update(env, exp);
793 cfs_mutex_unlock(&ted->ted_lcd_lock);
795 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
796 "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
797 tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
798 ted->ted_lr_idx, ted->ted_lr_off, rc);
801 EXPORT_SYMBOL(lut_client_del);
803 int lut_init(const struct lu_env *env, struct lu_target *lut,
804 struct obd_device *obd, struct dt_device *dt)
806 struct dt_object_format dof;
816 lut->lut_bottom = dt;
817 lut->lut_last_rcvd = NULL;
818 obd->u.obt.obt_lut = lut;
820 cfs_spin_lock_init(&lut->lut_translock);
822 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
823 if (lut->lut_client_bitmap == NULL)
826 /** obdfilter has no lu_device stack yet */
830 memset(&attr, 0, sizeof(attr));
831 attr.la_valid = LA_MODE;
832 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
833 dof.dof_type = dt_mode_to_dft(S_IFREG);
835 lu_local_obj_fid(&fid, MDT_LAST_RECV_OID);
837 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
839 lut->lut_last_rcvd = o;
841 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
842 lut->lut_client_bitmap = NULL;
844 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
849 EXPORT_SYMBOL(lut_init);
851 void lut_fini(const struct lu_env *env, struct lu_target *lut)
855 if (lut->lut_client_bitmap) {
856 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
857 lut->lut_client_bitmap = NULL;
859 if (lut->lut_last_rcvd) {
860 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
861 lut->lut_last_rcvd = NULL;
865 EXPORT_SYMBOL(lut_fini);
867 /* context key constructor/destructor: tg_key_init, tg_key_fini */
868 LU_KEY_INIT_FINI(tg, struct tg_thread_info);
869 /* context key: tg_thread_key */
870 LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD);
871 LU_KEY_INIT_GENERIC(tg);
872 EXPORT_SYMBOL(tg_thread_key);
874 int lut_mod_init(void)
876 tg_key_init_generic(&tg_thread_key, NULL);
877 lu_context_key_register_many(&tg_thread_key, NULL);
881 void lut_mod_exit(void)
883 lu_context_key_degister_many(&tg_thread_key, NULL);