Whamcloud - gitweb
0b4f43999f5f7ed51b4741672f7c274c14131102
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  * Copyright (c) 2011 Whamcloud, Inc.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * Lustre Common Target
38  * These are common function for MDT and OST recovery-related functionality
39  *
40  *   Author: Mikhail Pershin <tappro@sun.com>
41  */
42
43 #include <obd.h>
44 #include <lustre_fsfilt.h>
45 #include <obd_class.h>
46
47 /**
48  * Update client data in last_rcvd file. An obd API
49  */
50 static int obt_client_data_update(struct obd_export *exp)
51 {
52         struct tg_export_data *ted = &exp->exp_target_data;
53         struct obd_device_target *obt = &exp->exp_obd->u.obt;
54         struct lu_target *lut = class_exp2tgt(exp);
55         loff_t off = ted->ted_lr_off;
56         int rc = 0;
57
58         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
59                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
60
61         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
62                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
63                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
64
65         return rc;
66 }
67
68 /**
69  * Update server data in last_rcvd file. An obd API
70  */
71 int obt_server_data_update(struct lu_target *lut, int force_sync)
72 {
73         struct obd_device_target *obt = &lut->lut_obd->u.obt;
74         loff_t off = 0;
75         int rc;
76         ENTRY;
77
78         CDEBUG(D_SUPER,
79                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
80                lut->lut_lsd.lsd_uuid,
81                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
82                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
83
84         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
85                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
86                                  &off, force_sync);
87         if (rc)
88                 CERROR("error writing lr_server_data: rc = %d\n", rc);
89
90         RETURN(rc);
91 }
92
93 /**
94  * Update client epoch with server's one
95  */
96 void obt_client_epoch_update(struct obd_export *exp)
97 {
98         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
99         struct lu_target *lut = class_exp2tgt(exp);
100
101         /** VBR: set client last_epoch to current epoch */
102         if (le32_to_cpu(lcd->lcd_last_epoch) >=
103             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
104                 return;
105         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
106         obt_client_data_update(exp);
107 }
108
109 /**
110  * Increment server epoch. An obd API
111  */
112 static void obt_boot_epoch_update(struct lu_target *lut)
113 {
114         struct obd_device *obd = lut->lut_obd;
115         __u32 start_epoch;
116         struct ptlrpc_request *req;
117         cfs_list_t client_list;
118
119         cfs_spin_lock(&lut->lut_translock);
120         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
121         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
122                                             LR_EPOCH_BITS);
123         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
124         cfs_spin_unlock(&lut->lut_translock);
125
126         CFS_INIT_LIST_HEAD(&client_list);
127         cfs_spin_lock(&obd->obd_recovery_task_lock);
128         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
129         cfs_spin_unlock(&obd->obd_recovery_task_lock);
130
131         /**
132          * go through list of exports participated in recovery and
133          * set new epoch for them
134          */
135         cfs_list_for_each_entry(req, &client_list, rq_list) {
136                 LASSERT(!req->rq_export->exp_delayed);
137                 obt_client_epoch_update(req->rq_export);
138         }
139         /** return list back at once */
140         cfs_spin_lock(&obd->obd_recovery_task_lock);
141         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
142         cfs_spin_unlock(&obd->obd_recovery_task_lock);
143         obt_server_data_update(lut, 1);
144 }
145
146 /**
147  * write data in last_rcvd file.
148  */
149 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
150                                const struct lu_buf *buf, loff_t *off, int sync)
151 {
152         struct thandle *th;
153         int rc;
154         ENTRY;
155
156         th = dt_trans_create(env, lut->lut_bottom);
157         if (IS_ERR(th))
158                 RETURN(PTR_ERR(th));
159
160         rc = dt_declare_record_write(env, lut->lut_last_rcvd,
161                                      buf->lb_len, *off, th);
162         if (rc)
163                 goto stop;
164
165         rc = dt_trans_start(env, lut->lut_bottom, th);
166         if (rc)
167                 goto stop;
168
169         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
170
171 stop:
172         dt_trans_stop(env, lut->lut_bottom, th);
173
174         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
175                "uuid = %s\nlast_transno = "LPU64"\n",
176                rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
177
178         RETURN(rc);
179 }
180
181 /**
182  * Allocate in-memory data for client slot related to export.
183  */
184 int lut_client_alloc(struct obd_export *exp)
185 {
186         LASSERT(exp != exp->exp_obd->obd_self_export);
187
188         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
189         if (exp->exp_target_data.ted_lcd == NULL)
190                 RETURN(-ENOMEM);
191         /* Mark that slot is not yet valid, 0 doesn't work here */
192         exp->exp_target_data.ted_lr_idx = -1;
193         RETURN(0);
194 }
195 EXPORT_SYMBOL(lut_client_alloc);
196
197 /**
198  * Free in-memory data for client slot related to export.
199  */
200 void lut_client_free(struct obd_export *exp)
201 {
202         struct tg_export_data *ted = &exp->exp_target_data;
203         struct lu_target *lut = class_exp2tgt(exp);
204
205         LASSERT(exp != exp->exp_obd->obd_self_export);
206
207         OBD_FREE_PTR(ted->ted_lcd);
208         ted->ted_lcd = NULL;
209
210         /* Slot may be not yet assigned */
211         if (ted->ted_lr_idx < 0)
212                 return;
213         /* Clear bit when lcd is freed */
214         cfs_spin_lock(&lut->lut_client_bitmap_lock);
215         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
216                 CERROR("%s: client %u bit already clear in bitmap\n",
217                        exp->exp_obd->obd_name, ted->ted_lr_idx);
218                 LBUG();
219         }
220         cfs_spin_unlock(&lut->lut_client_bitmap_lock);
221 }
222 EXPORT_SYMBOL(lut_client_free);
223
224 /**
225  * Update client data in last_rcvd
226  */
227 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
228 {
229         struct tg_export_data *ted = &exp->exp_target_data;
230         struct lu_target *lut = class_exp2tgt(exp);
231         struct lsd_client_data tmp_lcd;
232         loff_t tmp_off = ted->ted_lr_off;
233         struct lu_buf tmp_buf = {
234                                         .lb_buf = &tmp_lcd,
235                                         .lb_len = sizeof(tmp_lcd)
236                                 };
237         int rc = 0;
238
239         lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
240         LASSERT(lut->lut_last_rcvd);
241         rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
242
243         return rc;
244 }
245
246 /**
247  * Update server data in last_rcvd
248  */
249 int lut_server_data_update(const struct lu_env *env,
250                            struct lu_target *lut, int sync)
251 {
252         struct lr_server_data tmp_lsd;
253         loff_t tmp_off = 0;
254         struct lu_buf tmp_buf = {
255                                         .lb_buf = &tmp_lsd,
256                                         .lb_len = sizeof(tmp_lsd)
257                                 };
258         int rc = 0;
259         ENTRY;
260
261         CDEBUG(D_SUPER,
262                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
263                lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
264                lut->lut_last_transno);
265
266         cfs_spin_lock(&lut->lut_translock);
267         lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
268         cfs_spin_unlock(&lut->lut_translock);
269
270         lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
271         if (lut->lut_last_rcvd != NULL)
272                 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
273         RETURN(rc);
274 }
275
276 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
277 {
278         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
279         struct lu_target *lut = class_exp2tgt(exp);
280
281         LASSERT(lut->lut_bottom);
282         /** VBR: set client last_epoch to current epoch */
283         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
284                 return;
285         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
286         lut_client_data_update(env, exp);
287 }
288
289 /**
290  * Update boot epoch when recovery ends
291  */
292 void lut_boot_epoch_update(struct lu_target *lut)
293 {
294         struct lu_env env;
295         struct ptlrpc_request *req;
296         __u32 start_epoch;
297         cfs_list_t client_list;
298         int rc;
299
300         if (lut->lut_obd->obd_stopping)
301                 return;
302         /** Increase server epoch after recovery */
303         if (lut->lut_bottom == NULL)
304                 return obt_boot_epoch_update(lut);
305
306         rc = lu_env_init(&env, LCT_DT_THREAD);
307         if (rc) {
308                 CERROR("Can't initialize environment rc=%d\n", rc);
309                 return;
310         }
311
312         cfs_spin_lock(&lut->lut_translock);
313         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
314         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
315         lut->lut_lsd.lsd_start_epoch = start_epoch;
316         cfs_spin_unlock(&lut->lut_translock);
317
318         CFS_INIT_LIST_HEAD(&client_list);
319         /**
320          * The recovery is not yet finished and final queue can still be updated
321          * with resend requests. Move final list to separate one for processing
322          */
323         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
324         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
325         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
326
327         /**
328          * go through list of exports participated in recovery and
329          * set new epoch for them
330          */
331         cfs_list_for_each_entry(req, &client_list, rq_list) {
332                 LASSERT(!req->rq_export->exp_delayed);
333                 if (!req->rq_export->exp_vbr_failed)
334                         lut_client_epoch_update(&env, req->rq_export);
335         }
336         /** return list back at once */
337         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
338         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
339         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
340         /** update server epoch */
341         lut_server_data_update(&env, lut, 1);
342         lu_env_fini(&env);
343 }
344 EXPORT_SYMBOL(lut_boot_epoch_update);
345
346 /**
347  * commit callback, need to update last_commited value
348  */
349 struct lut_last_committed_callback {
350         struct dt_txn_commit_cb llcc_cb;
351         struct lu_target       *llcc_lut;
352         struct obd_export      *llcc_exp;
353         __u64                   llcc_transno;
354 };
355
356 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
357                            struct dt_txn_commit_cb *cb, int err)
358 {
359         struct lut_last_committed_callback *ccb;
360
361         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
362
363         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
364
365         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
366         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
367                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
368
369         LASSERT(ccb->llcc_exp);
370         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
371                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
372                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
373                 ptlrpc_commit_replies(ccb->llcc_exp);
374         } else {
375                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
376         }
377         class_export_cb_put(ccb->llcc_exp);
378         if (ccb->llcc_transno)
379                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
380                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
381         cfs_list_del(&ccb->llcc_cb.dcb_linkage);
382         OBD_FREE_PTR(ccb);
383 }
384
385 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
386                            struct obd_export *exp, __u64 transno)
387 {
388         struct lut_last_committed_callback *ccb;
389         int rc;
390
391         OBD_ALLOC_PTR(ccb);
392         if (ccb == NULL)
393                 return -ENOMEM;
394
395         ccb->llcc_cb.dcb_func = lut_cb_last_committed;
396         CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
397         ccb->llcc_lut = lut;
398         ccb->llcc_exp = class_export_cb_get(exp);
399         ccb->llcc_transno = transno;
400
401         rc = dt_trans_cb_add(th, &ccb->llcc_cb);
402         if (rc) {
403                 class_export_cb_put(exp);
404                 OBD_FREE_PTR(ccb);
405         }
406         return rc;
407 }
408 EXPORT_SYMBOL(lut_last_commit_cb_add);
409
410 struct lut_new_client_callback {
411         struct dt_txn_commit_cb lncc_cb;
412         struct obd_export      *lncc_exp;
413 };
414
415 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
416                        struct dt_txn_commit_cb *cb, int err)
417 {
418         struct lut_new_client_callback *ccb;
419
420         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
421
422         LASSERT(ccb->lncc_exp->exp_obd);
423
424         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
425                ccb->lncc_exp->exp_obd->obd_name,
426                ccb->lncc_exp->exp_client_uuid.uuid);
427
428         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
429         ccb->lncc_exp->exp_need_sync = 0;
430         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
431         class_export_cb_put(ccb->lncc_exp);
432
433         cfs_list_del(&ccb->lncc_cb.dcb_linkage);
434         OBD_FREE_PTR(ccb);
435 }
436
437 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
438 {
439         struct lut_new_client_callback *ccb;
440         int rc;
441
442         OBD_ALLOC_PTR(ccb);
443         if (ccb == NULL)
444                 return -ENOMEM;
445
446         ccb->lncc_cb.dcb_func = lut_cb_new_client;
447         CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
448         ccb->lncc_exp = class_export_cb_get(exp);
449
450         rc = dt_trans_cb_add(th, &ccb->lncc_cb);
451         if (rc) {
452                 class_export_cb_put(exp);
453                 OBD_FREE_PTR(ccb);
454         }
455         return rc;
456 }
457 EXPORT_SYMBOL(lut_new_client_cb_add);
458
459 int lut_init(const struct lu_env *env, struct lu_target *lut,
460              struct obd_device *obd, struct dt_device *dt)
461 {
462         struct lu_fid fid;
463         struct dt_object *o;
464         int rc = 0;
465         ENTRY;
466
467         LASSERT(lut);
468         LASSERT(obd);
469         lut->lut_obd = obd;
470         lut->lut_bottom = dt;
471         lut->lut_last_rcvd = NULL;
472         obd->u.obt.obt_lut = lut;
473
474         cfs_spin_lock_init(&lut->lut_translock);
475         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
476
477         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
478         if (lut->lut_client_bitmap == NULL)
479                 RETURN(-ENOMEM);
480
481         /** obdfilter has no lu_device stack yet */
482         if (dt == NULL)
483                 RETURN(rc);
484         o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
485         if (!IS_ERR(o)) {
486                 lut->lut_last_rcvd = o;
487         } else {
488                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
489                 lut->lut_client_bitmap = NULL;
490                 rc = PTR_ERR(o);
491                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
492         }
493
494         RETURN(rc);
495 }
496 EXPORT_SYMBOL(lut_init);
497
498 void lut_fini(const struct lu_env *env, struct lu_target *lut)
499 {
500         ENTRY;
501
502         if (lut->lut_client_bitmap) {
503                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
504                 lut->lut_client_bitmap = NULL;
505         }
506         if (lut->lut_last_rcvd) {
507                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
508                 lut->lut_last_rcvd = NULL;
509         }
510         EXIT;
511 }
512 EXPORT_SYMBOL(lut_fini);