Whamcloud - gitweb
0de17bfad2cbc25df2d837fcd5b09012cd29f01d
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45
46 /**
47  * Update client data in last_rcvd file. An obd API
48  */
49 static int obt_client_data_update(struct obd_export *exp)
50 {
51         struct tg_export_data *ted = &exp->exp_target_data;
52         struct obd_device_target *obt = &exp->exp_obd->u.obt;
53         struct lu_target *lut = class_exp2tgt(exp);
54         loff_t off = ted->ted_lr_off;
55         int rc = 0;
56
57         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
58                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
59
60         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
61                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
62                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
63
64         return rc;
65 }
66
67 /**
68  * Update server data in last_rcvd file. An obd API
69  */
70 int obt_server_data_update(struct lu_target *lut, int force_sync)
71 {
72         struct obd_device_target *obt = &lut->lut_obd->u.obt;
73         loff_t off = 0;
74         int rc;
75         ENTRY;
76
77         CDEBUG(D_SUPER,
78                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
79                lut->lut_lsd.lsd_uuid,
80                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
81                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
82
83         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
84                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
85                                  &off, force_sync);
86         if (rc)
87                 CERROR("error writing lr_server_data: rc = %d\n", rc);
88
89         RETURN(rc);
90 }
91
92 /**
93  * Update client epoch with server's one
94  */
95 void obt_client_epoch_update(struct obd_export *exp)
96 {
97         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
98         struct lu_target *lut = class_exp2tgt(exp);
99
100         /** VBR: set client last_epoch to current epoch */
101         if (le32_to_cpu(lcd->lcd_last_epoch) >=
102             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
103                 return;
104         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
105         obt_client_data_update(exp);
106 }
107
108 /**
109  * Increment server epoch. An obd API
110  */
111 static void obt_boot_epoch_update(struct lu_target *lut)
112 {
113         struct obd_device *obd = lut->lut_obd;
114         __u32 start_epoch;
115         struct ptlrpc_request *req;
116         cfs_list_t client_list;
117
118         cfs_spin_lock(&lut->lut_translock);
119         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
120         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
121                                             LR_EPOCH_BITS);
122         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
123         cfs_spin_unlock(&lut->lut_translock);
124
125         CFS_INIT_LIST_HEAD(&client_list);
126         cfs_spin_lock(&obd->obd_recovery_task_lock);
127         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
128         cfs_spin_unlock(&obd->obd_recovery_task_lock);
129
130         /**
131          * go through list of exports participated in recovery and
132          * set new epoch for them
133          */
134         cfs_list_for_each_entry(req, &client_list, rq_list) {
135                 LASSERT(!req->rq_export->exp_delayed);
136                 obt_client_epoch_update(req->rq_export);
137         }
138         /** return list back at once */
139         cfs_spin_lock(&obd->obd_recovery_task_lock);
140         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
141         cfs_spin_unlock(&obd->obd_recovery_task_lock);
142         obt_server_data_update(lut, 1);
143 }
144
145 /**
146  * write data in last_rcvd file.
147  */
148 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
149                                const struct lu_buf *buf, loff_t *off, int sync)
150 {
151         struct thandle *th;
152         struct txn_param p;
153         int rc, credits;
154         ENTRY;
155
156         credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
157                                                          DTO_WRITE_BLOCK);
158         txn_param_init(&p, credits);
159
160         th = dt_trans_start(env, lut->lut_bottom, &p);
161         if (IS_ERR(th))
162                 RETURN(PTR_ERR(th));
163
164         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
165         dt_trans_stop(env, lut->lut_bottom, th);
166
167         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
168                "uuid = %s\nlast_transno = "LPU64"\n",
169                rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
170
171         RETURN(rc);
172 }
173
174 /**
175  * Allocate in-memory data for client slot related to export.
176  */
177 int lut_client_alloc(struct obd_export *exp)
178 {
179         LASSERT(exp != exp->exp_obd->obd_self_export);
180
181         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
182         if (exp->exp_target_data.ted_lcd == NULL)
183                 RETURN(-ENOMEM);
184         /* Mark that slot is not yet valid, 0 doesn't work here */
185         exp->exp_target_data.ted_lr_idx = -1;
186         RETURN(0);
187 }
188 EXPORT_SYMBOL(lut_client_alloc);
189
190 /**
191  * Free in-memory data for client slot related to export.
192  */
193 void lut_client_free(struct obd_export *exp)
194 {
195         struct tg_export_data *ted = &exp->exp_target_data;
196         struct lu_target *lut = class_exp2tgt(exp);
197
198         LASSERT(exp != exp->exp_obd->obd_self_export);
199
200         OBD_FREE_PTR(ted->ted_lcd);
201         ted->ted_lcd = NULL;
202
203         /* Slot may be not yet assigned */
204         if (ted->ted_lr_idx < 0)
205                 return;
206         /* Clear bit when lcd is freed */
207         cfs_spin_lock(&lut->lut_client_bitmap_lock);
208         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
209                 CERROR("%s: client %u bit already clear in bitmap\n",
210                        exp->exp_obd->obd_name, ted->ted_lr_idx);
211                 LBUG();
212         }
213         cfs_spin_unlock(&lut->lut_client_bitmap_lock);
214 }
215 EXPORT_SYMBOL(lut_client_free);
216
217 /**
218  * Update client data in last_rcvd
219  */
220 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
221 {
222         struct tg_export_data *ted = &exp->exp_target_data;
223         struct lu_target *lut = class_exp2tgt(exp);
224         struct lsd_client_data tmp_lcd;
225         loff_t tmp_off = ted->ted_lr_off;
226         struct lu_buf tmp_buf = {
227                                         .lb_buf = &tmp_lcd,
228                                         .lb_len = sizeof(tmp_lcd)
229                                 };
230         int rc = 0;
231
232         lcd_cpu_to_le(ted->ted_lcd, &tmp_lcd);
233         LASSERT(lut->lut_last_rcvd);
234         rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
235
236         return rc;
237 }
238
239 /**
240  * Update server data in last_rcvd
241  */
242 static int lut_server_data_update(const struct lu_env *env,
243                                   struct lu_target *lut, int sync)
244 {
245         struct lr_server_data tmp_lsd;
246         loff_t tmp_off = 0;
247         struct lu_buf tmp_buf = {
248                                         .lb_buf = &tmp_lsd,
249                                         .lb_len = sizeof(tmp_lsd)
250                                 };
251         int rc = 0;
252         ENTRY;
253
254         CDEBUG(D_SUPER,
255                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
256                lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
257                lut->lut_last_transno);
258
259         cfs_spin_lock(&lut->lut_translock);
260         lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
261         cfs_spin_unlock(&lut->lut_translock);
262
263         lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
264         if (lut->lut_last_rcvd != NULL)
265                 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
266         RETURN(rc);
267 }
268
269 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
270 {
271         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
272         struct lu_target *lut = class_exp2tgt(exp);
273
274         LASSERT(lut->lut_bottom);
275         /** VBR: set client last_epoch to current epoch */
276         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
277                 return;
278         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
279         lut_client_data_update(env, exp);
280 }
281
282 /**
283  * Update boot epoch when recovery ends
284  */
285 void lut_boot_epoch_update(struct lu_target *lut)
286 {
287         struct lu_env env;
288         struct ptlrpc_request *req;
289         __u32 start_epoch;
290         cfs_list_t client_list;
291         int rc;
292
293         if (lut->lut_obd->obd_stopping)
294                 return;
295         /** Increase server epoch after recovery */
296         if (lut->lut_bottom == NULL)
297                 return obt_boot_epoch_update(lut);
298
299         rc = lu_env_init(&env, LCT_DT_THREAD);
300         if (rc) {
301                 CERROR("Can't initialize environment rc=%d\n", rc);
302                 return;
303         }
304
305         cfs_spin_lock(&lut->lut_translock);
306         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
307         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
308         lut->lut_lsd.lsd_start_epoch = start_epoch;
309         cfs_spin_unlock(&lut->lut_translock);
310
311         CFS_INIT_LIST_HEAD(&client_list);
312         /**
313          * The recovery is not yet finished and final queue can still be updated
314          * with resend requests. Move final list to separate one for processing
315          */
316         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
317         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
318         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
319
320         /**
321          * go through list of exports participated in recovery and
322          * set new epoch for them
323          */
324         cfs_list_for_each_entry(req, &client_list, rq_list) {
325                 LASSERT(!req->rq_export->exp_delayed);
326                 if (!req->rq_export->exp_vbr_failed)
327                         lut_client_epoch_update(&env, req->rq_export);
328         }
329         /** return list back at once */
330         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
331         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
332         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
333         /** update server epoch */
334         lut_server_data_update(&env, lut, 1);
335         lu_env_fini(&env);
336 }
337 EXPORT_SYMBOL(lut_boot_epoch_update);
338
339 /**
340  * commit callback, need to update last_commited value
341  */
342 struct lut_last_committed_callback {
343         struct dt_txn_commit_cb llcc_cb;
344         struct lu_target       *llcc_lut;
345         struct obd_export      *llcc_exp;
346         __u64                   llcc_transno;
347 };
348
349 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
350                            struct dt_txn_commit_cb *cb, int err)
351 {
352         struct lut_last_committed_callback *ccb;
353
354         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
355
356         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
357
358         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
359         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
360                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
361
362         LASSERT(ccb->llcc_exp);
363         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
364                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
365                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
366                 ptlrpc_commit_replies(ccb->llcc_exp);
367         } else {
368                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
369         }
370         class_export_cb_put(ccb->llcc_exp);
371         if (ccb->llcc_transno)
372                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
373                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
374         cfs_list_del(&ccb->llcc_cb.dcb_linkage);
375         OBD_FREE_PTR(ccb);
376 }
377
378 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
379                            struct obd_export *exp, __u64 transno)
380 {
381         struct lut_last_committed_callback *ccb;
382         int rc;
383
384         OBD_ALLOC_PTR(ccb);
385         if (ccb == NULL)
386                 return -ENOMEM;
387
388         ccb->llcc_cb.dcb_func = lut_cb_last_committed;
389         CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
390         ccb->llcc_lut = lut;
391         ccb->llcc_exp = class_export_cb_get(exp);
392         ccb->llcc_transno = transno;
393
394         rc = dt_trans_cb_add(th, &ccb->llcc_cb);
395         if (rc) {
396                 class_export_cb_put(exp);
397                 OBD_FREE_PTR(ccb);
398         }
399         return rc;
400 }
401 EXPORT_SYMBOL(lut_last_commit_cb_add);
402
403 struct lut_new_client_callback {
404         struct dt_txn_commit_cb lncc_cb;
405         struct obd_export      *lncc_exp;
406 };
407
408 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
409                        struct dt_txn_commit_cb *cb, int err)
410 {
411         struct lut_new_client_callback *ccb;
412
413         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
414
415         LASSERT(ccb->lncc_exp->exp_obd);
416
417         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
418                ccb->lncc_exp->exp_obd->obd_name,
419                ccb->lncc_exp->exp_client_uuid.uuid);
420
421         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
422         ccb->lncc_exp->exp_need_sync = 0;
423         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
424         class_export_cb_put(ccb->lncc_exp);
425
426         cfs_list_del(&ccb->lncc_cb.dcb_linkage);
427         OBD_FREE_PTR(ccb);
428 }
429
430 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
431 {
432         struct lut_new_client_callback *ccb;
433         int rc;
434
435         OBD_ALLOC_PTR(ccb);
436         if (ccb == NULL)
437                 return -ENOMEM;
438
439         ccb->lncc_cb.dcb_func = lut_cb_new_client;
440         CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
441         ccb->lncc_exp = class_export_cb_get(exp);
442
443         rc = dt_trans_cb_add(th, &ccb->lncc_cb);
444         if (rc) {
445                 class_export_cb_put(exp);
446                 OBD_FREE_PTR(ccb);
447         }
448         return rc;
449 }
450 EXPORT_SYMBOL(lut_new_client_cb_add);
451
452 int lut_init(const struct lu_env *env, struct lu_target *lut,
453              struct obd_device *obd, struct dt_device *dt)
454 {
455         struct lu_fid fid;
456         struct dt_object *o;
457         int rc = 0;
458         ENTRY;
459
460         LASSERT(lut);
461         LASSERT(obd);
462         lut->lut_obd = obd;
463         lut->lut_bottom = dt;
464         lut->lut_last_rcvd = NULL;
465         obd->u.obt.obt_lut = lut;
466
467         cfs_spin_lock_init(&lut->lut_translock);
468         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
469
470         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
471         if (lut->lut_client_bitmap == NULL)
472                 RETURN(-ENOMEM);
473
474         /** obdfilter has no lu_device stack yet */
475         if (dt == NULL)
476                 RETURN(rc);
477         o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
478         if (!IS_ERR(o)) {
479                 lut->lut_last_rcvd = o;
480         } else {
481                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
482                 lut->lut_client_bitmap = NULL;
483                 rc = PTR_ERR(o);
484                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
485         }
486
487         RETURN(rc);
488 }
489 EXPORT_SYMBOL(lut_init);
490
491 void lut_fini(const struct lu_env *env, struct lu_target *lut)
492 {
493         ENTRY;
494
495         if (lut->lut_client_bitmap) {
496                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
497                 lut->lut_client_bitmap = NULL;
498         }
499         if (lut->lut_last_rcvd) {
500                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
501                 lut->lut_last_rcvd = NULL;
502         }
503         EXIT;
504 }
505 EXPORT_SYMBOL(lut_fini);