Whamcloud - gitweb
Revert "b=21485 Keep in-memory client data until export destroy"
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44 /**
45  * Update client data in last_rcvd file. An obd API
46  */
47 static int obt_client_data_update(struct obd_export *exp)
48 {
49         struct lu_export_data *led = &exp->exp_target_data;
50         struct obd_device_target *obt = &exp->exp_obd->u.obt;
51         loff_t off = led->led_lr_off;
52         int rc = 0;
53
54         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
55                                  led->led_lcd, sizeof(*led->led_lcd), &off, 0);
56
57         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
58                led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
59                le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
60
61         return rc;
62 }
63
64 /**
65  * Update server data in last_rcvd file. An obd API
66  */
67 int obt_server_data_update(struct obd_device *obd, int force_sync)
68 {
69         struct obd_device_target *obt = &obd->u.obt;
70         loff_t off = 0;
71         int rc;
72         ENTRY;
73
74         CDEBUG(D_SUPER,
75                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
76                obt->obt_lsd->lsd_uuid,
77                le64_to_cpu(obt->obt_lsd->lsd_mount_count),
78                le64_to_cpu(obt->obt_lsd->lsd_last_transno));
79
80         rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
81                                  sizeof(*obt->obt_lsd), &off, force_sync);
82         if (rc)
83                 CERROR("error writing lr_server_data: rc = %d\n", rc);
84
85         RETURN(rc);
86 }
87
88 /**
89  * Update client epoch with server's one
90  */
91 void obt_client_epoch_update(struct obd_export *exp)
92 {
93         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
94         struct obd_device_target *obt = &exp->exp_obd->u.obt;
95
96         /** VBR: set client last_epoch to current epoch */
97         if (le32_to_cpu(lcd->lcd_last_epoch) >=
98             le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
99                 return;
100         lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
101         obt_client_data_update(exp);
102 }
103
104 /**
105  * Increment server epoch. An obd API
106  */
107 static void obt_boot_epoch_update(struct obd_device *obd)
108 {
109         __u32 start_epoch;
110         struct obd_device_target *obt = &obd->u.obt;
111         struct ptlrpc_request *req;
112         cfs_list_t client_list;
113
114         cfs_spin_lock(&obt->obt_translock);
115         start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
116         obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
117                                             LR_EPOCH_BITS);
118         obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
119         cfs_spin_unlock(&obt->obt_translock);
120
121         CFS_INIT_LIST_HEAD(&client_list);
122         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
123         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
124         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
125
126         /**
127          * go through list of exports participated in recovery and
128          * set new epoch for them
129          */
130         cfs_list_for_each_entry(req, &client_list, rq_list) {
131                 LASSERT(!req->rq_export->exp_delayed);
132                 obt_client_epoch_update(req->rq_export);
133         }
134         /** return list back at once */
135         cfs_spin_lock_bh(&obd->obd_processing_task_lock);
136         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
137         cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
138         obt_server_data_update(obd, 1);
139 }
140
141 /**
142  * write data in last_rcvd file.
143  */
144 static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
145                                const struct lu_buf *buf, loff_t *off, int sync)
146 {
147         struct thandle *th;
148         struct txn_param p;
149         int rc, credits;
150         ENTRY;
151
152         credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
153                                                          DTO_WRITE_BLOCK);
154         txn_param_init(&p, credits);
155
156         th = dt_trans_start(env, lut->lut_bottom, &p);
157         if (IS_ERR(th))
158                 RETURN(PTR_ERR(th));
159
160         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
161         dt_trans_stop(env, lut->lut_bottom, th);
162
163         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
164                "uuid = %s\nlast_transno = "LPU64"\n",
165                rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
166
167         RETURN(rc);
168 }
169
170 /**
171  * Update client data in last_rcvd
172  */
173 int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
174                             struct obd_export *exp)
175 {
176         struct lu_export_data *led = &exp->exp_target_data;
177         struct lsd_client_data tmp_lcd;
178         loff_t tmp_off = led->led_lr_off;
179         struct lu_buf tmp_buf = {
180                                         .lb_buf = &tmp_lcd,
181                                         .lb_len = sizeof(tmp_lcd)
182                                 };
183         int rc = 0;
184
185         lcd_cpu_to_le(led->led_lcd, &tmp_lcd);
186         LASSERT(lut->lut_last_rcvd);
187         rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
188
189         return rc;
190 }
191
192 /**
193  * Update server data in last_rcvd
194  */
195 static int lut_server_data_update(const struct lu_env *env,
196                                   struct lu_target *lut, int sync)
197 {
198         struct lr_server_data tmp_lsd;
199         loff_t tmp_off = 0;
200         struct lu_buf tmp_buf = {
201                                         .lb_buf = &tmp_lsd,
202                                         .lb_len = sizeof(tmp_lsd)
203                                 };
204         int rc = 0;
205         ENTRY;
206
207         CDEBUG(D_SUPER,
208                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
209                lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
210                lut->lut_last_transno);
211
212         cfs_spin_lock(&lut->lut_translock);
213         lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
214         cfs_spin_unlock(&lut->lut_translock);
215
216         lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
217         if (lut->lut_last_rcvd != NULL)
218                 rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
219         RETURN(rc);
220 }
221
222 void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
223                              struct obd_export *exp)
224 {
225         struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
226
227         LASSERT(lut->lut_bottom);
228         /** VBR: set client last_epoch to current epoch */
229         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
230                 return;
231         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
232         lut_client_data_update(env, lut, exp);
233 }
234
235 /**
236  * Update boot epoch when recovery ends
237  */
238 void lut_boot_epoch_update(struct lu_target *lut)
239 {
240         struct lu_env env;
241         struct ptlrpc_request *req;
242         __u32 start_epoch;
243         cfs_list_t client_list;
244         int rc;
245
246         if (lut->lut_obd->obd_stopping)
247                 return;
248         /** Increase server epoch after recovery */
249         if (lut->lut_bottom == NULL)
250                 return obt_boot_epoch_update(lut->lut_obd);
251
252         rc = lu_env_init(&env, LCT_DT_THREAD);
253         if (rc) {
254                 CERROR("Can't initialize environment rc=%i\n", rc);
255                 return;
256         }
257
258         cfs_spin_lock(&lut->lut_translock);
259         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
260         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
261         lut->lut_lsd.lsd_start_epoch = start_epoch;
262         cfs_spin_unlock(&lut->lut_translock);
263
264         CFS_INIT_LIST_HEAD(&client_list);
265         /**
266          * The recovery is not yet finished and final queue can still be updated
267          * with resend requests. Move final list to separate one for processing
268          */
269         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
270         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
271         cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
272
273         /**
274          * go through list of exports participated in recovery and
275          * set new epoch for them
276          */
277         cfs_list_for_each_entry(req, &client_list, rq_list) {
278                 LASSERT(!req->rq_export->exp_delayed);
279                 if (!req->rq_export->exp_vbr_failed)
280                         lut_client_epoch_update(&env, lut, req->rq_export);
281         }
282         /** return list back at once */
283         cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
284         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
285         cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
286         /** update server epoch */
287         lut_server_data_update(&env, lut, 1);
288         lu_env_fini(&env);
289 }
290 EXPORT_SYMBOL(lut_boot_epoch_update);
291
292 /**
293  * commit callback, need to update last_commited value
294  */
295 void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
296                            void *data, int err)
297 {
298         struct obd_export *exp = data;
299         LASSERT(exp->exp_obd == lut->lut_obd);
300         cfs_spin_lock(&lut->lut_translock);
301         if (transno > lut->lut_obd->obd_last_committed)
302                 lut->lut_obd->obd_last_committed = transno;
303
304         LASSERT(exp);
305         if (transno > exp->exp_last_committed) {
306                 exp->exp_last_committed = transno;
307                 cfs_spin_unlock(&lut->lut_translock);
308                 ptlrpc_commit_replies(exp);
309         } else {
310                 cfs_spin_unlock(&lut->lut_translock);
311         }
312         class_export_cb_put(exp);
313         if (transno)
314                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
315                        lut->lut_obd->obd_name, transno);
316 }
317 EXPORT_SYMBOL(lut_cb_last_committed);
318
319 void lut_cb_client(struct lu_target *lut, __u64 transno,
320                        void *data, int err)
321 {
322         LASSERT(lut->lut_obd);
323         target_client_add_cb(lut->lut_obd, transno, data, err);
324 }
325 EXPORT_SYMBOL(lut_cb_client);
326
327 int lut_init(const struct lu_env *env, struct lu_target *lut,
328              struct obd_device *obd, struct dt_device *dt)
329 {
330         struct lu_fid fid;
331         struct dt_object *o;
332         int rc = 0;
333         ENTRY;
334
335         lut->lut_obd = obd;
336         lut->lut_bottom = dt;
337         lut->lut_last_rcvd = NULL;
338
339         cfs_spin_lock_init(&lut->lut_translock);
340         cfs_spin_lock_init(&lut->lut_client_bitmap_lock);
341         cfs_spin_lock_init(&lut->lut_trans_table_lock);
342
343         /** obdfilter has no lu_device stack yet */
344         if (dt == NULL)
345                 RETURN(rc);
346         o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
347         if (!IS_ERR(o)) {
348                 lut->lut_last_rcvd = o;
349         } else {
350                 rc = PTR_ERR(o);
351                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
352         }
353
354         RETURN(rc);
355 }
356 EXPORT_SYMBOL(lut_init);
357
358 void lut_fini(const struct lu_env *env, struct lu_target *lut)
359 {
360         ENTRY;
361         if (lut->lut_last_rcvd)
362                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
363         lut->lut_last_rcvd = NULL;
364         EXIT;
365 }
366 EXPORT_SYMBOL(lut_fini);