Whamcloud - gitweb
df732ad2b5daa59ab957591d6c1568a49d494f18
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44
45 /**
46  * Update client data in last_rcvd file. An obd API
47  */
48 static int obt_client_data_update(struct obd_export *exp)
49 {
50         struct tg_export_data *ted = &exp->exp_target_data;
51         struct obd_device_target *obt = &exp->exp_obd->u.obt;
52         struct lu_target *lut = class_exp2tgt(exp);
53         loff_t off = ted->ted_lr_off;
54         int rc = 0;
55
56         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
57                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
58
59         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
60                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
61                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
62
63         return rc;
64 }
65
66 /**
67  * Update server data in last_rcvd file. An obd API
68  */
69 int obt_server_data_update(struct lu_target *lut, int force_sync)
70 {
71         struct obd_device_target *obt = &lut->lut_obd->u.obt;
72         loff_t off = 0;
73         int rc;
74         ENTRY;
75
76         CDEBUG(D_SUPER,
77                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
78                lut->lut_lsd.lsd_uuid,
79                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
80                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
81
82         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
83                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
84                                  &off, force_sync);
85         if (rc)
86                 CERROR("error writing lr_server_data: rc = %d\n", rc);
87
88         RETURN(rc);
89 }
90
91 /**
92  * Update client epoch with server's one
93  */
94 void obt_client_epoch_update(struct obd_export *exp)
95 {
96         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
97         struct lu_target *lut = class_exp2tgt(exp);
98
99         /** VBR: set client last_epoch to current epoch */
100         if (le32_to_cpu(lcd->lcd_last_epoch) >=
101             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
102                 return;
103         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
104         obt_client_data_update(exp);
105 }
106
107 /**
108  * Increment server epoch. An obd API
109  */
110 static void obt_boot_epoch_update(struct lu_target *lut)
111 {
112         struct obd_device *obd = lut->lut_obd;
113         __u32 start_epoch;
114         struct ptlrpc_request *req;
115         cfs_list_t client_list;
116
117         cfs_spin_lock(&lut->lut_translock);
118         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
119         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
120                                             LR_EPOCH_BITS);
121         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
122         cfs_spin_unlock(&lut->lut_translock);
123
124         CFS_INIT_LIST_HEAD(&client_list);
125         cfs_spin_lock(&obd->obd_recovery_task_lock);
126         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
127         cfs_spin_unlock(&obd->obd_recovery_task_lock);
128
129         /**
130          * go through list of exports participated in recovery and
131          * set new epoch for them
132          */
133         cfs_list_for_each_entry(req, &client_list, rq_list) {
134                 LASSERT(!req->rq_export->exp_delayed);
135                 obt_client_epoch_update(req->rq_export);
136         }
137         /** return list back at once */
138         cfs_spin_lock(&obd->obd_recovery_task_lock);
139         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
140         cfs_spin_unlock(&obd->obd_recovery_task_lock);
141         obt_server_data_update(lut, 1);
142 }
143
144 /**
145  * write data in last_rcvd file.
146  */
147 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
148                                const struct lu_buf *buf, loff_t *off, int sync)
149 {
150         struct thandle *th;
151         struct txn_param p;
152         int rc, credits;
153         ENTRY;
154
155         credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
156                                                          DTO_WRITE_BLOCK);
157         txn_param_init(&p, credits);
158
159         th = dt_trans_start(env, lut->lut_bottom, &p);
160         if (IS_ERR(th))
161                 RETURN(PTR_ERR(th));
162
163         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
164         dt_trans_stop(env, lut->lut_bottom, th);
165
166         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
167                "uuid = %s\nlast_transno = "LPU64"\n",
168                rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
169
170         RETURN(rc);
171 }
172
173 /**
174  * Allocate in-memory data for client slot related to export.
175  */
176 int lut_client_alloc(struct obd_export *exp)
177 {
178         LASSERT(exp != exp->exp_obd->obd_self_export);
179
180         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
181         if (exp->exp_target_data.ted_lcd == NULL)
182                 RETURN(-ENOMEM);
183         /* Mark that slot is not yet valid, 0 doesn't work here */
184         exp->exp_target_data.ted_lr_idx = -1;
185         RETURN(0);
186 }
187 EXPORT_SYMBOL(lut_client_alloc);
188
189 /**
190  * Free in-memory data for client slot related to export.
191  */
192 void lut_client_free(struct obd_export *exp)
193 {
194         struct tg_export_data *ted = &exp->exp_target_data;
195         struct lu_target *lut = class_exp2tgt(exp);
196
197         LASSERT(exp != exp->exp_obd->obd_self_export);
198
199         OBD_FREE_PTR(ted->ted_lcd);
200         ted->ted_lcd = NULL;
201
202         /* Slot may be not yet assigned */
203         if (ted->ted_lr_idx < 0)
204                 return;
205         /* Clear bit when lcd is freed */
206         cfs_spin_lock(&lut->lut_client_bitmap_lock);
207         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
208                 CERROR("%s: client %u bit already clear in bitmap\n",
209                        exp->exp_obd->obd_name, ted->ted_lr_idx);
210                 LBUG();
211         }
212         cfs_spin_unlock(&lut->lut_client_bitmap_lock);
213 }
214 EXPORT_SYMBOL(lut_client_free);
215
216 /**
217  * Update client data in last_rcvd
218  */
219 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
220 {
221         struct tg_export_data *ted = &exp->exp_target_data;
222         struct lu_target *lut = class_exp2tgt(exp);
223         struct lsd_client_data tmp_lcd;
224         loff_t tmp_off = ted->ted_lr_off;
225         struct lu_buf tmp_buf = {
226                                         .lb_buf = &tmp_lcd,
227                                         .lb_len = sizeof(tmp_lcd)
228                                 };
229         int rc = 0;
230
231         lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
232         LASSERT(lut->lut_last_rcvd);
233         rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
234
235         return rc;
236 }
237
238 /**
239  * Update server data in last_rcvd
240  */
241 static int lut_server_data_update(const struct lu_env *env,
242                                   struct lu_target *lut, int sync)
243 {
244         struct lr_server_data tmp_lsd;
245         loff_t tmp_off = 0;
246         struct lu_buf tmp_buf = {
247                                         .lb_buf = &tmp_lsd,
248                                         .lb_len = sizeof(tmp_lsd)
249                                 };
250         int rc = 0;
251         ENTRY;
252
253         CDEBUG(D_SUPER,
254                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
255                lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
256                lut->lut_last_transno);
257
258         cfs_spin_lock(&lut->lut_translock);
259         lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
260         cfs_spin_unlock(&lut->lut_translock);
261
262         lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
263         if (lut->lut_last_rcvd != NULL)
264                 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
265         RETURN(rc);
266 }
267
268 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
269 {
270         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
271         struct lu_target *lut = class_exp2tgt(exp);
272
273         LASSERT(lut->lut_bottom);
274         /** VBR: set client last_epoch to current epoch */
275         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
276                 return;
277         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
278         lut_client_data_update(env, exp);
279 }
280
281 /**
282  * Update boot epoch when recovery ends
283  */
284 void lut_boot_epoch_update(struct lu_target *lut)
285 {
286         struct lu_env env;
287         struct ptlrpc_request *req;
288         __u32 start_epoch;
289         cfs_list_t client_list;
290         int rc;
291
292         if (lut->lut_obd->obd_stopping)
293                 return;
294         /** Increase server epoch after recovery */
295         if (lut->lut_bottom == NULL)
296                 return obt_boot_epoch_update(lut);
297
298         rc = lu_env_init(&env, LCT_DT_THREAD);
299         if (rc) {
300                 CERROR("Can't initialize environment rc=%d\n", rc);
301                 return;
302         }
303
304         cfs_spin_lock(&lut->lut_translock);
305         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
306         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
307         lut->lut_lsd.lsd_start_epoch = start_epoch;
308         cfs_spin_unlock(&lut->lut_translock);
309
310         CFS_INIT_LIST_HEAD(&client_list);
311         /**
312          * The recovery is not yet finished and final queue can still be updated
313          * with resend requests. Move final list to separate one for processing
314          */
315         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
316         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
317         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
318
319         /**
320          * go through list of exports participated in recovery and
321          * set new epoch for them
322          */
323         cfs_list_for_each_entry(req, &client_list, rq_list) {
324                 LASSERT(!req->rq_export->exp_delayed);
325                 if (!req->rq_export->exp_vbr_failed)
326                         lut_client_epoch_update(&env, req->rq_export);
327         }
328         /** return list back at once */
329         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
330         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
331         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
332         /** update server epoch */
333         lut_server_data_update(&env, lut, 1);
334         lu_env_fini(&env);
335 }
336 EXPORT_SYMBOL(lut_boot_epoch_update);
337
338 /**
339  * commit callback, need to update last_commited value
340  */
341 void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
342                            void *data, int err)
343 {
344         struct obd_export *exp = data;
345         LASSERT(exp->exp_obd == lut->lut_obd);
346         cfs_spin_lock(&lut->lut_translock);
347         if (transno > lut->lut_obd->obd_last_committed)
348                 lut->lut_obd->obd_last_committed = transno;
349
350         LASSERT(exp);
351         if (transno > exp->exp_last_committed) {
352                 exp->exp_last_committed = transno;
353                 cfs_spin_unlock(&lut->lut_translock);
354                 ptlrpc_commit_replies(exp);
355         } else {
356                 cfs_spin_unlock(&lut->lut_translock);
357         }
358         class_export_cb_put(exp);
359         if (transno)
360                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
361                        lut->lut_obd->obd_name, transno);
362 }
363 EXPORT_SYMBOL(lut_cb_last_committed);
364
365 void lut_cb_client(struct lu_target *lut, __u64 transno,
366                        void *data, int err)
367 {
368         LASSERT(lut->lut_obd);
369         target_client_add_cb(lut->lut_obd, transno, data, err);
370 }
371 EXPORT_SYMBOL(lut_cb_client);
372
373 int lut_init(const struct lu_env *env, struct lu_target *lut,
374              struct obd_device *obd, struct dt_device *dt)
375 {
376         struct lu_fid fid;
377         struct dt_object *o;
378         int rc = 0;
379         ENTRY;
380
381         LASSERT(lut);
382         LASSERT(obd);
383         lut->lut_obd = obd;
384         lut->lut_bottom = dt;
385         lut->lut_last_rcvd = NULL;
386         obd->u.obt.obt_lut = lut;
387
388         cfs_spin_lock_init(&lut->lut_translock);
389         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
390
391         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
392         if (lut->lut_client_bitmap == NULL)
393                 RETURN(-ENOMEM);
394
395         /** obdfilter has no lu_device stack yet */
396         if (dt == NULL)
397                 RETURN(rc);
398         o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
399         if (!IS_ERR(o)) {
400                 lut->lut_last_rcvd = o;
401         } else {
402                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
403                 lut->lut_client_bitmap = NULL;
404                 rc = PTR_ERR(o);
405                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
406         }
407
408         RETURN(rc);
409 }
410 EXPORT_SYMBOL(lut_init);
411
412 void lut_fini(const struct lu_env *env, struct lu_target *lut)
413 {
414         ENTRY;
415
416         if (lut->lut_client_bitmap) {
417                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
418                 lut->lut_client_bitmap = NULL;
419         }
420         if (lut->lut_last_rcvd) {
421                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
422                 lut->lut_last_rcvd = NULL;
423         }
424         EXIT;
425 }
426 EXPORT_SYMBOL(lut_fini);