Whamcloud - gitweb
b=22850 add changelog entry
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         struct list_head *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = list_entry(pos, struct lov_request,
74                                                      rq_link);
75                 list_del_init(&req->rq_link);
76
77                 if (req->rq_oi.oi_oa)
78                         OBDO_FREE(req->rq_oi.oi_oa);
79                 if (req->rq_oi.oi_md)
80                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
81                 if (req->rq_oi.oi_osfs)
82                         OBD_FREE(req->rq_oi.oi_osfs,
83                                  sizeof(*req->rq_oi.oi_osfs));
84                 OBD_FREE(req, sizeof(*req));
85         }
86
87         if (set->set_pga) {
88                 int len = set->set_oabufs * sizeof(*set->set_pga);
89                 OBD_FREE(set->set_pga, len);
90         }
91         if (set->set_lockh)
92                 lov_llh_put(set->set_lockh);
93
94         OBD_FREE(set, sizeof(*set));
95         EXIT;
96 }
97
98 int lov_finished_set(struct lov_request_set *set)
99 {
100         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
101                set->set_count);
102         return set->set_completes == set->set_count;
103 }
104
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
144 {
145         struct lov_request_set *set = req->rq_rqset;
146         struct lustre_handle *lov_lockhp;
147         struct lov_oinfo *loi;
148         ENTRY;
149
150         LASSERT(set != NULL);
151         LASSERT(set->set_oi != NULL);
152
153         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
154         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
155
156         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
157          * and that copy can be arbitrarily out of date.
158          *
159          * The LOV API is due for a serious rewriting anyways, and this
160          * can be addressed then. */
161
162         if (rc == ELDLM_OK) {
163                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
164                 __u64 tmp;
165
166                 LASSERT(lock != NULL);
167                 lov_stripe_lock(set->set_oi->oi_md);
168                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
169                 tmp = loi->loi_lvb.lvb_size;
170                 /* Extend KMS up to the end of this lock and no further
171                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
172                 if (tmp > lock->l_policy_data.l_extent.end)
173                         tmp = lock->l_policy_data.l_extent.end + 1;
174                 if (tmp >= loi->loi_kms) {
175                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
176                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
177                         loi->loi_kms = tmp;
178                         loi->loi_kms_valid = 1;
179                 } else {
180                         LDLM_DEBUG(lock, "lock acquired, setting rss="
181                                    LPU64"; leaving kms="LPU64", end="LPU64,
182                                    loi->loi_lvb.lvb_size, loi->loi_kms,
183                                    lock->l_policy_data.l_extent.end);
184                 }
185                 lov_stripe_unlock(set->set_oi->oi_md);
186                 ldlm_lock_allow_match(lock);
187                 LDLM_LOCK_PUT(lock);
188         } else if ((rc == ELDLM_LOCK_ABORTED) &&
189                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
190                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
191                 lov_stripe_lock(set->set_oi->oi_md);
192                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
193                 lov_stripe_unlock(set->set_oi->oi_md);
194                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
195                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
196                 rc = ELDLM_OK;
197         } else {
198                 struct obd_export *exp = set->set_exp;
199                 struct lov_obd *lov = &exp->exp_obd->u.lov;
200
201                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
202                 if (lov->lov_tgts[req->rq_idx] &&
203                     lov->lov_tgts[req->rq_idx]->ltd_active) {
204                         /* -EUSERS used by OST to report file contention */
205                         if (rc != -EINTR && rc != -EUSERS)
206                                 CERROR("enqueue objid "LPX64" subobj "
207                                        LPX64" on OST idx %d: rc %d\n",
208                                        set->set_oi->oi_md->lsm_object_id,
209                                        loi->loi_id, loi->loi_ost_idx, rc);
210                 } else {
211                         rc = ELDLM_OK;
212                 }
213         }
214         lov_update_set(set, req, rc);
215         RETURN(rc);
216 }
217
218 /* The callback for osc_enqueue that updates lov info for every OSC request. */
219 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
220 {
221         struct ldlm_enqueue_info *einfo;
222         struct lov_request *lovreq;
223
224         lovreq = container_of(oinfo, struct lov_request, rq_oi);
225         einfo = lovreq->rq_rqset->set_ei;
226         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
227 }
228
229 static int enqueue_done(struct lov_request_set *set, __u32 mode)
230 {
231         struct lov_request *req;
232         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
233         int rc = 0;
234         ENTRY;
235
236         /* enqueue/match success, just return */
237         if (set->set_completes && set->set_completes == set->set_success)
238                 RETURN(0);
239
240         /* cancel enqueued/matched locks */
241         list_for_each_entry(req, &set->set_list, rq_link) {
242                 struct lustre_handle *lov_lockhp;
243
244                 if (!req->rq_complete || req->rq_rc)
245                         continue;
246
247                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
248                 LASSERT(lov_lockhp);
249                 if (!lustre_handle_is_used(lov_lockhp))
250                         continue;
251
252                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
253                                 req->rq_oi.oi_md, mode, lov_lockhp, 0, 0);
254                 if (rc && lov->lov_tgts[req->rq_idx] &&
255                     lov->lov_tgts[req->rq_idx]->ltd_active)
256                         CERROR("cancelling obdjid "LPX64" on OST "
257                                "idx %d error: rc = %d\n",
258                                req->rq_oi.oi_md->lsm_object_id,
259                                req->rq_idx, rc);
260         }
261         if (set->set_lockh)
262                 lov_llh_put(set->set_lockh);
263         RETURN(rc);
264 }
265
266 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
267                          struct ptlrpc_request_set *rqset)
268 {
269         int ret = 0;
270         ENTRY;
271
272         if (set == NULL)
273                 RETURN(0);
274         LASSERT(set->set_exp);
275         /* Do enqueue_done only for sync requests and if any request
276          * succeeded. */
277         if (!rqset) {
278                 if (rc)
279                         set->set_completes = 0;
280                 ret = enqueue_done(set, mode);
281         } else if (set->set_lockh)
282                 lov_llh_put(set->set_lockh);
283
284         lov_put_reqset(set);
285
286         RETURN(rc ? rc : ret);
287 }
288
289 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
290                          struct ldlm_enqueue_info *einfo,
291                          struct lov_request_set **reqset)
292 {
293         struct lov_obd *lov = &exp->exp_obd->u.lov;
294         struct lov_request_set *set;
295         int i, rc = 0;
296         struct lov_oinfo *loi;
297         ENTRY;
298
299         OBD_ALLOC(set, sizeof(*set));
300         if (set == NULL)
301                 RETURN(-ENOMEM);
302         lov_init_set(set);
303
304         set->set_exp = exp;
305         set->set_oi = oinfo;
306         set->set_ei = einfo;
307         set->set_lockh = lov_llh_new(oinfo->oi_md);
308         if (set->set_lockh == NULL)
309                 GOTO(out_set, rc = -ENOMEM);
310         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
311
312         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
313                 struct lov_request *req;
314                 obd_off start, end;
315
316                 loi = oinfo->oi_md->lsm_oinfo[i];
317                 if (!lov_stripe_intersects(oinfo->oi_md, i,
318                                            oinfo->oi_policy.l_extent.start,
319                                            oinfo->oi_policy.l_extent.end,
320                                            &start, &end))
321                         continue;
322
323                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
324                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
325                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
326                         continue;
327                 }
328
329                 OBD_ALLOC(req, sizeof(*req));
330                 if (req == NULL)
331                         GOTO(out_set, rc = -ENOMEM);
332
333                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
334                         sizeof(struct lov_oinfo *) +
335                         sizeof(struct lov_oinfo);
336                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
337                 if (req->rq_oi.oi_md == NULL) {
338                         OBD_FREE(req, sizeof(*req));
339                         GOTO(out_set, rc = -ENOMEM);
340                 }
341                 req->rq_oi.oi_md->lsm_oinfo[0] =
342                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
343                         sizeof(struct lov_oinfo *);
344
345                 /* Set lov request specific parameters. */
346                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
347                 req->rq_oi.oi_cb_up = cb_update_enqueue;
348                 req->rq_oi.oi_flags = oinfo->oi_flags;
349
350                 LASSERT(req->rq_oi.oi_lockh);
351
352                 req->rq_oi.oi_policy.l_extent.gid =
353                         oinfo->oi_policy.l_extent.gid;
354                 req->rq_oi.oi_policy.l_extent.start = start;
355                 req->rq_oi.oi_policy.l_extent.end = end;
356
357                 req->rq_idx = loi->loi_ost_idx;
358                 req->rq_stripe = i;
359
360                 /* XXX LOV STACKING: submd should be from the subobj */
361                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
362                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
363                 req->rq_oi.oi_md->lsm_stripe_count = 0;
364                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
365                         loi->loi_kms_valid;
366                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
367                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
368
369                 lov_set_add_req(req, set);
370         }
371         if (!set->set_count)
372                 GOTO(out_set, rc = -EIO);
373         *reqset = set;
374         RETURN(0);
375 out_set:
376         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
377         RETURN(rc);
378 }
379
380 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
381                          int rc)
382 {
383         int ret = rc;
384         ENTRY;
385
386         if (rc > 0)
387                 ret = 0;
388         else if (rc == 0)
389                 ret = 1;
390         lov_update_set(set, req, ret);
391         RETURN(rc);
392 }
393
394 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
395 {
396         int rc = 0;
397         ENTRY;
398
399         if (set == NULL)
400                 RETURN(0);
401         LASSERT(set->set_exp);
402         rc = enqueue_done(set, mode);
403         if ((set->set_count == set->set_success) &&
404             (flags & LDLM_FL_TEST_LOCK))
405                 lov_llh_put(set->set_lockh);
406
407         lov_put_reqset(set);
408
409         RETURN(rc);
410 }
411
412 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
413                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
414                        __u32 mode, struct lustre_handle *lockh,
415                        struct lov_request_set **reqset)
416 {
417         struct lov_obd *lov = &exp->exp_obd->u.lov;
418         struct lov_request_set *set;
419         int i, rc = 0;
420         struct lov_oinfo *loi;
421         ENTRY;
422
423         OBD_ALLOC(set, sizeof(*set));
424         if (set == NULL)
425                 RETURN(-ENOMEM);
426         lov_init_set(set);
427
428         set->set_exp = exp;
429         set->set_oi = oinfo;
430         set->set_oi->oi_md = lsm;
431         set->set_lockh = lov_llh_new(lsm);
432         if (set->set_lockh == NULL)
433                 GOTO(out_set, rc = -ENOMEM);
434         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
435
436         for (i = 0; i < lsm->lsm_stripe_count; i++){
437                 struct lov_request *req;
438                 obd_off start, end;
439
440                 loi = lsm->lsm_oinfo[i];
441                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
442                                            policy->l_extent.end, &start, &end))
443                         continue;
444
445                 /* FIXME raid1 should grace this error */
446                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
447                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
448                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
449                         GOTO(out_set, rc = -EIO);
450                 }
451
452                 OBD_ALLOC(req, sizeof(*req));
453                 if (req == NULL)
454                         GOTO(out_set, rc = -ENOMEM);
455
456                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
457                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
458                 if (req->rq_oi.oi_md == NULL) {
459                         OBD_FREE(req, sizeof(*req));
460                         GOTO(out_set, rc = -ENOMEM);
461                 }
462
463                 req->rq_oi.oi_policy.l_extent.start = start;
464                 req->rq_oi.oi_policy.l_extent.end = end;
465                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
466
467                 req->rq_idx = loi->loi_ost_idx;
468                 req->rq_stripe = i;
469
470                 /* XXX LOV STACKING: submd should be from the subobj */
471                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
472                 req->rq_oi.oi_md->lsm_object_gr = loi->loi_gr;
473                 req->rq_oi.oi_md->lsm_stripe_count = 0;
474
475                 lov_set_add_req(req, set);
476         }
477         if (!set->set_count)
478                 GOTO(out_set, rc = -EIO);
479         *reqset = set;
480         RETURN(rc);
481 out_set:
482         lov_fini_match_set(set, mode, 0);
483         RETURN(rc);
484 }
485
486 int lov_fini_cancel_set(struct lov_request_set *set)
487 {
488         int rc = 0;
489         ENTRY;
490
491         if (set == NULL)
492                 RETURN(0);
493
494         LASSERT(set->set_exp);
495         if (set->set_lockh)
496                 lov_llh_put(set->set_lockh);
497
498         lov_put_reqset(set);
499
500         RETURN(rc);
501 }
502
503 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
504                         struct lov_stripe_md *lsm, __u32 mode,
505                         struct lustre_handle *lockh,
506                         struct lov_request_set **reqset)
507 {
508         struct lov_request_set *set;
509         int i, rc = 0;
510         struct lov_oinfo *loi;
511         ENTRY;
512
513         OBD_ALLOC(set, sizeof(*set));
514         if (set == NULL)
515                 RETURN(-ENOMEM);
516         lov_init_set(set);
517
518         set->set_exp = exp;
519         set->set_oi = oinfo;
520         set->set_oi->oi_md = lsm;
521         set->set_lockh = lov_handle2llh(lockh);
522         if (set->set_lockh == NULL) {
523                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
524                 GOTO(out_set, rc = -EINVAL);
525         }
526         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
527
528         for (i = 0; i < lsm->lsm_stripe_count; i++){
529                 struct lov_request *req;
530                 struct lustre_handle *lov_lockhp;
531
532                 loi = lsm->lsm_oinfo[i];
533                 lov_lockhp = set->set_lockh->llh_handles + i;
534                 if (!lustre_handle_is_used(lov_lockhp)) {
535                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
536                                loi->loi_ost_idx, loi->loi_id);
537                         continue;
538                 }
539
540                 OBD_ALLOC(req, sizeof(*req));
541                 if (req == NULL)
542                         GOTO(out_set, rc = -ENOMEM);
543
544                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
545                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
546                 if (req->rq_oi.oi_md == NULL) {
547                         OBD_FREE(req, sizeof(*req));
548                         GOTO(out_set, rc = -ENOMEM);
549                 }
550
551                 req->rq_idx = loi->loi_ost_idx;
552                 req->rq_stripe = i;
553
554                 /* XXX LOV STACKING: submd should be from the subobj */
555                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
556                 req->rq_oi.oi_md->lsm_stripe_count = 0;
557
558                 lov_set_add_req(req, set);
559         }
560         if (!set->set_count)
561                 GOTO(out_set, rc = -EIO);
562         *reqset = set;
563         RETURN(rc);
564 out_set:
565         lov_fini_cancel_set(set);
566         RETURN(rc);
567 }
568
569 static int create_done(struct obd_export *exp, struct lov_request_set *set,
570                        struct lov_stripe_md **lsmp)
571 {
572         struct lov_obd *lov = &exp->exp_obd->u.lov;
573         struct obd_trans_info *oti = set->set_oti;
574         struct obdo *src_oa = set->set_oi->oi_oa;
575         struct lov_request *req;
576         struct obdo *ret_oa = NULL;
577         int attrset = 0, rc = 0;
578         ENTRY;
579
580         LASSERT(set->set_completes);
581
582         /* try alloc objects on other osts if osc_create fails for
583          * exceptions: RPC failure, ENOSPC, etc */
584         if (set->set_count != set->set_success) {
585                 list_for_each_entry (req, &set->set_list, rq_link) {
586                         if (req->rq_rc == 0)
587                                 continue;
588
589                         set->set_completes--;
590                         req->rq_complete = 0;
591
592                         rc = qos_remedy_create(set, req);
593                         lov_update_create_set(set, req, rc);
594                 }
595         }
596
597         /* no successful creates */
598         if (set->set_success == 0)
599                 GOTO(cleanup, rc);
600
601         if (set->set_count != set->set_success) {
602                 set->set_count = set->set_success;
603                 qos_shrink_lsm(set);
604         }
605
606         OBDO_ALLOC(ret_oa);
607         if (ret_oa == NULL)
608                 GOTO(cleanup, rc = -ENOMEM);
609
610         list_for_each_entry(req, &set->set_list, rq_link) {
611                 if (!req->rq_complete || req->rq_rc)
612                         continue;
613                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
614                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
615                                 req->rq_stripe, &attrset);
616         }
617         if (src_oa->o_valid & OBD_MD_FLSIZE &&
618             ret_oa->o_size != src_oa->o_size) {
619                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
620                        src_oa->o_size, ret_oa->o_size);
621                 LBUG();
622         }
623         ret_oa->o_id = src_oa->o_id;
624         memcpy(src_oa, ret_oa, sizeof(*src_oa));
625         OBDO_FREE(ret_oa);
626
627         *lsmp = set->set_oi->oi_md;
628         GOTO(done, rc = 0);
629
630 cleanup:
631         list_for_each_entry(req, &set->set_list, rq_link) {
632                 struct obd_export *sub_exp;
633                 int err = 0;
634
635                 if (!req->rq_complete || req->rq_rc)
636                         continue;
637
638                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
639                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
640                 if (err)
641                         CERROR("Failed to uncreate objid "LPX64" subobj "
642                                LPX64" on OST idx %d: rc = %d\n",
643                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
644                                req->rq_idx, rc);
645         }
646         if (*lsmp == NULL)
647                 obd_free_memmd(exp, &set->set_oi->oi_md);
648 done:
649         if (oti && set->set_cookies) {
650                 oti->oti_logcookies = set->set_cookies;
651                 if (!set->set_cookie_sent) {
652                         oti_free_cookies(oti);
653                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
654                 } else {
655                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
656                 }
657         }
658         RETURN(rc);
659 }
660
661 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
662 {
663         int rc = 0;
664         ENTRY;
665
666         if (set == NULL)
667                 RETURN(0);
668         LASSERT(set->set_exp);
669         if (set->set_completes)
670                 rc = create_done(set->set_exp, set, lsmp);
671
672         lov_put_reqset(set);
673         RETURN(rc);
674 }
675
676 int lov_update_create_set(struct lov_request_set *set,
677                           struct lov_request *req, int rc)
678 {
679         struct obd_trans_info *oti = set->set_oti;
680         struct lov_stripe_md *lsm = set->set_oi->oi_md;
681         struct lov_oinfo *loi;
682         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
683         ENTRY;
684
685         if (rc && lov->lov_tgts[req->rq_idx] &&
686             lov->lov_tgts[req->rq_idx]->ltd_active) {
687                 CERROR("error creating fid "LPX64" sub-object"
688                        " on OST idx %d/%d: rc = %d\n",
689                        set->set_oi->oi_oa->o_id, req->rq_idx,
690                        lsm->lsm_stripe_count, rc);
691                 if (rc > 0) {
692                         CERROR("obd_create returned invalid err %d\n", rc);
693                         rc = -EIO;
694                 }
695         }
696
697         spin_lock(&set->set_lock);
698         req->rq_stripe = set->set_success;
699         loi = lsm->lsm_oinfo[req->rq_stripe];
700         if (rc) {
701                 lov_update_set(set, req, rc);
702                 spin_unlock(&set->set_lock);
703                 RETURN(rc);
704         }
705
706         loi->loi_id = req->rq_oi.oi_oa->o_id;
707         loi->loi_ost_idx = req->rq_idx;
708         loi_init(loi);
709
710         if (oti && set->set_cookies)
711                 ++oti->oti_logcookies;
712         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
713                 set->set_cookie_sent++;
714
715         lov_update_set(set, req, rc);
716         spin_unlock(&set->set_lock);
717
718         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
719                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
720
721         RETURN(0);
722 }
723
724 int cb_create_update(struct obd_info *oinfo, int rc)
725 {
726         struct lov_request *lovreq;
727
728         lovreq = container_of(oinfo, struct lov_request, rq_oi);
729         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
730         if (lov_finished_set(lovreq->rq_rqset))
731                 lov_put_reqset(lovreq->rq_rqset);
732         return rc;
733 }
734
735
736 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
737                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
738                         struct obd_trans_info *oti,
739                         struct lov_request_set **reqset)
740 {
741         struct lov_request_set *set;
742         int rc = 0;
743         ENTRY;
744
745         OBD_ALLOC(set, sizeof(*set));
746         if (set == NULL)
747                 RETURN(-ENOMEM);
748         lov_init_set(set);
749
750         set->set_exp = exp;
751         set->set_oi = oinfo;
752         set->set_oi->oi_md = *lsmp;
753         set->set_oi->oi_oa = src_oa;
754         set->set_oti = oti;
755         lov_get_reqset(set);
756
757         rc = qos_prep_create(exp, set);
758         /* qos_shrink_lsm() may have allocated a new lsm */
759         *lsmp = oinfo->oi_md;
760         if (rc) {
761                 lov_fini_create_set(set, lsmp);
762                 lov_put_reqset(set);
763         } else {
764                 *reqset = set;
765         }
766         RETURN(rc);
767 }
768
769 static int common_attr_done(struct lov_request_set *set)
770 {
771         struct list_head *pos;
772         struct lov_request *req;
773         struct obdo *tmp_oa;
774         int rc = 0, attrset = 0;
775         ENTRY;
776
777         LASSERT(set->set_oi != NULL);
778
779         if (set->set_oi->oi_oa == NULL)
780                 RETURN(0);
781
782         if (!set->set_success)
783                 RETURN(-EIO);
784
785         OBDO_ALLOC(tmp_oa);
786         if (tmp_oa == NULL)
787                 GOTO(out, rc = -ENOMEM);
788
789         list_for_each (pos, &set->set_list) {
790                 req = list_entry(pos, struct lov_request, rq_link);
791
792                 if (!req->rq_complete || req->rq_rc)
793                         continue;
794                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
795                         continue;
796                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
797                                 req->rq_oi.oi_oa->o_valid,
798                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
799         }
800         if (!attrset) {
801                 CERROR("No stripes had valid attrs\n");
802                 rc = -EIO;
803         }
804         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
805         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
806 out:
807         if (tmp_oa)
808                 OBDO_FREE(tmp_oa);
809         RETURN(rc);
810
811 }
812
813 static int brw_done(struct lov_request_set *set)
814 {
815         struct lov_stripe_md *lsm = set->set_oi->oi_md;
816         struct lov_oinfo     *loi = NULL;
817         struct list_head *pos;
818         struct lov_request *req;
819         ENTRY;
820
821         list_for_each (pos, &set->set_list) {
822                 req = list_entry(pos, struct lov_request, rq_link);
823
824                 if (!req->rq_complete || req->rq_rc)
825                         continue;
826
827                 loi = lsm->lsm_oinfo[req->rq_stripe];
828
829                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
830                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
831         }
832
833         RETURN(0);
834 }
835
836 int lov_fini_brw_set(struct lov_request_set *set)
837 {
838         int rc = 0;
839         ENTRY;
840
841         if (set == NULL)
842                 RETURN(0);
843         LASSERT(set->set_exp);
844         if (set->set_completes) {
845                 rc = brw_done(set);
846                 /* FIXME update qos data here */
847         }
848         lov_put_reqset(set);
849
850         RETURN(rc);
851 }
852
853 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
854                      obd_count oa_bufs, struct brw_page *pga,
855                      struct obd_trans_info *oti,
856                      struct lov_request_set **reqset)
857 {
858         struct {
859                 obd_count       index;
860                 obd_count       count;
861                 obd_count       off;
862         } *info = NULL;
863         struct lov_request_set *set;
864         struct lov_oinfo *loi = NULL;
865         struct lov_obd *lov = &exp->exp_obd->u.lov;
866         int rc = 0, i, shift;
867         ENTRY;
868
869         OBD_ALLOC(set, sizeof(*set));
870         if (set == NULL)
871                 RETURN(-ENOMEM);
872         lov_init_set(set);
873
874         set->set_exp = exp;
875         set->set_oti = oti;
876         set->set_oi = oinfo;
877         set->set_oabufs = oa_bufs;
878         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
879         if (!set->set_pga)
880                 GOTO(out, rc = -ENOMEM);
881
882         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
883         if (!info)
884                 GOTO(out, rc = -ENOMEM);
885
886         /* calculate the page count for each stripe */
887         for (i = 0; i < oa_bufs; i++) {
888                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
889                 info[stripe].count++;
890         }
891
892         /* alloc and initialize lov request */
893         shift = 0;
894         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
895                 struct lov_request *req;
896
897                 if (info[i].count == 0)
898                         continue;
899
900                 loi = oinfo->oi_md->lsm_oinfo[i];
901                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
902                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
903                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
904                         GOTO(out, rc = -EIO);
905                 }
906
907                 OBD_ALLOC(req, sizeof(*req));
908                 if (req == NULL)
909                         GOTO(out, rc = -ENOMEM);
910
911                 OBDO_ALLOC(req->rq_oi.oi_oa);
912                 if (req->rq_oi.oi_oa == NULL) {
913                         OBD_FREE(req, sizeof(*req));
914                         GOTO(out, rc = -ENOMEM);
915                 }
916
917                 if (oinfo->oi_oa) {
918                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
919                                sizeof(*req->rq_oi.oi_oa));
920                 }
921                 req->rq_oi.oi_oa->o_id = loi->loi_id;
922                 req->rq_oi.oi_oa->o_stripe_idx = i;
923
924                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
925                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
926                 if (req->rq_oi.oi_md == NULL) {
927                         OBDO_FREE(req->rq_oi.oi_oa);
928                         OBD_FREE(req, sizeof(*req));
929                         GOTO(out, rc = -ENOMEM);
930                 }
931
932                 req->rq_idx = loi->loi_ost_idx;
933                 req->rq_stripe = i;
934
935                 /* XXX LOV STACKING */
936                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
937                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
938                 req->rq_oabufs = info[i].count;
939                 req->rq_pgaidx = shift;
940                 shift += req->rq_oabufs;
941
942                 /* remember the index for sort brw_page array */
943                 info[i].index = req->rq_pgaidx;
944
945                 lov_set_add_req(req, set);
946         }
947         if (!set->set_count)
948                 GOTO(out, rc = -EIO);
949
950         /* rotate & sort the brw_page array */
951         for (i = 0; i < oa_bufs; i++) {
952                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
953
954                 shift = info[stripe].index + info[stripe].off;
955                 LASSERT(shift < oa_bufs);
956                 set->set_pga[shift] = pga[i];
957                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
958                                   &set->set_pga[shift].off);
959                 info[stripe].off++;
960         }
961 out:
962         if (info)
963                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
964
965         if (rc == 0)
966                 *reqset = set;
967         else
968                 lov_fini_brw_set(set);
969
970         RETURN(rc);
971 }
972
973 int lov_fini_getattr_set(struct lov_request_set *set)
974 {
975         int rc = 0;
976         ENTRY;
977
978         if (set == NULL)
979                 RETURN(0);
980         LASSERT(set->set_exp);
981         if (set->set_completes)
982                 rc = common_attr_done(set);
983
984         lov_put_reqset(set);
985
986         RETURN(rc);
987 }
988
989 /* The callback for osc_getattr_async that finilizes a request info when a
990  * response is recieved. */
991 static int cb_getattr_update(struct obd_info *oinfo, int rc)
992 {
993         struct lov_request *lovreq;
994         lovreq = container_of(oinfo, struct lov_request, rq_oi);
995         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
996 }
997
998 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
999                          struct lov_request_set **reqset)
1000 {
1001         struct lov_request_set *set;
1002         struct lov_oinfo *loi = NULL;
1003         struct lov_obd *lov = &exp->exp_obd->u.lov;
1004         int rc = 0, i;
1005         ENTRY;
1006
1007         OBD_ALLOC(set, sizeof(*set));
1008         if (set == NULL)
1009                 RETURN(-ENOMEM);
1010         lov_init_set(set);
1011
1012         set->set_exp = exp;
1013         set->set_oi = oinfo;
1014
1015         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1016                 struct lov_request *req;
1017
1018                 loi = oinfo->oi_md->lsm_oinfo[i];
1019                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1020                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1021                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1022                         continue;
1023                 }
1024
1025                 OBD_ALLOC(req, sizeof(*req));
1026                 if (req == NULL)
1027                         GOTO(out_set, rc = -ENOMEM);
1028
1029                 req->rq_stripe = i;
1030                 req->rq_idx = loi->loi_ost_idx;
1031
1032                 OBDO_ALLOC(req->rq_oi.oi_oa);
1033                 if (req->rq_oi.oi_oa == NULL) {
1034                         OBD_FREE(req, sizeof(*req));
1035                         GOTO(out_set, rc = -ENOMEM);
1036                 }
1037                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1038                        sizeof(*req->rq_oi.oi_oa));
1039                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1040                 req->rq_oi.oi_cb_up = cb_getattr_update;
1041
1042                 lov_set_add_req(req, set);
1043         }
1044         if (!set->set_count)
1045                 GOTO(out_set, rc = -EIO);
1046         *reqset = set;
1047         RETURN(rc);
1048 out_set:
1049         lov_fini_getattr_set(set);
1050         RETURN(rc);
1051 }
1052
1053 int lov_fini_destroy_set(struct lov_request_set *set)
1054 {
1055         ENTRY;
1056
1057         if (set == NULL)
1058                 RETURN(0);
1059         LASSERT(set->set_exp);
1060         if (set->set_completes) {
1061                 /* FIXME update qos data here */
1062         }
1063
1064         lov_put_reqset(set);
1065
1066         RETURN(0);
1067 }
1068
1069 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1070                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1071                          struct obd_trans_info *oti,
1072                          struct lov_request_set **reqset)
1073 {
1074         struct lov_request_set *set;
1075         struct lov_oinfo *loi = NULL;
1076         struct lov_obd *lov = &exp->exp_obd->u.lov;
1077         int rc = 0, i;
1078         ENTRY;
1079
1080         OBD_ALLOC(set, sizeof(*set));
1081         if (set == NULL)
1082                 RETURN(-ENOMEM);
1083         lov_init_set(set);
1084
1085         set->set_exp = exp;
1086         set->set_oi = oinfo;
1087         set->set_oi->oi_md = lsm;
1088         set->set_oi->oi_oa = src_oa;
1089         set->set_oti = oti;
1090         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1091                 set->set_cookies = oti->oti_logcookies;
1092
1093         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1094                 struct lov_request *req;
1095
1096                 loi = lsm->lsm_oinfo[i];
1097                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1098                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1099                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1100                         continue;
1101                 }
1102
1103                 OBD_ALLOC(req, sizeof(*req));
1104                 if (req == NULL)
1105                         GOTO(out_set, rc = -ENOMEM);
1106
1107                 req->rq_stripe = i;
1108                 req->rq_idx = loi->loi_ost_idx;
1109
1110                 OBDO_ALLOC(req->rq_oi.oi_oa);
1111                 if (req->rq_oi.oi_oa == NULL) {
1112                         OBD_FREE(req, sizeof(*req));
1113                         GOTO(out_set, rc = -ENOMEM);
1114                 }
1115                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1116                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1117                 lov_set_add_req(req, set);
1118         }
1119         if (!set->set_count)
1120                 GOTO(out_set, rc = -EIO);
1121         *reqset = set;
1122         RETURN(rc);
1123 out_set:
1124         lov_fini_destroy_set(set);
1125         RETURN(rc);
1126 }
1127
1128 int lov_fini_setattr_set(struct lov_request_set *set)
1129 {
1130         int rc = 0;
1131         ENTRY;
1132
1133         if (set == NULL)
1134                 RETURN(0);
1135         LASSERT(set->set_exp);
1136         if (set->set_completes) {
1137                 rc = common_attr_done(set);
1138                 /* FIXME update qos data here */
1139         }
1140
1141         lov_put_reqset(set);
1142         RETURN(rc);
1143 }
1144
1145 int lov_update_setattr_set(struct lov_request_set *set,
1146                            struct lov_request *req, int rc)
1147 {
1148         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1149         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1150         ENTRY;
1151
1152         lov_update_set(set, req, rc);
1153
1154         /* grace error on inactive ost */
1155         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1156                     lov->lov_tgts[req->rq_idx]->ltd_active))
1157                 rc = 0;
1158
1159         if (rc == 0) {
1160                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1161                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1162                                 req->rq_oi.oi_oa->o_ctime;
1163                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1164                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1165                                 req->rq_oi.oi_oa->o_mtime;
1166                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1167                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1168                                 req->rq_oi.oi_oa->o_atime;
1169         }
1170
1171         RETURN(rc);
1172 }
1173
1174 /* The callback for osc_setattr_async that finilizes a request info when a
1175  * response is recieved. */
1176 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1177 {
1178         struct lov_request *lovreq;
1179         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1180         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1181 }
1182
1183 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1184                          struct obd_trans_info *oti,
1185                          struct lov_request_set **reqset)
1186 {
1187         struct lov_request_set *set;
1188         struct lov_oinfo *loi = NULL;
1189         struct lov_obd *lov = &exp->exp_obd->u.lov;
1190         int rc = 0, i;
1191         ENTRY;
1192
1193         OBD_ALLOC(set, sizeof(*set));
1194         if (set == NULL)
1195                 RETURN(-ENOMEM);
1196         lov_init_set(set);
1197
1198         set->set_exp = exp;
1199         set->set_oti = oti;
1200         set->set_oi = oinfo;
1201         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1202                 set->set_cookies = oti->oti_logcookies;
1203
1204         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1205                 struct lov_request *req;
1206
1207                 loi = oinfo->oi_md->lsm_oinfo[i];
1208                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1209                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1210                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1211                         continue;
1212                 }
1213
1214                 OBD_ALLOC(req, sizeof(*req));
1215                 if (req == NULL)
1216                         GOTO(out_set, rc = -ENOMEM);
1217                 req->rq_stripe = i;
1218                 req->rq_idx = loi->loi_ost_idx;
1219
1220                 OBDO_ALLOC(req->rq_oi.oi_oa);
1221                 if (req->rq_oi.oi_oa == NULL) {
1222                         OBD_FREE(req, sizeof(*req));
1223                         GOTO(out_set, rc = -ENOMEM);
1224                 }
1225                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1226                        sizeof(*req->rq_oi.oi_oa));
1227                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1228                 req->rq_oi.oi_oa->o_stripe_idx = i;
1229                 req->rq_oi.oi_cb_up = cb_setattr_update;
1230
1231                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1232                         int off = lov_stripe_offset(oinfo->oi_md,
1233                                                     oinfo->oi_oa->o_size, i,
1234                                                     &req->rq_oi.oi_oa->o_size);
1235
1236                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1237                                 req->rq_oi.oi_oa->o_size--;
1238
1239                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1240                                i, req->rq_oi.oi_oa->o_size,
1241                                oinfo->oi_oa->o_size);
1242                 }
1243                 lov_set_add_req(req, set);
1244         }
1245         if (!set->set_count)
1246                 GOTO(out_set, rc = -EIO);
1247         *reqset = set;
1248         RETURN(rc);
1249 out_set:
1250         lov_fini_setattr_set(set);
1251         RETURN(rc);
1252 }
1253
1254 int lov_fini_punch_set(struct lov_request_set *set)
1255 {
1256         int rc = 0;
1257         ENTRY;
1258
1259         if (set == NULL)
1260                 RETURN(0);
1261         LASSERT(set->set_exp);
1262         if (set->set_completes) {
1263                 rc = -EIO;
1264                 /* FIXME update qos data here */
1265                 if (set->set_success)
1266                         rc = common_attr_done(set);
1267         }
1268
1269         lov_put_reqset(set);
1270
1271         RETURN(rc);
1272 }
1273
1274 int lov_update_punch_set(struct lov_request_set *set,
1275                            struct lov_request *req, int rc)
1276 {
1277         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1278         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1279         ENTRY;
1280
1281         lov_update_set(set, req, rc);
1282
1283         /* grace error on inactive ost */
1284         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1285                 rc = 0;
1286
1287         if (rc == 0) {
1288                 lov_stripe_lock(lsm);
1289                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1290                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1291                                 req->rq_oi.oi_oa->o_blocks;
1292                 }
1293
1294                 /* Do we need to update lvb_size here? It needn't because
1295                  * it have been done in ll_truncate(). -jay */
1296                 lov_stripe_unlock(lsm);
1297         }
1298
1299         RETURN(rc);
1300 }
1301
1302 /* The callback for osc_punch that finilizes a request info when a response
1303  * is recieved. */
1304 static int cb_update_punch(struct obd_info *oinfo, int rc)
1305 {
1306         struct lov_request *lovreq;
1307         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1308         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1309 }
1310
1311 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1312                        struct obd_trans_info *oti,
1313                        struct lov_request_set **reqset)
1314 {
1315         struct lov_request_set *set;
1316         struct lov_oinfo *loi = NULL;
1317         struct lov_obd *lov = &exp->exp_obd->u.lov;
1318         int rc = 0, i;
1319         ENTRY;
1320
1321         OBD_ALLOC(set, sizeof(*set));
1322         if (set == NULL)
1323                 RETURN(-ENOMEM);
1324         lov_init_set(set);
1325
1326         set->set_oi = oinfo;
1327         set->set_exp = exp;
1328
1329         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1330                 struct lov_request *req;
1331                 obd_off rs, re;
1332
1333                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1334                                            oinfo->oi_policy.l_extent.start,
1335                                            oinfo->oi_policy.l_extent.end,
1336                                            &rs, &re))
1337                         continue;
1338
1339                 loi = oinfo->oi_md->lsm_oinfo[i];
1340                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1341                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1342                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1343                         GOTO(out_set, rc = -EIO);
1344                 }
1345
1346                 OBD_ALLOC(req, sizeof(*req));
1347                 if (req == NULL)
1348                         GOTO(out_set, rc = -ENOMEM);
1349                 req->rq_stripe = i;
1350                 req->rq_idx = loi->loi_ost_idx;
1351
1352                 OBDO_ALLOC(req->rq_oi.oi_oa);
1353                 if (req->rq_oi.oi_oa == NULL) {
1354                         OBD_FREE(req, sizeof(*req));
1355                         GOTO(out_set, rc = -ENOMEM);
1356                 }
1357                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1358                        sizeof(*req->rq_oi.oi_oa));
1359                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1360                 req->rq_oi.oi_oa->o_stripe_idx = i;
1361                 req->rq_oi.oi_cb_up = cb_update_punch;
1362
1363                 req->rq_oi.oi_policy.l_extent.start = rs;
1364                 req->rq_oi.oi_policy.l_extent.end = re;
1365                 req->rq_oi.oi_policy.l_extent.gid = -1;
1366
1367                 lov_set_add_req(req, set);
1368         }
1369         if (!set->set_count)
1370                 GOTO(out_set, rc = -EIO);
1371         *reqset = set;
1372         RETURN(rc);
1373 out_set:
1374         lov_fini_punch_set(set);
1375         RETURN(rc);
1376 }
1377
1378 int lov_fini_sync_set(struct lov_request_set *set)
1379 {
1380         int rc = 0;
1381         ENTRY;
1382
1383         if (set == NULL)
1384                 RETURN(0);
1385         LASSERT(set->set_exp);
1386         if (set->set_completes) {
1387                 if (!set->set_success)
1388                         rc = -EIO;
1389                 /* FIXME update qos data here */
1390         }
1391
1392         lov_put_reqset(set);
1393
1394         RETURN(rc);
1395 }
1396
1397 /* The callback for osc_sync that finilizes a request info when a
1398  * response is recieved. */
1399 static int cb_sync_update(struct obd_info *oinfo, int rc)
1400 {
1401         struct lov_request *lovreq;
1402         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1403         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1404 }
1405
1406 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1407                       obd_off start, obd_off end,
1408                       struct lov_request_set **reqset)
1409 {
1410         struct lov_request_set *set;
1411         struct lov_oinfo *loi = NULL;
1412         struct lov_obd *lov = &exp->exp_obd->u.lov;
1413         int rc = 0, i;
1414         ENTRY;
1415
1416         OBD_ALLOC(set, sizeof(*set));
1417         if (set == NULL)
1418                 RETURN(-ENOMEM);
1419         lov_init_set(set);
1420
1421         set->set_exp = exp;
1422         set->set_oi = oinfo;
1423         set->set_oi->oi_md = oinfo->oi_md;
1424         set->set_oi->oi_oa = oinfo->oi_oa;
1425
1426         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1427                 struct lov_request *req;
1428                 obd_off rs, re;
1429
1430                 loi = oinfo->oi_md->lsm_oinfo[i];
1431                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1432                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1433                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1434                         continue;
1435                 }
1436
1437                 if (!lov_stripe_intersects(oinfo->oi_md, i, start,
1438                                            end, &rs, &re))
1439                         continue;
1440
1441                 OBD_ALLOC(req, sizeof(*req));
1442                 if (req == NULL)
1443                         GOTO(out_set, rc = -ENOMEM);
1444                 req->rq_stripe = i;
1445                 req->rq_idx = loi->loi_ost_idx;
1446
1447                 OBDO_ALLOC(req->rq_oi.oi_oa);
1448                 if (req->rq_oi.oi_oa == NULL) {
1449                         OBD_FREE(req, sizeof(*req));
1450                         GOTO(out_set, rc = -ENOMEM);
1451                 }
1452                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1453                        sizeof(*req->rq_oi.oi_oa));
1454                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1455                 req->rq_oi.oi_oa->o_stripe_idx = i;
1456
1457                 req->rq_oi.oi_policy.l_extent.start = rs;
1458                 req->rq_oi.oi_policy.l_extent.end = re;
1459                 req->rq_oi.oi_policy.l_extent.gid = -1;
1460                 req->rq_oi.oi_cb_up = cb_sync_update;
1461
1462                 lov_set_add_req(req, set);
1463         }
1464         if (!set->set_count)
1465                 GOTO(out_set, rc = -EIO);
1466         *reqset = set;
1467         RETURN(rc);
1468 out_set:
1469         lov_fini_sync_set(set);
1470         RETURN(rc);
1471 }
1472
1473 #define LOV_U64_MAX ((__u64)~0ULL)
1474 #define LOV_SUM_MAX(tot, add)                                           \
1475         do {                                                            \
1476                 if ((tot) + (add) < (tot))                              \
1477                         (tot) = LOV_U64_MAX;                            \
1478                 else                                                    \
1479                         (tot) += (add);                                 \
1480         } while(0)
1481
1482 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1483 {
1484         ENTRY;
1485
1486         if (success) {
1487                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1488
1489                 if (osfs->os_files != LOV_U64_MAX)
1490                         do_div(osfs->os_files, expected_stripes);
1491                 if (osfs->os_ffree != LOV_U64_MAX)
1492                         do_div(osfs->os_ffree, expected_stripes);
1493
1494                 spin_lock(&obd->obd_osfs_lock);
1495                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1496                 obd->obd_osfs_age = cfs_time_current_64();
1497                 spin_unlock(&obd->obd_osfs_lock);
1498                 RETURN(0);
1499         }
1500
1501         RETURN(-EIO);
1502 }
1503
1504 int lov_fini_statfs_set(struct lov_request_set *set)
1505 {
1506         int rc = 0;
1507         ENTRY;
1508
1509         if (set == NULL)
1510                 RETURN(0);
1511
1512         if (set->set_completes) {
1513                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1514                                      set->set_success);
1515         }
1516         lov_put_reqset(set);
1517         RETURN(rc);
1518 }
1519
1520 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1521                        int success)
1522 {
1523         int shift = 0, quit = 0;
1524         __u64 tmp;
1525
1526         if (success == 0) {
1527                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1528         } else {
1529                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1530                         /* assume all block sizes are always powers of 2 */
1531                         /* get the bits difference */
1532                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1533                         for (shift = 0; shift <= 64; ++shift) {
1534                                 if (tmp & 1) {
1535                                         if (quit)
1536                                                 break;
1537                                         else
1538                                                 quit = 1;
1539                                         shift = 0;
1540                                 }
1541                                 tmp >>= 1;
1542                         }
1543                 }
1544
1545                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1546                         osfs->os_bsize = lov_sfs->os_bsize;
1547
1548                         osfs->os_bfree  >>= shift;
1549                         osfs->os_bavail >>= shift;
1550                         osfs->os_blocks >>= shift;
1551                 } else if (shift != 0) {
1552                         lov_sfs->os_bfree  >>= shift;
1553                         lov_sfs->os_bavail >>= shift;
1554                         lov_sfs->os_blocks >>= shift;
1555                 }
1556 #ifdef MIN_DF
1557                 /* Sandia requested that df (and so, statfs) only
1558                    returned minimal available space on
1559                    a single OST, so people would be able to
1560                    write this much data guaranteed. */
1561                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1562                         /* Presumably if new bavail is smaller,
1563                            new bfree is bigger as well */
1564                         osfs->os_bfree = lov_sfs->os_bfree;
1565                         osfs->os_bavail = lov_sfs->os_bavail;
1566                 }
1567 #else
1568                 osfs->os_bfree += lov_sfs->os_bfree;
1569                 osfs->os_bavail += lov_sfs->os_bavail;
1570 #endif
1571                 osfs->os_blocks += lov_sfs->os_blocks;
1572                 /* XXX not sure about this one - depends on policy.
1573                  *   - could be minimum if we always stripe on all OBDs
1574                  *     (but that would be wrong for any other policy,
1575                  *     if one of the OBDs has no more objects left)
1576                  *   - could be sum if we stripe whole objects
1577                  *   - could be average, just to give a nice number
1578                  *
1579                  * To give a "reasonable" (if not wholly accurate)
1580                  * number, we divide the total number of free objects
1581                  * by expected stripe count (watch out for overflow).
1582                  */
1583                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1584                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1585         }
1586 }
1587
1588 /* The callback for osc_statfs_async that finilizes a request info when a
1589  * response is received. */
1590 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1591 {
1592         struct lov_request *lovreq;
1593         struct obd_statfs *osfs, *lov_sfs;
1594         struct obd_device *obd;
1595         struct lov_obd *lov;
1596         int success;
1597         ENTRY;
1598
1599         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1600         lov = &lovreq->rq_rqset->set_obd->u.lov;
1601         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1602
1603         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1604         lov_sfs = oinfo->oi_osfs;
1605
1606         success = lovreq->rq_rqset->set_success;
1607         /* XXX: the same is done in lov_update_common_set, however
1608            lovset->set_exp is not initialized. */
1609         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1610         if (rc) {
1611                 /* XXX ignore error for disconnected ost ? */
1612                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1613                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1614                         rc = 0;
1615                 GOTO(out, rc);
1616         }
1617
1618         spin_lock(&obd->obd_osfs_lock);
1619         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1620         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1621                 obd->obd_osfs_age = cfs_time_current_64();
1622         spin_unlock(&obd->obd_osfs_lock);
1623
1624         lov_update_statfs(osfs, lov_sfs, success);
1625         qos_update(lov);
1626 out:
1627         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1628             lov_finished_set(lovreq->rq_rqset)) {
1629                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1630                                     lovreq->rq_rqset->set_success !=
1631                                                   lovreq->rq_rqset->set_count);
1632                qos_statfs_done(lov);
1633         }
1634
1635         RETURN(0);
1636 }
1637
1638 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1639                         struct lov_request_set **reqset)
1640 {
1641         struct lov_request_set *set;
1642         struct lov_obd *lov = &obd->u.lov;
1643         int rc = 0, i;
1644         ENTRY;
1645
1646         OBD_ALLOC(set, sizeof(*set));
1647         if (set == NULL)
1648                 RETURN(-ENOMEM);
1649         lov_init_set(set);
1650
1651         set->set_obd = obd;
1652         set->set_oi = oinfo;
1653
1654         /* We only get block data from the OBD */
1655         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1656                 struct lov_request *req;
1657
1658                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1659                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1660                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1661                         continue;
1662                 }
1663
1664                 /* skip targets that have been explicitely disabled by the
1665                  * administrator */
1666                 if (!lov->lov_tgts[i]->ltd_exp) {
1667                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1668                         continue;
1669                 }
1670
1671                 OBD_ALLOC(req, sizeof(*req));
1672                 if (req == NULL)
1673                         GOTO(out_set, rc = -ENOMEM);
1674
1675                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1676                 if (req->rq_oi.oi_osfs == NULL) {
1677                         OBD_FREE(req, sizeof(*req));
1678                         GOTO(out_set, rc = -ENOMEM);
1679                 }
1680
1681                 req->rq_idx = i;
1682                 req->rq_oi.oi_cb_up = cb_statfs_update;
1683                 req->rq_oi.oi_flags = oinfo->oi_flags;
1684
1685                 lov_set_add_req(req, set);
1686         }
1687         if (!set->set_count)
1688                 GOTO(out_set, rc = -EIO);
1689         *reqset = set;
1690         RETURN(rc);
1691 out_set:
1692         lov_fini_statfs_set(set);
1693         RETURN(rc);
1694 }