1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2011, 2012, Whamcloud, Inc.
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
38 * Lustre Common Target
39 * These are common function for MDT and OST recovery-related functionality
41 * Author: Mikhail Pershin <tappro@sun.com>
45 #include <lustre_fsfilt.h>
46 #include <obd_class.h>
49 * Update client data in last_rcvd file. An obd API
51 static int obt_client_data_update(struct obd_export *exp)
53 struct tg_export_data *ted = &exp->exp_target_data;
54 struct obd_device_target *obt = &exp->exp_obd->u.obt;
55 struct lu_target *lut = class_exp2tgt(exp);
56 loff_t off = ted->ted_lr_off;
59 rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
60 ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
62 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
63 ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
64 le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
70 * Update server data in last_rcvd file. An obd API
72 int obt_server_data_update(struct lu_target *lut, int force_sync)
74 struct obd_device_target *obt = &lut->lut_obd->u.obt;
80 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
81 lut->lut_lsd.lsd_uuid,
82 le64_to_cpu(lut->lut_lsd.lsd_mount_count),
83 le64_to_cpu(lut->lut_lsd.lsd_last_transno));
85 rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
86 &lut->lut_lsd, sizeof(lut->lut_lsd),
89 CERROR("error writing lr_server_data: rc = %d\n", rc);
95 * Update client epoch with server's one
97 void obt_client_epoch_update(struct obd_export *exp)
99 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
100 struct lu_target *lut = class_exp2tgt(exp);
102 /** VBR: set client last_epoch to current epoch */
103 if (le32_to_cpu(lcd->lcd_last_epoch) >=
104 le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
106 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
107 obt_client_data_update(exp);
111 * Increment server epoch. An obd API
113 static void obt_boot_epoch_update(struct lu_target *lut)
115 struct obd_device *obd = lut->lut_obd;
117 struct ptlrpc_request *req;
118 cfs_list_t client_list;
120 cfs_spin_lock(&lut->lut_translock);
121 start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
122 lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
124 lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
125 cfs_spin_unlock(&lut->lut_translock);
127 CFS_INIT_LIST_HEAD(&client_list);
128 cfs_spin_lock(&obd->obd_recovery_task_lock);
129 cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
130 cfs_spin_unlock(&obd->obd_recovery_task_lock);
133 * go through list of exports participated in recovery and
134 * set new epoch for them
136 cfs_list_for_each_entry(req, &client_list, rq_list) {
137 LASSERT(!req->rq_export->exp_delayed);
138 obt_client_epoch_update(req->rq_export);
140 /** return list back at once */
141 cfs_spin_lock(&obd->obd_recovery_task_lock);
142 cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
143 cfs_spin_unlock(&obd->obd_recovery_task_lock);
144 obt_server_data_update(lut, 1);
148 * write data in last_rcvd file.
150 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
151 const struct lu_buf *buf, loff_t *off, int sync)
157 th = dt_trans_create(env, lut->lut_bottom);
161 rc = dt_declare_record_write(env, lut->lut_last_rcvd,
162 buf->lb_len, *off, th);
166 rc = dt_trans_start(env, lut->lut_bottom, th);
170 rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
173 dt_trans_stop(env, lut->lut_bottom, th);
175 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
176 "uuid = %s\nlast_transno = "LPU64"\n",
177 rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
183 * Allocate in-memory data for client slot related to export.
185 int lut_client_alloc(struct obd_export *exp)
187 LASSERT(exp != exp->exp_obd->obd_self_export);
189 OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
190 if (exp->exp_target_data.ted_lcd == NULL)
192 /* Mark that slot is not yet valid, 0 doesn't work here */
193 exp->exp_target_data.ted_lr_idx = -1;
196 EXPORT_SYMBOL(lut_client_alloc);
199 * Free in-memory data for client slot related to export.
201 void lut_client_free(struct obd_export *exp)
203 struct tg_export_data *ted = &exp->exp_target_data;
204 struct lu_target *lut = class_exp2tgt(exp);
206 LASSERT(exp != exp->exp_obd->obd_self_export);
208 OBD_FREE_PTR(ted->ted_lcd);
211 /* Slot may be not yet assigned */
212 if (ted->ted_lr_idx < 0)
214 /* Clear bit when lcd is freed */
215 cfs_spin_lock(&lut->lut_client_bitmap_lock);
216 if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
217 CERROR("%s: client %u bit already clear in bitmap\n",
218 exp->exp_obd->obd_name, ted->ted_lr_idx);
221 cfs_spin_unlock(&lut->lut_client_bitmap_lock);
223 EXPORT_SYMBOL(lut_client_free);
226 * Update client data in last_rcvd
228 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
230 struct tg_export_data *ted = &exp->exp_target_data;
231 struct lu_target *lut = class_exp2tgt(exp);
232 struct lsd_client_data tmp_lcd;
233 loff_t tmp_off = ted->ted_lr_off;
234 struct lu_buf tmp_buf = {
236 .lb_len = sizeof(tmp_lcd)
240 lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
241 LASSERT(lut->lut_last_rcvd);
242 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
248 * Update server data in last_rcvd
250 int lut_server_data_update(const struct lu_env *env,
251 struct lu_target *lut, int sync)
253 struct lr_server_data tmp_lsd;
255 struct lu_buf tmp_buf = {
257 .lb_len = sizeof(tmp_lsd)
263 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
264 lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
265 lut->lut_last_transno);
267 cfs_spin_lock(&lut->lut_translock);
268 lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
269 cfs_spin_unlock(&lut->lut_translock);
271 lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
272 if (lut->lut_last_rcvd != NULL)
273 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
277 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
279 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
280 struct lu_target *lut = class_exp2tgt(exp);
282 LASSERT(lut->lut_bottom);
283 /** VBR: set client last_epoch to current epoch */
284 if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
286 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
287 lut_client_data_update(env, exp);
291 * Update boot epoch when recovery ends
293 void lut_boot_epoch_update(struct lu_target *lut)
296 struct ptlrpc_request *req;
298 cfs_list_t client_list;
301 if (lut->lut_obd->obd_stopping)
303 /** Increase server epoch after recovery */
304 if (lut->lut_bottom == NULL)
305 return obt_boot_epoch_update(lut);
307 rc = lu_env_init(&env, LCT_DT_THREAD);
309 CERROR("Can't initialize environment rc=%d\n", rc);
313 cfs_spin_lock(&lut->lut_translock);
314 start_epoch = lr_epoch(lut->lut_last_transno) + 1;
315 lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
316 lut->lut_lsd.lsd_start_epoch = start_epoch;
317 cfs_spin_unlock(&lut->lut_translock);
319 CFS_INIT_LIST_HEAD(&client_list);
321 * The recovery is not yet finished and final queue can still be updated
322 * with resend requests. Move final list to separate one for processing
324 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
325 cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
326 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
329 * go through list of exports participated in recovery and
330 * set new epoch for them
332 cfs_list_for_each_entry(req, &client_list, rq_list) {
333 LASSERT(!req->rq_export->exp_delayed);
334 if (!req->rq_export->exp_vbr_failed)
335 lut_client_epoch_update(&env, req->rq_export);
337 /** return list back at once */
338 cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
339 cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
340 cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
341 /** update server epoch */
342 lut_server_data_update(&env, lut, 1);
345 EXPORT_SYMBOL(lut_boot_epoch_update);
348 * commit callback, need to update last_commited value
350 struct lut_last_committed_callback {
351 struct dt_txn_commit_cb llcc_cb;
352 struct lu_target *llcc_lut;
353 struct obd_export *llcc_exp;
357 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
358 struct dt_txn_commit_cb *cb, int err)
360 struct lut_last_committed_callback *ccb;
362 ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
364 LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
366 cfs_spin_lock(&ccb->llcc_lut->lut_translock);
367 if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
368 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
370 LASSERT(ccb->llcc_exp);
371 if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
372 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
373 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
374 ptlrpc_commit_replies(ccb->llcc_exp);
376 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
378 class_export_cb_put(ccb->llcc_exp);
379 if (ccb->llcc_transno)
380 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
381 ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
382 cfs_list_del(&ccb->llcc_cb.dcb_linkage);
386 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
387 struct obd_export *exp, __u64 transno)
389 struct lut_last_committed_callback *ccb;
396 ccb->llcc_cb.dcb_func = lut_cb_last_committed;
397 CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
399 ccb->llcc_exp = class_export_cb_get(exp);
400 ccb->llcc_transno = transno;
402 rc = dt_trans_cb_add(th, &ccb->llcc_cb);
404 class_export_cb_put(exp);
409 EXPORT_SYMBOL(lut_last_commit_cb_add);
411 struct lut_new_client_callback {
412 struct dt_txn_commit_cb lncc_cb;
413 struct obd_export *lncc_exp;
416 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
417 struct dt_txn_commit_cb *cb, int err)
419 struct lut_new_client_callback *ccb;
421 ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
423 LASSERT(ccb->lncc_exp->exp_obd);
425 CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
426 ccb->lncc_exp->exp_obd->obd_name,
427 ccb->lncc_exp->exp_client_uuid.uuid);
429 cfs_spin_lock(&ccb->lncc_exp->exp_lock);
430 ccb->lncc_exp->exp_need_sync = 0;
431 cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
432 class_export_cb_put(ccb->lncc_exp);
434 cfs_list_del(&ccb->lncc_cb.dcb_linkage);
438 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
440 struct lut_new_client_callback *ccb;
447 ccb->lncc_cb.dcb_func = lut_cb_new_client;
448 CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
449 ccb->lncc_exp = class_export_cb_get(exp);
451 rc = dt_trans_cb_add(th, &ccb->lncc_cb);
453 class_export_cb_put(exp);
458 EXPORT_SYMBOL(lut_new_client_cb_add);
460 int lut_init(const struct lu_env *env, struct lu_target *lut,
461 struct obd_device *obd, struct dt_device *dt)
471 lut->lut_bottom = dt;
472 lut->lut_last_rcvd = NULL;
473 obd->u.obt.obt_lut = lut;
475 cfs_spin_lock_init(&lut->lut_translock);
476 cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
478 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
479 if (lut->lut_client_bitmap == NULL)
482 /** obdfilter has no lu_device stack yet */
485 o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
487 lut->lut_last_rcvd = o;
489 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
490 lut->lut_client_bitmap = NULL;
492 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
497 EXPORT_SYMBOL(lut_init);
499 void lut_fini(const struct lu_env *env, struct lu_target *lut)
503 if (lut->lut_client_bitmap) {
504 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
505 lut->lut_client_bitmap = NULL;
507 if (lut->lut_last_rcvd) {
508 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
509 lut->lut_last_rcvd = NULL;
513 EXPORT_SYMBOL(lut_fini);