Whamcloud - gitweb
dcf581088c29e5799c8f2cac4b12149993228af2
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 # define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_LOV
26
27 #ifdef __KERNEL__
28 #include <asm/div64.h>
29 #else
30 #include <liblustre.h>
31 #endif
32
33 #include <linux/obd_class.h>
34 #include <linux/obd_lov.h>
35 #include <linux/lustre_idl.h>
36
37 #include "lov_internal.h"
38
39 static void lov_init_set(struct lov_request_set *set)
40 {
41         set->set_count = 0;
42         set->set_completes = 0;
43         set->set_success = 0;
44         INIT_LIST_HEAD(&set->set_list);
45         atomic_set(&set->set_refcount, 1);
46 }
47
48 static void lov_finish_set(struct lov_request_set *set)
49 {
50         struct list_head *pos, *n;
51         ENTRY;
52
53         LASSERT(set);
54         list_for_each_safe(pos, n, &set->set_list) {
55                 struct lov_request *req = list_entry(pos, struct lov_request,
56                                                      rq_link);
57                 list_del_init(&req->rq_link);
58
59                 if (req->rq_oa)
60                         obdo_free(req->rq_oa);
61                 if (req->rq_md)
62                         OBD_FREE(req->rq_md, req->rq_buflen);
63                 OBD_FREE(req, sizeof(*req));
64         }
65
66         if (set->set_pga) {
67                 int len = set->set_oabufs * sizeof(*set->set_pga);
68                 OBD_FREE(set->set_pga, len);
69         }
70         if (set->set_lockh)
71                 lov_llh_put(set->set_lockh);
72
73         OBD_FREE(set, sizeof(*set));
74         EXIT;
75 }
76
77 static void lov_update_set(struct lov_request_set *set,
78                            struct lov_request *req, int rc)
79 {
80         req->rq_complete = 1;
81         req->rq_rc = rc;
82
83         set->set_completes++;
84         if (rc == 0)
85                 set->set_success++;
86 }
87
88 int lov_update_common_set(struct lov_request_set *set,
89                           struct lov_request *req, int rc)
90 {
91         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
92         ENTRY;
93
94         lov_update_set(set, req, rc);
95
96         /* grace error on inactive ost */
97         if (rc && !lov->tgts[req->rq_idx].active)
98                 rc = 0;
99
100         /* FIXME in raid1 regime, should return 0 */
101         RETURN(rc);
102 }
103
104 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
105 {
106         list_add_tail(&req->rq_link, &set->set_list);
107         set->set_count++;
108 }
109
110 int lov_update_enqueue_set(struct lov_request_set *set,
111                            struct lov_request *req, int rc, int flags)
112 {
113         struct lustre_handle *lov_lockhp;
114         struct lov_oinfo *loi;
115         ENTRY;
116
117         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
118         loi = &set->set_md->lsm_oinfo[req->rq_stripe];
119
120         /* XXX FIXME: This unpleasantness doesn't belong here at *all*.
121          * It belongs in the OSC, except that the OSC doesn't have
122          * access to the real LOI -- it gets a copy, that we created
123          * above, and that copy can be arbitrarily out of date.
124          *
125          * The LOV API is due for a serious rewriting anyways, and this
126          * can be addressed then. */
127         if (rc == ELDLM_OK) {
128                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
129                 __u64 tmp = req->rq_md->lsm_oinfo->loi_rss;
130
131                 LASSERT(lock != NULL);
132                 loi->loi_rss = tmp;
133                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
134                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
135                 /* Extend KMS up to the end of this lock and no further
136                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
137                 if (tmp > lock->l_policy_data.l_extent.end)
138                         tmp = lock->l_policy_data.l_extent.end + 1;
139                 if (tmp >= loi->loi_kms) {
140                         CDEBUG(D_INODE, "lock acquired, setting rss="
141                                LPU64", kms="LPU64"\n", loi->loi_rss, tmp);
142                         loi->loi_kms = tmp;
143                         loi->loi_kms_valid = 1;
144                 } else {
145                         CDEBUG(D_INODE, "lock acquired, setting rss="
146                                LPU64"; leaving kms="LPU64", end="LPU64
147                                "\n", loi->loi_rss, loi->loi_kms,
148                                lock->l_policy_data.l_extent.end);
149                 }
150                 ldlm_lock_allow_match(lock);
151                 LDLM_LOCK_PUT(lock);
152         } else if (rc == ELDLM_LOCK_ABORTED && flags & LDLM_FL_HAS_INTENT) {
153                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
154                 loi->loi_rss = req->rq_md->lsm_oinfo->loi_rss;
155                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
156                 loi->loi_blocks = req->rq_md->lsm_oinfo->loi_blocks;
157                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
158                        " kms="LPU64"\n", loi->loi_rss, loi->loi_kms);
159                 rc = ELDLM_OK;
160         } else {
161                 struct obd_export *exp = set->set_exp;
162                 struct lov_obd *lov = &exp->exp_obd->u.lov;
163
164                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
165                 if (lov->tgts[req->rq_idx].active) {
166                         CERROR("error: enqueue objid "LPX64" subobj "
167                                 LPX64" on OST idx %d: rc = %d\n",
168                                 set->set_md->lsm_object_id, loi->loi_id,
169                                 loi->loi_ost_idx, rc);
170                 } else {
171                         rc = ELDLM_OK;
172                 }
173         }
174         lov_update_set(set, req, rc);
175         RETURN(rc);
176 }
177
178 static int enqueue_done(struct lov_request_set *set, __u32 mode, int flags)
179 {
180         struct list_head *pos;
181         struct lov_request *req;
182         struct lustre_handle *lov_lockhp = NULL;
183         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
184         int rc = 0;
185         ENTRY;
186
187         LASSERT(set->set_completes);
188         /* enqueue/match success, just return */
189         if (set->set_completes == set->set_success) {
190                 if (flags & LDLM_FL_TEST_LOCK)
191                         lov_llh_put(set->set_lockh);
192                 RETURN(0);
193         }
194
195         /* cancel enqueued/matched locks */
196         list_for_each (pos, &set->set_list) {
197                 req = list_entry(pos, struct lov_request, rq_link);
198
199                 if (!req->rq_complete || req->rq_rc)
200                         continue;
201                 if (flags & LDLM_FL_TEST_LOCK)
202                         continue;
203
204                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
205                 LASSERT(lov_lockhp);
206                 if (lov_lockhp->cookie == 0)
207                         continue;
208
209                 rc = obd_cancel(lov->tgts[req->rq_idx].ltd_exp, req->rq_md,
210                                 mode, lov_lockhp);
211                 if (rc && lov->tgts[req->rq_idx].active)
212                         CERROR("cancelling obdjid "LPX64" on OST "
213                                "idx %d error: rc = %d\n",
214                                req->rq_md->lsm_object_id, req->rq_idx, rc);
215         }
216         lov_llh_put(set->set_lockh);
217         RETURN(rc);
218 }
219
220 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode)
221 {
222         int rc = 0;
223         ENTRY;
224
225         LASSERT(set->set_exp);
226         if (set == NULL)
227                 RETURN(0);
228         if (set->set_completes)
229                 rc = enqueue_done(set, mode, 0);
230         else
231                 lov_llh_put(set->set_lockh);
232
233         if (atomic_dec_and_test(&set->set_refcount))
234                 lov_finish_set(set);
235
236         RETURN(rc);
237 }
238
239 int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm,
240                          ldlm_policy_data_t *policy, __u32 mode,
241                          struct lustre_handle *lockh,
242                          struct lov_request_set **reqset)
243 {
244         struct lov_obd *lov = &exp->exp_obd->u.lov;
245         struct lov_request_set *set;
246         int i, rc = 0;
247         struct lov_oinfo *loi;
248         ENTRY;
249
250         OBD_ALLOC(set, sizeof(*set));
251         if (set == NULL)
252                 RETURN(-ENOMEM);
253         lov_init_set(set);
254
255         set->set_exp = exp;
256         set->set_md = lsm;
257         set->set_lockh = lov_llh_new(lsm);
258         if (set->set_lockh == NULL)
259                 GOTO(out_set, rc = -ENOMEM);
260         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
261
262         loi = lsm->lsm_oinfo;
263         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
264                 struct lov_request *req;
265                 obd_off start, end;
266
267                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
268                                            policy->l_extent.end, &start, &end))
269                         continue;
270
271                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
272                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
273                         continue;
274                 }
275
276                 OBD_ALLOC(req, sizeof(*req));
277                 if (req == NULL)
278                         GOTO(out_set, rc = -ENOMEM);
279
280                 req->rq_buflen = sizeof(*req->rq_md) +
281                         sizeof(struct lov_oinfo);
282                 OBD_ALLOC(req->rq_md, req->rq_buflen);
283                 if (req->rq_md == NULL)
284                         GOTO(out_set, rc = -ENOMEM);
285
286                 req->rq_extent.start = start;
287                 req->rq_extent.end = end;
288
289                 req->rq_idx = loi->loi_ost_idx;
290                 req->rq_stripe = i;
291
292                 /* XXX LOV STACKING: submd should be from the subobj */
293                 req->rq_md->lsm_object_id = loi->loi_id;
294                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
295                 req->rq_md->lsm_stripe_count = 0;
296                 req->rq_md->lsm_oinfo->loi_kms_valid = loi->loi_kms_valid;
297                 req->rq_md->lsm_oinfo->loi_rss = loi->loi_rss;
298                 req->rq_md->lsm_oinfo->loi_kms = loi->loi_kms;
299                 req->rq_md->lsm_oinfo->loi_blocks = loi->loi_blocks;
300                 loi->loi_mtime = req->rq_md->lsm_oinfo->loi_mtime;
301
302                 lov_set_add_req(req, set);
303         }
304         if (!set->set_count)
305                 GOTO(out_set, rc = -EIO);
306         *reqset = set;
307         RETURN(0);
308 out_set:
309         lov_fini_enqueue_set(set, mode);
310         RETURN(rc);
311 }
312
313 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
314                          int rc)
315 {
316         ENTRY;
317         lov_update_set(set, req, !rc);
318         RETURN(rc);
319 }
320
321 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
322 {
323         int rc = 0;
324         ENTRY;
325
326         LASSERT(set->set_exp);
327         if (set == NULL)
328                 RETURN(0);
329         if (set->set_completes)
330                 rc = enqueue_done(set, mode, flags);
331         else
332                 lov_llh_put(set->set_lockh);
333
334         if (atomic_dec_and_test(&set->set_refcount))
335                 lov_finish_set(set);
336
337         RETURN(rc);
338 }
339
340 int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm,
341                        ldlm_policy_data_t *policy, __u32 mode,
342                        struct lustre_handle *lockh,
343                        struct lov_request_set **reqset)
344 {
345         struct lov_obd *lov = &exp->exp_obd->u.lov;
346         struct lov_request_set *set;
347         int i, rc = 0;
348         struct lov_oinfo *loi;
349         ENTRY;
350
351         OBD_ALLOC(set, sizeof(*set));
352         if (set == NULL)
353                 RETURN(-ENOMEM);
354         lov_init_set(set);
355
356         set->set_exp = exp;
357         set->set_md = lsm;
358         set->set_lockh = lov_llh_new(lsm);
359         if (set->set_lockh == NULL)
360                 GOTO(out_set, rc = -ENOMEM);
361         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
362
363         loi = lsm->lsm_oinfo;
364         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
365                 struct lov_request *req;
366                 obd_off start, end;
367
368                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
369                                            policy->l_extent.end, &start, &end))
370                         continue;
371
372                 /* FIXME raid1 should grace this error */
373                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
374                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
375                         GOTO(out_set, rc = -EIO);
376                 }
377
378                 OBD_ALLOC(req, sizeof(*req));
379                 if (req == NULL)
380                         GOTO(out_set, rc = -ENOMEM);
381
382                 req->rq_buflen = sizeof(*req->rq_md);
383                 OBD_ALLOC(req->rq_md, req->rq_buflen);
384                 if (req->rq_md == NULL)
385                         GOTO(out_set, rc = -ENOMEM);
386
387                 req->rq_extent.start = start;
388                 req->rq_extent.end = end;
389
390                 req->rq_idx = loi->loi_ost_idx;
391                 req->rq_stripe = i;
392
393                 /* XXX LOV STACKING: submd should be from the subobj */
394                 req->rq_md->lsm_object_id = loi->loi_id;
395                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
396                 req->rq_md->lsm_stripe_count = 0;
397                 lov_set_add_req(req, set);
398         }
399         if (!set->set_count)
400                 GOTO(out_set, rc = -EIO);
401         *reqset = set;
402         RETURN(rc);
403 out_set:
404         lov_fini_match_set(set, mode, 0);
405         RETURN(rc);
406 }
407
408 int lov_fini_cancel_set(struct lov_request_set *set)
409 {
410         int rc = 0;
411         ENTRY;
412
413         LASSERT(set->set_exp);
414         if (set == NULL)
415                 RETURN(0);
416
417         if (set->set_lockh)
418                 lov_llh_put(set->set_lockh);
419
420         if (atomic_dec_and_test(&set->set_refcount))
421                 lov_finish_set(set);
422
423         RETURN(rc);
424 }
425
426 int lov_prep_cancel_set(struct obd_export *exp, struct lov_stripe_md *lsm,
427                         __u32 mode, struct lustre_handle *lockh,
428                         struct lov_request_set **reqset)
429 {
430         struct lov_request_set *set;
431         int i, rc = 0;
432         struct lov_oinfo *loi;
433         ENTRY;
434
435         OBD_ALLOC(set, sizeof(*set));
436         if (set == NULL)
437                 RETURN(-ENOMEM);
438         lov_init_set(set);
439
440         set->set_exp = exp;
441         set->set_md = lsm;
442         set->set_lockh = lov_handle2llh(lockh);
443         if (set->set_lockh == NULL) {
444                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
445                 GOTO(out_set, rc = -EINVAL);
446         }
447         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
448
449         loi = lsm->lsm_oinfo;
450         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
451                 struct lov_request *req;
452                 struct lustre_handle *lov_lockhp;
453
454                 lov_lockhp = set->set_lockh->llh_handles + i;
455                 if (lov_lockhp->cookie == 0) {
456                         CDEBUG(D_HA, "lov idx %d subobj "LPX64" no lock?\n",
457                                loi->loi_ost_idx, loi->loi_id);
458                         continue;
459                 }
460
461                 OBD_ALLOC(req, sizeof(*req));
462                 if (req == NULL)
463                         GOTO(out_set, rc = -ENOMEM);
464
465                 req->rq_buflen = sizeof(*req->rq_md);
466                 OBD_ALLOC(req->rq_md, req->rq_buflen);
467                 if (req->rq_md == NULL)
468                         GOTO(out_set, rc = -ENOMEM);
469
470                 req->rq_idx = loi->loi_ost_idx;
471                 req->rq_stripe = i;
472
473                 /* XXX LOV STACKING: submd should be from the subobj */
474                 req->rq_md->lsm_object_id = loi->loi_id;
475                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
476                 req->rq_md->lsm_stripe_count = 0;
477                 lov_set_add_req(req, set);
478         }
479         if (!set->set_count)
480                 GOTO(out_set, rc = -EIO);
481         *reqset = set;
482         RETURN(rc);
483 out_set:
484         lov_fini_cancel_set(set);
485         RETURN(rc);
486 }
487
488 static int create_done(struct obd_export *exp, struct lov_request_set *set,
489                        struct lov_stripe_md **ea)
490 {
491         struct lov_obd *lov = &exp->exp_obd->u.lov;
492         struct obd_trans_info *oti = set->set_oti;
493         struct obdo *src_oa = set->set_oa;
494         struct list_head *pos;
495         struct lov_request *req;
496         struct obdo *ret_oa = NULL;
497         int attrset = 0, rc = 0;
498         ENTRY;
499
500         LASSERT(set->set_completes);
501
502         if (!set->set_success)
503                 GOTO(cleanup, rc = -EIO);
504
505         if (*ea == NULL && set->set_count != set->set_success) {
506                 set->set_count = set->set_success;
507                 qos_shrink_lsm(set);
508         }
509
510         ret_oa = obdo_alloc();
511         if (ret_oa == NULL)
512                 GOTO(cleanup, rc = -ENOMEM);
513
514         list_for_each (pos, &set->set_list) {
515                 req = list_entry(pos, struct lov_request, rq_link);
516                 if (!req->rq_complete || req->rq_rc)
517                         continue;
518                 lov_merge_attrs(ret_oa, req->rq_oa, req->rq_oa->o_valid,
519                                 set->set_md, req->rq_stripe, &attrset);
520         }
521         if (src_oa->o_valid & OBD_MD_FLSIZE &&
522             ret_oa->o_size != src_oa->o_size) {
523                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
524                        src_oa->o_size, ret_oa->o_size);
525                 LBUG();
526         }
527         ret_oa->o_id = src_oa->o_id;
528         ret_oa->o_gr = src_oa->o_gr;
529         ret_oa->o_valid |= OBD_MD_FLGROUP;
530         memcpy(src_oa, ret_oa, sizeof(*src_oa));
531         obdo_free(ret_oa);
532
533         *ea = set->set_md;
534         GOTO(done, rc = 0);
535
536         EXIT;
537 cleanup:
538         list_for_each (pos, &set->set_list) {
539                 struct obd_export *sub_exp;
540                 int err = 0;
541                 req = list_entry(pos, struct lov_request, rq_link);
542
543                 if (!req->rq_complete || req->rq_rc)
544                         continue;
545
546                 sub_exp = lov->tgts[req->rq_idx].ltd_exp,
547                 err = obd_destroy(sub_exp, req->rq_oa, NULL, oti);
548                 if (err)
549                         CERROR("Failed to uncreate objid "LPX64" subobj "
550                                LPX64" on OST idx %d: rc = %d\n",
551                                set->set_oa->o_id, req->rq_oa->o_id,
552                                req->rq_idx, rc);
553         }
554         if (*ea == NULL)
555                 obd_free_memmd(exp, &set->set_md);
556 done:
557         if (oti && set->set_cookies) {
558                 oti->oti_logcookies = set->set_cookies;
559                 if (!set->set_cookie_sent) {
560                         oti_free_cookies(oti);
561                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
562                 } else {
563                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
564                 }
565         }
566         return rc;
567 }
568
569 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
570 {
571         int rc = 0;
572         ENTRY;
573
574         LASSERT(set->set_exp);
575         if (set == NULL)
576                 RETURN(0);
577         if (set->set_completes) {
578                 rc = create_done(set->set_exp, set, ea);
579                 /* FIXME update qos data here */
580         }
581
582         if (atomic_dec_and_test(&set->set_refcount))
583                 lov_finish_set(set);
584
585         RETURN(rc);
586 }
587
588 int lov_update_create_set(struct lov_request_set *set,
589                           struct lov_request *req, int rc)
590 {
591         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
592         struct obd_trans_info *oti = set->set_oti;
593         struct lov_stripe_md *lsm = set->set_md;
594         struct lov_oinfo *loi;
595         ENTRY;
596
597         req->rq_stripe = set->set_success;
598         loi = &lsm->lsm_oinfo[req->rq_stripe];
599
600         if (rc && lov->tgts[req->rq_idx].active) {
601                 CERROR("error creating objid "LPX64" sub-object"
602                        " on OST idx %d/%d: rc = %d\n",
603                        set->set_oa->o_id, req->rq_idx,
604                        lsm->lsm_stripe_count, rc);
605                 if (rc > 0) {
606                         CERROR("obd_create returned invalid err %d\n", rc);
607                         rc = -EIO;
608                 }
609         }
610         lov_update_set(set, req, rc);
611         if (rc)
612                 RETURN(rc);
613
614         if (oti && oti->oti_objid)
615                 oti->oti_objid[req->rq_idx] = req->rq_oa->o_id;
616
617         loi->loi_id = req->rq_oa->o_id;
618         loi->loi_gr = req->rq_oa->o_gr;
619         loi->loi_ost_idx = req->rq_idx;
620         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPX64" at idx %d\n",
621                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
622         loi_init(loi);
623
624         if (set->set_cookies)
625                 ++oti->oti_logcookies;
626         if (req->rq_oa->o_valid & OBD_MD_FLCOOKIE)
627                 set->set_cookie_sent++;
628
629         RETURN(0);
630 }
631
632 int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
633                         struct obdo *src_oa, struct obd_trans_info *oti,
634                         struct lov_request_set **reqset)
635 {
636         struct lov_obd *lov = &exp->exp_obd->u.lov;
637         struct lov_request_set *set;
638         int rc = 0, newea = 0;
639         ENTRY;
640
641         OBD_ALLOC(set, sizeof(*set));
642         if (set == NULL)
643                 RETURN(-ENOMEM);
644         lov_init_set(set);
645
646         set->set_exp = exp;
647         set->set_md = *ea;
648         set->set_oa = src_oa;
649         set->set_oti = oti;
650
651         if (set->set_md == NULL) {
652                 int stripes, stripe_cnt;
653                 stripe_cnt = lov_get_stripecnt(lov, 0);
654
655                 /* If the MDS file was truncated up to some size, stripe over
656                  * enough OSTs to allow the file to be created at that size. */
657                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
658                         stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
659                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
660
661                         if (stripes > lov->desc.ld_active_tgt_count)
662                                 GOTO(out_set, rc = -EFBIG);
663                         if (stripes < stripe_cnt)
664                                 stripes = stripe_cnt;
665                 } else {
666                         stripes = stripe_cnt;
667                 }
668
669                 rc = lov_alloc_memmd(&set->set_md, stripes,
670                                      lov->desc.ld_pattern ?
671                                      lov->desc.ld_pattern : LOV_PATTERN_RAID0);
672                 if (rc < 0)
673                         goto out_set;
674                 newea = 1;
675         }
676
677         rc = qos_prep_create(lov, set, newea);
678         if (rc)
679                 goto out_lsm;
680
681         if (oti && (src_oa->o_valid & OBD_MD_FLCOOKIE)) {
682                 oti_alloc_cookies(oti, set->set_count);
683                 if (!oti->oti_logcookies)
684                         goto out_lsm;
685                 set->set_cookies = oti->oti_logcookies;
686         }
687         *reqset = set;
688         RETURN(rc);
689
690 out_lsm:
691         if (*ea == NULL)
692                 obd_free_memmd(exp, &set->set_md);
693 out_set:
694         lov_fini_create_set(set, ea);
695         RETURN(rc);
696 }
697
698 static int common_attr_done(struct lov_request_set *set)
699 {
700         struct list_head *pos;
701         struct lov_request *req;
702         struct obdo *tmp_oa;
703         int rc = 0, attrset = 0;
704         ENTRY;
705
706         if (set->set_oa == NULL)
707                 RETURN(0);
708
709         if (!set->set_success)
710                 RETURN(-EIO);
711
712         tmp_oa = obdo_alloc();
713         if (tmp_oa == NULL)
714                 GOTO(out, rc = -ENOMEM);
715
716         list_for_each (pos, &set->set_list) {
717                 req = list_entry(pos, struct lov_request, rq_link);
718
719                 if (!req->rq_complete || req->rq_rc)
720                         continue;
721                 if (req->rq_oa->o_valid == 0)   /* inactive stripe */
722                         continue;
723                 lov_merge_attrs(tmp_oa, req->rq_oa, req->rq_oa->o_valid,
724                                 set->set_md, req->rq_stripe, &attrset);
725         }
726         if (!attrset) {
727                 CERROR("No stripes had valid attrs\n");
728                 rc = -EIO;
729         }
730         tmp_oa->o_id = set->set_oa->o_id;
731         memcpy(set->set_oa, tmp_oa, sizeof(*set->set_oa));
732 out:
733         if (tmp_oa)
734                 obdo_free(tmp_oa);
735         RETURN(rc);
736
737 }
738
739 static int brw_done(struct lov_request_set *set)
740 {
741         struct lov_stripe_md *lsm = set->set_md;
742         struct lov_oinfo     *loi = NULL;
743         struct list_head *pos;
744         struct lov_request *req;
745         ENTRY;
746                                                                                                                              
747         list_for_each (pos, &set->set_list) {
748                 req = list_entry(pos, struct lov_request, rq_link);
749                                                                                                                              
750                 if (!req->rq_complete || req->rq_rc)
751                         continue;
752                                                                                                                              
753                 loi = &lsm->lsm_oinfo[req->rq_stripe];
754                                                                                                                              
755                 if (req->rq_oa->o_valid & OBD_MD_FLBLOCKS)
756                         loi->loi_blocks = req->rq_oa->o_blocks;
757         }
758                                                                                                                              
759         RETURN(0);
760 }
761
762 int lov_fini_brw_set(struct lov_request_set *set)
763 {
764         int rc = 0;
765         ENTRY;
766
767         LASSERT(set->set_exp);
768         if (set == NULL)
769                 RETURN(0);
770         if (set->set_completes) {
771                 rc = brw_done(set);
772                 /* FIXME update qos data here */
773         }
774         if (atomic_dec_and_test(&set->set_refcount))
775                 lov_finish_set(set);
776
777         RETURN(rc);
778 }
779
780 int lov_prep_brw_set(struct obd_export *exp, struct obdo *src_oa,
781                      struct lov_stripe_md *lsm, obd_count oa_bufs,
782                      struct brw_page *pga, struct obd_trans_info *oti,
783                      struct lov_request_set **reqset)
784 {
785         struct {
786                 obd_count       index;
787                 obd_count       count;
788                 obd_count       off;
789         } *info = NULL;
790         struct lov_request_set *set;
791         struct lov_oinfo *loi = NULL;
792         struct lov_obd *lov = &exp->exp_obd->u.lov;
793         int rc = 0, i, shift;
794         ENTRY;
795
796         OBD_ALLOC(set, sizeof(*set));
797         if (set == NULL)
798                 RETURN(-ENOMEM);
799         lov_init_set(set);
800
801         set->set_exp = exp;
802         set->set_md = lsm;
803         set->set_oa = src_oa;
804         set->set_oti = oti;
805         set->set_oabufs = oa_bufs;
806         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
807         if (!set->set_pga)
808                 GOTO(out, rc = -ENOMEM);
809
810         OBD_ALLOC(info, sizeof(*info) * lsm->lsm_stripe_count);
811         if (!info)
812                 GOTO(out, rc = -ENOMEM);
813
814         /* calculate the page count for each stripe */
815         for (i = 0; i < oa_bufs; i++) {
816                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
817                 info[stripe].count++;
818         }
819
820         /* alloc and initialize lov request */
821         loi = lsm->lsm_oinfo;
822         shift = 0;
823         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
824                 struct lov_request *req;
825
826                 if (info[i].count == 0)
827                         continue;
828
829                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
830                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
831                         GOTO(out, rc = -EIO);
832                 }
833
834                 OBD_ALLOC(req, sizeof(*req));
835                 if (req == NULL)
836                         GOTO(out, rc = -ENOMEM);
837
838                 req->rq_oa = obdo_alloc();
839                 if (req->rq_oa == NULL)
840                         GOTO(out, rc = -ENOMEM);
841
842                 if (src_oa)
843                         memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
844                 req->rq_oa->o_id = loi->loi_id;
845                 req->rq_buflen = sizeof(*req->rq_md);
846                 OBD_ALLOC(req->rq_md, req->rq_buflen);
847                 if (req->rq_md == NULL)
848                         GOTO(out, rc = -ENOMEM);
849
850                 req->rq_idx = loi->loi_ost_idx;
851                 req->rq_stripe = i;
852
853                 /* XXX LOV STACKING */
854                 req->rq_md->lsm_object_id = loi->loi_id;
855                 req->rq_md->lsm_object_gr = lsm->lsm_object_gr;
856                 req->rq_oabufs = info[i].count;
857                 req->rq_pgaidx = shift;
858                 shift += req->rq_oabufs;
859
860                 /* remember the index for sort brw_page array */
861                 info[i].index = req->rq_pgaidx;
862                 lov_set_add_req(req, set);
863         }
864         if (!set->set_count)
865                 GOTO(out, rc = -EIO);
866
867         /* rotate & sort the brw_page array */
868         for (i = 0; i < oa_bufs; i++) {
869                 int stripe = lov_stripe_number(lsm, pga[i].disk_offset);
870
871                 shift = info[stripe].index + info[stripe].off;
872                 LASSERT(shift < oa_bufs);
873                 set->set_pga[shift] = pga[i];
874                 lov_stripe_offset(lsm, pga[i].disk_offset, stripe,
875                                   &set->set_pga[shift].disk_offset);
876                 info[stripe].off++;
877         }
878 out:
879         if (info)
880                 OBD_FREE(info, sizeof(*info) * lsm->lsm_stripe_count);
881
882         if (rc == 0)
883                 *reqset = set;
884         else
885                 lov_fini_brw_set(set);
886
887         RETURN(rc);
888 }
889
890 static int getattr_done(struct lov_request_set *set)
891 {
892         return common_attr_done(set);
893 }
894
895 int lov_fini_getattr_set(struct lov_request_set *set)
896 {
897         int rc = 0;
898         ENTRY;
899
900         LASSERT(set->set_exp);
901         if (set == NULL)
902                 RETURN(0);
903         if (set->set_completes)
904                 rc = getattr_done(set);
905
906         if (atomic_dec_and_test(&set->set_refcount))
907                 lov_finish_set(set);
908
909         RETURN(rc);
910 }
911
912 int lov_prep_getattr_set(struct obd_export *exp, struct obdo *src_oa,
913                          struct lov_stripe_md *lsm,
914                          struct lov_request_set **reqset)
915 {
916         struct lov_request_set *set;
917         struct lov_oinfo *loi = NULL;
918         struct lov_obd *lov = &exp->exp_obd->u.lov;
919         int rc = 0, i;
920         ENTRY;
921
922         OBD_ALLOC(set, sizeof(*set));
923         if (set == NULL)
924                 RETURN(-ENOMEM);
925         lov_init_set(set);
926
927         set->set_exp = exp;
928         set->set_md = lsm;
929         set->set_oa = src_oa;
930
931         loi = lsm->lsm_oinfo;
932         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
933                 struct lov_request *req;
934
935                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
936                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
937                         continue;
938                 }
939
940                 OBD_ALLOC(req, sizeof(*req));
941                 if (req == NULL)
942                         GOTO(out_set, rc = -ENOMEM);
943
944                 req->rq_stripe = i;
945                 req->rq_idx = loi->loi_ost_idx;
946
947                 req->rq_oa = obdo_alloc();
948                 if (req->rq_oa == NULL)
949                         GOTO(out_set, rc = -ENOMEM);
950                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
951                 req->rq_oa->o_id = loi->loi_id;
952                 lov_set_add_req(req, set);
953         }
954         if (!set->set_count)
955                 GOTO(out_set, rc = -EIO);
956         *reqset = set;
957         RETURN(rc);
958 out_set:
959         lov_fini_getattr_set(set);
960         RETURN(rc);
961 }
962
963 int lov_fini_destroy_set(struct lov_request_set *set)
964 {
965         ENTRY;
966
967         LASSERT(set->set_exp);
968         if (set == NULL)
969                 RETURN(0);
970         if (set->set_completes) {
971                 /* FIXME update qos data here */
972         }
973
974         if (atomic_dec_and_test(&set->set_refcount))
975                 lov_finish_set(set);
976
977         RETURN(0);
978 }
979
980 int lov_prep_destroy_set(struct obd_export *exp, struct obdo *src_oa,
981                          struct lov_stripe_md *lsm,
982                          struct obd_trans_info *oti,
983                          struct lov_request_set **reqset)
984 {
985         struct lov_request_set *set;
986         struct lov_oinfo *loi = NULL;
987         struct lov_obd *lov = &exp->exp_obd->u.lov;
988         int rc = 0, cookie_set = 0, i;
989         ENTRY;
990
991         OBD_ALLOC(set, sizeof(*set));
992         if (set == NULL)
993                 RETURN(-ENOMEM);
994         lov_init_set(set);
995
996         set->set_exp = exp;
997         set->set_md = lsm;
998         set->set_oa = src_oa;
999         set->set_oti = oti;
1000         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1001                 set->set_cookies = oti->oti_logcookies;
1002
1003         loi = lsm->lsm_oinfo;
1004         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1005                 struct lov_request *req;
1006
1007                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1008                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1009                         continue;
1010                 }
1011
1012                 OBD_ALLOC(req, sizeof(*req));
1013                 if (req == NULL)
1014                         GOTO(out_set, rc = -ENOMEM);
1015
1016                 req->rq_stripe = i;
1017                 req->rq_idx = loi->loi_ost_idx;
1018
1019                 req->rq_oa = obdo_alloc();
1020                 if (req->rq_oa == NULL)
1021                         GOTO(out_set, rc = -ENOMEM);
1022                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1023                 req->rq_oa->o_id = loi->loi_id;
1024
1025                 /* Setup the first request's cookie position */
1026                 if (!cookie_set && set->set_cookies) {
1027                         oti->oti_logcookies = set->set_cookies + i;
1028                         cookie_set = 1;
1029                 }
1030                 lov_set_add_req(req, set);
1031         }
1032         if (!set->set_count)
1033                 GOTO(out_set, rc = -EIO);
1034         *reqset = set;
1035         RETURN(rc);
1036 out_set:
1037         lov_fini_destroy_set(set);
1038         RETURN(rc);
1039 }
1040
1041 static int setattr_done(struct lov_request_set *set)
1042 {
1043         return common_attr_done(set);
1044 }
1045
1046 int lov_fini_setattr_set(struct lov_request_set *set)
1047 {
1048         int rc = 0;
1049         ENTRY;
1050
1051         LASSERT(set->set_exp);
1052         if (set == NULL)
1053                 RETURN(0);
1054         if (set->set_completes) {
1055                 rc = setattr_done(set);
1056                 /* FIXME update qos data here */
1057         }
1058
1059         if (atomic_dec_and_test(&set->set_refcount))
1060                 lov_finish_set(set);
1061         RETURN(rc);
1062 }
1063
1064 int lov_prep_setattr_set(struct obd_export *exp, struct obdo *src_oa,
1065                          struct lov_stripe_md *lsm, struct obd_trans_info *oti,
1066                          struct lov_request_set **reqset)
1067 {
1068         struct lov_request_set *set;
1069         struct lov_oinfo *loi = NULL;
1070         struct lov_obd *lov = &exp->exp_obd->u.lov;
1071         int rc = 0, i;
1072         ENTRY;
1073
1074         OBD_ALLOC(set, sizeof(*set));
1075         if (set == NULL)
1076                 RETURN(-ENOMEM);
1077         lov_init_set(set);
1078
1079         set->set_exp = exp;
1080         set->set_md = lsm;
1081         set->set_oa = src_oa;
1082
1083         loi = lsm->lsm_oinfo;
1084         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1085                 struct lov_request *req;
1086
1087                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1088                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1089                         continue;
1090                 }
1091
1092                 OBD_ALLOC(req, sizeof(*req));
1093                 if (req == NULL)
1094                         GOTO(out_set, rc = -ENOMEM);
1095                 req->rq_stripe = i;
1096                 req->rq_idx = loi->loi_ost_idx;
1097
1098                 req->rq_oa = obdo_alloc();
1099                 if (req->rq_oa == NULL)
1100                         GOTO(out_set, rc = -ENOMEM);
1101                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1102                 req->rq_oa->o_id = loi->loi_id;
1103                 LASSERT(!(req->rq_oa->o_valid & OBD_MD_FLGROUP) || req->rq_oa->o_gr>0);
1104
1105                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
1106                         if (lov_stripe_offset(lsm, src_oa->o_size, i,
1107                                               &req->rq_oa->o_size) < 0 &&
1108                             req->rq_oa->o_size)
1109                                 req->rq_oa->o_size--;
1110                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1111                                i, req->rq_oa->o_size, src_oa->o_size);
1112                 }
1113                 lov_set_add_req(req, set);
1114         }
1115         if (!set->set_count)
1116                 GOTO(out_set, rc = -EIO);
1117         *reqset = set;
1118         RETURN(rc);
1119 out_set:
1120         lov_fini_setattr_set(set);
1121         RETURN(rc);
1122 }
1123
1124 int lov_update_punch_set(struct lov_request_set *set, struct lov_request *req,
1125                          int rc)
1126 {
1127         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
1128         ENTRY;
1129
1130         lov_update_set(set, req, rc);
1131         if (rc && !lov->tgts[req->rq_idx].active)
1132                 rc = 0;
1133         /* FIXME in raid1 regime, should return 0 */
1134         RETURN(rc);
1135 }
1136
1137 int lov_fini_punch_set(struct lov_request_set *set)
1138 {
1139         int rc = 0;
1140         ENTRY;
1141
1142         LASSERT(set->set_exp);
1143         if (set == NULL)
1144                 RETURN(0);
1145         if (set->set_completes) {
1146                 if (!set->set_success)
1147                         rc = -EIO;
1148                 /* FIXME update qos data here */
1149         }
1150
1151         if (atomic_dec_and_test(&set->set_refcount))
1152                 lov_finish_set(set);
1153
1154         RETURN(rc);
1155 }
1156
1157 int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa,
1158                        struct lov_stripe_md *lsm, obd_off start,
1159                        obd_off end, struct obd_trans_info *oti,
1160                        struct lov_request_set **reqset)
1161 {
1162         struct lov_request_set *set;
1163         struct lov_oinfo *loi = NULL;
1164         struct lov_obd *lov = &exp->exp_obd->u.lov;
1165         int rc = 0, i;
1166         ENTRY;
1167
1168         OBD_ALLOC(set, sizeof(*set));
1169         if (set == NULL)
1170                 RETURN(-ENOMEM);
1171         lov_init_set(set);
1172
1173         set->set_exp = exp;
1174         set->set_md = lsm;
1175         set->set_oa = src_oa;
1176
1177         loi = lsm->lsm_oinfo;
1178         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1179                 struct lov_request *req;
1180                 obd_off rs, re;
1181
1182                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1183                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1184                         continue;
1185                 }
1186
1187                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1188                         continue;
1189
1190                 OBD_ALLOC(req, sizeof(*req));
1191                 if (req == NULL)
1192                         GOTO(out_set, rc = -ENOMEM);
1193                 req->rq_stripe = i;
1194                 req->rq_idx = loi->loi_ost_idx;
1195
1196                 req->rq_oa = obdo_alloc();
1197                 if (req->rq_oa == NULL)
1198                         GOTO(out_set, rc = -ENOMEM);
1199                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1200                 req->rq_oa->o_id = loi->loi_id;
1201                 req->rq_oa->o_gr = loi->loi_gr;
1202                 req->rq_oa->o_valid |= OBD_MD_FLGROUP;
1203
1204                 req->rq_extent.start = rs;
1205                 req->rq_extent.end = re;
1206
1207                 lov_set_add_req(req, set);
1208         }
1209         if (!set->set_count)
1210                 GOTO(out_set, rc = -EIO);
1211         *reqset = set;
1212         RETURN(rc);
1213 out_set:
1214         lov_fini_punch_set(set);
1215         RETURN(rc);
1216 }
1217
1218 int lov_fini_sync_set(struct lov_request_set *set)
1219 {
1220         int rc = 0;
1221         ENTRY;
1222
1223         LASSERT(set->set_exp);
1224         if (set == NULL)
1225                 RETURN(0);
1226         if (set->set_completes) {
1227                 if (!set->set_success)
1228                         rc = -EIO;
1229                 /* FIXME update qos data here */
1230         }
1231
1232         if (atomic_dec_and_test(&set->set_refcount))
1233                 lov_finish_set(set);
1234
1235         RETURN(rc);
1236 }
1237
1238 int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa,
1239                       struct lov_stripe_md *lsm, obd_off start,
1240                       obd_off end, struct lov_request_set **reqset)
1241 {
1242         struct lov_request_set *set;
1243         struct lov_oinfo *loi = NULL;
1244         struct lov_obd *lov = &exp->exp_obd->u.lov;
1245         int rc = 0, i;
1246         ENTRY;
1247
1248         OBD_ALLOC(set, sizeof(*set));
1249         if (set == NULL)
1250                 RETURN(-ENOMEM);
1251         lov_init_set(set);
1252
1253         set->set_exp = exp;
1254         set->set_md = lsm;
1255         set->set_oa = src_oa;
1256
1257         loi = lsm->lsm_oinfo;
1258         for (i = 0; i < lsm->lsm_stripe_count; i++, loi++) {
1259                 struct lov_request *req;
1260                 obd_off rs, re;
1261
1262                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
1263                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1264                         continue;
1265                 }
1266
1267                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1268                         continue;
1269
1270                 OBD_ALLOC(req, sizeof(*req));
1271                 if (req == NULL)
1272                         GOTO(out_set, rc = -ENOMEM);
1273                 req->rq_stripe = i;
1274                 req->rq_idx = loi->loi_ost_idx;
1275
1276                 req->rq_oa = obdo_alloc();
1277                 if (req->rq_oa == NULL)
1278                         GOTO(out_set, rc = -ENOMEM);
1279                 memcpy(req->rq_oa, src_oa, sizeof(*req->rq_oa));
1280                 req->rq_oa->o_id = loi->loi_id;
1281                 req->rq_extent.start = rs;
1282                 req->rq_extent.end = re;
1283                 lov_set_add_req(req, set);
1284         }
1285         if (!set->set_count)
1286                 GOTO(out_set, rc = -EIO);
1287         *reqset = set;
1288         RETURN(rc);
1289 out_set:
1290         lov_fini_sync_set(set);
1291         RETURN(rc);
1292 }