Whamcloud - gitweb
Updated version strings for 2.1.0 build 01. Updated ChangeLogs
[fs/lustre-release.git] / lustre / mds / mds_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mds/mds_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <linux/module.h>
46 #include <lustre_mds.h>
47 #include <lustre/lustre_idl.h>
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre_lib.h>
51 #include <lustre_fsfilt.h>
52 #include <obd_cksum.h>
53
54 #include "mds_internal.h"
55
56 static void mds_lov_dump_objids(const char *label, struct obd_device *obd)
57 {
58         struct mds_obd *mds = &obd->u.mds;
59         unsigned int i=0, j;
60
61         CDEBUG(D_INFO, "dump from %s\n", label);
62         if (mds->mds_lov_page_dirty == NULL) {
63                 CERROR("NULL bitmap!\n");
64                 GOTO(skip_bitmap, i);
65         }
66
67         for(i = 0; i < mds->mds_lov_page_dirty->size / BITS_PER_LONG + 1; i++)
68                 CDEBUG(D_INFO, "%u - %lx\n", i,
69                        mds->mds_lov_page_dirty->data[i]);
70 skip_bitmap:
71         if (mds->mds_lov_page_array == NULL) {
72                 CERROR("not init page array!\n");
73                 GOTO(skip_array, i);
74
75         }
76         for(i = 0;i < MDS_LOV_OBJID_PAGES_COUNT; i++) {
77                 obd_id *data = mds->mds_lov_page_array[i];
78
79                 if (data == NULL)
80                         continue;
81
82                 for(j=0; j < OBJID_PER_PAGE(); j++) {
83                         if (data[j] == 0)
84                                 continue;
85                         CDEBUG(D_INFO,"objid page %u idx %u - "LPU64" \n",
86                                i, j, data[j]);
87                 }
88         }
89 skip_array:
90         EXIT;
91 }
92
93 int mds_lov_init_objids(struct obd_device *obd)
94 {
95         struct mds_obd *mds = &obd->u.mds;
96         int size = MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *);
97         struct file *file;
98         int rc;
99         ENTRY;
100
101         CLASSERT(((MDS_LOV_ALLOC_SIZE % sizeof(obd_id)) == 0));
102
103         mds->mds_lov_page_dirty =
104                 CFS_ALLOCATE_BITMAP(MDS_LOV_OBJID_PAGES_COUNT);
105         if (mds->mds_lov_page_dirty == NULL)
106                 RETURN(-ENOMEM);
107
108
109         OBD_ALLOC(mds->mds_lov_page_array, size);
110         if (mds->mds_lov_page_array == NULL)
111                 GOTO(err_free_bitmap, rc = -ENOMEM);
112
113         /* open and test the lov objd file */
114         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
115         if (IS_ERR(file)) {
116                 rc = PTR_ERR(file);
117                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
118                 GOTO(err_free, rc = PTR_ERR(file));
119         }
120         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
121                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
122                        file->f_dentry->d_inode->i_mode);
123                 GOTO(err_open, rc = -ENOENT);
124         }
125         mds->mds_lov_objid_filp = file;
126
127         RETURN (0);
128 err_open:
129         if (filp_close((struct file *)file, 0))
130                 CERROR("can't close %s after error\n", LOV_OBJID);
131 err_free:
132         OBD_FREE(mds->mds_lov_page_array, size);
133 err_free_bitmap:
134         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
135
136         RETURN(rc);
137 }
138
139 void mds_lov_destroy_objids(struct obd_device *obd)
140 {
141         struct mds_obd *mds = &obd->u.mds;
142         int i, rc;
143         ENTRY;
144
145         if (mds->mds_lov_page_array != NULL) {
146                 for(i=0;i<MDS_LOV_OBJID_PAGES_COUNT;i++) {
147                         obd_id *data = mds->mds_lov_page_array[i];
148                         if (data != NULL)
149                                 OBD_FREE(data, MDS_LOV_ALLOC_SIZE);
150                 }
151                 OBD_FREE(mds->mds_lov_page_array,
152                          MDS_LOV_OBJID_PAGES_COUNT*sizeof(void *));
153         }
154
155         if (mds->mds_lov_objid_filp) {
156                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, NULL);
157                 mds->mds_lov_objid_filp = NULL;
158                 if (rc)
159                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
160         }
161
162         CFS_FREE_BITMAP(mds->mds_lov_page_dirty);
163         EXIT;
164 }
165
166 /**
167  * currently exist two ways for know about ost count and max ost index.
168  * first - after ost is connected to mds and sync process finished
169  * second - get from lmm in recovery process, in case when mds not have configs,
170  * and ost isn't registered in mgs.
171  *
172  * \param mds pointer to mds structure
173  * \param index maxium ost index
174  *
175  * \retval -ENOMEM is not hame memory for new page
176  * \retval 0 is update passed
177  */
178 static int mds_lov_update_max_ost(struct mds_obd *mds, obd_id index)
179 {
180         __u32 page = index / OBJID_PER_PAGE();
181         __u32 off = index % OBJID_PER_PAGE();
182         obd_id *data =  mds->mds_lov_page_array[page];
183
184         if (data == NULL) {
185                 OBD_ALLOC(data, MDS_LOV_ALLOC_SIZE);
186                 if (data == NULL)
187                         RETURN(-ENOMEM);
188
189                 mds->mds_lov_page_array[page] = data;
190         }
191
192         if (index > mds->mds_lov_objid_max_index) {
193                 mds->mds_lov_objid_lastpage = page;
194                 mds->mds_lov_objid_lastidx = off;
195                 mds->mds_lov_objid_max_index = index;
196         }
197
198         /* workaround - New target not in objids file; increase mdsize */
199         /* ld_tgt_count is used as the max index everywhere, despite its name. */
200         if (data[off] == 0) {
201                 __u32 stripes;
202
203                 data[off] = 1;
204                 mds->mds_lov_objid_count++;
205                 stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
206                                 mds->mds_lov_objid_count);
207
208                 mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
209                 mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
210
211                 CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d"
212                        " stripes: %d/%d\n", stripes, mds->mds_max_mdsize,
213                        mds->mds_max_cookiesize);
214         }
215
216         EXIT;
217         return 0;
218 }
219
220 int mds_lov_prepare_objids(struct obd_device *obd, struct lov_mds_md *lmm)
221 {
222         struct lov_ost_data_v1 *data;
223         __u32 count;
224         int rc = 0;
225         __u32 j;
226
227         /* if we create file without objects - lmm is NULL */
228         if (lmm == NULL)
229                 return 0;
230
231         switch (le32_to_cpu(lmm->lmm_magic)) {
232                 case LOV_MAGIC_V1:
233                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
234                         data = &(((struct lov_mds_md_v1*)lmm)->lmm_objects[0]);
235                         break;
236                 case LOV_MAGIC_V3:
237                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
238                         data = &(((struct lov_mds_md_v3*)lmm)->lmm_objects[0]);
239                         break;
240                 default:
241                         CERROR("Unknow lmm type %X!\n", le32_to_cpu(lmm->lmm_magic));
242                         RETURN(-EINVAL);
243         }
244
245
246         cfs_mutex_down(&obd->obd_dev_sem);
247         for (j = 0; j < count; j++) {
248                 __u32 i = le32_to_cpu(data[j].l_ost_idx);
249                 if (mds_lov_update_max_ost(&obd->u.mds, i)) {
250                         rc = -ENOMEM;
251                         break;
252                 }
253         }
254         cfs_mutex_up(&obd->obd_dev_sem);
255
256         RETURN(rc);
257 }
258 EXPORT_SYMBOL(mds_lov_prepare_objids);
259
260 /*
261  * write llog orphan record about lost ost object,
262  * Special lsm is allocated with single stripe, caller should deallocated it
263  * after use
264  */
265 static int mds_log_lost_precreated(struct obd_device *obd,
266                                    struct lov_stripe_md **lsmp, int *stripes,
267                                    obd_id id, obd_count count, int idx)
268 {
269         struct lov_stripe_md *lsm = *lsmp;
270         int rc;
271         ENTRY;
272
273         if (*lsmp == NULL) {
274                 rc = obd_alloc_memmd(obd->u.mds.mds_lov_exp, &lsm);
275                 if (rc < 0)
276                         RETURN(rc);
277                 /* need only one stripe, save old value */
278                 *stripes = lsm->lsm_stripe_count;
279                 lsm->lsm_stripe_count = 1;
280                 *lsmp = lsm;
281         }
282
283         lsm->lsm_oinfo[0]->loi_id = id;
284         lsm->lsm_oinfo[0]->loi_seq = mdt_to_obd_objseq(obd->u.mds.mds_id);
285         lsm->lsm_oinfo[0]->loi_ost_idx = idx;
286
287         rc = mds_log_op_orphan(obd, lsm, count);
288         RETURN(rc);
289 }
290
291 void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm)
292 {
293         struct mds_obd *mds = &obd->u.mds;
294         int j;
295         struct lov_ost_data_v1 *obj;
296         struct lov_stripe_md *lsm = NULL;
297         int stripes = 0;
298         int count;
299         ENTRY;
300
301         /* if we create file without objects - lmm is NULL */
302         if (lmm == NULL)
303                 return;
304
305         switch (le32_to_cpu(lmm->lmm_magic)) {
306                 case LOV_MAGIC_V1:
307                         count = le32_to_cpu(((struct lov_mds_md_v1*)lmm)->lmm_stripe_count);
308                         obj = ((struct lov_mds_md_v1*)lmm)->lmm_objects;
309                         break;
310                 case LOV_MAGIC_V3:
311                         count = le32_to_cpu(((struct lov_mds_md_v3*)lmm)->lmm_stripe_count);
312                         obj = ((struct lov_mds_md_v3*)lmm)->lmm_objects;
313                         break;
314                 default:
315                         CERROR("Unknow lmm type %X !\n",
316                                le32_to_cpu(lmm->lmm_magic));
317                         return;
318         }
319
320         for (j = 0; j < count; j++) {
321                 __u32 i = le32_to_cpu(obj[j].l_ost_idx);
322                 obd_id id = le64_to_cpu(obj[j].l_object_id);
323                 __u32 page = i / OBJID_PER_PAGE();
324                 __u32 idx = i % OBJID_PER_PAGE();
325                 obd_id *data;
326
327                 data = mds->mds_lov_page_array[page];
328
329                 CDEBUG(D_INODE,"update last object for ost %u"
330                        " - new "LPU64" old "LPU64"\n", i, id, data[idx]);
331                 if (id > data[idx]) {
332                         int lost = id - data[idx] - 1;
333                         /* we might have lost precreated objects due to VBR */
334                         if (lost > 0 && obd->obd_recovering) {
335                                 CDEBUG(D_HA, "Gap in objids is %u\n", lost);
336                                 if (!obd->obd_version_recov)
337                                         CERROR("Unexpected gap in objids\n");
338                                 /* lsm is allocated if NULL */
339                                 mds_log_lost_precreated(obd, &lsm, &stripes,
340                                                         data[idx]+1, lost, i);
341                         }
342                         data[idx] = id;
343                         cfs_bitmap_set(mds->mds_lov_page_dirty, page);
344                 }
345         }
346         if (lsm) {
347                 /* restore stripes number */
348                 lsm->lsm_stripe_count = stripes;
349                 obd_free_memmd(mds->mds_lov_exp, &lsm);
350         }
351         EXIT;
352         return;
353 }
354 EXPORT_SYMBOL(mds_lov_update_objids);
355
356 static int mds_lov_update_from_read(struct mds_obd *mds, obd_id *data,
357                                     __u32 count)
358 {
359         __u32 i;
360         __u32 stripes;
361
362         for (i = 0; i < count; i++) {
363                 if (data[i] == 0)
364                         continue;
365
366                 mds->mds_lov_objid_count++;
367         }
368
369         stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT,
370                          mds->mds_lov_objid_count);
371
372         mds->mds_max_mdsize = lov_mds_md_size(stripes, LOV_MAGIC_V3);
373         mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie);
374
375         CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: "
376                "%d/%d\n", stripes, mds->mds_max_mdsize,
377                mds->mds_max_cookiesize);
378
379         EXIT;
380         return 0;
381 }
382
383 static int mds_lov_read_objids(struct obd_device *obd)
384 {
385         struct mds_obd *mds = &obd->u.mds;
386         loff_t off = 0;
387         int i, rc = 0, count = 0, page = 0;
388         unsigned long size;
389         ENTRY;
390
391         /* Read everything in the file, even if our current lov desc
392            has fewer targets. Old targets not in the lov descriptor
393            during mds setup may still have valid objids. */
394         size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode);
395         if (size == 0)
396                 RETURN(0);
397
398         page = (size + MDS_LOV_ALLOC_SIZE - 1) / MDS_LOV_ALLOC_SIZE;
399         CDEBUG(D_INFO, "file size %lu pages %d\n", size, page);
400         for (i = 0; i < page; i++) {
401                 obd_id *data;
402                 loff_t off_old = off;
403
404                 LASSERT(mds->mds_lov_page_array[i] == NULL);
405                 OBD_ALLOC(mds->mds_lov_page_array[i], MDS_LOV_ALLOC_SIZE);
406                 if (mds->mds_lov_page_array[i] == NULL)
407                         GOTO(out, rc = -ENOMEM);
408
409                 data = mds->mds_lov_page_array[i];
410
411                 rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, data,
412                                         MDS_LOV_ALLOC_SIZE, &off);
413                 if (rc < 0) {
414                         CERROR("Error reading objids %d\n", rc);
415                         GOTO(out, rc);
416                 }
417                 if (off == off_old) /* hole is read */
418                         off += MDS_LOV_ALLOC_SIZE;
419
420                 count = (off - off_old) / sizeof(obd_id);
421                 if (mds_lov_update_from_read(mds, data, count)) {
422                         CERROR("Can't update mds data\n");
423                         GOTO(out, rc = -EIO);
424                 }
425         }
426         mds->mds_lov_objid_lastpage = page - 1;
427         mds->mds_lov_objid_lastidx = count - 1;
428
429         CDEBUG(D_INFO, "Read %u - %u %u objid\n", mds->mds_lov_objid_count,
430                mds->mds_lov_objid_lastpage, mds->mds_lov_objid_lastidx);
431 out:
432         mds_lov_dump_objids("read",obd);
433
434         RETURN(rc);
435 }
436
437 int mds_lov_write_objids(struct obd_device *obd)
438 {
439         struct mds_obd *mds = &obd->u.mds;
440         int i = 0, rc = 0;
441         ENTRY;
442
443         if (cfs_bitmap_check_empty(mds->mds_lov_page_dirty))
444                 RETURN(0);
445
446         mds_lov_dump_objids("write", obd);
447
448         cfs_foreach_bit(mds->mds_lov_page_dirty, i) {
449                 obd_id *data =  mds->mds_lov_page_array[i];
450                 unsigned int size = MDS_LOV_ALLOC_SIZE;
451                 loff_t off = i * size;
452
453                 LASSERT(data != NULL);
454
455                 if (!cfs_bitmap_test_and_clear(mds->mds_lov_page_dirty, i))
456                         continue;
457
458                 /* check for particaly filled last page */
459                 if (i == mds->mds_lov_objid_lastpage)
460                         size = (mds->mds_lov_objid_lastidx+1) * sizeof(obd_id);
461
462                 CDEBUG(D_INFO, "write %lld - %u\n", off, size);
463                 rc = fsfilt_write_record(obd, mds->mds_lov_objid_filp, data,
464                                          size, &off, 0);
465                 if (rc < 0) {
466                         cfs_bitmap_set(mds->mds_lov_page_dirty, i);
467                         break;
468                 }
469         }
470         if (rc >= 0)
471                 rc = 0;
472
473         RETURN(rc);
474 }
475 EXPORT_SYMBOL(mds_lov_write_objids);
476
477 static int mds_lov_get_objid(struct obd_device * obd,
478                              obd_id idx)
479 {
480         struct mds_obd *mds = &obd->u.mds;
481         struct obd_export *lov_exp = mds->mds_lov_exp;
482         unsigned int page;
483         unsigned int off;
484         obd_id *data;
485         __u64 connect_flags;
486         __u32 size;
487         int rc = 0;
488         ENTRY;
489
490         page = idx / OBJID_PER_PAGE();
491         off = idx % OBJID_PER_PAGE();
492         data = mds->mds_lov_page_array[page];
493
494         size = sizeof(__u64);
495         connect_flags = idx;
496         rc = obd_get_info(lov_exp, sizeof(KEY_CONNECT_FLAG), KEY_CONNECT_FLAG,
497                           &size, &connect_flags, NULL);
498         if (rc)
499                 GOTO(out, rc);
500
501         if (data[off] < 2 || connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
502                 /* We never read this lastid; ask the osc */
503                 struct obd_id_info lastid;
504
505                 size = sizeof(lastid);
506                 lastid.idx = idx;
507                 lastid.data = &data[off];
508                 rc = obd_get_info(lov_exp, sizeof(KEY_LAST_ID), KEY_LAST_ID,
509                                   &size, &lastid, NULL);
510                 if (rc)
511                         GOTO(out, rc);
512
513                 /* workaround for clean filter */
514                 if (data[off] == 0)
515                         data[off] = 1;
516
517                 cfs_bitmap_set(mds->mds_lov_page_dirty, page);
518         }
519         CDEBUG(D_INFO, "idx "LPU64" - %p - %d/%d - "LPU64"\n",
520                idx, data, page, off, data[off]);
521 out:
522         RETURN(rc);
523 }
524
525 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
526 {
527         int rc;
528         struct obdo oa = { 0 };
529         struct obd_trans_info oti = {0};
530         struct lov_stripe_md  *empty_ea = NULL;
531         ENTRY;
532
533         LASSERT(mds->mds_lov_page_array != NULL);
534
535         /* This create will in fact either create or destroy:  If the OST is
536          * missing objects below this ID, they will be created.  If it finds
537          * objects above this ID, they will be removed. */
538         memset(&oa, 0, sizeof(oa));
539         oa.o_flags = OBD_FL_DELORPHAN;
540         oa.o_seq = mdt_to_obd_objseq(mds->mds_id);
541         oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
542         if (ost_uuid != NULL)
543                 oti.oti_ost_uuid = ost_uuid;
544
545         rc = obd_create(mds->mds_lov_exp, &oa, &empty_ea, &oti);
546
547         RETURN(rc);
548 }
549
550 /* for one target */
551 static int mds_lov_set_one_nextid(struct obd_device *obd, __u32 idx, obd_id *id)
552 {
553         struct mds_obd *mds = &obd->u.mds;
554         int rc;
555         struct obd_id_info info;
556         ENTRY;
557
558         LASSERT(!obd->obd_recovering);
559
560         info.idx = idx;
561         info.data = id;
562         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_NEXT_ID),
563                                 KEY_NEXT_ID, sizeof(info), &info, NULL);
564         if (rc)
565                 CERROR ("%s: mds_lov_set_nextid failed (%d)\n",
566                         obd->obd_name, rc);
567
568         RETURN(rc);
569 }
570
571 /* Update the lov desc for a new size lov. */
572 static int mds_lov_update_desc(struct obd_device *obd, int idx,
573                                struct obd_uuid *uuid, enum obd_notify_event ev)
574 {
575         struct mds_obd *mds = &obd->u.mds;
576         struct lov_desc *ld;
577         __u32 valsize = sizeof(mds->mds_lov_desc);
578         int rc = 0;
579         ENTRY;
580
581         OBD_ALLOC(ld, sizeof(*ld));
582         if (!ld)
583                 RETURN(-ENOMEM);
584
585         rc = obd_get_info(mds->mds_lov_exp, sizeof(KEY_LOVDESC), KEY_LOVDESC,
586                           &valsize, ld, NULL);
587         if (rc)
588                 GOTO(out, rc);
589
590         /* Don't change the mds_lov_desc until the objids size matches the
591            count (paranoia) */
592         mds->mds_lov_desc = *ld;
593         CDEBUG(D_CONFIG, "updated lov_desc, tgt_count: %d - idx %d / uuid %s\n",
594                mds->mds_lov_desc.ld_tgt_count, idx, uuid->uuid);
595
596         cfs_mutex_down(&obd->obd_dev_sem);
597         rc = mds_lov_update_max_ost(mds, idx);
598         cfs_mutex_up(&obd->obd_dev_sem);
599         if (rc != 0)
600                 GOTO(out, rc );
601
602         /* If we added a target we have to reconnect the llogs */
603         /* We only _need_ to do this at first add (idx), or the first time
604            after recovery.  However, it should now be safe to call anytime. */
605         rc = obd_llog_init(obd, &obd->obd_olg, obd, &idx);
606         if (rc)
607                 GOTO(out, rc);
608
609         /*XXX this notifies the MDD until lov handling use old mds code */
610         if (obd->obd_upcall.onu_owner) {
611                  LASSERT(obd->obd_upcall.onu_upcall != NULL);
612                  rc = obd->obd_upcall.onu_upcall(obd, NULL, ev,
613                                                  obd->obd_upcall.onu_owner,
614                                                  &mds->mds_obt.obt_mount_count);
615         }
616 out:
617         OBD_FREE(ld, sizeof(*ld));
618         RETURN(rc);
619 }
620
621 /* Inform MDS about new/updated target */
622 static int mds_lov_update_mds(struct obd_device *obd,
623                               struct obd_device *watched,
624                               __u32 idx, enum obd_notify_event ev)
625 {
626         struct mds_obd *mds = &obd->u.mds;
627         int rc = 0;
628         int page;
629         int off;
630         obd_id *data;
631
632         ENTRY;
633
634         /* Don't let anyone else mess with mds_lov_objids now */
635         rc = mds_lov_update_desc(obd, idx, &watched->u.cli.cl_target_uuid, ev);
636         if (rc)
637                 GOTO(out, rc);
638
639         CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d\n",
640                idx, obd->obd_recovering, obd->obd_async_recov,
641                mds->mds_lov_desc.ld_tgt_count);
642
643         /* idx is set as data from lov_notify. */
644         if (obd->obd_recovering)
645                 GOTO(out, rc);
646
647         if (idx >= mds->mds_lov_desc.ld_tgt_count) {
648                 CERROR("index %d > count %d!\n", idx,
649                        mds->mds_lov_desc.ld_tgt_count);
650                 GOTO(out, rc = -EINVAL);
651         }
652
653         rc = mds_lov_get_objid(obd, idx);
654         if (rc)
655                 GOTO(out, rc);
656
657         page = idx / OBJID_PER_PAGE();
658         off = idx % OBJID_PER_PAGE();
659         data = mds->mds_lov_page_array[page];
660
661         /* We have read this lastid from disk; tell the osc.
662            Don't call this during recovery. */
663         rc = mds_lov_set_one_nextid(obd, idx, &data[off]);
664         if (rc) {
665                 CERROR("Failed to set next id, idx=%d rc=%d\n", idx,rc);
666                 /* Don't abort the rest of the sync */
667                 rc = 0;
668         } else {
669                 CDEBUG(D_CONFIG, "last object "LPU64" from OST %d rc=%d\n",
670                         data[off], idx, rc);
671         }
672 out:
673         RETURN(rc);
674 }
675
676 /* update the LOV-OSC knowledge of the last used object id's */
677 int mds_lov_connect(struct obd_device *obd, char * lov_name)
678 {
679         struct mds_obd *mds = &obd->u.mds;
680         struct obd_connect_data *data;
681         int rc;
682         ENTRY;
683
684         if (IS_ERR(mds->mds_lov_obd))
685                 RETURN(PTR_ERR(mds->mds_lov_obd));
686
687         if (mds->mds_lov_obd)
688                 RETURN(0);
689
690         mds->mds_lov_obd = class_name2obd(lov_name);
691         if (!mds->mds_lov_obd) {
692                 CERROR("MDS cannot locate LOV %s\n", lov_name);
693                 mds->mds_lov_obd = ERR_PTR(-ENOTCONN);
694                 RETURN(-ENOTCONN);
695         }
696
697         cfs_mutex_down(&obd->obd_dev_sem);
698         rc = mds_lov_read_objids(obd);
699         cfs_mutex_up(&obd->obd_dev_sem);
700         if (rc) {
701                 CERROR("cannot read %s: rc = %d\n", "lov_objids", rc);
702                 GOTO(err_exit, rc);
703         }
704
705         rc = obd_register_observer(mds->mds_lov_obd, obd);
706         if (rc) {
707                 CERROR("MDS cannot register as observer of LOV %s (%d)\n",
708                        lov_name, rc);
709                 GOTO(err_exit, rc);
710         }
711
712         /* try init too early */
713         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
714         if (rc)
715                 GOTO(err_exit, rc);
716
717         mds->mds_lov_obd->u.lov.lov_sp_me = LUSTRE_SP_MDT;
718
719         OBD_ALLOC(data, sizeof(*data));
720         if (data == NULL)
721                 GOTO(err_exit, rc = -ENOMEM);
722
723         data->ocd_connect_flags = OBD_CONNECT_VERSION   | OBD_CONNECT_INDEX   |
724                                   OBD_CONNECT_REQPORTAL | OBD_CONNECT_QUOTA64 |
725                                   OBD_CONNECT_OSS_CAPA  | OBD_CONNECT_FID     |
726                                   OBD_CONNECT_BRW_SIZE  | OBD_CONNECT_CKSUM   |
727                                   OBD_CONNECT_CHANGE_QS | OBD_CONNECT_AT      |
728                                   OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN   |
729                                   OBD_CONNECT_SOM | OBD_CONNECT_FULL20;
730 #ifdef HAVE_LRU_RESIZE_SUPPORT
731         data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
732 #endif
733         data->ocd_version = LUSTRE_VERSION_CODE;
734         data->ocd_group = mdt_to_obd_objseq(mds->mds_id);
735         /* send max bytes per rpc */
736         data->ocd_brw_size = PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT;
737         /* send the list of supported checksum types */
738         data->ocd_cksum_types = OBD_CKSUM_ALL;
739         /* NB: lov_connect() needs to fill in .ocd_index for each OST */
740         rc = obd_connect(NULL, &mds->mds_lov_exp, mds->mds_lov_obd, &obd->obd_uuid, data, NULL);
741         OBD_FREE(data, sizeof(*data));
742         if (rc) {
743                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
744                 mds->mds_lov_obd = ERR_PTR(rc);
745                 RETURN(rc);
746         }
747
748         /* I want to see a callback happen when the OBD moves to a
749          * "For General Use" state, and that's when we'll call
750          * set_nextid().  The class driver can help us here, because
751          * it can use the obd_recovering flag to determine when the
752          * the OBD is full available. */
753         /* MDD device will care about that
754         if (!obd->obd_recovering)
755                 rc = mds_postrecov(obd);
756          */
757         RETURN(rc);
758
759 err_exit:
760         mds->mds_lov_exp = NULL;
761         mds->mds_lov_obd = ERR_PTR(rc);
762         RETURN(rc);
763 }
764
765 int mds_lov_disconnect(struct obd_device *obd)
766 {
767         struct mds_obd *mds = &obd->u.mds;
768         int rc = 0;
769         ENTRY;
770
771         if (!IS_ERR(mds->mds_lov_obd) && mds->mds_lov_exp != NULL) {
772                 obd_register_observer(mds->mds_lov_obd, NULL);
773
774                 /* The actual disconnect of the mds_lov will be called from
775                  * class_disconnect_exports from mds_lov_clean. So we have to
776                  * ensure that class_cleanup doesn't fail due to the extra ref
777                  * we're holding now. The mechanism to do that already exists -
778                  * the obd_force flag. We'll drop the final ref to the
779                  * mds_lov_exp in mds_cleanup. */
780                 mds->mds_lov_obd->obd_force = 1;
781         }
782
783         RETURN(rc);
784 }
785
786 struct mds_lov_sync_info {
787         struct obd_device    *mlsi_obd;     /* the lov device to sync */
788         struct obd_device    *mlsi_watched; /* target osc */
789         __u32                 mlsi_index;   /* index of target */
790         enum obd_notify_event mlsi_ev;      /* event type */
791 };
792
793 static int mds_propagate_capa_keys(struct mds_obd *mds, struct obd_uuid *uuid)
794 {
795         struct mds_capa_info    info = { .uuid = uuid };
796         struct lustre_capa_key *key;
797         int i, rc = 0;
798
799         ENTRY;
800
801         if (!mds->mds_capa_keys)
802                 RETURN(0);
803
804         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_SYNC_CAPA_SL, 5);
805         for (i = 0; i < 2; i++) {
806                 key = &mds->mds_capa_keys[i];
807                 DEBUG_CAPA_KEY(D_SEC, key, "propagate");
808
809                 info.capa = key;
810                 rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_CAPA_KEY),
811                                         KEY_CAPA_KEY, sizeof(info), &info, NULL);
812                 if (rc) {
813                         DEBUG_CAPA_KEY(D_ERROR, key,
814                                        "propagate failed (rc = %d) for", rc);
815                         RETURN(rc);
816                 }
817         }
818
819         RETURN(0);
820 }
821
822 /* We only sync one osc at a time, so that we don't have to hold
823    any kind of lock on the whole mds_lov_desc, which may change
824    (grow) as a result of mds_lov_add_ost.  This also avoids any
825    kind of mismatch between the lov_desc and the mds_lov_desc,
826    which are not in lock-step during lov_add_obd */
827 static int __mds_lov_synchronize(void *data)
828 {
829         struct mds_lov_sync_info *mlsi = data;
830         struct obd_device *obd = mlsi->mlsi_obd;
831         struct obd_device *watched = mlsi->mlsi_watched;
832         struct mds_obd *mds = &obd->u.mds;
833         struct obd_uuid *uuid;
834         __u32  idx = mlsi->mlsi_index;
835         enum obd_notify_event ev = mlsi->mlsi_ev;
836         struct mds_group_info mgi;
837         struct llog_ctxt *ctxt;
838         int rc = 0;
839         ENTRY;
840
841         OBD_FREE_PTR(mlsi);
842
843         LASSERT(obd);
844         LASSERT(watched);
845         uuid = &watched->u.cli.cl_target_uuid;
846         LASSERT(uuid);
847
848         cfs_down_read(&mds->mds_notify_lock);
849         if (obd->obd_stopping || obd->obd_fail)
850                 GOTO(out, rc = -ENODEV);
851
852         OBD_RACE(OBD_FAIL_MDS_LOV_SYNC_RACE);
853         rc = mds_lov_update_mds(obd, watched, idx, ev);
854         if (rc != 0) {
855                 CERROR("%s failed at update_mds: %d\n", obd_uuid2str(uuid), rc);
856                 GOTO(out, rc);
857         }
858         mgi.group = mdt_to_obd_objseq(mds->mds_id);
859         mgi.uuid = uuid;
860
861         rc = obd_set_info_async(mds->mds_lov_exp, sizeof(KEY_MDS_CONN),
862                                 KEY_MDS_CONN, sizeof(mgi), &mgi, NULL);
863         if (rc != 0)
864                 GOTO(out, rc);
865         /* propagate capability keys */
866         rc = mds_propagate_capa_keys(mds, uuid);
867         if (rc)
868                 GOTO(out, rc);
869
870         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
871         if (!ctxt)
872                 GOTO(out, rc = -ENODEV);
873
874         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT, 60);
875         rc = llog_connect(ctxt, NULL, NULL, uuid);
876         llog_ctxt_put(ctxt);
877         if (rc != 0) {
878                 CERROR("%s failed at llog_origin_connect: %d\n",
879                        obd_uuid2str(uuid), rc);
880                 GOTO(out, rc);
881         }
882
883         LCONSOLE_INFO("MDS %s: %s now active, resetting orphans\n",
884               obd->obd_name, obd_uuid2str(uuid));
885         rc = mds_lov_clear_orphans(mds, uuid);
886         if (rc != 0) {
887                 CERROR("%s failed at mds_lov_clear_orphans: %d\n",
888                        obd_uuid2str(uuid), rc);
889                 GOTO(out, rc);
890         }
891
892 #ifdef HAVE_QUOTA_SUPPORT
893         if (obd->obd_upcall.onu_owner) { 
894                 /*
895                  * This is a hack for mds_notify->mdd_notify. When the mds obd
896                  * in mdd is removed, This hack should be removed.
897                  */
898                 LASSERT(obd->obd_upcall.onu_upcall != NULL);
899                 rc = obd->obd_upcall.onu_upcall(obd, NULL, OBD_NOTIFY_QUOTA,
900                                                 obd->obd_upcall.onu_owner,NULL);
901         }
902 #endif
903         EXIT;
904 out:
905         cfs_up_read(&mds->mds_notify_lock);
906         if (rc) {
907                 /* Deactivate it for safety */
908                 CERROR("%s sync failed %d, deactivating\n", obd_uuid2str(uuid),
909                        rc);
910                 if (!obd->obd_stopping && mds->mds_lov_obd &&
911                     !mds->mds_lov_obd->obd_stopping && !watched->obd_stopping)
912                         obd_notify(mds->mds_lov_obd, watched,
913                                    OBD_NOTIFY_INACTIVE, NULL);
914         }
915
916         class_decref(obd, "mds_lov_synchronize", obd);
917         return rc;
918 }
919
920 int mds_lov_synchronize(void *data)
921 {
922         struct mds_lov_sync_info *mlsi = data;
923         char name[20];
924
925         snprintf(name, sizeof(name), "ll_sync_%02u", mlsi->mlsi_index);
926         cfs_daemonize_ctxt(name);
927
928         RETURN(__mds_lov_synchronize(data));
929 }
930
931 int mds_lov_start_synchronize(struct obd_device *obd,
932                               struct obd_device *watched,
933                               void *data, enum obd_notify_event ev)
934 {
935         struct mds_lov_sync_info *mlsi;
936         int rc;
937         struct obd_uuid *uuid;
938         ENTRY;
939
940         LASSERT(watched);
941         uuid = &watched->u.cli.cl_target_uuid;
942
943         OBD_ALLOC(mlsi, sizeof(*mlsi));
944         if (mlsi == NULL)
945                 RETURN(-ENOMEM);
946
947         LASSERT(data);
948         mlsi->mlsi_obd = obd;
949         mlsi->mlsi_watched = watched;
950         mlsi->mlsi_index = *(__u32 *)data;
951         mlsi->mlsi_ev = ev;
952
953         /* Although class_export_get(obd->obd_self_export) would lock
954            the MDS in place, since it's only a self-export
955            it doesn't lock the LOV in place.  The LOV can be disconnected
956            during MDS precleanup, leaving nothing for __mds_lov_synchronize.
957            Simply taking an export ref on the LOV doesn't help, because it's
958            still disconnected. Taking an obd reference insures that we don't
959            disconnect the LOV.  This of course means a cleanup won't
960            finish for as long as the sync is blocking. */
961         class_incref(obd, "mds_lov_synchronize", obd);
962
963         if (ev != OBD_NOTIFY_SYNC) {
964                 /* Synchronize in the background */
965                 rc = cfs_kernel_thread(mds_lov_synchronize, mlsi,
966                                        CLONE_VM | CLONE_FILES);
967                 if (rc < 0) {
968                         CERROR("%s: error starting mds_lov_synchronize: %d\n",
969                                obd->obd_name, rc);
970                         class_decref(obd, "mds_lov_synchronize", obd);
971                 } else {
972                         CDEBUG(D_HA, "%s: mds_lov_synchronize idx=%d "
973                                "thread=%d\n", obd->obd_name,
974                                mlsi->mlsi_index, rc);
975                         rc = 0;
976                 }
977         } else {
978                 rc = __mds_lov_synchronize((void *)mlsi);
979         }
980
981         RETURN(rc);
982 }
983
984 int mds_notify(struct obd_device *obd, struct obd_device *watched,
985                enum obd_notify_event ev, void *data)
986 {
987         int rc = 0;
988         ENTRY;
989
990         CDEBUG(D_CONFIG, "notify %s ev=%d\n", watched->obd_name, ev);
991
992         switch (ev) {
993         /* We only handle these: */
994         case OBD_NOTIFY_ACTIVE:
995                 /* lov want one or more _active_ targets for work */
996                 /* activate event should be pass lov idx as argument */
997         case OBD_NOTIFY_SYNC:
998         case OBD_NOTIFY_SYNC_NONBLOCK:
999                 /* sync event should be pass lov idx as argument */
1000                 break;
1001         default:
1002                 RETURN(0);
1003         }
1004
1005         if (strcmp(watched->obd_type->typ_name, LUSTRE_OSC_NAME) != 0) {
1006                 CERROR("unexpected notification of %s %s!\n",
1007                        watched->obd_type->typ_name, watched->obd_name);
1008                 RETURN(-EINVAL);
1009         }
1010
1011         if (obd->obd_recovering) {
1012                 CWARN("MDS %s: in recovery, not resetting orphans on %s\n",
1013                       obd->obd_name,
1014                       obd_uuid2str(&watched->u.cli.cl_target_uuid));
1015                 /* We still have to fix the lov descriptor for ost's added
1016                    after the mdt in the config log.  They didn't make it into
1017                    mds_lov_connect. */
1018                 rc = mds_lov_update_desc(obd, *(__u32 *)data,
1019                                          &watched->u.cli.cl_target_uuid, ev);
1020         } else {
1021                 rc = mds_lov_start_synchronize(obd, watched, data, ev);
1022         }
1023         RETURN(rc);
1024 }