Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / mds / mds_fs.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_fs.c
37  *
38  * Lustre Metadata Server (MDS) filesystem interface code
39  *
40  * Author: Andreas Dilger <adilger@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_MDS
47
48 #include <linux/module.h>
49 #include <linux/kmod.h>
50 #include <linux/version.h>
51 #include <linux/sched.h>
52 #include <lustre_quota.h>
53 #include <linux/mount.h>
54 #include <lustre_mds.h>
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_lib.h>
58 #include <lustre_fsfilt.h>
59 #include <lustre_disk.h>
60 #include <libcfs/list.h>
61
62 #include "mds_internal.h"
63
64
65 int mds_export_stats_init(struct obd_device *obd,
66                                  struct obd_export *exp,
67                                  void *localdata)
68 {
69         lnet_nid_t *client_nid = localdata;
70         int rc, num_stats, newnid = 0;
71
72         rc = lprocfs_exp_setup(exp, client_nid, &newnid);
73         if (rc) {
74                 /* Mask error for already created
75                  * /proc entries */
76                 if (rc == -EALREADY)
77                         rc = 0;
78                 return rc;
79         }
80
81         if (newnid) {
82                 struct nid_stat *tmp = exp->exp_nid_stats;
83                 LASSERT(tmp != NULL);
84
85                 num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) +
86                              LPROC_MDS_LAST - 1;
87                 tmp->nid_stats = lprocfs_alloc_stats(num_stats,
88                                                      LPROCFS_STATS_FLAG_NOPERCPU);
89                 if (tmp->nid_stats == NULL)
90                         return -ENOMEM;
91
92                 lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats);
93                 rc = lprocfs_register_stats(tmp->nid_proc, "stats",
94                                             tmp->nid_stats);
95                 if (rc)
96                         return rc;
97
98                 mds_stats_counter_init(tmp->nid_stats);
99
100                 /* Always add in ldlm_stats */
101                 tmp->nid_ldlm_stats = lprocfs_alloc_stats(LDLM_LAST_OPC -
102                                                           LDLM_FIRST_OPC,
103                                                           0);
104                 if (tmp->nid_ldlm_stats == NULL)
105                         return -ENOMEM;
106
107                 lprocfs_init_ldlm_stats(tmp->nid_ldlm_stats);
108
109                 rc = lprocfs_register_stats(tmp->nid_proc, "ldlm_stats",
110                                             tmp->nid_ldlm_stats);
111                 if (rc)
112                         return rc;
113         }
114
115         return 0;
116 }
117
118 /* VBR: to determine the delayed client the lcd should be updated for each new
119  * epoch */
120 int mds_update_client_epoch(struct obd_export *exp)
121 {
122         struct mds_export_data *med = &exp->exp_mds_data;
123         struct mds_obd *mds = &exp->exp_obd->u.mds;
124         struct lvfs_run_ctxt saved;
125         loff_t off = med->med_lr_off;
126         int rc = 0;
127
128         /* VBR: set client last_epoch to current epoch */
129         if (le32_to_cpu(med->med_lcd->lcd_last_epoch) >=
130                         le32_to_cpu(mds->mds_server_data->lsd_start_epoch))
131                 return rc;
132
133         med->med_lcd->lcd_last_epoch = mds->mds_server_data->lsd_start_epoch;
134         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
135         rc = fsfilt_write_record(exp->exp_obd, mds->mds_rcvd_filp,
136                                  med->med_lcd, sizeof(*med->med_lcd), &off,
137                                  exp->exp_delayed);
138         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
139
140         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
141                med->med_lr_idx, le32_to_cpu(med->med_lcd->lcd_last_epoch),
142                le32_to_cpu(mds->mds_server_data->lsd_start_epoch));
143
144         return rc;
145 }
146
147 /* Called after recovery is done on server */
148 void mds_update_last_epoch(struct obd_device *obd)
149 {
150         struct ptlrpc_request *req;
151         struct mds_obd *mds = &obd->u.mds;
152         __u32 start_epoch;
153
154         /* Increase server epoch after recovery */
155         spin_lock(&mds->mds_transno_lock);
156         start_epoch = lr_epoch(mds->mds_last_transno) + 1;
157         mds->mds_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
158         mds->mds_server_data->lsd_start_epoch = cpu_to_le32(start_epoch);
159         spin_unlock(&mds->mds_transno_lock);
160
161         /* go through delayed reply queue to find all exports participate in
162          * recovery and set new epoch for them */
163         list_for_each_entry(req, &obd->obd_delayed_reply_queue, rq_list) {
164                 LASSERT(!req->rq_export->exp_delayed);
165                 mds_update_client_epoch(req->rq_export);
166         }
167         mds_update_server_data(obd, 1);
168 }
169
170 /* Add client data to the MDS.  We use a bitmap to locate a free space
171  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
172  * Otherwise, we have just read the data from the last_rcvd file and
173  * we know its offset.
174  *
175  * It should not be possible to fail adding an existing client - otherwise
176  * mds_init_server_data() callsite needs to be fixed.
177  */
178 int mds_client_add(struct obd_device *obd, struct obd_export *exp,
179                    int cl_idx, void *localdata)
180 {
181         struct mds_obd *mds = &obd->u.mds;
182         struct mds_export_data *med = &exp->exp_mds_data;
183         unsigned long *bitmap = mds->mds_client_bitmap;
184         int new_client = (cl_idx == -1);
185         int rc = 0;
186         ENTRY;
187
188         LASSERT(bitmap != NULL);
189         LASSERTF(cl_idx > -2, "%d\n", cl_idx);
190
191         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
192         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
193                 RETURN(0);
194
195         /* VBR: remove expired exports before searching for free slot */
196         if (new_client)
197                 class_disconnect_expired_exports(obd);
198
199         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
200          * there's no need for extra complication here
201          */
202         if (new_client) {
203                 cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
204         repeat:
205                 if (cl_idx >= LR_MAX_CLIENTS ||
206                     OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
207                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n",
208                                cl_idx);
209                         return -EOVERFLOW;
210                 }
211                 if (test_and_set_bit(cl_idx, bitmap)) {
212                         cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
213                                                     cl_idx);
214                         goto repeat;
215                 }
216         } else {
217                 if (test_and_set_bit(cl_idx, bitmap)) {
218                         CERROR("MDS client %d: bit already set in bitmap!!\n",
219                                cl_idx);
220                         LBUG();
221                 }
222         }
223
224         CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
225                cl_idx, med->med_lcd->lcd_uuid);
226
227         med->med_lr_idx = cl_idx;
228         med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) +
229                 (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size));
230         LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
231         mds_export_stats_init(obd, exp, localdata);
232
233         if (new_client) {
234                 struct lvfs_run_ctxt *saved = NULL;
235                 loff_t off = med->med_lr_off;
236                 struct file *file = mds->mds_rcvd_filp;
237                 void *handle;
238
239                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
240                 if (saved == NULL) {
241                         CERROR("cannot allocate memory for run ctxt\n");
242                         RETURN(-ENOMEM);
243                 }
244
245                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
246                 handle = fsfilt_start(obd, file->f_dentry->d_inode,
247                                       FSFILT_OP_SETATTR, NULL);
248                 if (IS_ERR(handle)) {
249                         rc = PTR_ERR(handle);
250                         CERROR("unable to start transaction: rc %d\n", rc);
251                 } else {
252                         /* VBR: set client last_transno as mds_last_transno to
253                          * remember last epoch for this client */
254                         med->med_lcd->lcd_last_epoch =
255                                         mds->mds_server_data->lsd_start_epoch;
256                         exp->exp_last_request_time = cfs_time_current_sec();
257                         /* remember first epoch of client for orphan handling */
258                         med->med_lcd->lcd_first_epoch =
259                                   cpu_to_le32(lr_epoch(mds->mds_last_transno));
260                         rc = fsfilt_add_journal_cb(obd, 0, handle,
261                                                    target_client_add_cb, exp);
262                         if (rc == 0) {
263                                 spin_lock(&exp->exp_lock);
264                                 exp->exp_need_sync = 1;
265                                 spin_unlock(&exp->exp_lock);
266                         }
267                         rc = fsfilt_write_record(obd, file, med->med_lcd,
268                                                  sizeof(*med->med_lcd),
269                                                  &off, rc /* sync if no cb */);
270                         fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0);
271                 }
272
273                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
274                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
275
276                 if (rc)
277                         return rc;
278                 CDEBUG(D_INFO, "wrote client lcd at idx %u off %llu (len %u)\n",
279                        med->med_lr_idx, med->med_lr_off,
280                        (unsigned int)sizeof(*med->med_lcd));
281         }
282         return rc;
283 }
284
285 struct lsd_client_data zero_lcd; /* globals are implicitly zeroed */
286  
287 int mds_client_free(struct obd_export *exp)
288 {
289         struct mds_export_data *med = &exp->exp_mds_data;
290         struct mds_obd *mds = &exp->exp_obd->u.mds;
291         struct obd_device *obd = exp->exp_obd;
292         struct lvfs_run_ctxt *saved = NULL;
293         int rc;
294         loff_t off;
295         ENTRY;
296
297         if (!med->med_lcd)
298                 RETURN(0);
299
300         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
301         if (!strcmp(med->med_lcd->lcd_uuid, obd->obd_uuid.uuid))
302                 GOTO(free, 0);
303
304         CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n",
305                med->med_lr_idx, med->med_lr_off, med->med_lcd->lcd_uuid);
306
307         LASSERT(mds->mds_client_bitmap != NULL);
308
309
310         off = med->med_lr_off;
311
312         /* Don't clear med_lr_idx here as it is likely also unset.  At worst
313          * we leak a client slot that will be cleaned on the next recovery. */
314         if (off <= 0) {
315                 CERROR("%s: client idx %d has offset %lld\n",
316                         obd->obd_name, med->med_lr_idx, off);
317                 GOTO(free, rc = -EINVAL);
318         }
319
320         /* Clear the bit _after_ zeroing out the client so we don't
321            race with mds_client_add and zero out new clients.*/
322         if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
323                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
324                        med->med_lr_idx);
325                 LBUG();
326         }
327
328         if (!(exp->exp_flags & OBD_OPT_FAILOVER)) {
329                 /* Don't force sync on each disconnect if aborting recovery,
330                  * or it does num_clients * num_osts syncs.  b=17194 */
331                 int need_sync = (!exp->exp_libclient || exp->exp_need_sync) &&
332                                  !(exp->exp_flags & OBD_OPT_ABORT_RECOV);
333                 OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
334                 if (saved == NULL) {
335                         CERROR("cannot allocate memory for run ctxt\n");
336                         GOTO(free, rc = -ENOMEM);
337                 }
338                 push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
339                 rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_lcd,
340                                          sizeof(zero_lcd), &off, 0);
341
342                 /* Make sure the server's last_transno is up to date. Do this
343                  * after the client is freed so we know all the client's
344                  * transactions have been committed. */
345                 if (rc == 0)
346                         mds_update_server_data(exp->exp_obd, need_sync);
347
348                 pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
349
350                 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
351                        "zero out client %s at idx %u/%llu in %s %ssync rc %d\n",
352                        med->med_lcd->lcd_uuid, med->med_lr_idx, med->med_lr_off,
353                        LAST_RCVD, need_sync ? "" : "a", rc);
354         }
355
356         if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) {
357                 CERROR("MDS client %u: bit already clear in bitmap!!\n",
358                        med->med_lr_idx);
359                 LBUG();
360         }
361
362         EXIT;
363 free:
364         if (saved)
365                 OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
366
367         OBD_FREE_PTR(med->med_lcd);
368         med->med_lcd = NULL;
369
370         return 0;
371 }
372
373 static int mds_server_free_data(struct mds_obd *mds)
374 {
375         OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
376         OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data));
377         mds->mds_server_data = NULL;
378
379         return 0;
380 }
381
382 static void mds_add_fake_export(struct obd_device *obd, int num,
383                                 struct file *file)
384 {
385         struct obd_export *exp;
386         struct lvfs_run_ctxt saved;
387         struct obd_device_target *obt = &obd->u.obt;
388         struct lu_export_data *led;
389         unsigned long *bitmap = obt->obt_client_bitmap;
390         struct lsd_client_data *lcd = NULL;
391         unsigned int idx = 0;
392         loff_t off = 0;
393         int rc = 0;
394
395         while (num > 0) {
396                 num--;
397                 if (!lcd) {
398                         OBD_ALLOC_PTR(lcd);
399                         if (!lcd)
400                                 return;
401                 }
402                 idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, idx);
403                 if (idx >= LR_MAX_CLIENTS) {
404                         CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", idx);
405                         OBD_FREE_PTR(lcd);
406                         break;
407                 }
408                 if (test_and_set_bit(idx, bitmap)) {
409                         CERROR("Bit %u is set already\n", idx);
410                         continue;
411                 }
412                 off = le32_to_cpu(obt->obt_lsd->lsd_client_start) +
413                       idx * le16_to_cpu(obt->obt_lsd->lsd_client_size);
414
415                 sprintf(lcd->lcd_uuid, "dead-%.16u", idx);
416                 CDEBUG(D_INFO, "Create fake export %s, index %u, offset %lu\n",
417                        lcd->lcd_uuid, idx, (unsigned long)off);
418
419                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
420                 if (IS_ERR(exp)) {
421                         if (PTR_ERR(exp) == -EALREADY) {
422                                 CERROR("Export %s already exists\n",
423                                        lcd->lcd_uuid);
424                         }
425                         CERROR("Failed to create export %lu\n", PTR_ERR(exp));
426                         OBD_FREE_PTR(lcd);
427                         break;
428                 }
429                 LASSERT(exp);
430                 led = &exp->exp_target_data;
431                 led->led_lr_idx = idx;
432                 led->led_lr_off = off;
433                 led->led_lcd = lcd;
434
435                 exp->exp_last_request_time = cfs_time_current_sec();
436                 exp->exp_replay_needed = 1;
437                 exp->exp_connecting = 0;
438                 exp->exp_in_recovery = 0;
439
440                 spin_lock_bh(&obd->obd_processing_task_lock);
441                 obd->obd_recoverable_clients++;
442                 obd->obd_max_recoverable_clients++;
443                 spin_unlock_bh(&obd->obd_processing_task_lock);
444
445                 class_set_export_delayed(exp);
446                 class_export_put(exp);
447
448                 lcd->lcd_last_epoch = cpu_to_le32(1);
449                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
450                 rc = fsfilt_write_record(obd, file, lcd, sizeof(*lcd), &off, 0);
451                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
452                 if (rc) {
453                         CERROR("Failed to create fake client record\n");
454                         OBD_FREE_PTR(lcd);
455                         break;
456                 }
457                 lcd = NULL;
458         }
459 }
460
461 static int mds_init_server_data(struct obd_device *obd, struct file *file)
462 {
463         struct mds_obd *mds = &obd->u.mds;
464         struct lr_server_data *lsd;
465         struct lsd_client_data *lcd = NULL;
466         loff_t off = 0;
467         unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode);
468         __u64 mount_count;
469         __u32 start_epoch;
470         int cl_idx, rc = 0;
471         ENTRY;
472
473         /* ensure padding in the struct is the correct size */
474         LASSERT(offsetof(struct lr_server_data, lsd_padding) +
475                 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
476         LASSERT(offsetof(struct lsd_client_data, lcd_padding) +
477                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
478
479         OBD_ALLOC_WAIT(lsd, sizeof(*lsd));
480         if (!lsd)
481                 RETURN(-ENOMEM);
482
483         OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8);
484         if (!mds->mds_client_bitmap) {
485                 OBD_FREE(lsd, sizeof(*lsd));
486                 RETURN(-ENOMEM);
487         }
488
489         mds->mds_server_data = lsd;
490
491         if (last_rcvd_size == 0) {
492                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
493
494                 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid));
495                 lsd->lsd_last_transno = 0;
496                 mount_count = lsd->lsd_mount_count = 0;
497                 lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE);
498                 lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START);
499                 lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
500                 lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
501                 lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT);
502         } else {
503                 rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off);
504                 if (rc) {
505                         CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
506                         GOTO(err_msd, rc);
507                 }
508                 if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
509                         LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
510                                            " the wrong disk %s. Were the /dev/ "
511                                            "assignments rearranged?\n",
512                                            obd->obd_uuid.uuid, lsd->lsd_uuid);
513                         GOTO(err_msd, rc = -EINVAL);
514                 }
515                 /* COMPAT_146 */
516                 /* Assume old last_rcvd format unless I_C_LR is set */
517                 if (!(lsd->lsd_feature_incompat &
518                       cpu_to_le32(OBD_INCOMPAT_COMMON_LR)))
519                         lsd->lsd_mount_count = lsd->lsd_compat14;
520                 /* end COMPAT_146 */
521                 mount_count = le64_to_cpu(lsd->lsd_mount_count);
522         }
523
524         if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
525                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
526                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) &
527                        ~MDT_INCOMPAT_SUPP);
528                 GOTO(err_msd, rc = -EINVAL);
529         }
530         if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
531                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
532                        obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) &
533                        ~MDT_ROCOMPAT_SUPP);
534                 /* Do something like remount filesystem read-only */
535                 GOTO(err_msd, rc = -EINVAL);
536         }
537
538         lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
539
540         target_trans_table_init(obd);
541         mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno);
542         start_epoch = le32_to_cpu(lsd->lsd_start_epoch);
543
544         CDEBUG(D_INODE, "%s: server start_epoch: %#x\n",
545                obd->obd_name, start_epoch);
546         CDEBUG(D_INODE, "%s: server last_transno: "LPX64"\n",
547                obd->obd_name, mds->mds_last_transno);
548         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
549                obd->obd_name, mount_count + 1);
550         CDEBUG(D_INODE, "%s: server data size: %u\n",
551                obd->obd_name, le32_to_cpu(lsd->lsd_server_size));
552         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
553                obd->obd_name, le32_to_cpu(lsd->lsd_client_start));
554         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
555                obd->obd_name, le32_to_cpu(lsd->lsd_client_size));
556         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
557                obd->obd_name, last_rcvd_size);
558         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
559                last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 :
560                (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) /
561                 le16_to_cpu(lsd->lsd_client_size));
562
563         if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
564             !lsd->lsd_client_size) {
565                 CERROR("Bad last_rcvd contents!\n");
566                 GOTO(err_msd, rc = -EINVAL);
567         }
568
569         /* When we do a clean MDS shutdown, we save the last_transno into
570          * the header.  If we find clients with higher last_transno values
571          * then those clients may need recovery done. */
572         for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start);
573              off < last_rcvd_size; cl_idx++) {
574                 __u64 last_transno;
575                 __u32 last_epoch;
576                 struct obd_export *exp;
577                 struct mds_export_data *med;
578
579                 if (!lcd) {
580                         OBD_ALLOC_WAIT(lcd, sizeof(*lcd));
581                         if (!lcd)
582                                 GOTO(err_client, rc = -ENOMEM);
583                 }
584
585                 /* Don't assume off is incremented properly by
586                  * fsfilt_read_record(), in case sizeof(*lcd)
587                  * isn't the same as lsd->lsd_client_size.  */
588                 off = le32_to_cpu(lsd->lsd_client_start) +
589                         cl_idx * le16_to_cpu(lsd->lsd_client_size);
590                 rc = fsfilt_read_record(obd, file, lcd, sizeof(*lcd), &off);
591                 if (rc) {
592                         CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
593                                LAST_RCVD, cl_idx, off, rc);
594                         break; /* read error shouldn't cause startup to fail */
595                 }
596
597                 if (lcd->lcd_uuid[0] == '\0') {
598                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
599                                cl_idx);
600                         continue;
601                 }
602
603                 last_transno = lsd_last_transno(lcd);
604                 last_epoch = le32_to_cpu(lcd->lcd_last_epoch);
605
606                 /* These exports are cleaned up by mds_disconnect(), so they
607                  * need to be set up like real exports as mds_connect() does.
608                  */
609                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
610                        " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
611                        last_transno, le64_to_cpu(lsd->lsd_last_transno),
612                        le64_to_cpu(lcd->lcd_last_xid));
613
614                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
615                 if (IS_ERR(exp)) {
616                         if (PTR_ERR(exp) == -EALREADY) {
617                                 /* export already exists, zero out this one */
618                                 lcd->lcd_uuid[0] = '\0';
619                         } else {
620                                 GOTO(err_client, rc = PTR_ERR(exp));
621                         }
622                 } else {
623                         med = &exp->exp_mds_data;
624                         med->med_lcd = lcd;
625                         rc = mds_client_add(obd, exp, cl_idx, NULL);
626                         /* can't fail for existing client */
627                         LASSERTF(rc == 0, "rc = %d\n", rc);
628
629                         /* VBR: set export last committed version */
630                         exp->exp_last_committed = last_transno;
631                         /* read last time from disk */
632                         exp->exp_last_request_time = target_trans_table_last_time(exp);
633                         lcd = NULL;
634
635                         spin_lock(&exp->exp_lock);
636                         exp->exp_replay_needed = 1;
637                         exp->exp_connecting = 0;
638                         exp->exp_in_recovery = 0;
639                         spin_unlock(&exp->exp_lock);
640
641                         spin_lock_bh(&obd->obd_processing_task_lock);
642                         obd->obd_recoverable_clients++;
643                         obd->obd_max_recoverable_clients++;
644                         spin_unlock_bh(&obd->obd_processing_task_lock);
645
646                         /* VBR: if epoch too old mark export as delayed,
647                          * if epoch is zero then client is pre-vbr one */
648                         if (start_epoch > last_epoch && last_epoch != 0)
649                                 class_set_export_delayed(exp);
650                         class_export_put(exp);
651                 }
652
653                 /* Need to check last_rcvd even for duplicated exports. */
654                 CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPX64","
655                        "last_epoch %#x\n", cl_idx, last_transno, last_epoch);
656
657                 if (last_transno > mds->mds_last_transno)
658                         mds->mds_last_transno = last_transno;
659         }
660
661         if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_TGT_FAKE_EXP))) {
662                 mds_add_fake_export(obd, obd_fail_val, file);
663         }
664
665         if (lcd)
666                 OBD_FREE_PTR(lcd);
667
668         obd->obd_last_committed = mds->mds_last_transno;
669
670         if (obd->obd_recoverable_clients) {
671                 CWARN("RECOVERY: service %s, %d recoverable clients, "
672                       "%d delayed clients, last_transno "LPU64"\n",
673                       obd->obd_name, obd->obd_recoverable_clients,
674                       obd->obd_delayed_clients, mds->mds_last_transno);
675                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
676                 obd->obd_recovering = 1;
677                 obd->obd_recovery_start = 0;
678                 obd->obd_recovery_end = 0;
679                 obd->obd_recovery_timeout = OBD_RECOVERY_FACTOR * obd_timeout;
680 #ifdef CRAY_XT3
681                 /* bz13079: this won't be changed for mds */
682                 obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
683 #endif
684         } else {
685                 LASSERT(!obd->obd_recovering);
686                 /* VBR: update boot epoch after recovery */
687                 mds_update_last_epoch(obd);
688         }
689         mds->mds_mount_count = mount_count + 1;
690         lsd->lsd_mount_count = lsd->lsd_compat14 =
691                 cpu_to_le64(mds->mds_mount_count);
692
693         /* save it, so mount count and last_transno is current */
694         rc = mds_update_server_data(obd, 1);
695         if (rc)
696                 GOTO(err_client, rc);
697
698         RETURN(0);
699
700 err_client:
701         class_disconnect_exports(obd);
702 err_msd:
703         mds_server_free_data(mds);
704         RETURN(rc);
705 }
706
707 int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt)
708 {
709         struct mds_obd *mds = &obd->u.mds;
710         struct lvfs_run_ctxt *saved = NULL;
711         struct dentry *dentry;
712         struct file *file;
713         int rc;
714         ENTRY;
715
716         OBD_FAIL_RETURN(OBD_FAIL_MDS_FS_SETUP, -ENOENT);
717
718         rc = cleanup_group_info();
719         if (rc)
720                 RETURN(rc);
721
722         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
723         if (saved == NULL) {
724                 CERROR("cannot allocate memory for run ctxt\n");
725                 RETURN(-ENOMEM);
726         }
727
728         mds->mds_vfsmnt = mnt;
729         /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */
730         obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb;
731         obd->u.obt.obt_stale_export_age = STALE_EXPORT_MAXTIME_DEFAULT;
732         spin_lock_init(&obd->u.obt.obt_trans_table_lock);
733
734         rc = fsfilt_setup(obd, obd->u.obt.obt_sb);
735         if (rc)
736                 RETURN(rc);
737
738         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
739         obd->obd_lvfs_ctxt.pwdmnt = mnt;
740         obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
741         obd->obd_lvfs_ctxt.fs = get_ds();
742         obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops;
743
744         /* setup the directory tree */
745         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
746         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "ROOT", 0755, 0);
747         if (IS_ERR(dentry)) {
748                 rc = PTR_ERR(dentry);
749                 CERROR("cannot create ROOT directory: rc = %d\n", rc);
750                 GOTO(err_pop, rc);
751         }
752
753         mds->mds_rootfid.id = dentry->d_inode->i_ino;
754         mds->mds_rootfid.generation = dentry->d_inode->i_generation;
755         mds->mds_rootfid.f_type = S_IFDIR;
756
757         dput(dentry);
758
759         dentry = lookup_one_len("__iopen__", cfs_fs_pwd(current->fs),
760                                 strlen("__iopen__"));
761         if (IS_ERR(dentry)) {
762                 rc = PTR_ERR(dentry);
763                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
764                 GOTO(err_pop, rc);
765         }
766
767         mds->mds_fid_de = dentry;
768         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
769                 rc = -ENOENT;
770                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
771                 GOTO(err_fid, rc);
772         }
773
774         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "PENDING", 0777, 1);
775         if (IS_ERR(dentry)) {
776                 rc = PTR_ERR(dentry);
777                 CERROR("cannot create PENDING directory: rc = %d\n", rc);
778                 GOTO(err_fid, rc);
779         }
780         mds->mds_pending_dir = dentry;
781
782         /* COMPAT_146 */
783         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, MDT_LOGS_DIR, 0777, 1);
784         if (IS_ERR(dentry)) {
785                 rc = PTR_ERR(dentry);
786                 CERROR("cannot create %s directory: rc = %d\n",
787                        MDT_LOGS_DIR, rc);
788                 GOTO(err_pending, rc);
789         }
790         mds->mds_logs_dir = dentry;
791         /* end COMPAT_146 */
792
793         dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1);
794         if (IS_ERR(dentry)) {
795                 rc = PTR_ERR(dentry);
796                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
797                 GOTO(err_logs, rc);
798         }
799         mds->mds_objects_dir = dentry;
800
801         /* open and test the last rcvd file */
802         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644);
803         if (IS_ERR(file)) {
804                 rc = PTR_ERR(file);
805                 CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc);
806                 GOTO(err_objects, rc = PTR_ERR(file));
807         }
808         mds->mds_rcvd_filp = file;
809         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
810                 CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD,
811                        file->f_dentry->d_inode->i_mode);
812                 GOTO(err_last_rcvd, rc = -ENOENT);
813         }
814
815         rc = mds_init_server_data(obd, file);
816         if (rc) {
817                 CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
818                 GOTO(err_last_rcvd, rc);
819         }
820
821         rc = mds_lov_init_objids(obd);
822         if (rc != 0) {
823                CERROR("cannot init lov objid rc = %d\n", rc);
824                GOTO(err_client, rc );
825         }
826
827         /* open and test the check io file junk */
828         file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644);
829         if (IS_ERR(file)) {
830                 rc = PTR_ERR(file);
831                 CERROR("cannot open/create %s file: rc = %d\n",HEALTH_CHECK,rc);
832                 GOTO(err_lov_objid, rc = PTR_ERR(file));
833         }
834         mds->mds_health_check_filp = file;
835         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
836                 CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK,
837                        file->f_dentry->d_inode->i_mode);
838                 GOTO(err_health_check, rc = -ENOENT);
839         }
840         rc = lvfs_check_io_health(obd, file);
841         if (rc)
842                 GOTO(err_health_check, rc);
843 err_pop:
844         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
845         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
846         return rc;
847
848 err_health_check:
849         if (mds->mds_health_check_filp &&
850             filp_close(mds->mds_health_check_filp, 0))
851                 CERROR("can't close %s after error\n", HEALTH_CHECK);
852 err_lov_objid:
853          mds_lov_destroy_objids(obd);
854 err_client:
855         class_disconnect_exports(obd);
856 err_last_rcvd:
857         if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0))
858                 CERROR("can't close %s after error\n", LAST_RCVD);
859 err_objects:
860         dput(mds->mds_objects_dir);
861 err_logs:
862         dput(mds->mds_logs_dir);
863 err_pending:
864         dput(mds->mds_pending_dir);
865 err_fid:
866         dput(mds->mds_fid_de);
867         goto err_pop;
868 }
869
870 int mds_fs_cleanup(struct obd_device *obd)
871 {
872         struct mds_obd *mds = &obd->u.mds;
873         struct lvfs_run_ctxt *saved = NULL;
874         int rc = 0;
875
876         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
877         if (saved == NULL) {
878                 CERROR("cannot allocate memory for run ctxt\n");
879                 RETURN(-ENOMEM);
880         }
881
882         if (obd->obd_fail)
883                 LCONSOLE_WARN("%s: shutting down for failover; client state "
884                               "will be preserved.\n", obd->obd_name);
885
886         class_disconnect_exports(obd); /* cleans up client info too */
887         mds_server_free_data(mds);
888
889         push_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
890         if (mds->mds_rcvd_filp) {
891                 rc = filp_close(mds->mds_rcvd_filp, 0);
892                 mds->mds_rcvd_filp = NULL;
893                 if (rc)
894                         CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc);
895         }
896
897         mds_lov_destroy_objids(obd);
898
899         if (mds->mds_health_check_filp) {
900                 rc = filp_close(mds->mds_health_check_filp, 0);
901                 mds->mds_health_check_filp = NULL;
902                 if (rc)
903                         CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK,rc);
904         }
905         if (mds->mds_objects_dir != NULL) {
906                 l_dput(mds->mds_objects_dir);
907                 mds->mds_objects_dir = NULL;
908         }
909         if (mds->mds_logs_dir) {
910                 l_dput(mds->mds_logs_dir);
911                 mds->mds_logs_dir = NULL;
912         }
913         if (mds->mds_pending_dir) {
914                 l_dput(mds->mds_pending_dir);
915                 mds->mds_pending_dir = NULL;
916         }
917
918         lquota_fs_cleanup(mds_quota_interface_ref, obd);
919
920         pop_ctxt(saved, &obd->obd_lvfs_ctxt, NULL);
921         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
922         shrink_dcache_parent(mds->mds_fid_de);
923         dput(mds->mds_fid_de);
924         LL_DQUOT_OFF(obd->u.obt.obt_sb, 0);
925
926         return rc;
927 }
928
929 /* Creates an object with the same name as its fid.  Because this is not at all
930  * performance sensitive, it is accomplished by creating a file, checking the
931  * fid, and renaming it. */
932 int mds_obd_create(struct obd_export *exp, struct obdo *oa,
933                    struct lov_stripe_md **ea, struct obd_trans_info *oti)
934 {
935         struct mds_obd *mds = &exp->exp_obd->u.mds;
936         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
937         unsigned int tmpname = ll_rand();
938         struct dentry *dchild, *new_child;
939         struct lvfs_dentry_params dp = LVFS_DENTRY_PARAMS_INIT;
940         struct lvfs_run_ctxt *saved = NULL;
941         char fidname[LL_FID_NAMELEN];
942         void *handle;
943         struct lvfs_ucred ucred = { 0 };
944         int rc = 0, err, namelen;
945         ENTRY;
946
947         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
948         if (saved == NULL) {
949                 CERROR("cannot allocate memory for run ctxt\n");
950                 RETURN(-ENOMEM);
951         }
952
953         /* the owner of object file should always be root */
954         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
955
956         push_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
957
958         sprintf(fidname, "%u.%u", tmpname, current->pid);
959         dchild = lookup_one_len(fidname, mds->mds_objects_dir, strlen(fidname));
960         if (IS_ERR(dchild)) {
961                 CERROR("getting neg dentry for obj: %u\n", tmpname);
962                 GOTO(out_pop, rc = PTR_ERR(dchild));
963         }
964         if (dchild->d_inode != NULL) {
965                 CERROR("impossible non-negative obj dentry: %u\n", tmpname);
966                 LBUG();
967         }
968
969         dchild->d_fsdata = (void *)&dp;
970         dp.ldp_ptr   = (void *)DP_LASTGROUP_REVERSE;
971
972         LOCK_INODE_MUTEX(parent_inode);
973         rc = ll_vfs_create(parent_inode, dchild, S_IFREG | 0666, NULL);
974
975         oa->o_id = dchild->d_inode->i_ino;
976         oa->o_generation = dchild->d_inode->i_generation;
977         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
978
979         new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
980
981         if (IS_ERR(new_child)) {
982                 CERROR("getting neg dentry for obj rename: %d\n", rc);
983                 GOTO(out_dput, rc = PTR_ERR(new_child));
984         }
985         if (new_child->d_inode != NULL) {
986                 CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
987                        oa->o_id, oa->o_generation);
988                 LBUG();
989         }
990
991         handle = fsfilt_start(exp->exp_obd, mds->mds_objects_dir->d_inode,
992                               FSFILT_OP_RENAME, NULL);
993         if (IS_ERR(handle))
994                 GOTO(out_dput2, rc = PTR_ERR(handle));
995
996         lock_kernel();
997         rc = ll_vfs_rename(parent_inode, dchild, mds->mds_vfsmnt,
998                            parent_inode, new_child, mds->mds_vfsmnt);
999         unlock_kernel();
1000         if (rc)
1001                 CERROR("error renaming new object "LPU64":%u: rc %d\n",
1002                        oa->o_id, oa->o_generation, rc);
1003
1004         err = fsfilt_commit(exp->exp_obd, mds->mds_objects_dir->d_inode,
1005                             handle, 0);
1006         if (!err)
1007                 oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1008         else if (!rc)
1009                 rc = err;
1010 out_dput2:
1011         dput(new_child);
1012 out_dput:
1013         dput(dchild);
1014         UNLOCK_INODE_MUTEX(parent_inode);
1015 out_pop:
1016         pop_ctxt(saved, &exp->exp_obd->obd_lvfs_ctxt, &ucred);
1017         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1018         RETURN(rc);
1019 }
1020
1021 int mds_obd_destroy(struct obd_export *exp, struct obdo *oa,
1022                     struct lov_stripe_md *ea, struct obd_trans_info *oti,
1023                     struct obd_export *md_exp)
1024 {
1025         struct mds_obd *mds = &exp->exp_obd->u.mds;
1026         struct inode *parent_inode = mds->mds_objects_dir->d_inode;
1027         struct obd_device *obd = exp->exp_obd;
1028         struct lvfs_run_ctxt *saved = NULL;
1029         struct lvfs_ucred ucred = { 0 };
1030         char fidname[LL_FID_NAMELEN];
1031         struct inode *inode = NULL;
1032         struct dentry *de;
1033         void *handle;
1034         int err, namelen, rc = 0;
1035         ENTRY;
1036
1037         OBD_SLAB_ALLOC_PTR(saved, obd_lvfs_ctxt_cache);
1038         if (saved == NULL) {
1039                 CERROR("cannot allocate memory for run ctxt\n");
1040                 RETURN(-ENOMEM);
1041         }
1042
1043         cap_raise(ucred.luc_cap, CAP_SYS_RESOURCE);
1044         push_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1045
1046         namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
1047
1048         LOCK_INODE_MUTEX(parent_inode);
1049         de = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
1050         if (IS_ERR(de)) {
1051                 rc = IS_ERR(de);
1052                 de = NULL;
1053                 CERROR("error looking up object "LPU64" %s: rc %d\n",
1054                        oa->o_id, fidname, rc);
1055                 GOTO(out_dput, rc);
1056         }
1057         if (de->d_inode == NULL) {
1058                 CERROR("destroying non-existent object "LPU64" %s: rc %d\n",
1059                        oa->o_id, fidname, rc);
1060                 GOTO(out_dput, rc = -ENOENT);
1061         }
1062
1063         /* Stripe count is 1 here since this is some MDS specific stuff
1064            that is unlinked, not spanned across multiple OSTs */
1065         handle = fsfilt_start_log(obd, mds->mds_objects_dir->d_inode,
1066                                   FSFILT_OP_UNLINK, oti, 1);
1067
1068         if (IS_ERR(handle))
1069                 GOTO(out_dput, rc = PTR_ERR(handle));
1070
1071         /* take a reference to protect inode from truncation within
1072            vfs_unlink() context. bug 10409 */
1073         inode = de->d_inode;
1074         atomic_inc(&inode->i_count);
1075         rc = ll_vfs_unlink(mds->mds_objects_dir->d_inode, de, mds->mds_vfsmnt);
1076         if (rc)
1077                 CERROR("error destroying object "LPU64":%u: rc %d\n",
1078                        oa->o_id, oa->o_generation, rc);
1079
1080         err = fsfilt_commit(obd, mds->mds_objects_dir->d_inode, handle, 0);
1081         if (err && !rc)
1082                 rc = err;
1083 out_dput:
1084         if (de != NULL)
1085                 l_dput(de);
1086         UNLOCK_INODE_MUTEX(parent_inode);
1087
1088         if (inode)
1089                 iput(inode);
1090
1091         pop_ctxt(saved, &obd->obd_lvfs_ctxt, &ucred);
1092         OBD_SLAB_FREE_PTR(saved, obd_lvfs_ctxt_cache);
1093         RETURN(rc);
1094 }