Whamcloud - gitweb
6f8bccf78da3bf54f875f98e70da130d3612033f
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * Lustre Common Target
39  * These are common function for MDT and OST recovery-related functionality
40  *
41  *   Author: Mikhail Pershin <tappro@sun.com>
42  */
43
44 #include <obd.h>
45 #include <lustre_fsfilt.h>
46 #include <obd_class.h>
47
48 /**
49  * Update client data in last_rcvd file. An obd API
50  */
51 static int obt_client_data_update(struct obd_export *exp)
52 {
53         struct tg_export_data *ted = &exp->exp_target_data;
54         struct obd_device_target *obt = &exp->exp_obd->u.obt;
55         struct lu_target *lut = class_exp2tgt(exp);
56         loff_t off = ted->ted_lr_off;
57         int rc = 0;
58
59         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
60                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
61
62         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
63                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
64                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
65
66         return rc;
67 }
68
69 /**
70  * Update server data in last_rcvd file. An obd API
71  */
72 int obt_server_data_update(struct lu_target *lut, int force_sync)
73 {
74         struct obd_device_target *obt = &lut->lut_obd->u.obt;
75         loff_t off = 0;
76         int rc;
77         ENTRY;
78
79         CDEBUG(D_SUPER,
80                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
81                lut->lut_lsd.lsd_uuid,
82                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
83                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
84
85         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
86                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
87                                  &off, force_sync);
88         if (rc)
89                 CERROR("error writing lr_server_data: rc = %d\n", rc);
90
91         RETURN(rc);
92 }
93
94 /**
95  * Update client epoch with server's one
96  */
97 void obt_client_epoch_update(struct obd_export *exp)
98 {
99         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
100         struct lu_target *lut = class_exp2tgt(exp);
101
102         /** VBR: set client last_epoch to current epoch */
103         if (le32_to_cpu(lcd->lcd_last_epoch) >=
104             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
105                 return;
106         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
107         obt_client_data_update(exp);
108 }
109
110 /**
111  * Increment server epoch. An obd API
112  */
113 static void obt_boot_epoch_update(struct lu_target *lut)
114 {
115         struct obd_device *obd = lut->lut_obd;
116         __u32 start_epoch;
117         struct ptlrpc_request *req;
118         cfs_list_t client_list;
119
120         cfs_spin_lock(&lut->lut_translock);
121         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
122         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
123                                             LR_EPOCH_BITS);
124         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
125         cfs_spin_unlock(&lut->lut_translock);
126
127         CFS_INIT_LIST_HEAD(&client_list);
128         cfs_spin_lock(&obd->obd_recovery_task_lock);
129         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
130         cfs_spin_unlock(&obd->obd_recovery_task_lock);
131
132         /**
133          * go through list of exports participated in recovery and
134          * set new epoch for them
135          */
136         cfs_list_for_each_entry(req, &client_list, rq_list) {
137                 LASSERT(!req->rq_export->exp_delayed);
138                 obt_client_epoch_update(req->rq_export);
139         }
140         /** return list back at once */
141         cfs_spin_lock(&obd->obd_recovery_task_lock);
142         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
143         cfs_spin_unlock(&obd->obd_recovery_task_lock);
144         obt_server_data_update(lut, 1);
145 }
146
147 /**
148  * write data in last_rcvd file.
149  */
150 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
151                                const struct lu_buf *buf, loff_t *off, int sync)
152 {
153         struct thandle *th;
154         int rc;
155         ENTRY;
156
157         th = dt_trans_create(env, lut->lut_bottom);
158         if (IS_ERR(th))
159                 RETURN(PTR_ERR(th));
160
161         rc = dt_declare_record_write(env, lut->lut_last_rcvd,
162                                      buf->lb_len, *off, th);
163         if (rc)
164                 goto stop;
165
166         rc = dt_trans_start(env, lut->lut_bottom, th);
167         if (rc)
168                 goto stop;
169
170         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
171
172 stop:
173         dt_trans_stop(env, lut->lut_bottom, th);
174
175         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
176                "uuid = %s\nlast_transno = "LPU64"\n",
177                rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
178
179         RETURN(rc);
180 }
181
182 /**
183  * Allocate in-memory data for client slot related to export.
184  */
185 int lut_client_alloc(struct obd_export *exp)
186 {
187         LASSERT(exp != exp->exp_obd->obd_self_export);
188
189         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
190         if (exp->exp_target_data.ted_lcd == NULL)
191                 RETURN(-ENOMEM);
192         /* Mark that slot is not yet valid, 0 doesn't work here */
193         exp->exp_target_data.ted_lr_idx = -1;
194         RETURN(0);
195 }
196 EXPORT_SYMBOL(lut_client_alloc);
197
198 /**
199  * Free in-memory data for client slot related to export.
200  */
201 void lut_client_free(struct obd_export *exp)
202 {
203         struct tg_export_data *ted = &exp->exp_target_data;
204         struct lu_target *lut = class_exp2tgt(exp);
205
206         LASSERT(exp != exp->exp_obd->obd_self_export);
207
208         OBD_FREE_PTR(ted->ted_lcd);
209         ted->ted_lcd = NULL;
210
211         /* Slot may be not yet assigned */
212         if (ted->ted_lr_idx < 0)
213                 return;
214         /* Clear bit when lcd is freed */
215         cfs_spin_lock(&lut->lut_client_bitmap_lock);
216         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
217                 CERROR("%s: client %u bit already clear in bitmap\n",
218                        exp->exp_obd->obd_name, ted->ted_lr_idx);
219                 LBUG();
220         }
221         cfs_spin_unlock(&lut->lut_client_bitmap_lock);
222 }
223 EXPORT_SYMBOL(lut_client_free);
224
225 /**
226  * Update client data in last_rcvd
227  */
228 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
229 {
230         struct tg_export_data *ted = &exp->exp_target_data;
231         struct lu_target *lut = class_exp2tgt(exp);
232         struct lsd_client_data tmp_lcd;
233         loff_t tmp_off = ted->ted_lr_off;
234         struct lu_buf tmp_buf = {
235                                         .lb_buf = &tmp_lcd,
236                                         .lb_len = sizeof(tmp_lcd)
237                                 };
238         int rc = 0;
239
240         lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
241         LASSERT(lut->lut_last_rcvd);
242         rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
243
244         return rc;
245 }
246
247 /**
248  * Update server data in last_rcvd
249  */
250 int lut_server_data_update(const struct lu_env *env,
251                            struct lu_target *lut, int sync)
252 {
253         struct lr_server_data tmp_lsd;
254         loff_t tmp_off = 0;
255         struct lu_buf tmp_buf = {
256                                         .lb_buf = &tmp_lsd,
257                                         .lb_len = sizeof(tmp_lsd)
258                                 };
259         int rc = 0;
260         ENTRY;
261
262         CDEBUG(D_SUPER,
263                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
264                lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
265                lut->lut_last_transno);
266
267         cfs_spin_lock(&lut->lut_translock);
268         lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
269         cfs_spin_unlock(&lut->lut_translock);
270
271         lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
272         if (lut->lut_last_rcvd != NULL)
273                 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
274         RETURN(rc);
275 }
276
277 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
278 {
279         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
280         struct lu_target *lut = class_exp2tgt(exp);
281
282         LASSERT(lut->lut_bottom);
283         /** VBR: set client last_epoch to current epoch */
284         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
285                 return;
286         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
287         lut_client_data_update(env, exp);
288 }
289
290 /**
291  * Update boot epoch when recovery ends
292  */
293 void lut_boot_epoch_update(struct lu_target *lut)
294 {
295         struct lu_env env;
296         struct ptlrpc_request *req;
297         __u32 start_epoch;
298         cfs_list_t client_list;
299         int rc;
300
301         if (lut->lut_obd->obd_stopping)
302                 return;
303         /** Increase server epoch after recovery */
304         if (lut->lut_bottom == NULL)
305                 return obt_boot_epoch_update(lut);
306
307         rc = lu_env_init(&env, LCT_DT_THREAD);
308         if (rc) {
309                 CERROR("Can't initialize environment rc=%d\n", rc);
310                 return;
311         }
312
313         cfs_spin_lock(&lut->lut_translock);
314         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
315         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
316         lut->lut_lsd.lsd_start_epoch = start_epoch;
317         cfs_spin_unlock(&lut->lut_translock);
318
319         CFS_INIT_LIST_HEAD(&client_list);
320         /**
321          * The recovery is not yet finished and final queue can still be updated
322          * with resend requests. Move final list to separate one for processing
323          */
324         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
325         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
326         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
327
328         /**
329          * go through list of exports participated in recovery and
330          * set new epoch for them
331          */
332         cfs_list_for_each_entry(req, &client_list, rq_list) {
333                 LASSERT(!req->rq_export->exp_delayed);
334                 if (!req->rq_export->exp_vbr_failed)
335                         lut_client_epoch_update(&env, req->rq_export);
336         }
337         /** return list back at once */
338         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
339         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
340         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
341         /** update server epoch */
342         lut_server_data_update(&env, lut, 1);
343         lu_env_fini(&env);
344 }
345 EXPORT_SYMBOL(lut_boot_epoch_update);
346
347 /**
348  * commit callback, need to update last_commited value
349  */
350 struct lut_last_committed_callback {
351         struct dt_txn_commit_cb llcc_cb;
352         struct lu_target       *llcc_lut;
353         struct obd_export      *llcc_exp;
354         __u64                   llcc_transno;
355 };
356
357 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
358                            struct dt_txn_commit_cb *cb, int err)
359 {
360         struct lut_last_committed_callback *ccb;
361
362         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
363
364         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
365
366         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
367         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
368                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
369
370         LASSERT(ccb->llcc_exp);
371         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
372                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
373                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
374                 ptlrpc_commit_replies(ccb->llcc_exp);
375         } else {
376                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
377         }
378         class_export_cb_put(ccb->llcc_exp);
379         if (ccb->llcc_transno)
380                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
381                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
382         cfs_list_del(&ccb->llcc_cb.dcb_linkage);
383         OBD_FREE_PTR(ccb);
384 }
385
386 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
387                            struct obd_export *exp, __u64 transno)
388 {
389         struct lut_last_committed_callback *ccb;
390         int rc;
391
392         OBD_ALLOC_PTR(ccb);
393         if (ccb == NULL)
394                 return -ENOMEM;
395
396         ccb->llcc_cb.dcb_func = lut_cb_last_committed;
397         CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
398         ccb->llcc_lut = lut;
399         ccb->llcc_exp = class_export_cb_get(exp);
400         ccb->llcc_transno = transno;
401
402         rc = dt_trans_cb_add(th, &ccb->llcc_cb);
403         if (rc) {
404                 class_export_cb_put(exp);
405                 OBD_FREE_PTR(ccb);
406         }
407         return rc;
408 }
409 EXPORT_SYMBOL(lut_last_commit_cb_add);
410
411 struct lut_new_client_callback {
412         struct dt_txn_commit_cb lncc_cb;
413         struct obd_export      *lncc_exp;
414 };
415
416 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
417                        struct dt_txn_commit_cb *cb, int err)
418 {
419         struct lut_new_client_callback *ccb;
420
421         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
422
423         LASSERT(ccb->lncc_exp->exp_obd);
424
425         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
426                ccb->lncc_exp->exp_obd->obd_name,
427                ccb->lncc_exp->exp_client_uuid.uuid);
428
429         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
430         ccb->lncc_exp->exp_need_sync = 0;
431         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
432         class_export_cb_put(ccb->lncc_exp);
433
434         cfs_list_del(&ccb->lncc_cb.dcb_linkage);
435         OBD_FREE_PTR(ccb);
436 }
437
438 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
439 {
440         struct lut_new_client_callback *ccb;
441         int rc;
442
443         OBD_ALLOC_PTR(ccb);
444         if (ccb == NULL)
445                 return -ENOMEM;
446
447         ccb->lncc_cb.dcb_func = lut_cb_new_client;
448         CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
449         ccb->lncc_exp = class_export_cb_get(exp);
450
451         rc = dt_trans_cb_add(th, &ccb->lncc_cb);
452         if (rc) {
453                 class_export_cb_put(exp);
454                 OBD_FREE_PTR(ccb);
455         }
456         return rc;
457 }
458 EXPORT_SYMBOL(lut_new_client_cb_add);
459
460 int lut_init(const struct lu_env *env, struct lu_target *lut,
461              struct obd_device *obd, struct dt_device *dt)
462 {
463         struct lu_fid fid;
464         struct dt_object *o;
465         int rc = 0;
466         ENTRY;
467
468         LASSERT(lut);
469         LASSERT(obd);
470         lut->lut_obd = obd;
471         lut->lut_bottom = dt;
472         lut->lut_last_rcvd = NULL;
473         obd->u.obt.obt_lut = lut;
474
475         cfs_spin_lock_init(&lut->lut_translock);
476         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
477
478         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
479         if (lut->lut_client_bitmap == NULL)
480                 RETURN(-ENOMEM);
481
482         /** obdfilter has no lu_device stack yet */
483         if (dt == NULL)
484                 RETURN(rc);
485         o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
486         if (!IS_ERR(o)) {
487                 lut->lut_last_rcvd = o;
488         } else {
489                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
490                 lut->lut_client_bitmap = NULL;
491                 rc = PTR_ERR(o);
492                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
493         }
494
495         RETURN(rc);
496 }
497 EXPORT_SYMBOL(lut_init);
498
499 void lut_fini(const struct lu_env *env, struct lu_target *lut)
500 {
501         ENTRY;
502
503         if (lut->lut_client_bitmap) {
504                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
505                 lut->lut_client_bitmap = NULL;
506         }
507         if (lut->lut_last_rcvd) {
508                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
509                 lut->lut_last_rcvd = NULL;
510         }
511         EXIT;
512 }
513 EXPORT_SYMBOL(lut_fini);