Whamcloud - gitweb
82153b7a46966b3005f44da9bb8fa2b487bfb03a
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  */
24
25 #ifndef EXPORT_SYMTAB
26 # define EXPORT_SYMTAB
27 #endif
28 #define DEBUG_SUBSYSTEM S_LOV
29
30 #ifdef __KERNEL__
31 #include <libcfs/libcfs.h>
32 #else
33 #include <liblustre.h>
34 #endif
35
36 #include <obd_class.h>
37 #include <obd_lov.h>
38 #include <lustre/lustre_idl.h>
39
40 #include "lov_internal.h"
41
42 static void lov_init_set(struct lov_request_set *set)
43 {
44         set->set_count = 0;
45         set->set_completes = 0;
46         set->set_success = 0;
47         set->set_cookies = 0;
48         CFS_INIT_LIST_HEAD(&set->set_list);
49         atomic_set(&set->set_refcount, 1);
50 }
51
52 static void lov_finish_set(struct lov_request_set *set)
53 {
54         struct list_head *pos, *n;
55         ENTRY;
56
57         LASSERT(set);
58         list_for_each_safe(pos, n, &set->set_list) {
59                 struct lov_request *req = list_entry(pos, struct lov_request,
60                                                      rq_link);
61                 list_del_init(&req->rq_link);
62
63                 if (req->rq_oi.oi_oa)
64                         OBDO_FREE(req->rq_oi.oi_oa);
65                 if (req->rq_oi.oi_md)
66                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
67                 if (req->rq_oi.oi_osfs)
68                         OBD_FREE(req->rq_oi.oi_osfs,
69                                  sizeof(*req->rq_oi.oi_osfs));
70                 OBD_FREE(req, sizeof(*req));
71         }
72
73         if (set->set_pga) {
74                 int len = set->set_oabufs * sizeof(*set->set_pga);
75                 OBD_FREE(set->set_pga, len);
76         }
77         if (set->set_lockh)
78                 lov_llh_put(set->set_lockh);
79
80         OBD_FREE(set, sizeof(*set));
81         EXIT;
82 }
83
84 void lov_update_set(struct lov_request_set *set,
85                     struct lov_request *req, int rc)
86 {
87         req->rq_complete = 1;
88         req->rq_rc = rc;
89
90         set->set_completes++;
91         if (rc == 0)
92                 set->set_success++;
93 }
94
95 int lov_update_common_set(struct lov_request_set *set,
96                           struct lov_request *req, int rc)
97 {
98         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
99         ENTRY;
100
101         lov_update_set(set, req, rc);
102
103         /* grace error on inactive ost */
104         if (rc && !(lov->lov_tgts[req->rq_idx] && 
105                     lov->lov_tgts[req->rq_idx]->ltd_active))
106                 rc = 0;
107
108         /* FIXME in raid1 regime, should return 0 */
109         RETURN(rc);
110 }
111
112 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
113 {
114         list_add_tail(&req->rq_link, &set->set_list);
115         set->set_count++;
116 }
117
118 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
119 {
120         struct lov_request_set *set = req->rq_rqset;
121         struct lustre_handle *lov_lockhp;
122         struct lov_oinfo *loi;
123         ENTRY;
124
125         LASSERT(set != NULL);
126         LASSERT(set->set_oi != NULL);
127
128         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
129         loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];
130
131         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
132          * and that copy can be arbitrarily out of date.
133          *
134          * The LOV API is due for a serious rewriting anyways, and this
135          * can be addressed then. */
136
137         if (rc == ELDLM_OK) {
138                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
139                 __u64 tmp;
140
141                 LASSERT(lock != NULL);
142                 lov_stripe_lock(set->set_oi->oi_md);
143                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
144                 tmp = loi->loi_lvb.lvb_size;
145                 /* Extend KMS up to the end of this lock and no further
146                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
147                 if (tmp > lock->l_policy_data.l_extent.end)
148                         tmp = lock->l_policy_data.l_extent.end + 1;
149                 if (tmp >= loi->loi_kms) {
150                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
151                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
152                         loi->loi_kms = tmp;
153                         loi->loi_kms_valid = 1;
154                 } else {
155                         LDLM_DEBUG(lock, "lock acquired, setting rss="
156                                    LPU64"; leaving kms="LPU64", end="LPU64,
157                                    loi->loi_lvb.lvb_size, loi->loi_kms,
158                                    lock->l_policy_data.l_extent.end);
159                 }
160                 lov_stripe_unlock(set->set_oi->oi_md);
161                 ldlm_lock_allow_match(lock);
162                 LDLM_LOCK_PUT(lock);
163         } else if ((rc == ELDLM_LOCK_ABORTED) &&
164                    (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {
165                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
166                 lov_stripe_lock(set->set_oi->oi_md);
167                 loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;
168                 lov_stripe_unlock(set->set_oi->oi_md);
169                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
170                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
171                 rc = ELDLM_OK;
172         } else {
173                 struct obd_export *exp = set->set_exp;
174                 struct lov_obd *lov = &exp->exp_obd->u.lov;
175
176                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
177                 if (lov->lov_tgts[req->rq_idx] &&
178                     lov->lov_tgts[req->rq_idx]->ltd_active) {
179                         /* -EUSERS used by OST to report file contention */
180                         if (rc != -EINTR && rc != -EUSERS)
181                                 CERROR("enqueue objid "LPX64" subobj "
182                                        LPX64" on OST idx %d: rc %d\n",
183                                        set->set_oi->oi_md->lsm_object_id,
184                                        loi->loi_id, loi->loi_ost_idx, rc);
185                 } else {
186                         rc = ELDLM_OK;
187                 }
188         }
189         lov_update_set(set, req, rc);
190         RETURN(rc);
191 }
192
193 /* The callback for osc_enqueue that updates lov info for every OSC request. */
194 static int cb_update_enqueue(struct obd_info *oinfo, int rc)
195 {
196         struct ldlm_enqueue_info *einfo;
197         struct lov_request *lovreq;
198
199         lovreq = container_of(oinfo, struct lov_request, rq_oi);
200         einfo = lovreq->rq_rqset->set_ei;
201         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
202 }
203
204 static int enqueue_done(struct lov_request_set *set, __u32 mode)
205 {
206         struct lov_request *req;
207         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
208         int rc = 0;
209         ENTRY;
210
211         /* enqueue/match success, just return */
212         if (set->set_completes && set->set_completes == set->set_success)
213                 RETURN(0);
214
215         /* cancel enqueued/matched locks */
216         list_for_each_entry(req, &set->set_list, rq_link) {
217                 struct lustre_handle *lov_lockhp;
218
219                 if (!req->rq_complete || req->rq_rc)
220                         continue;
221
222                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
223                 LASSERT(lov_lockhp);
224                 if (!lustre_handle_is_used(lov_lockhp))
225                         continue;
226
227                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
228                                 req->rq_oi.oi_md, mode, lov_lockhp);
229                 if (rc && lov->lov_tgts[req->rq_idx] &&
230                     lov->lov_tgts[req->rq_idx]->ltd_active)
231                         CERROR("cancelling obdjid "LPX64" on OST "
232                                "idx %d error: rc = %d\n",
233                                req->rq_oi.oi_md->lsm_object_id,
234                                req->rq_idx, rc);
235         }
236         if (set->set_lockh)
237                 lov_llh_put(set->set_lockh);
238         RETURN(rc);
239 }
240
241 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
242                          struct ptlrpc_request_set *rqset)
243 {
244         int ret = 0;
245         ENTRY;
246
247         if (set == NULL)
248                 RETURN(0);
249         LASSERT(set->set_exp);
250         /* Do enqueue_done only for sync requests and if any request
251          * succeeded. */
252         if (!rqset) {
253                 if (rc)
254                         set->set_completes = 0;
255                 ret = enqueue_done(set, mode);
256         } else if (set->set_lockh)
257                 lov_llh_put(set->set_lockh);
258
259         if (atomic_dec_and_test(&set->set_refcount))
260                 lov_finish_set(set);
261
262         RETURN(rc ? rc : ret);
263 }
264
265 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
266                          struct ldlm_enqueue_info *einfo,
267                          struct lov_request_set **reqset)
268 {
269         struct lov_obd *lov = &exp->exp_obd->u.lov;
270         struct lov_request_set *set;
271         int i, rc = 0;
272         struct lov_oinfo *loi;
273         ENTRY;
274
275         OBD_ALLOC(set, sizeof(*set));
276         if (set == NULL)
277                 RETURN(-ENOMEM);
278         lov_init_set(set);
279
280         set->set_exp = exp;
281         set->set_oi = oinfo;
282         set->set_ei = einfo;
283         set->set_lockh = lov_llh_new(oinfo->oi_md);
284         if (set->set_lockh == NULL)
285                 GOTO(out_set, rc = -ENOMEM);
286         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
287
288         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
289                 struct lov_request *req;
290                 obd_off start, end;
291
292                 loi = oinfo->oi_md->lsm_oinfo[i];
293                 if (!lov_stripe_intersects(oinfo->oi_md, i,
294                                            oinfo->oi_policy.l_extent.start,
295                                            oinfo->oi_policy.l_extent.end,
296                                            &start, &end))
297                         continue;
298
299                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
300                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
301                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
302                         continue;
303                 }
304
305                 OBD_ALLOC(req, sizeof(*req));
306                 if (req == NULL)
307                         GOTO(out_set, rc = -ENOMEM);
308
309                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
310                         sizeof(struct lov_oinfo *) +
311                         sizeof(struct lov_oinfo);
312                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
313                 if (req->rq_oi.oi_md == NULL) {
314                         OBD_FREE(req, sizeof(*req));
315                         GOTO(out_set, rc = -ENOMEM);
316                 }
317                 req->rq_oi.oi_md->lsm_oinfo[0] =
318                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
319                         sizeof(struct lov_oinfo *);
320
321
322                 req->rq_rqset = set;
323                 /* Set lov request specific parameters. */
324                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
325                 req->rq_oi.oi_cb_up = cb_update_enqueue;
326                 req->rq_oi.oi_flags = oinfo->oi_flags;
327
328                 LASSERT(req->rq_oi.oi_lockh);
329
330                 req->rq_oi.oi_policy.l_extent.gid =
331                         oinfo->oi_policy.l_extent.gid;
332                 req->rq_oi.oi_policy.l_extent.start = start;
333                 req->rq_oi.oi_policy.l_extent.end = end;
334
335                 req->rq_idx = loi->loi_ost_idx;
336                 req->rq_stripe = i;
337
338                 /* XXX LOV STACKING: submd should be from the subobj */
339                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
340                 req->rq_oi.oi_md->lsm_stripe_count = 0;
341                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
342                         loi->loi_kms_valid;
343                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
344                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
345
346                 lov_set_add_req(req, set);
347         }
348         if (!set->set_count)
349                 GOTO(out_set, rc = -EIO);
350         *reqset = set;
351         RETURN(0);
352 out_set:
353         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
354         RETURN(rc);
355 }
356
357 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
358                          int rc)
359 {
360         int ret = rc;
361         ENTRY;
362
363         if (rc > 0)
364                 ret = 0;
365         else if (rc == 0)
366                 ret = 1;
367         lov_update_set(set, req, ret);
368         RETURN(rc);
369 }
370
371 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
372 {
373         int rc = 0;
374         ENTRY;
375
376         if (set == NULL)
377                 RETURN(0);
378         LASSERT(set->set_exp);
379         rc = enqueue_done(set, mode);
380         if ((set->set_count == set->set_success) &&
381             (flags & LDLM_FL_TEST_LOCK))
382                 lov_llh_put(set->set_lockh);
383
384         if (atomic_dec_and_test(&set->set_refcount))
385                 lov_finish_set(set);
386
387         RETURN(rc);
388 }
389
390 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
391                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
392                        __u32 mode, struct lustre_handle *lockh,
393                        struct lov_request_set **reqset)
394 {
395         struct lov_obd *lov = &exp->exp_obd->u.lov;
396         struct lov_request_set *set;
397         int i, rc = 0;
398         struct lov_oinfo *loi;
399         ENTRY;
400
401         OBD_ALLOC(set, sizeof(*set));
402         if (set == NULL)
403                 RETURN(-ENOMEM);
404         lov_init_set(set);
405
406         set->set_exp = exp;
407         set->set_oi = oinfo;
408         set->set_oi->oi_md = lsm;
409         set->set_lockh = lov_llh_new(lsm);
410         if (set->set_lockh == NULL)
411                 GOTO(out_set, rc = -ENOMEM);
412         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
413
414         for (i = 0; i < lsm->lsm_stripe_count; i++){
415                 struct lov_request *req;
416                 obd_off start, end;
417
418                 loi = lsm->lsm_oinfo[i];
419                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
420                                            policy->l_extent.end, &start, &end))
421                         continue;
422
423                 /* FIXME raid1 should grace this error */
424                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
425                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
426                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
427                         GOTO(out_set, rc = -EIO);
428                 }
429
430                 OBD_ALLOC(req, sizeof(*req));
431                 if (req == NULL)
432                         GOTO(out_set, rc = -ENOMEM);
433
434                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
435                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
436                 if (req->rq_oi.oi_md == NULL) {
437                         OBD_FREE(req, sizeof(*req));
438                         GOTO(out_set, rc = -ENOMEM);
439                 }
440
441                 req->rq_oi.oi_policy.l_extent.start = start;
442                 req->rq_oi.oi_policy.l_extent.end = end;
443                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
444
445                 req->rq_idx = loi->loi_ost_idx;
446                 req->rq_stripe = i;
447
448                 /* XXX LOV STACKING: submd should be from the subobj */
449                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
450                 req->rq_oi.oi_md->lsm_stripe_count = 0;
451
452                 lov_set_add_req(req, set);
453         }
454         if (!set->set_count)
455                 GOTO(out_set, rc = -EIO);
456         *reqset = set;
457         RETURN(rc);
458 out_set:
459         lov_fini_match_set(set, mode, 0);
460         RETURN(rc);
461 }
462
463 int lov_fini_cancel_set(struct lov_request_set *set)
464 {
465         int rc = 0;
466         ENTRY;
467
468         if (set == NULL)
469                 RETURN(0);
470
471         LASSERT(set->set_exp);
472         if (set->set_lockh)
473                 lov_llh_put(set->set_lockh);
474
475         if (atomic_dec_and_test(&set->set_refcount))
476                 lov_finish_set(set);
477
478         RETURN(rc);
479 }
480
481 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
482                         struct lov_stripe_md *lsm, __u32 mode,
483                         struct lustre_handle *lockh,
484                         struct lov_request_set **reqset)
485 {
486         struct lov_request_set *set;
487         int i, rc = 0;
488         struct lov_oinfo *loi;
489         ENTRY;
490
491         OBD_ALLOC(set, sizeof(*set));
492         if (set == NULL)
493                 RETURN(-ENOMEM);
494         lov_init_set(set);
495
496         set->set_exp = exp;
497         set->set_oi = oinfo;
498         set->set_oi->oi_md = lsm;
499         set->set_lockh = lov_handle2llh(lockh);
500         if (set->set_lockh == NULL) {
501                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
502                 GOTO(out_set, rc = -EINVAL);
503         }
504         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
505
506         for (i = 0; i < lsm->lsm_stripe_count; i++){
507                 struct lov_request *req;
508                 struct lustre_handle *lov_lockhp;
509
510                 loi = lsm->lsm_oinfo[i];
511                 lov_lockhp = set->set_lockh->llh_handles + i;
512                 if (!lustre_handle_is_used(lov_lockhp)) {
513                         CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",
514                                loi->loi_ost_idx, loi->loi_id);
515                         continue;
516                 }
517
518                 OBD_ALLOC(req, sizeof(*req));
519                 if (req == NULL)
520                         GOTO(out_set, rc = -ENOMEM);
521
522                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
523                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
524                 if (req->rq_oi.oi_md == NULL) {
525                         OBD_FREE(req, sizeof(*req));
526                         GOTO(out_set, rc = -ENOMEM);
527                 }
528
529                 req->rq_idx = loi->loi_ost_idx;
530                 req->rq_stripe = i;
531
532                 /* XXX LOV STACKING: submd should be from the subobj */
533                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
534                 req->rq_oi.oi_md->lsm_stripe_count = 0;
535
536                 lov_set_add_req(req, set);
537         }
538         if (!set->set_count)
539                 GOTO(out_set, rc = -EIO);
540         *reqset = set;
541         RETURN(rc);
542 out_set:
543         lov_fini_cancel_set(set);
544         RETURN(rc);
545 }
546
547 static int create_done(struct obd_export *exp, struct lov_request_set *set,
548                        struct lov_stripe_md **lsmp)
549 {
550         struct lov_obd *lov = &exp->exp_obd->u.lov;
551         struct obd_trans_info *oti = set->set_oti;
552         struct obdo *src_oa = set->set_oi->oi_oa;
553         struct lov_request *req;
554         struct obdo *ret_oa = NULL;
555         int attrset = 0, rc = 0;
556         ENTRY;
557
558         LASSERT(set->set_completes);
559
560         /* try alloc objects on other osts if osc_create fails for
561          * exceptions: RPC failure, ENOSPC, etc */
562         if (set->set_count != set->set_success) {
563                 list_for_each_entry (req, &set->set_list, rq_link) {
564                         if (req->rq_rc == 0)
565                                 continue;
566
567                         set->set_completes--;
568                         req->rq_complete = 0;
569
570                         rc = qos_remedy_create(set, req);
571                         lov_update_create_set(set, req, rc);
572
573                         if (rc)
574                                 break;
575                 }
576         }
577
578         /* no successful creates */
579         if (set->set_success == 0)
580                 GOTO(cleanup, rc);
581
582         /* If there was an explicit stripe set, fail.  Otherwise, we
583          * got some objects and that's not bad. */
584         if (set->set_count != set->set_success) {
585                 if (*lsmp)
586                         GOTO(cleanup, rc);
587                 set->set_count = set->set_success;
588                 qos_shrink_lsm(set);
589         }
590
591         OBDO_ALLOC(ret_oa);
592         if (ret_oa == NULL)
593                 GOTO(cleanup, rc = -ENOMEM);
594
595         list_for_each_entry(req, &set->set_list, rq_link) {
596                 if (!req->rq_complete || req->rq_rc)
597                         continue;
598                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
599                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
600                                 req->rq_stripe, &attrset);
601         }
602         if (src_oa->o_valid & OBD_MD_FLSIZE &&
603             ret_oa->o_size != src_oa->o_size) {
604                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
605                        src_oa->o_size, ret_oa->o_size);
606                 LBUG();
607         }
608         ret_oa->o_id = src_oa->o_id;
609         memcpy(src_oa, ret_oa, sizeof(*src_oa));
610         OBDO_FREE(ret_oa);
611
612         *lsmp = set->set_oi->oi_md;
613         GOTO(done, rc = 0);
614
615 cleanup:
616         list_for_each_entry(req, &set->set_list, rq_link) {
617                 struct obd_export *sub_exp;
618                 int err = 0;
619
620                 if (!req->rq_complete || req->rq_rc)
621                         continue;
622
623                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
624                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);
625                 if (err)
626                         CERROR("Failed to uncreate objid "LPX64" subobj "
627                                LPX64" on OST idx %d: rc = %d\n",
628                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
629                                req->rq_idx, rc);
630         }
631         if (*lsmp == NULL)
632                 obd_free_memmd(exp, &set->set_oi->oi_md);
633 done:
634         if (oti && set->set_cookies) {
635                 oti->oti_logcookies = set->set_cookies;
636                 if (!set->set_cookie_sent) {
637                         oti_free_cookies(oti);
638                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
639                 } else {
640                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
641                 }
642         }
643         RETURN(rc);
644 }
645
646 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
647 {
648         int rc = 0;
649         ENTRY;
650
651         if (set == NULL)
652                 RETURN(0);
653         LASSERT(set->set_exp);
654         if (set->set_completes)
655                 rc = create_done(set->set_exp, set, lsmp);
656
657         if (atomic_dec_and_test(&set->set_refcount))
658                 lov_finish_set(set);
659
660         RETURN(rc);
661 }
662
663 int lov_update_create_set(struct lov_request_set *set,
664                           struct lov_request *req, int rc)
665 {
666         struct obd_trans_info *oti = set->set_oti;
667         struct lov_stripe_md *lsm = set->set_oi->oi_md;
668         struct lov_oinfo *loi;
669         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
670         ENTRY;
671
672         req->rq_stripe = set->set_success;
673         loi = lsm->lsm_oinfo[req->rq_stripe];
674
675         if (rc && lov->lov_tgts[req->rq_idx] &&
676             lov->lov_tgts[req->rq_idx]->ltd_active) {
677                 CERROR("error creating fid "LPX64" sub-object"
678                        " on OST idx %d/%d: rc = %d\n",
679                        set->set_oi->oi_oa->o_id, req->rq_idx,
680                        lsm->lsm_stripe_count, rc);
681                 if (rc > 0) {
682                         CERROR("obd_create returned invalid err %d\n", rc);
683                         rc = -EIO;
684                 }
685         }
686         lov_update_set(set, req, rc);
687         if (rc)
688                 RETURN(rc);
689
690         loi->loi_id = req->rq_oi.oi_oa->o_id;
691         loi->loi_ost_idx = req->rq_idx;
692         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
693                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
694         loi_init(loi);
695
696         if (oti && set->set_cookies)
697                 ++oti->oti_logcookies;
698         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
699                 set->set_cookie_sent++;
700
701         RETURN(0);
702 }
703
704 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
705                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
706                         struct obd_trans_info *oti,
707                         struct lov_request_set **reqset)
708 {
709         struct lov_request_set *set;
710         int rc = 0;
711         ENTRY;
712
713         OBD_ALLOC(set, sizeof(*set));
714         if (set == NULL)
715                 RETURN(-ENOMEM);
716         lov_init_set(set);
717
718         set->set_exp = exp;
719         set->set_oi = oinfo;
720         set->set_oi->oi_md = *lsmp;
721         set->set_oi->oi_oa = src_oa;
722         set->set_oti = oti;
723
724         rc = qos_prep_create(exp, set);
725         if (rc)
726                 lov_fini_create_set(set, lsmp);
727         else
728                 *reqset = set;
729         RETURN(rc);
730 }
731
732 static int common_attr_done(struct lov_request_set *set)
733 {
734         struct list_head *pos;
735         struct lov_request *req;
736         struct obdo *tmp_oa;
737         int rc = 0, attrset = 0;
738         ENTRY;
739
740         LASSERT(set->set_oi != NULL);
741
742         if (set->set_oi->oi_oa == NULL)
743                 RETURN(0);
744
745         if (!set->set_success)
746                 RETURN(-EIO);
747
748         OBDO_ALLOC(tmp_oa);
749         if (tmp_oa == NULL)
750                 GOTO(out, rc = -ENOMEM);
751
752         list_for_each (pos, &set->set_list) {
753                 req = list_entry(pos, struct lov_request, rq_link);
754
755                 if (!req->rq_complete || req->rq_rc)
756                         continue;
757                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
758                         continue;
759                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
760                                 req->rq_oi.oi_oa->o_valid,
761                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
762         }
763         if (!attrset) {
764                 CERROR("No stripes had valid attrs\n");
765                 rc = -EIO;
766         }
767         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
768         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
769 out:
770         if (tmp_oa)
771                 OBDO_FREE(tmp_oa);
772         RETURN(rc);
773
774 }
775
776 static int brw_done(struct lov_request_set *set)
777 {
778         struct lov_stripe_md *lsm = set->set_oi->oi_md;
779         struct lov_oinfo     *loi = NULL;
780         struct list_head *pos;
781         struct lov_request *req;
782         ENTRY;
783
784         list_for_each (pos, &set->set_list) {
785                 req = list_entry(pos, struct lov_request, rq_link);
786
787                 if (!req->rq_complete || req->rq_rc)
788                         continue;
789
790                 loi = lsm->lsm_oinfo[req->rq_stripe];
791
792                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
793                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
794         }
795
796         RETURN(0);
797 }
798
799 int lov_fini_brw_set(struct lov_request_set *set)
800 {
801         int rc = 0;
802         ENTRY;
803
804         if (set == NULL)
805                 RETURN(0);
806         LASSERT(set->set_exp);
807         if (set->set_completes) {
808                 rc = brw_done(set);
809                 /* FIXME update qos data here */
810         }
811         if (atomic_dec_and_test(&set->set_refcount))
812                 lov_finish_set(set);
813
814         RETURN(rc);
815 }
816
817 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
818                      obd_count oa_bufs, struct brw_page *pga,
819                      struct obd_trans_info *oti,
820                      struct lov_request_set **reqset)
821 {
822         struct {
823                 obd_count       index;
824                 obd_count       count;
825                 obd_count       off;
826         } *info = NULL;
827         struct lov_request_set *set;
828         struct lov_oinfo *loi = NULL;
829         struct lov_obd *lov = &exp->exp_obd->u.lov;
830         int rc = 0, i, shift;
831         ENTRY;
832
833         OBD_ALLOC(set, sizeof(*set));
834         if (set == NULL)
835                 RETURN(-ENOMEM);
836         lov_init_set(set);
837
838         set->set_exp = exp;
839         set->set_oti = oti;
840         set->set_oi = oinfo;
841         set->set_oabufs = oa_bufs;
842         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
843         if (!set->set_pga)
844                 GOTO(out, rc = -ENOMEM);
845
846         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
847         if (!info)
848                 GOTO(out, rc = -ENOMEM);
849
850         /* calculate the page count for each stripe */
851         for (i = 0; i < oa_bufs; i++) {
852                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
853                 info[stripe].count++;
854         }
855
856         /* alloc and initialize lov request */
857         shift = 0;
858         for (i = 0 ; i < oinfo->oi_md->lsm_stripe_count; i++){
859                 struct lov_request *req;
860
861                 if (info[i].count == 0)
862                         continue;
863
864                 loi = oinfo->oi_md->lsm_oinfo[i];
865                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
866                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
867                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
868                         GOTO(out, rc = -EIO);
869                 }
870
871                 OBD_ALLOC(req, sizeof(*req));
872                 if (req == NULL)
873                         GOTO(out, rc = -ENOMEM);
874
875                 OBDO_ALLOC(req->rq_oi.oi_oa);
876                 if (req->rq_oi.oi_oa == NULL) {
877                         OBD_FREE(req, sizeof(*req));
878                         GOTO(out, rc = -ENOMEM);
879                 }
880
881                 if (oinfo->oi_oa) {
882                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
883                                sizeof(*req->rq_oi.oi_oa));
884                 }
885                 req->rq_oi.oi_oa->o_id = loi->loi_id;
886                 req->rq_oi.oi_oa->o_stripe_idx = i;
887
888                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
889                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
890                 if (req->rq_oi.oi_md == NULL) {
891                         OBDO_FREE(req->rq_oi.oi_oa);
892                         OBD_FREE(req, sizeof(*req));
893                         GOTO(out, rc = -ENOMEM);
894                 }
895
896                 req->rq_idx = loi->loi_ost_idx;
897                 req->rq_stripe = i;
898
899                 /* XXX LOV STACKING */
900                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
901                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
902                 req->rq_oabufs = info[i].count;
903                 req->rq_pgaidx = shift;
904                 shift += req->rq_oabufs;
905
906                 /* remember the index for sort brw_page array */
907                 info[i].index = req->rq_pgaidx;
908
909                 lov_set_add_req(req, set);
910         }
911         if (!set->set_count)
912                 GOTO(out, rc = -EIO);
913
914         /* rotate & sort the brw_page array */
915         for (i = 0; i < oa_bufs; i++) {
916                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
917
918                 shift = info[stripe].index + info[stripe].off;
919                 LASSERT(shift < oa_bufs);
920                 set->set_pga[shift] = pga[i];
921                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
922                                   &set->set_pga[shift].off);
923                 info[stripe].off++;
924         }
925 out:
926         if (info)
927                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
928
929         if (rc == 0)
930                 *reqset = set;
931         else
932                 lov_fini_brw_set(set);
933
934         RETURN(rc);
935 }
936
937 int lov_fini_getattr_set(struct lov_request_set *set)
938 {
939         int rc = 0;
940         ENTRY;
941
942         if (set == NULL)
943                 RETURN(0);
944         LASSERT(set->set_exp);
945         if (set->set_completes)
946                 rc = common_attr_done(set);
947
948         if (atomic_dec_and_test(&set->set_refcount))
949                 lov_finish_set(set);
950
951         RETURN(rc);
952 }
953
954 /* The callback for osc_getattr_async that finilizes a request info when a
955  * response is recieved. */
956 static int cb_getattr_update(struct obd_info *oinfo, int rc)
957 {
958         struct lov_request *lovreq;
959         lovreq = container_of(oinfo, struct lov_request, rq_oi);
960         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
961 }
962
963 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
964                          struct lov_request_set **reqset)
965 {
966         struct lov_request_set *set;
967         struct lov_oinfo *loi = NULL;
968         struct lov_obd *lov = &exp->exp_obd->u.lov;
969         int rc = 0, i;
970         ENTRY;
971
972         OBD_ALLOC(set, sizeof(*set));
973         if (set == NULL)
974                 RETURN(-ENOMEM);
975         lov_init_set(set);
976
977         set->set_exp = exp;
978         set->set_oi = oinfo;
979
980         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
981                 struct lov_request *req;
982
983                 loi = oinfo->oi_md->lsm_oinfo[i];
984                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
985                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
986                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
987                         continue;
988                 }
989
990                 OBD_ALLOC(req, sizeof(*req));
991                 if (req == NULL)
992                         GOTO(out_set, rc = -ENOMEM);
993
994                 req->rq_stripe = i;
995                 req->rq_idx = loi->loi_ost_idx;
996
997                 OBDO_ALLOC(req->rq_oi.oi_oa);
998                 if (req->rq_oi.oi_oa == NULL) {
999                         OBD_FREE(req, sizeof(*req));
1000                         GOTO(out_set, rc = -ENOMEM);
1001                 }
1002                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1003                        sizeof(*req->rq_oi.oi_oa));
1004                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1005                 req->rq_oi.oi_cb_up = cb_getattr_update;
1006
1007                 lov_set_add_req(req, set);
1008         }
1009         if (!set->set_count)
1010                 GOTO(out_set, rc = -EIO);
1011         *reqset = set;
1012         RETURN(rc);
1013 out_set:
1014         lov_fini_getattr_set(set);
1015         RETURN(rc);
1016 }
1017
1018 int lov_fini_destroy_set(struct lov_request_set *set)
1019 {
1020         ENTRY;
1021
1022         if (set == NULL)
1023                 RETURN(0);
1024         LASSERT(set->set_exp);
1025         if (set->set_completes) {
1026                 /* FIXME update qos data here */
1027         }
1028
1029         if (atomic_dec_and_test(&set->set_refcount))
1030                 lov_finish_set(set);
1031
1032         RETURN(0);
1033 }
1034
1035 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1036                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1037                          struct obd_trans_info *oti,
1038                          struct lov_request_set **reqset)
1039 {
1040         struct lov_request_set *set;
1041         struct lov_oinfo *loi = NULL;
1042         struct lov_obd *lov = &exp->exp_obd->u.lov;
1043         int rc = 0, i;
1044         ENTRY;
1045
1046         OBD_ALLOC(set, sizeof(*set));
1047         if (set == NULL)
1048                 RETURN(-ENOMEM);
1049         lov_init_set(set);
1050
1051         set->set_exp = exp;
1052         set->set_oi = oinfo;
1053         set->set_oi->oi_md = lsm;
1054         set->set_oi->oi_oa = src_oa;
1055         set->set_oti = oti;
1056         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1057                 set->set_cookies = oti->oti_logcookies;
1058
1059         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1060                 struct lov_request *req;
1061
1062                 loi = lsm->lsm_oinfo[i];
1063                 if (!lov->lov_tgts[loi->loi_ost_idx] || 
1064                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1065                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1066                         continue;
1067                 }
1068
1069                 OBD_ALLOC(req, sizeof(*req));
1070                 if (req == NULL)
1071                         GOTO(out_set, rc = -ENOMEM);
1072
1073                 req->rq_stripe = i;
1074                 req->rq_idx = loi->loi_ost_idx;
1075
1076                 OBDO_ALLOC(req->rq_oi.oi_oa);
1077                 if (req->rq_oi.oi_oa == NULL) {
1078                         OBD_FREE(req, sizeof(*req));
1079                         GOTO(out_set, rc = -ENOMEM);
1080                 }
1081                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1082                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1083                 lov_set_add_req(req, set);
1084         }
1085         if (!set->set_count)
1086                 GOTO(out_set, rc = -EIO);
1087         *reqset = set;
1088         RETURN(rc);
1089 out_set:
1090         lov_fini_destroy_set(set);
1091         RETURN(rc);
1092 }
1093
1094 int lov_fini_setattr_set(struct lov_request_set *set)
1095 {
1096         int rc = 0;
1097         ENTRY;
1098
1099         if (set == NULL)
1100                 RETURN(0);
1101         LASSERT(set->set_exp);
1102         if (set->set_completes) {
1103                 rc = common_attr_done(set);
1104                 /* FIXME update qos data here */
1105         }
1106
1107         if (atomic_dec_and_test(&set->set_refcount))
1108                 lov_finish_set(set);
1109         RETURN(rc);
1110 }
1111
1112 int lov_update_setattr_set(struct lov_request_set *set,
1113                            struct lov_request *req, int rc)
1114 {
1115         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1116         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1117         ENTRY;
1118
1119         lov_update_set(set, req, rc);
1120
1121         /* grace error on inactive ost */
1122         if (rc && !(lov->lov_tgts[req->rq_idx] && 
1123                     lov->lov_tgts[req->rq_idx]->ltd_active))
1124                 rc = 0;
1125
1126         if (rc == 0) {
1127                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1128                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1129                                 req->rq_oi.oi_oa->o_ctime;
1130                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1131                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1132                                 req->rq_oi.oi_oa->o_mtime;
1133                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1134                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1135                                 req->rq_oi.oi_oa->o_atime;
1136         }
1137
1138         RETURN(rc);
1139 }
1140
1141 /* The callback for osc_setattr_async that finilizes a request info when a
1142  * response is recieved. */
1143 static int cb_setattr_update(struct obd_info *oinfo, int rc)
1144 {
1145         struct lov_request *lovreq;
1146         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1147         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1148 }
1149
1150 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1151                          struct obd_trans_info *oti,
1152                          struct lov_request_set **reqset)
1153 {
1154         struct lov_request_set *set;
1155         struct lov_oinfo *loi = NULL;
1156         struct lov_obd *lov = &exp->exp_obd->u.lov;
1157         int rc = 0, i;
1158         ENTRY;
1159
1160         OBD_ALLOC(set, sizeof(*set));
1161         if (set == NULL)
1162                 RETURN(-ENOMEM);
1163         lov_init_set(set);
1164
1165         set->set_exp = exp;
1166         set->set_oti = oti;
1167         set->set_oi = oinfo;
1168         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1169                 set->set_cookies = oti->oti_logcookies;
1170
1171         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1172                 struct lov_request *req;
1173
1174                 loi = oinfo->oi_md->lsm_oinfo[i];
1175                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1176                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1177                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1178                         continue;
1179                 }
1180
1181                 OBD_ALLOC(req, sizeof(*req));
1182                 if (req == NULL)
1183                         GOTO(out_set, rc = -ENOMEM);
1184                 req->rq_stripe = i;
1185                 req->rq_idx = loi->loi_ost_idx;
1186
1187                 OBDO_ALLOC(req->rq_oi.oi_oa);
1188                 if (req->rq_oi.oi_oa == NULL) {
1189                         OBD_FREE(req, sizeof(*req));
1190                         GOTO(out_set, rc = -ENOMEM);
1191                 }
1192                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1193                        sizeof(*req->rq_oi.oi_oa));
1194                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1195                 req->rq_oi.oi_oa->o_stripe_idx = i;
1196                 req->rq_oi.oi_cb_up = cb_setattr_update;
1197                 req->rq_rqset = set;
1198
1199                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1200                         int off = lov_stripe_offset(oinfo->oi_md,
1201                                                     oinfo->oi_oa->o_size, i,
1202                                                     &req->rq_oi.oi_oa->o_size);
1203
1204                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1205                                 req->rq_oi.oi_oa->o_size--;
1206
1207                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1208                                i, req->rq_oi.oi_oa->o_size,
1209                                oinfo->oi_oa->o_size);
1210                 }
1211                 lov_set_add_req(req, set);
1212         }
1213         if (!set->set_count)
1214                 GOTO(out_set, rc = -EIO);
1215         *reqset = set;
1216         RETURN(rc);
1217 out_set:
1218         lov_fini_setattr_set(set);
1219         RETURN(rc);
1220 }
1221
1222 int lov_fini_punch_set(struct lov_request_set *set)
1223 {
1224         int rc = 0;
1225         ENTRY;
1226
1227         if (set == NULL)
1228                 RETURN(0);
1229         LASSERT(set->set_exp);
1230         if (set->set_completes) {
1231                 rc = -EIO;
1232                 /* FIXME update qos data here */
1233                 if (set->set_success)
1234                         rc = common_attr_done(set);
1235         }
1236
1237         if (atomic_dec_and_test(&set->set_refcount))
1238                 lov_finish_set(set);
1239
1240         RETURN(rc);
1241 }
1242
1243 int lov_update_punch_set(struct lov_request_set *set,
1244                            struct lov_request *req, int rc)
1245 {
1246         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1247         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1248         ENTRY;
1249
1250         lov_update_set(set, req, rc);
1251
1252         /* grace error on inactive ost */
1253         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1254                 rc = 0;
1255
1256         if (rc == 0) {
1257                 lov_stripe_lock(lsm);
1258                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1259                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1260                                 req->rq_oi.oi_oa->o_blocks;
1261                 }
1262
1263                 /* Do we need to update lvb_size here? It needn't because
1264                  * it have been done in ll_truncate(). -jay */
1265                 lov_stripe_unlock(lsm);
1266         }
1267
1268         RETURN(rc);
1269 }
1270
1271 /* The callback for osc_punch that finilizes a request info when a response
1272  * is recieved. */
1273 static int cb_update_punch(struct obd_info *oinfo, int rc)
1274 {
1275         struct lov_request *lovreq;
1276         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1277         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1278 }
1279
1280 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1281                        struct obd_trans_info *oti,
1282                        struct lov_request_set **reqset)
1283 {
1284         struct lov_request_set *set;
1285         struct lov_oinfo *loi = NULL;
1286         struct lov_obd *lov = &exp->exp_obd->u.lov;
1287         int rc = 0, i;
1288         ENTRY;
1289
1290         OBD_ALLOC(set, sizeof(*set));
1291         if (set == NULL)
1292                 RETURN(-ENOMEM);
1293         lov_init_set(set);
1294
1295         set->set_oi = oinfo;
1296         set->set_exp = exp;
1297
1298         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1299                 struct lov_request *req;
1300                 obd_off rs, re;
1301
1302                 loi = oinfo->oi_md->lsm_oinfo[i];
1303                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1304                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1305                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1306                         continue;
1307                 }
1308
1309                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1310                                            oinfo->oi_policy.l_extent.start,
1311                                            oinfo->oi_policy.l_extent.end,
1312                                            &rs, &re))
1313                         continue;
1314
1315                 OBD_ALLOC(req, sizeof(*req));
1316                 if (req == NULL)
1317                         GOTO(out_set, rc = -ENOMEM);
1318                 req->rq_stripe = i;
1319                 req->rq_idx = loi->loi_ost_idx;
1320
1321                 OBDO_ALLOC(req->rq_oi.oi_oa);
1322                 if (req->rq_oi.oi_oa == NULL) {
1323                         OBD_FREE(req, sizeof(*req));
1324                         GOTO(out_set, rc = -ENOMEM);
1325                 }
1326                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1327                        sizeof(*req->rq_oi.oi_oa));
1328                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1329                 req->rq_oi.oi_oa->o_stripe_idx = i;
1330                 req->rq_oi.oi_cb_up = cb_update_punch;
1331                 req->rq_rqset = set;
1332
1333                 req->rq_oi.oi_policy.l_extent.start = rs;
1334                 req->rq_oi.oi_policy.l_extent.end = re;
1335                 req->rq_oi.oi_policy.l_extent.gid = -1;
1336
1337                 lov_set_add_req(req, set);
1338         }
1339         if (!set->set_count)
1340                 GOTO(out_set, rc = -EIO);
1341         *reqset = set;
1342         RETURN(rc);
1343 out_set:
1344         lov_fini_punch_set(set);
1345         RETURN(rc);
1346 }
1347
1348 int lov_fini_sync_set(struct lov_request_set *set)
1349 {
1350         int rc = 0;
1351         ENTRY;
1352
1353         if (set == NULL)
1354                 RETURN(0);
1355         LASSERT(set->set_exp);
1356         if (set->set_completes) {
1357                 if (!set->set_success)
1358                         rc = -EIO;
1359                 /* FIXME update qos data here */
1360         }
1361
1362         if (atomic_dec_and_test(&set->set_refcount))
1363                 lov_finish_set(set);
1364
1365         RETURN(rc);
1366 }
1367
1368 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1369                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1370                       obd_off start, obd_off end,
1371                       struct lov_request_set **reqset)
1372 {
1373         struct lov_request_set *set;
1374         struct lov_oinfo *loi = NULL;
1375         struct lov_obd *lov = &exp->exp_obd->u.lov;
1376         int rc = 0, i;
1377         ENTRY;
1378
1379         OBD_ALLOC(set, sizeof(*set));
1380         if (set == NULL)
1381                 RETURN(-ENOMEM);
1382         lov_init_set(set);
1383
1384         set->set_exp = exp;
1385         set->set_oi = oinfo;
1386         set->set_oi->oi_md = lsm;
1387         set->set_oi->oi_oa = src_oa;
1388
1389         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1390                 struct lov_request *req;
1391                 obd_off rs, re;
1392
1393                 loi = lsm->lsm_oinfo[i];
1394                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1395                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1396                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1397                         continue;
1398                 }
1399
1400                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1401                         continue;
1402
1403                 OBD_ALLOC(req, sizeof(*req));
1404                 if (req == NULL)
1405                         GOTO(out_set, rc = -ENOMEM);
1406                 req->rq_stripe = i;
1407                 req->rq_idx = loi->loi_ost_idx;
1408
1409                 OBDO_ALLOC(req->rq_oi.oi_oa);
1410                 if (req->rq_oi.oi_oa == NULL) {
1411                         OBD_FREE(req, sizeof(*req));
1412                         GOTO(out_set, rc = -ENOMEM);
1413                 }
1414                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1415                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1416                 req->rq_oi.oi_oa->o_stripe_idx = i;
1417
1418                 req->rq_oi.oi_policy.l_extent.start = rs;
1419                 req->rq_oi.oi_policy.l_extent.end = re;
1420                 req->rq_oi.oi_policy.l_extent.gid = -1;
1421
1422                 lov_set_add_req(req, set);
1423         }
1424         if (!set->set_count)
1425                 GOTO(out_set, rc = -EIO);
1426         *reqset = set;
1427         RETURN(rc);
1428 out_set:
1429         lov_fini_sync_set(set);
1430         RETURN(rc);
1431 }
1432
1433 #define LOV_U64_MAX ((__u64)~0ULL)
1434 #define LOV_SUM_MAX(tot, add)                                           \
1435         do {                                                            \
1436                 if ((tot) + (add) < (tot))                              \
1437                         (tot) = LOV_U64_MAX;                            \
1438                 else                                                    \
1439                         (tot) += (add);                                 \
1440         } while(0)
1441
1442 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1443 {
1444         ENTRY;
1445
1446         if (success) {
1447                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1448
1449                 if (osfs->os_files != LOV_U64_MAX)
1450                         do_div(osfs->os_files, expected_stripes);
1451                 if (osfs->os_ffree != LOV_U64_MAX)
1452                         do_div(osfs->os_ffree, expected_stripes);
1453
1454                 spin_lock(&obd->obd_osfs_lock);
1455                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1456                 obd->obd_osfs_age = get_jiffies_64();
1457                 spin_unlock(&obd->obd_osfs_lock);
1458                 RETURN(0);
1459         }
1460
1461         RETURN(-EIO);
1462 }
1463
1464 int lov_fini_statfs_set(struct lov_request_set *set)
1465 {
1466         int rc = 0;
1467         ENTRY;
1468
1469         if (set == NULL)
1470                 RETURN(0);
1471
1472         if (set->set_completes) {
1473                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1474                                      set->set_success);
1475         }
1476
1477         if (atomic_dec_and_test(&set->set_refcount))
1478                 lov_finish_set(set);
1479
1480         RETURN(rc);
1481 }
1482
1483 void lov_update_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1484                        struct obd_statfs *lov_sfs, int success)
1485 {
1486         int shift = 0, quit = 0;
1487         __u64 tmp;
1488         spin_lock(&obd->obd_osfs_lock);
1489         memcpy(&obd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1490         obd->obd_osfs_age = get_jiffies_64();
1491         spin_unlock(&obd->obd_osfs_lock);
1492
1493         if (success == 0) {
1494                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1495         } else {
1496                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1497                         /* assume all block sizes are always powers of 2 */
1498                         /* get the bits difference */
1499                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1500                         for (shift = 0; shift <= 64; ++shift) {
1501                                 if (tmp & 1) {
1502                                         if (quit)
1503                                                 break;
1504                                         else
1505                                                 quit = 1;
1506                                         shift = 0;
1507                                 }
1508                                 tmp >>= 1;
1509                         }
1510                 }
1511
1512                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1513                         osfs->os_bsize = lov_sfs->os_bsize;
1514
1515                         osfs->os_bfree  >>= shift;
1516                         osfs->os_bavail >>= shift;
1517                         osfs->os_blocks >>= shift;
1518                 } else if (shift != 0) {
1519                         lov_sfs->os_bfree  >>= shift;
1520                         lov_sfs->os_bavail >>= shift;
1521                         lov_sfs->os_blocks >>= shift;
1522                 }
1523 #ifdef MIN_DF
1524                 /* Sandia requested that df (and so, statfs) only
1525                    returned minimal available space on
1526                    a single OST, so people would be able to
1527                    write this much data guaranteed. */
1528                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1529                         /* Presumably if new bavail is smaller,
1530                            new bfree is bigger as well */
1531                         osfs->os_bfree = lov_sfs->os_bfree;
1532                         osfs->os_bavail = lov_sfs->os_bavail;
1533                 }
1534 #else
1535                 osfs->os_bfree += lov_sfs->os_bfree;
1536                 osfs->os_bavail += lov_sfs->os_bavail;
1537 #endif
1538                 osfs->os_blocks += lov_sfs->os_blocks;
1539                 /* XXX not sure about this one - depends on policy.
1540                  *   - could be minimum if we always stripe on all OBDs
1541                  *     (but that would be wrong for any other policy,
1542                  *     if one of the OBDs has no more objects left)
1543                  *   - could be sum if we stripe whole objects
1544                  *   - could be average, just to give a nice number
1545                  *
1546                  * To give a "reasonable" (if not wholly accurate)
1547                  * number, we divide the total number of free objects
1548                  * by expected stripe count (watch out for overflow).
1549                  */
1550                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1551                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1552         }
1553 }
1554
1555 /* The callback for osc_statfs_async that finilizes a request info when a
1556  * response is recieved. */
1557 static int cb_statfs_update(struct obd_info *oinfo, int rc)
1558 {
1559         struct lov_request *lovreq;
1560         struct obd_statfs *osfs, *lov_sfs;
1561         struct obd_device *obd;
1562         struct lov_obd *lov;
1563         int success;
1564         ENTRY;
1565
1566         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1567         lov = &lovreq->rq_rqset->set_obd->u.lov;
1568         obd = class_exp2obd(lov->lov_tgts[lovreq->rq_idx]->ltd_exp);
1569
1570         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1571         lov_sfs = oinfo->oi_osfs;
1572
1573         success = lovreq->rq_rqset->set_success;
1574
1575         /* XXX: the same is done in lov_update_common_set, however
1576            lovset->set_exp is not initialized. */
1577         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1578         if (rc) {
1579                 if (rc && !(lov->lov_tgts[lovreq->rq_idx] &&
1580                             lov->lov_tgts[lovreq->rq_idx]->ltd_active))
1581                         rc = 0;
1582                 RETURN(rc);
1583         }
1584
1585         lov_update_statfs(obd, osfs, lov_sfs, success);
1586         qos_update(lov);
1587
1588         RETURN(0);
1589 }
1590
1591 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1592                         struct lov_request_set **reqset)
1593 {
1594         struct lov_request_set *set;
1595         struct lov_obd *lov = &obd->u.lov;
1596         int rc = 0, i;
1597         ENTRY;
1598
1599         OBD_ALLOC(set, sizeof(*set));
1600         if (set == NULL)
1601                 RETURN(-ENOMEM);
1602         lov_init_set(set);
1603
1604         set->set_obd = obd;
1605         set->set_oi = oinfo;
1606
1607         /* We only get block data from the OBD */
1608         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1609                 struct lov_request *req;
1610
1611                 if (!lov->lov_tgts[i] || !lov->lov_tgts[i]->ltd_active) {
1612                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1613                         continue;
1614                 }
1615
1616                 OBD_ALLOC(req, sizeof(*req));
1617                 if (req == NULL)
1618                         GOTO(out_set, rc = -ENOMEM);
1619
1620                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1621                 if (req->rq_oi.oi_osfs == NULL) {
1622                         OBD_FREE(req, sizeof(*req));
1623                         GOTO(out_set, rc = -ENOMEM);
1624                 }
1625
1626                 req->rq_idx = i;
1627                 req->rq_oi.oi_cb_up = cb_statfs_update;
1628                 req->rq_rqset = set;
1629
1630                 lov_set_add_req(req, set);
1631         }
1632         if (!set->set_count)
1633                 GOTO(out_set, rc = -EIO);
1634         *reqset = set;
1635         RETURN(rc);
1636 out_set:
1637         lov_fini_statfs_set(set);
1638         RETURN(rc);
1639 }