1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * Lustre Common Target
37 * These are common function for MDT and OST recovery-related functionality
39 * Author: Mikhail Pershin <tappro@sun.com>
43 #include <lustre_fsfilt.h>
45 * Update client data in last_rcvd file. An obd API
47 static int obt_client_data_update(struct obd_export *exp)
49 struct lu_export_data *led = &exp->exp_target_data;
50 struct obd_device_target *obt = &exp->exp_obd->u.obt;
51 loff_t off = led->led_lr_off;
54 rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
55 led->led_lcd, sizeof(*led->led_lcd), &off, 0);
57 CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
58 led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
59 le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
65 * Update server data in last_rcvd file. An obd API
67 int obt_server_data_update(struct obd_device *obd, int force_sync)
69 struct obd_device_target *obt = &obd->u.obt;
75 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
76 obt->obt_lsd->lsd_uuid,
77 le64_to_cpu(obt->obt_lsd->lsd_mount_count),
78 le64_to_cpu(obt->obt_lsd->lsd_last_transno));
80 rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
81 sizeof(*obt->obt_lsd), &off, force_sync);
83 CERROR("error writing lr_server_data: rc = %d\n", rc);
89 * Update client epoch with server's one
91 void obt_client_epoch_update(struct obd_export *exp)
93 struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
94 struct obd_device_target *obt = &exp->exp_obd->u.obt;
96 /** VBR: set client last_epoch to current epoch */
97 if (le32_to_cpu(lcd->lcd_last_epoch) >=
98 le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
100 lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
101 obt_client_data_update(exp);
105 * Increment server epoch. An obd API
107 static void obt_boot_epoch_update(struct obd_device *obd)
110 struct obd_device_target *obt = &obd->u.obt;
111 struct ptlrpc_request *req;
112 struct list_head client_list;
114 spin_lock(&obt->obt_translock);
115 start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
116 obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
118 obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
119 spin_unlock(&obt->obt_translock);
121 CFS_INIT_LIST_HEAD(&client_list);
122 spin_lock_bh(&obd->obd_processing_task_lock);
123 list_splice_init(&obd->obd_final_req_queue, &client_list);
124 spin_unlock_bh(&obd->obd_processing_task_lock);
127 * go through list of exports participated in recovery and
128 * set new epoch for them
130 list_for_each_entry(req, &client_list, rq_list) {
131 LASSERT(!req->rq_export->exp_delayed);
132 obt_client_epoch_update(req->rq_export);
134 /** return list back at once */
135 spin_lock_bh(&obd->obd_processing_task_lock);
136 list_splice_init(&client_list, &obd->obd_final_req_queue);
137 spin_unlock_bh(&obd->obd_processing_task_lock);
138 obt_server_data_update(obd, 1);
142 * write data in last_rcvd file.
144 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
145 const struct lu_buf *buf, loff_t *off, int sync)
152 credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
154 txn_param_init(&p, credits);
156 th = dt_trans_start(env, lut->lut_bottom, &p);
160 rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
161 dt_trans_stop(env, lut->lut_bottom, th);
163 CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
164 "uuid = %s\nlast_transno = "LPU64"\n",
165 rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
171 * Update client data in last_rcvd
173 int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
174 struct obd_export *exp)
176 struct lu_export_data *led = &exp->exp_target_data;
177 struct lsd_client_data tmp_lcd;
178 loff_t tmp_off = led->led_lr_off;
179 struct lu_buf tmp_buf = {
181 .lb_len = sizeof(tmp_lcd)
185 lcd_cpu_to_le(led->led_lcd, &tmp_lcd);
186 LASSERT(lut->lut_last_rcvd);
187 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
193 * Update server data in last_rcvd
195 static int lut_server_data_update(const struct lu_env *env,
196 struct lu_target *lut, int sync)
198 struct lr_server_data tmp_lsd;
200 struct lu_buf tmp_buf = {
202 .lb_len = sizeof(tmp_lsd)
208 "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
209 lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
210 lut->lut_last_transno);
212 spin_lock(&lut->lut_translock);
213 lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
214 spin_unlock(&lut->lut_translock);
216 lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
217 if (lut->lut_last_rcvd != NULL)
218 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
222 void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
223 struct obd_export *exp)
225 struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
227 LASSERT(lut->lut_bottom);
228 /** VBR: set client last_epoch to current epoch */
229 if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
231 lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
232 lut_client_data_update(env, lut, exp);
236 * Update boot epoch when recovery ends
238 void lut_boot_epoch_update(struct lu_target *lut)
241 struct ptlrpc_request *req;
243 struct list_head client_list;
246 if (lut->lut_obd->obd_stopping)
248 /** Increase server epoch after recovery */
249 if (lut->lut_bottom == NULL)
250 return obt_boot_epoch_update(lut->lut_obd);
252 rc = lu_env_init(&env, LCT_DT_THREAD);
254 CERROR("Can't initialize environment rc=%i\n", rc);
258 spin_lock(&lut->lut_translock);
259 start_epoch = lr_epoch(lut->lut_last_transno) + 1;
260 lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
261 lut->lut_lsd.lsd_start_epoch = start_epoch;
262 spin_unlock(&lut->lut_translock);
264 CFS_INIT_LIST_HEAD(&client_list);
266 * The recovery is not yet finished and final queue can still be updated
267 * with resend requests. Move final list to separate one for processing
269 spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
270 list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
271 spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
274 * go through list of exports participated in recovery and
275 * set new epoch for them
277 list_for_each_entry(req, &client_list, rq_list) {
278 LASSERT(!req->rq_export->exp_delayed);
279 if (!req->rq_export->exp_vbr_failed)
280 lut_client_epoch_update(&env, lut, req->rq_export);
282 /** return list back at once */
283 spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
284 list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
285 spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
286 /** update server epoch */
287 lut_server_data_update(&env, lut, 1);
290 EXPORT_SYMBOL(lut_boot_epoch_update);
293 * commit callback, need to update last_commited value
295 void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
298 struct obd_export *exp = data;
299 LASSERT(exp->exp_obd == lut->lut_obd);
300 spin_lock(&lut->lut_translock);
301 if (transno > lut->lut_obd->obd_last_committed)
302 lut->lut_obd->obd_last_committed = transno;
305 if (transno > exp->exp_last_committed) {
306 exp->exp_last_committed = transno;
307 spin_unlock(&lut->lut_translock);
308 ptlrpc_commit_replies(exp);
310 spin_unlock(&lut->lut_translock);
312 class_export_put(exp);
314 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
315 lut->lut_obd->obd_name, transno);
317 EXPORT_SYMBOL(lut_cb_last_committed);
319 void lut_cb_client(struct lu_target *lut, __u64 transno,
322 LASSERT(lut->lut_obd);
323 target_client_add_cb(lut->lut_obd, transno, data, err);
325 EXPORT_SYMBOL(lut_cb_client);
327 int lut_init(const struct lu_env *env, struct lu_target *lut,
328 struct obd_device *obd, struct dt_device *dt)
336 lut->lut_bottom = dt;
337 lut->lut_last_rcvd = NULL;
339 spin_lock_init(&lut->lut_translock);
340 spin_lock_init(&lut->lut_client_bitmap_lock);
341 spin_lock_init(&lut->lut_trans_table_lock);
343 /** obdfilter has no lu_device stack yet */
346 o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
348 lut->lut_last_rcvd = o;
351 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
356 EXPORT_SYMBOL(lut_init);
358 void lut_fini(const struct lu_env *env, struct lu_target *lut)
361 if (lut->lut_last_rcvd)
362 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
363 lut->lut_last_rcvd = NULL;
366 EXPORT_SYMBOL(lut_fini);