1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Lustre Common Target
37 * These are common function for MDT and OST recovery-related functionality
39 * Author: Mikhail Pershin <tappro@sun.com>
43 #include <lustre_fsfilt.h>
46 * Update client data in last_rcvd file. An obd API
48 static int obt_client_data_update(struct obd_export *exp)
50 struct tg_export_data *ted = &exp->exp_target_data;
51 struct obd_device_target *obt = &exp->exp_obd->u.obt;
52 struct lu_target *lut = class_exp2tgt(exp);
53 loff_t off = ted->ted_lr_off;
56 rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
57 ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
59 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
60 ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
61 le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
67 * Update server data in last_rcvd file. An obd API
69 int obt_server_data_update(struct lu_target *lut, int force_sync)
71 struct obd_device_target *obt = &lut->lut_obd->u.obt;
77 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
78 lut->lut_lsd.lsd_uuid,
79 le64_to_cpu(lut->lut_lsd.lsd_mount_count),
80 le64_to_cpu(lut->lut_lsd.lsd_last_transno));
82 rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
83 &lut->lut_lsd, sizeof(lut->lut_lsd),
86 CERROR("error writing lr_server_data: rc = %d\n", rc);
92 * Update client epoch with server's one
94 void obt_client_epoch_update(struct obd_export *exp)
96 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
97 struct lu_target *lut = class_exp2tgt(exp);
99 /** VBR: set client last_epoch to current epoch */
100 if (le32_to_cpu(lcd->lcd_last_epoch) >=
101 le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
103 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
104 obt_client_data_update(exp);
108 * Increment server epoch. An obd API
110 static void obt_boot_epoch_update(struct lu_target *lut)
112 struct obd_device *obd = lut->lut_obd;
114 struct ptlrpc_request *req;
115 cfs_list_t client_list;
117 cfs_spin_lock(&lut->lut_translock);
118 start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
119 lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
121 lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
122 cfs_spin_unlock(&lut->lut_translock);
124 CFS_INIT_LIST_HEAD(&client_list);
125 cfs_spin_lock_bh(&obd->obd_processing_task_lock);
126 cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
127 cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
130 * go through list of exports participated in recovery and
131 * set new epoch for them
133 cfs_list_for_each_entry(req, &client_list, rq_list) {
134 LASSERT(!req->rq_export->exp_delayed);
135 obt_client_epoch_update(req->rq_export);
137 /** return list back at once */
138 cfs_spin_lock_bh(&obd->obd_processing_task_lock);
139 cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
140 cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
141 obt_server_data_update(lut, 1);
145 * write data in last_rcvd file.
147 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
148 const struct lu_buf *buf, loff_t *off, int sync)
155 credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
157 txn_param_init(&p, credits);
159 th = dt_trans_start(env, lut->lut_bottom, &p);
163 rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
164 dt_trans_stop(env, lut->lut_bottom, th);
166 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
167 "uuid = %s\nlast_transno = "LPU64"\n",
168 rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
174 * Allocate in-memory data for client slot related to export.
176 int lut_client_alloc(struct obd_export *exp)
178 OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
179 if (exp->exp_target_data.ted_lcd == NULL)
181 /* Mark that slot is not yet valid, 0 doesn't work here */
182 exp->exp_target_data.ted_lr_idx = -1;
185 EXPORT_SYMBOL(lut_client_alloc);
188 * Free in-memory data for client slot related to export.
190 void lut_client_free(struct obd_export *exp)
192 struct tg_export_data *ted = &exp->exp_target_data;
193 struct lu_target *lut = class_exp2tgt(exp);
195 OBD_FREE_PTR(ted->ted_lcd);
198 /* Slot may be not yet assigned */
199 if (ted->ted_lr_idx < 0)
201 /* Clear bit when lcd is freed */
202 cfs_spin_lock(&lut->lut_client_bitmap_lock);
203 if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
204 CERROR("%s: client %u bit already clear in bitmap\n",
205 exp->exp_obd->obd_name, ted->ted_lr_idx);
208 cfs_spin_unlock(&lut->lut_client_bitmap_lock);
210 EXPORT_SYMBOL(lut_client_free);
213 * Update client data in last_rcvd
215 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
217 struct tg_export_data *ted = &exp->exp_target_data;
218 struct lu_target *lut = class_exp2tgt(exp);
219 struct lsd_client_data tmp_lcd;
220 loff_t tmp_off = ted->ted_lr_off;
221 struct lu_buf tmp_buf = {
223 .lb_len = sizeof(tmp_lcd)
227 lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
228 LASSERT(lut->lut_last_rcvd);
229 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
235 * Update server data in last_rcvd
237 static int lut_server_data_update(const struct lu_env *env,
238 struct lu_target *lut, int sync)
240 struct lr_server_data tmp_lsd;
242 struct lu_buf tmp_buf = {
244 .lb_len = sizeof(tmp_lsd)
250 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
251 lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
252 lut->lut_last_transno);
254 cfs_spin_lock(&lut->lut_translock);
255 lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
256 cfs_spin_unlock(&lut->lut_translock);
258 lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
259 if (lut->lut_last_rcvd != NULL)
260 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
264 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
266 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
267 struct lu_target *lut = class_exp2tgt(exp);
269 LASSERT(lut->lut_bottom);
270 /** VBR: set client last_epoch to current epoch */
271 if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
273 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
274 lut_client_data_update(env, exp);
278 * Update boot epoch when recovery ends
280 void lut_boot_epoch_update(struct lu_target *lut)
283 struct ptlrpc_request *req;
285 cfs_list_t client_list;
288 if (lut->lut_obd->obd_stopping)
290 /** Increase server epoch after recovery */
291 if (lut->lut_bottom == NULL)
292 return obt_boot_epoch_update(lut);
294 rc = lu_env_init(&env, LCT_DT_THREAD);
296 CERROR("Can't initialize environment rc=%i\n", rc);
300 cfs_spin_lock(&lut->lut_translock);
301 start_epoch = lr_epoch(lut->lut_last_transno) + 1;
302 lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
303 lut->lut_lsd.lsd_start_epoch = start_epoch;
304 cfs_spin_unlock(&lut->lut_translock);
306 CFS_INIT_LIST_HEAD(&client_list);
308 * The recovery is not yet finished and final queue can still be updated
309 * with resend requests. Move final list to separate one for processing
311 cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
312 cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
313 cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
316 * go through list of exports participated in recovery and
317 * set new epoch for them
319 cfs_list_for_each_entry(req, &client_list, rq_list) {
320 LASSERT(!req->rq_export->exp_delayed);
321 if (!req->rq_export->exp_vbr_failed)
322 lut_client_epoch_update(&env, req->rq_export);
324 /** return list back at once */
325 cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
326 cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
327 cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
328 /** update server epoch */
329 lut_server_data_update(&env, lut, 1);
332 EXPORT_SYMBOL(lut_boot_epoch_update);
335 * commit callback, need to update last_commited value
337 void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
340 struct obd_export *exp = data;
341 LASSERT(exp->exp_obd == lut->lut_obd);
342 cfs_spin_lock(&lut->lut_translock);
343 if (transno > lut->lut_obd->obd_last_committed)
344 lut->lut_obd->obd_last_committed = transno;
347 if (transno > exp->exp_last_committed) {
348 exp->exp_last_committed = transno;
349 cfs_spin_unlock(&lut->lut_translock);
350 ptlrpc_commit_replies(exp);
352 cfs_spin_unlock(&lut->lut_translock);
354 class_export_cb_put(exp);
356 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
357 lut->lut_obd->obd_name, transno);
359 EXPORT_SYMBOL(lut_cb_last_committed);
361 void lut_cb_client(struct lu_target *lut, __u64 transno,
364 LASSERT(lut->lut_obd);
365 target_client_add_cb(lut->lut_obd, transno, data, err);
367 EXPORT_SYMBOL(lut_cb_client);
369 int lut_init(const struct lu_env *env, struct lu_target *lut,
370 struct obd_device *obd, struct dt_device *dt)
380 lut->lut_bottom = dt;
381 lut->lut_last_rcvd = NULL;
382 obd->u.obt.obt_lut = lut;
384 cfs_spin_lock_init(&lut->lut_translock);
385 cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
387 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
388 if (lut->lut_client_bitmap == NULL)
391 /** obdfilter has no lu_device stack yet */
394 o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
396 lut->lut_last_rcvd = o;
398 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
399 lut->lut_client_bitmap = NULL;
401 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
406 EXPORT_SYMBOL(lut_init);
408 void lut_fini(const struct lu_env *env, struct lu_target *lut)
412 if (lut->lut_client_bitmap) {
413 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
414 lut->lut_client_bitmap = NULL;
416 if (lut->lut_last_rcvd) {
417 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
418 lut->lut_last_rcvd = NULL;
422 EXPORT_SYMBOL(lut_fini);