Whamcloud - gitweb
attr_get is changed. It takes now md_attr() and does 'lov' getting inside
[fs/lustre-release.git] / lustre / mdt / mdt_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_open.c
5  *  Lustre Metadata Target (mdt) open/close file handling
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Huang Hua <huanghua@clusterfs.com>
9  *   Author; Pershin Mike <tappro@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "mdt_internal.h"
35
36 /* Add client data to the MDS.  We use a bitmap to locate a free space
37  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
38  * Otherwise, we just have to read the data from the last_rcvd file and
39  * we know its offset.
40  *
41  * It should not be possible to fail adding an existing client - otherwise
42  * mdt_init_server_data() callsite needs to be fixed.
43  */
44 int mdt_client_add(const struct lu_context *ctxt,
45                    struct mdt_device *mdt,
46                    struct mdt_export_data *med,
47                    int cl_idx)
48 {
49         unsigned long          *bitmap = mdt->mdt_client_bitmap;
50         struct mdt_client_data *mcd = med->med_mcd;
51         struct mdt_server_data *msd = &mdt->mdt_msd;
52         int                     new_client = (cl_idx == -1);
53         int                     rc = 0;
54         ENTRY;
55
56         LASSERT(bitmap != NULL);
57         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
58
59         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
60          * there's no need for extra complication here
61          */
62         if (new_client) {
63                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
64         repeat:
65                 if (cl_idx >= LR_MAX_CLIENTS ||
66                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
67                         CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
68                         RETURN(-EOVERFLOW);
69                 }
70                 if (test_and_set_bit(cl_idx, bitmap)) {
71                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
72                                                     cl_idx);
73                         goto repeat;
74                 }
75         } else {
76                 if (test_and_set_bit(cl_idx, bitmap)) {
77                         CERROR("MDS client %d: bit already set in bitmap!!\n",
78                                cl_idx);
79                         LBUG();
80                 }
81         }
82
83         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
84                cl_idx, med->med_mcd->mcd_uuid);
85
86         med->med_lr_idx = cl_idx;
87         med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
88                           (cl_idx * le16_to_cpu(msd->msd_client_size));
89         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
90
91         if (new_client) {
92                 struct dt_object *last =  mdt->mdt_last_rcvd;
93                 loff_t off = med->med_lr_off;
94                 rc = last->do_body_ops->dbo_write(ctxt, last, mcd,
95                                                   sizeof(*mcd), &off, NULL);
96                 CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)"
97                                " rc = %d\n",
98                                cl_idx, off, sizeof(mcd), rc);
99                 if (rc == sizeof(*mcd))
100                         rc = 0;
101                 else if (rc >= 0)
102                         rc = -EFAULT;
103         }
104         RETURN(rc);
105 }
106
107 int mdt_update_server_data(const struct lu_context *ctxt,
108                            struct mdt_device *mdt,
109                            int sync)
110 {
111         struct mdt_server_data *msd = &mdt->mdt_msd;
112         loff_t                  off = 0;
113         int                     rc = 0;
114         struct dt_object       *last =  mdt->mdt_last_rcvd;
115         ENTRY;
116
117         CDEBUG(D_INODE, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
118                         mdt->mdt_mount_count, mdt->mdt_last_transno);
119
120         msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
121         rc = last->do_body_ops->dbo_write(ctxt, last, msd,
122                                           sizeof(*msd), &off, NULL);
123         if (rc == sizeof(*msd))
124                 rc = 0;
125         else if (rc >= 0)
126                 rc = -EFAULT;
127         RETURN(rc);
128
129 }
130
131 int mdt_client_free(const struct lu_context *ctxt,
132                     struct mdt_device *mdt,
133                     struct mdt_export_data *med)
134 {
135         struct mdt_client_data *mcd = med->med_mcd;
136         struct dt_object       *last =  mdt->mdt_last_rcvd;
137         int                     rc = 0;
138         loff_t                  off;
139         ENTRY;
140
141         if (!mcd)
142                 RETURN(0);
143
144         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
145                med->med_lr_idx, med->med_lr_off);
146
147         off = med->med_lr_off;
148
149         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
150          * we leak a client slot that will be cleaned on the next recovery. */
151         if (off <= 0) {
152                 CERROR("client idx %d has offset %lld\n",
153                         med->med_lr_idx, off);
154                 GOTO(free, rc = -EINVAL);
155         }
156
157         /* Clear the bit _after_ zeroing out the client so we don't
158            race with mdt_client_add and zero out new clients.*/
159         if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
160                 CERROR("MDT client %u: bit already clear in bitmap!!\n",
161                        med->med_lr_idx);
162                 LBUG();
163         }
164
165         memset(mcd, 0, sizeof *mcd);
166         rc = last->do_body_ops->dbo_write(ctxt, last, mcd,
167                                           sizeof(*mcd), &off, NULL);
168         if (rc == sizeof(*mcd))
169                 rc = 0;
170         else if (rc >= 0)
171                 rc = -EFAULT;
172
173         CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR,
174                   "zeroing out client idx %u in %s rc %d\n",
175                   med->med_lr_idx, LAST_RCVD, rc);
176
177         if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
178                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
179                        med->med_lr_idx);
180                 LBUG();
181         }
182
183         /* Make sure the server's last_transno is up to date. Do this
184          * after the client is freed so we know all the client's
185          * transactions have been committed. */
186         mdt_update_server_data(ctxt, mdt, 0);
187
188         EXIT;
189 free:
190         OBD_FREE_PTR(mcd);
191         med->med_mcd = NULL;
192         return 0;
193 }
194
195 static int mdt_init_server_data(const struct lu_context *ctxt,
196                                 struct mdt_device *mdt)
197 {
198         struct mdt_server_data *msd = &mdt->mdt_msd;
199         struct mdt_client_data *mcd = NULL;
200         struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
201         loff_t                  off = 0;
202         unsigned long           last_rcvd_size = 0; // = getsize(mdt->mdt_last)
203         __u64                   mount_count;
204         int                     cl_idx;
205         int                     rc;
206         struct mdt_thread_info *info;
207         struct dt_object       *last = mdt->mdt_last_rcvd;
208         struct lu_attr         *la;
209         ENTRY;
210
211         /* ensure padding in the struct is the correct size */
212         LASSERT(offsetof(struct mdt_server_data, msd_padding) +
213                 sizeof(msd->msd_padding) == LR_SERVER_SIZE);
214         LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
215                 sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
216
217         info = lu_context_key_get(ctxt, &mdt_thread_key);
218         LASSERT(info != NULL);
219         la = &info->mti_attr.ma_attr;
220
221         rc = last->do_ops->do_attr_get(ctxt, last, la);
222         if (rc)
223                 RETURN(rc);
224         last_rcvd_size = la->la_size;
225
226         if (last_rcvd_size == 0) {
227                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
228
229                 memcpy(msd->msd_uuid, obd->obd_uuid.uuid,sizeof(msd->msd_uuid));
230                 msd->msd_last_transno = 0;
231                 mount_count = msd->msd_mount_count = 0;
232                 msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
233                 msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
234                 msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
235                 msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
236                 msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
237                                                         OBD_INCOMPAT_COMMON_LR);
238         } else {
239                 rc = last->do_body_ops->dbo_read(ctxt, last, msd,
240                                                  sizeof(*msd), &off);
241                 if (rc == sizeof(*msd))
242                         rc = 0;
243                 else if (rc >= 0)
244                         rc = -EFAULT;
245
246                 if (rc) {
247                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
248                         GOTO(out, rc);
249                 }
250                 if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
251                         LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
252                                        " disk %s. Were the /dev/ assignments "
253                                        "rearranged?\n",
254                                        obd->obd_uuid.uuid, msd->msd_uuid);
255                         GOTO(out, rc = -EINVAL);
256                 }
257                 mount_count = le64_to_cpu(msd->msd_mount_count);
258         }
259
260         if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
261                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
262                        obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
263                        ~MDT_INCOMPAT_SUPP);
264                 GOTO(out, rc = -EINVAL);
265         }
266         if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
267                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
268                        obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
269                        ~MDT_ROCOMPAT_SUPP);
270                 /* Do something like remount filesystem read-only */
271                 GOTO(out, rc = -EINVAL);
272         }
273         if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
274                 CDEBUG(D_WARNING, "using old last_rcvd format\n");
275                 msd->msd_mount_count = msd->msd_last_transno;
276                 msd->msd_last_transno = msd->msd_unused;
277                 /* If we update the last_rcvd, we can never go back to
278                    an old install, so leave this in the old format for now.
279                 msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
280                 */
281         }
282         msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
283
284         mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
285
286         CDEBUG(D_INODE, "========BEGIN DUMPING LAST_RCVD========\n");
287         CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
288                obd->obd_name, mdt->mdt_last_transno);
289         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
290                obd->obd_name, mount_count + 1);
291         CDEBUG(D_INODE, "%s: server data size: %u\n",
292                obd->obd_name, le32_to_cpu(msd->msd_server_size));
293         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
294                obd->obd_name, le32_to_cpu(msd->msd_client_start));
295         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
296                obd->obd_name, le32_to_cpu(msd->msd_client_size));
297         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
298                obd->obd_name, last_rcvd_size);
299         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
300                last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
301                (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
302                 le16_to_cpu(msd->msd_client_size));
303         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
304
305         if (!msd->msd_server_size || !msd->msd_client_start ||
306             !msd->msd_client_size) {
307                 CERROR("Bad last_rcvd contents!\n");
308                 GOTO(out, rc = -EINVAL);
309         }
310
311         /* When we do a clean MDS shutdown, we save the last_transno into
312          * the header.  If we find clients with higher last_transno values
313          * then those clients may need recovery done. */
314         for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
315              off < last_rcvd_size; cl_idx++) {
316                 __u64 last_transno;
317                 struct obd_export *exp;
318                 struct mdt_export_data *med;
319
320                 if (!mcd) {
321                         OBD_ALLOC_PTR(mcd);
322                         if (!mcd)
323                                 GOTO(err_client, rc = -ENOMEM);
324                 }
325
326                 off = le32_to_cpu(msd->msd_client_start) +
327                         cl_idx * le16_to_cpu(msd->msd_client_size);
328                 rc = last->do_body_ops->dbo_read(ctxt, last, mcd,
329                                                  sizeof(*mcd), &off);
330                 if (rc == sizeof(*mcd))
331                         rc = 0;
332                 else if (rc >= 0)
333                         rc = -EFAULT;
334
335                 if (rc) {
336                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
337                                LAST_RCVD, cl_idx, off, rc);
338                         break; /* read error shouldn't cause startup to fail */
339                 }
340
341                 if (mcd->mcd_uuid[0] == '\0') {
342                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
343                                cl_idx);
344                         continue;
345                 }
346
347                 last_transno = le64_to_cpu(mcd->mcd_last_transno);
348
349                 /* These exports are cleaned up by mdt_obd_disconnect(), so
350                  * they need to be set up like real exports as
351                  * mdt_obd_connect() does.
352                  */
353                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
354                        " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
355                        last_transno, le64_to_cpu(msd->msd_last_transno),
356                        le64_to_cpu(mcd->mcd_last_xid));
357
358                 exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
359                 if (IS_ERR(exp))
360                         GOTO(err_client, rc = PTR_ERR(exp));
361
362                 med = &exp->exp_mdt_data;
363                 med->med_mcd = mcd;
364                 rc = mdt_client_add(ctxt, mdt, med, cl_idx);
365                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
366
367                 mcd = NULL;
368                 exp->exp_replay_needed = 1;
369                 exp->exp_connecting = 0;
370                 obd->obd_recoverable_clients++;
371                 obd->obd_max_recoverable_clients++;
372                 class_export_put(exp);
373
374                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
375                        cl_idx, last_transno);
376
377                 if (last_transno > mdt->mdt_last_transno)
378                         mdt->mdt_last_transno = last_transno;
379         }
380
381         if (mcd)
382                 OBD_FREE_PTR(mcd);
383
384         obd->obd_last_committed = mdt->mdt_last_transno;
385
386         if (obd->obd_recoverable_clients) {
387                 CWARN("RECOVERY: service %s, %d recoverable clients, "
388                       "last_transno "LPU64"\n", obd->obd_name,
389                       obd->obd_recoverable_clients, mdt->mdt_last_transno);
390                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
391                 obd->obd_recovering = 1;
392                 obd->obd_recovery_start = CURRENT_SECONDS;
393                 /* Only used for lprocfs_status */
394                 obd->obd_recovery_end = obd->obd_recovery_start +
395                         OBD_RECOVERY_TIMEOUT;
396         }
397
398         mdt->mdt_mount_count = mount_count + 1;
399         msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
400
401         /* save it, so mount count and last_transno is current */
402         rc = mdt_update_server_data(ctxt, mdt, 1);
403         if (rc)
404                 GOTO(err_client, rc);
405
406         RETURN(0);
407
408 err_client:
409         class_disconnect_exports(obd);
410 out:
411         return rc;
412 }
413
414 /*
415  * last_rcvd update callbacks
416  */
417 extern struct lu_context_key mdt_txn_key;
418 extern struct lu_context_key mdt_thread_key;
419
420 enum {
421         MDT_TXN_LAST_RCVD_CREDITS = 1
422 };
423
424 /* add credits for last_rcvd update */
425 static int mdt_txn_start_cb(const struct lu_context *ctx,
426                             struct dt_device *dev,
427                             struct txn_param *param, void *cookie)
428 {
429         param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
430         return 0;
431 }
432
433 /* Update last_rcvd records with latests transaction data */
434 static int mdt_txn_stop_cb(const struct lu_context *ctx,
435                            struct dt_device *dev,
436                            struct thandle *txn, void *cookie)
437 {
438         struct mdt_device *mdt = cookie;
439         struct mdt_txn_info *txni;
440         struct mdt_thread_info *mti;
441
442         /* transno in two contexts - for commit_cb and for thread */
443         txni = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
444         mti = lu_context_key_get(ctx, &mdt_thread_key);
445         /*TODO: checks for recovery cases, see mds_finish_transno */
446         spin_lock(&mdt->mdt_transno_lock);
447         if (mti->mti_transno == 0) {
448                 mti->mti_transno = ++ mdt->mdt_last_transno;
449         } else {
450                 /* replay */
451                 CDEBUG(D_HA, "replaying transno: "LPD64" stopped\n", 
452                               mti->mti_transno);
453                 if (mti->mti_transno > mdt->mdt_last_transno)
454                         mdt->mdt_last_transno = mti->mti_transno;
455         }
456         spin_unlock(&mdt->mdt_transno_lock);
457         /* save transno for the commit callback */
458         txni->txi_transno = mti->mti_transno;
459         CDEBUG(D_INFO, "transno "LPD64" stopped\n", txni->txi_transno);
460 /*
461         TODO: write last_rcvd
462 */
463         return 0;
464 }
465
466 /* commit callback, need to update last_commited value */
467 static int mdt_txn_commit_cb(const struct lu_context *ctx,
468                              struct dt_device *dev,
469                              struct thandle *txn, void *cookie)
470 {
471         struct mdt_device *mdt = cookie;
472         struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
473         struct mdt_txn_info *txi;
474
475         txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
476         if (txi->txi_transno > mdt->mdt_last_committed) {
477                 mdt->mdt_last_committed = txi->txi_transno;
478                 ptlrpc_commit_replies(obd);
479         }
480         CDEBUG(D_INFO, "%s: transno "LPD64" committed\n",
481                obd->obd_name, txi->txi_transno);
482
483         return 0;
484 }
485
486 int mdt_fs_setup(const struct lu_context *ctxt,
487                  struct mdt_device *mdt)
488 {
489         struct lu_fid last_fid;
490         struct dt_object *last;
491         int rc = 0;
492         ENTRY;
493
494         /* prepare transactions callbacks */
495         mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
496         mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
497         mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
498         mdt->mdt_txn_cb.dtc_cookie = mdt;
499
500         dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
501
502         last = dt_store_open(ctxt, mdt->mdt_bottom, LAST_RCVD, &last_fid);
503         if(!IS_ERR(last)) {
504                 mdt->mdt_last_rcvd = last;
505                 rc = mdt_init_server_data(ctxt, mdt);
506                 if (rc) {
507                         lu_object_put(ctxt, &last->do_lu);
508                         mdt->mdt_last_rcvd = NULL;
509                 }
510         } else {
511                 rc = PTR_ERR(last);
512                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
513         }
514
515         RETURN (rc);
516 }
517
518
519 void mdt_fs_cleanup(const struct lu_context *ctxt,
520                    struct mdt_device *mdt)
521 {
522         struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
523
524         /* remove transaction callback */
525         dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
526
527         class_disconnect_exports(obd); /* cleans up client info too */
528
529         if (mdt->mdt_last_rcvd)
530                 lu_object_put(ctxt, &mdt->mdt_last_rcvd->do_lu);
531         mdt->mdt_last_rcvd = NULL;
532 }
533