Whamcloud - gitweb
Revert "b=19427 correct lmm_object_id and reserve fids for fid-on-OST."
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         cfs_atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         cfs_spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         cfs_list_t *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         cfs_list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = cfs_list_entry(pos,
74                                                          struct lov_request,
75                                                          rq_link);
76                 cfs_list_del_init(&req->rq_link);
77
78                 if (req->rq_oi.oi_oa)
79                         OBDO_FREE(req->rq_oi.oi_oa);
80                 if (req->rq_oi.oi_md)
81                         OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);
82                 if (req->rq_oi.oi_osfs)
83                         OBD_FREE(req->rq_oi.oi_osfs,
84                                  sizeof(*req->rq_oi.oi_osfs));
85                 OBD_FREE(req, sizeof(*req));
86         }
87
88         if (set->set_pga) {
89                 int len = set->set_oabufs * sizeof(*set->set_pga);
90                 OBD_FREE(set->set_pga, len);
91         }
92         if (set->set_lockh)
93                 lov_llh_put(set->set_lockh);
94
95         OBD_FREE(set, sizeof(*set));
96         EXIT;
97 }
98
99 int lov_finished_set(struct lov_request_set *set)
100 {
101         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
102                set->set_count);
103         return set->set_completes == set->set_count;
104 }
105
106
107 void lov_update_set(struct lov_request_set *set,
108                     struct lov_request *req, int rc)
109 {
110         req->rq_complete = 1;
111         req->rq_rc = rc;
112
113         set->set_completes++;
114         if (rc == 0)
115                 set->set_success++;
116
117         cfs_waitq_signal(&set->set_waitq);
118 }
119
120 int lov_update_common_set(struct lov_request_set *set,
121                           struct lov_request *req, int rc)
122 {
123         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
124         ENTRY;
125
126         lov_update_set(set, req, rc);
127
128         /* grace error on inactive ost */
129         if (rc && !(lov->lov_tgts[req->rq_idx] &&
130                     lov->lov_tgts[req->rq_idx]->ltd_active))
131                 rc = 0;
132
133         /* FIXME in raid1 regime, should return 0 */
134         RETURN(rc);
135 }
136
137 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
138 {
139         cfs_list_add_tail(&req->rq_link, &set->set_list);
140         set->set_count++;
141         req->rq_rqset = set;
142 }
143
144 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
145                                struct lov_oinfo *loi, int flags,
146                                struct ost_lvb *lvb, __u32 mode, int rc);
147
148 static int lov_update_enqueue_lov(struct obd_export *exp,
149                                   struct lustre_handle *lov_lockhp,
150                                   struct lov_oinfo *loi, int flags, int idx,
151                                   __u64 oid, int rc)
152 {
153         struct lov_obd *lov = &exp->exp_obd->u.lov;
154
155         if (rc != ELDLM_OK &&
156             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
157                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
158                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
159                         /* -EUSERS used by OST to report file contention */
160                         if (rc != -EINTR && rc != -EUSERS)
161                                 CERROR("enqueue objid "LPX64" subobj "
162                                        LPX64" on OST idx %d: rc %d\n",
163                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
164                 } else
165                         rc = ELDLM_OK;
166         }
167         return rc;
168 }
169
170 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
171 {
172         struct lov_request_set *set = req->rq_rqset;
173         struct lustre_handle *lov_lockhp;
174         struct obd_info *oi = set->set_oi;
175         struct lov_oinfo *loi;
176         ENTRY;
177
178         LASSERT(oi != NULL);
179
180         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
181         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
182
183         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
184          * and that copy can be arbitrarily out of date.
185          *
186          * The LOV API is due for a serious rewriting anyways, and this
187          * can be addressed then. */
188
189         lov_stripe_lock(oi->oi_md);
190         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
191                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
192         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
193                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
194         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
195                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
196         lov_stripe_unlock(oi->oi_md);
197         lov_update_set(set, req, rc);
198         RETURN(rc);
199 }
200
201 /* The callback for osc_enqueue that updates lov info for every OSC request. */
202 static int cb_update_enqueue(void *cookie, int rc)
203 {
204         struct obd_info *oinfo = cookie;
205         struct ldlm_enqueue_info *einfo;
206         struct lov_request *lovreq;
207
208         lovreq = container_of(oinfo, struct lov_request, rq_oi);
209         einfo = lovreq->rq_rqset->set_ei;
210         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
211 }
212
213 static int enqueue_done(struct lov_request_set *set, __u32 mode)
214 {
215         struct lov_request *req;
216         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
217         int rc = 0;
218         ENTRY;
219
220         /* enqueue/match success, just return */
221         if (set->set_completes && set->set_completes == set->set_success)
222                 RETURN(0);
223
224         /* cancel enqueued/matched locks */
225         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
226                 struct lustre_handle *lov_lockhp;
227
228                 if (!req->rq_complete || req->rq_rc)
229                         continue;
230
231                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
232                 LASSERT(lov_lockhp);
233                 if (!lustre_handle_is_used(lov_lockhp))
234                         continue;
235
236                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
237                                 req->rq_oi.oi_md, mode, lov_lockhp);
238                 if (rc && lov->lov_tgts[req->rq_idx] &&
239                     lov->lov_tgts[req->rq_idx]->ltd_active)
240                         CERROR("cancelling obdjid "LPX64" on OST "
241                                "idx %d error: rc = %d\n",
242                                req->rq_oi.oi_md->lsm_object_id,
243                                req->rq_idx, rc);
244         }
245         if (set->set_lockh)
246                 lov_llh_put(set->set_lockh);
247         RETURN(rc);
248 }
249
250 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
251                          struct ptlrpc_request_set *rqset)
252 {
253         int ret = 0;
254         ENTRY;
255
256         if (set == NULL)
257                 RETURN(0);
258         LASSERT(set->set_exp);
259         /* Do enqueue_done only for sync requests and if any request
260          * succeeded. */
261         if (!rqset) {
262                 if (rc)
263                         set->set_completes = 0;
264                 ret = enqueue_done(set, mode);
265         } else if (set->set_lockh)
266                 lov_llh_put(set->set_lockh);
267
268         lov_put_reqset(set);
269
270         RETURN(rc ? rc : ret);
271 }
272
273 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
274                          struct ldlm_enqueue_info *einfo,
275                          struct lov_request_set **reqset)
276 {
277         struct lov_obd *lov = &exp->exp_obd->u.lov;
278         struct lov_request_set *set;
279         int i, rc = 0;
280         ENTRY;
281
282         OBD_ALLOC(set, sizeof(*set));
283         if (set == NULL)
284                 RETURN(-ENOMEM);
285         lov_init_set(set);
286
287         set->set_exp = exp;
288         set->set_oi = oinfo;
289         set->set_ei = einfo;
290         set->set_lockh = lov_llh_new(oinfo->oi_md);
291         if (set->set_lockh == NULL)
292                 GOTO(out_set, rc = -ENOMEM);
293         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
294
295         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
296                 struct lov_oinfo *loi;
297                 struct lov_request *req;
298                 obd_off start, end;
299
300                 loi = oinfo->oi_md->lsm_oinfo[i];
301                 if (!lov_stripe_intersects(oinfo->oi_md, i,
302                                            oinfo->oi_policy.l_extent.start,
303                                            oinfo->oi_policy.l_extent.end,
304                                            &start, &end))
305                         continue;
306
307                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
308                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
309                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
310                         continue;
311                 }
312
313                 OBD_ALLOC(req, sizeof(*req));
314                 if (req == NULL)
315                         GOTO(out_set, rc = -ENOMEM);
316
317                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
318                         sizeof(struct lov_oinfo *) +
319                         sizeof(struct lov_oinfo);
320                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
321                 if (req->rq_oi.oi_md == NULL) {
322                         OBD_FREE(req, sizeof(*req));
323                         GOTO(out_set, rc = -ENOMEM);
324                 }
325                 req->rq_oi.oi_md->lsm_oinfo[0] =
326                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
327                         sizeof(struct lov_oinfo *);
328
329                 /* Set lov request specific parameters. */
330                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
331                 req->rq_oi.oi_cb_up = cb_update_enqueue;
332                 req->rq_oi.oi_flags = oinfo->oi_flags;
333
334                 LASSERT(req->rq_oi.oi_lockh);
335
336                 req->rq_oi.oi_policy.l_extent.gid =
337                         oinfo->oi_policy.l_extent.gid;
338                 req->rq_oi.oi_policy.l_extent.start = start;
339                 req->rq_oi.oi_policy.l_extent.end = end;
340
341                 req->rq_idx = loi->loi_ost_idx;
342                 req->rq_stripe = i;
343
344                 /* XXX LOV STACKING: submd should be from the subobj */
345                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
346                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
347                 req->rq_oi.oi_md->lsm_stripe_count = 0;
348                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
349                         loi->loi_kms_valid;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
351                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
352
353                 lov_set_add_req(req, set);
354         }
355         if (!set->set_count)
356                 GOTO(out_set, rc = -EIO);
357         *reqset = set;
358         RETURN(0);
359 out_set:
360         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
361         RETURN(rc);
362 }
363
364 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
365                          int rc)
366 {
367         int ret = rc;
368         ENTRY;
369
370         if (rc > 0)
371                 ret = 0;
372         else if (rc == 0)
373                 ret = 1;
374         lov_update_set(set, req, ret);
375         RETURN(rc);
376 }
377
378 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
379 {
380         int rc = 0;
381         ENTRY;
382
383         if (set == NULL)
384                 RETURN(0);
385         LASSERT(set->set_exp);
386         rc = enqueue_done(set, mode);
387         if ((set->set_count == set->set_success) &&
388             (flags & LDLM_FL_TEST_LOCK))
389                 lov_llh_put(set->set_lockh);
390
391         lov_put_reqset(set);
392
393         RETURN(rc);
394 }
395
396 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
397                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
398                        __u32 mode, struct lustre_handle *lockh,
399                        struct lov_request_set **reqset)
400 {
401         struct lov_obd *lov = &exp->exp_obd->u.lov;
402         struct lov_request_set *set;
403         int i, rc = 0;
404         ENTRY;
405
406         OBD_ALLOC(set, sizeof(*set));
407         if (set == NULL)
408                 RETURN(-ENOMEM);
409         lov_init_set(set);
410
411         set->set_exp = exp;
412         set->set_oi = oinfo;
413         set->set_oi->oi_md = lsm;
414         set->set_lockh = lov_llh_new(lsm);
415         if (set->set_lockh == NULL)
416                 GOTO(out_set, rc = -ENOMEM);
417         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
418
419         for (i = 0; i < lsm->lsm_stripe_count; i++){
420                 struct lov_oinfo *loi;
421                 struct lov_request *req;
422                 obd_off start, end;
423
424                 loi = lsm->lsm_oinfo[i];
425                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
426                                            policy->l_extent.end, &start, &end))
427                         continue;
428
429                 /* FIXME raid1 should grace this error */
430                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
431                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
432                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
433                         GOTO(out_set, rc = -EIO);
434                 }
435
436                 OBD_ALLOC(req, sizeof(*req));
437                 if (req == NULL)
438                         GOTO(out_set, rc = -ENOMEM);
439
440                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
441                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
442                 if (req->rq_oi.oi_md == NULL) {
443                         OBD_FREE(req, sizeof(*req));
444                         GOTO(out_set, rc = -ENOMEM);
445                 }
446
447                 req->rq_oi.oi_policy.l_extent.start = start;
448                 req->rq_oi.oi_policy.l_extent.end = end;
449                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
450
451                 req->rq_idx = loi->loi_ost_idx;
452                 req->rq_stripe = i;
453
454                 /* XXX LOV STACKING: submd should be from the subobj */
455                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
456                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
457                 req->rq_oi.oi_md->lsm_stripe_count = 0;
458
459                 lov_set_add_req(req, set);
460         }
461         if (!set->set_count)
462                 GOTO(out_set, rc = -EIO);
463         *reqset = set;
464         RETURN(rc);
465 out_set:
466         lov_fini_match_set(set, mode, 0);
467         RETURN(rc);
468 }
469
470 int lov_fini_cancel_set(struct lov_request_set *set)
471 {
472         int rc = 0;
473         ENTRY;
474
475         if (set == NULL)
476                 RETURN(0);
477
478         LASSERT(set->set_exp);
479         if (set->set_lockh)
480                 lov_llh_put(set->set_lockh);
481
482         lov_put_reqset(set);
483
484         RETURN(rc);
485 }
486
487 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
488                         struct lov_stripe_md *lsm, __u32 mode,
489                         struct lustre_handle *lockh,
490                         struct lov_request_set **reqset)
491 {
492         struct lov_request_set *set;
493         int i, rc = 0;
494         ENTRY;
495
496         OBD_ALLOC(set, sizeof(*set));
497         if (set == NULL)
498                 RETURN(-ENOMEM);
499         lov_init_set(set);
500
501         set->set_exp = exp;
502         set->set_oi = oinfo;
503         set->set_oi->oi_md = lsm;
504         set->set_lockh = lov_handle2llh(lockh);
505         if (set->set_lockh == NULL) {
506                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
507                 GOTO(out_set, rc = -EINVAL);
508         }
509         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
510
511         for (i = 0; i < lsm->lsm_stripe_count; i++){
512                 struct lov_request *req;
513                 struct lustre_handle *lov_lockhp;
514                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
515
516                 lov_lockhp = set->set_lockh->llh_handles + i;
517                 if (!lustre_handle_is_used(lov_lockhp)) {
518                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
519                                loi->loi_ost_idx, loi->loi_id);
520                         continue;
521                 }
522
523                 OBD_ALLOC(req, sizeof(*req));
524                 if (req == NULL)
525                         GOTO(out_set, rc = -ENOMEM);
526
527                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
528                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
529                 if (req->rq_oi.oi_md == NULL) {
530                         OBD_FREE(req, sizeof(*req));
531                         GOTO(out_set, rc = -ENOMEM);
532                 }
533
534                 req->rq_idx = loi->loi_ost_idx;
535                 req->rq_stripe = i;
536
537                 /* XXX LOV STACKING: submd should be from the subobj */
538                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
539                 req->rq_oi.oi_md->lsm_object_gr = lsm->lsm_object_gr;
540                 req->rq_oi.oi_md->lsm_stripe_count = 0;
541
542                 lov_set_add_req(req, set);
543         }
544         if (!set->set_count)
545                 GOTO(out_set, rc = -EIO);
546         *reqset = set;
547         RETURN(rc);
548 out_set:
549         lov_fini_cancel_set(set);
550         RETURN(rc);
551 }
552
553 static int create_done(struct obd_export *exp, struct lov_request_set *set,
554                        struct lov_stripe_md **lsmp)
555 {
556         struct lov_obd *lov = &exp->exp_obd->u.lov;
557         struct obd_trans_info *oti = set->set_oti;
558         struct obdo *src_oa = set->set_oi->oi_oa;
559         struct lov_request *req;
560         struct obdo *ret_oa = NULL;
561         int attrset = 0, rc = 0;
562         ENTRY;
563
564         LASSERT(set->set_completes);
565
566         /* try alloc objects on other osts if osc_create fails for
567          * exceptions: RPC failure, ENOSPC, etc */
568         if (set->set_count != set->set_success) {
569                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
570                         if (req->rq_rc == 0)
571                                 continue;
572
573                         set->set_completes--;
574                         req->rq_complete = 0;
575
576                         rc = qos_remedy_create(set, req);
577                         lov_update_create_set(set, req, rc);
578                 }
579         }
580
581         /* no successful creates */
582         if (set->set_success == 0)
583                 GOTO(cleanup, rc);
584
585         if (set->set_count != set->set_success) {
586                 set->set_count = set->set_success;
587                 qos_shrink_lsm(set);
588         }
589
590         OBDO_ALLOC(ret_oa);
591         if (ret_oa == NULL)
592                 GOTO(cleanup, rc = -ENOMEM);
593
594         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
595                 if (!req->rq_complete || req->rq_rc)
596                         continue;
597                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
598                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
599                                 req->rq_stripe, &attrset);
600         }
601         if (src_oa->o_valid & OBD_MD_FLSIZE &&
602             ret_oa->o_size != src_oa->o_size) {
603                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
604                        src_oa->o_size, ret_oa->o_size);
605                 LBUG();
606         }
607         ret_oa->o_id = src_oa->o_id;
608         ret_oa->o_gr = src_oa->o_gr;
609         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
610         memcpy(src_oa, ret_oa, sizeof(*src_oa));
611         OBDO_FREE(ret_oa);
612
613         *lsmp = set->set_oi->oi_md;
614         GOTO(done, rc = 0);
615
616 cleanup:
617         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
618                 struct obd_export *sub_exp;
619                 int err = 0;
620
621                 if (!req->rq_complete || req->rq_rc)
622                         continue;
623
624                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
625                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
626                                   NULL);
627                 if (err)
628                         CERROR("Failed to uncreate objid "LPX64" subobj "
629                                LPX64" on OST idx %d: rc = %d\n",
630                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
631                                req->rq_idx, rc);
632         }
633         if (*lsmp == NULL)
634                 obd_free_memmd(exp, &set->set_oi->oi_md);
635 done:
636         if (oti && set->set_cookies) {
637                 oti->oti_logcookies = set->set_cookies;
638                 if (!set->set_cookie_sent) {
639                         oti_free_cookies(oti);
640                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
641                 } else {
642                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
643                 }
644         }
645         RETURN(rc);
646 }
647
648 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
649 {
650         int rc = 0;
651         ENTRY;
652
653         if (set == NULL)
654                 RETURN(0);
655         LASSERT(set->set_exp);
656         if (set->set_completes)
657                 rc = create_done(set->set_exp, set, lsmp);
658
659         lov_put_reqset(set);
660         RETURN(rc);
661 }
662
663 int lov_update_create_set(struct lov_request_set *set,
664                           struct lov_request *req, int rc)
665 {
666         struct obd_trans_info *oti = set->set_oti;
667         struct lov_stripe_md *lsm = set->set_oi->oi_md;
668         struct lov_oinfo *loi;
669         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
670         ENTRY;
671
672         if (rc && lov->lov_tgts[req->rq_idx] &&
673             lov->lov_tgts[req->rq_idx]->ltd_active) {
674                 CERROR("error creating fid "LPX64" sub-object"
675                        " on OST idx %d/%d: rc = %d\n",
676                        set->set_oi->oi_oa->o_id, req->rq_idx,
677                        lsm->lsm_stripe_count, rc);
678                 if (rc > 0) {
679                         CERROR("obd_create returned invalid err %d\n", rc);
680                         rc = -EIO;
681                 }
682         }
683
684         cfs_spin_lock(&set->set_lock);
685         req->rq_stripe = set->set_success;
686         loi = lsm->lsm_oinfo[req->rq_stripe];
687         if (rc) {
688                 lov_update_set(set, req, rc);
689                 cfs_spin_unlock(&set->set_lock);
690                 RETURN(rc);
691         }
692
693         loi->loi_id = req->rq_oi.oi_oa->o_id;
694         loi->loi_gr = req->rq_oi.oi_oa->o_gr;
695         loi->loi_ost_idx = req->rq_idx;
696         loi_init(loi);
697
698         if (oti && set->set_cookies)
699                 ++oti->oti_logcookies;
700         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
701                 set->set_cookie_sent++;
702
703         lov_update_set(set, req, rc);
704         cfs_spin_unlock(&set->set_lock);
705
706         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
707                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
708         RETURN(rc);
709 }
710
711 int cb_create_update(void *cookie, int rc)
712 {
713         struct obd_info *oinfo = cookie;
714         struct lov_request *lovreq;
715
716         lovreq = container_of(oinfo, struct lov_request, rq_oi);
717         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
718         if (lov_finished_set(lovreq->rq_rqset))
719                 lov_put_reqset(lovreq->rq_rqset);
720         return rc;
721 }
722
723
724 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
725                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
726                         struct obd_trans_info *oti,
727                         struct lov_request_set **reqset)
728 {
729         struct lov_request_set *set;
730         int rc = 0;
731         ENTRY;
732
733         OBD_ALLOC(set, sizeof(*set));
734         if (set == NULL)
735                 RETURN(-ENOMEM);
736         lov_init_set(set);
737
738         set->set_exp = exp;
739         set->set_oi = oinfo;
740         set->set_oi->oi_md = *lsmp;
741         set->set_oi->oi_oa = src_oa;
742         set->set_oti = oti;
743         lov_get_reqset(set);
744
745         rc = qos_prep_create(exp, set);
746         /* qos_shrink_lsm() may have allocated a new lsm */
747         *lsmp = oinfo->oi_md;
748         if (rc) {
749                 lov_fini_create_set(set, lsmp);
750                 lov_put_reqset(set);
751         } else {
752                 *reqset = set;
753         }
754         RETURN(rc);
755 }
756
757 static int common_attr_done(struct lov_request_set *set)
758 {
759         cfs_list_t *pos;
760         struct lov_request *req;
761         struct obdo *tmp_oa;
762         int rc = 0, attrset = 0;
763         ENTRY;
764
765         LASSERT(set->set_oi != NULL);
766
767         if (set->set_oi->oi_oa == NULL)
768                 RETURN(0);
769
770         if (!set->set_success)
771                 RETURN(-EIO);
772
773         OBDO_ALLOC(tmp_oa);
774         if (tmp_oa == NULL)
775                 GOTO(out, rc = -ENOMEM);
776
777         cfs_list_for_each (pos, &set->set_list) {
778                 req = cfs_list_entry(pos, struct lov_request, rq_link);
779
780                 if (!req->rq_complete || req->rq_rc)
781                         continue;
782                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
783                         continue;
784                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
785                                 req->rq_oi.oi_oa->o_valid,
786                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
787         }
788         if (!attrset) {
789                 CERROR("No stripes had valid attrs\n");
790                 rc = -EIO;
791         }
792         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
793             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
794                 /* When we take attributes of some epoch, we require all the
795                  * ost to be active. */
796                 CERROR("Not all the stripes had valid attrs\n");
797                 GOTO(out, rc = -EIO);
798         }
799
800         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
801         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
802 out:
803         if (tmp_oa)
804                 OBDO_FREE(tmp_oa);
805         RETURN(rc);
806
807 }
808
809 static int brw_done(struct lov_request_set *set)
810 {
811         struct lov_stripe_md *lsm = set->set_oi->oi_md;
812         struct lov_oinfo     *loi = NULL;
813         cfs_list_t *pos;
814         struct lov_request *req;
815         ENTRY;
816
817         cfs_list_for_each (pos, &set->set_list) {
818                 req = cfs_list_entry(pos, struct lov_request, rq_link);
819
820                 if (!req->rq_complete || req->rq_rc)
821                         continue;
822
823                 loi = lsm->lsm_oinfo[req->rq_stripe];
824
825                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
826                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
827         }
828
829         RETURN(0);
830 }
831
832 int lov_fini_brw_set(struct lov_request_set *set)
833 {
834         int rc = 0;
835         ENTRY;
836
837         if (set == NULL)
838                 RETURN(0);
839         LASSERT(set->set_exp);
840         if (set->set_completes) {
841                 rc = brw_done(set);
842                 /* FIXME update qos data here */
843         }
844         lov_put_reqset(set);
845
846         RETURN(rc);
847 }
848
849 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
850                      obd_count oa_bufs, struct brw_page *pga,
851                      struct obd_trans_info *oti,
852                      struct lov_request_set **reqset)
853 {
854         struct {
855                 obd_count       index;
856                 obd_count       count;
857                 obd_count       off;
858         } *info = NULL;
859         struct lov_request_set *set;
860         struct lov_obd *lov = &exp->exp_obd->u.lov;
861         int rc = 0, i, shift;
862         ENTRY;
863
864         OBD_ALLOC(set, sizeof(*set));
865         if (set == NULL)
866                 RETURN(-ENOMEM);
867         lov_init_set(set);
868
869         set->set_exp = exp;
870         set->set_oti = oti;
871         set->set_oi = oinfo;
872         set->set_oabufs = oa_bufs;
873         OBD_ALLOC(set->set_pga, oa_bufs * sizeof(*set->set_pga));
874         if (!set->set_pga)
875                 GOTO(out, rc = -ENOMEM);
876
877         OBD_ALLOC(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
878         if (!info)
879                 GOTO(out, rc = -ENOMEM);
880
881         /* calculate the page count for each stripe */
882         for (i = 0; i < oa_bufs; i++) {
883                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
884                 info[stripe].count++;
885         }
886
887         /* alloc and initialize lov request */
888         shift = 0;
889         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
890                 struct lov_oinfo *loi = NULL;
891                 struct lov_request *req;
892
893                 if (info[i].count == 0)
894                         continue;
895
896                 loi = oinfo->oi_md->lsm_oinfo[i];
897                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
898                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
899                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
900                         GOTO(out, rc = -EIO);
901                 }
902
903                 OBD_ALLOC(req, sizeof(*req));
904                 if (req == NULL)
905                         GOTO(out, rc = -ENOMEM);
906
907                 OBDO_ALLOC(req->rq_oi.oi_oa);
908                 if (req->rq_oi.oi_oa == NULL) {
909                         OBD_FREE(req, sizeof(*req));
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 if (oinfo->oi_oa) {
914                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
915                                sizeof(*req->rq_oi.oi_oa));
916                 }
917                 req->rq_oi.oi_oa->o_id = loi->loi_id;
918                 req->rq_oi.oi_oa->o_stripe_idx = i;
919
920                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
921                 OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);
922                 if (req->rq_oi.oi_md == NULL) {
923                         OBDO_FREE(req->rq_oi.oi_oa);
924                         OBD_FREE(req, sizeof(*req));
925                         GOTO(out, rc = -ENOMEM);
926                 }
927
928                 req->rq_idx = loi->loi_ost_idx;
929                 req->rq_stripe = i;
930
931                 /* XXX LOV STACKING */
932                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
933                 req->rq_oi.oi_md->lsm_object_gr = oinfo->oi_md->lsm_object_gr;
934                 req->rq_oabufs = info[i].count;
935                 req->rq_pgaidx = shift;
936                 shift += req->rq_oabufs;
937
938                 /* remember the index for sort brw_page array */
939                 info[i].index = req->rq_pgaidx;
940
941                 req->rq_oi.oi_capa = oinfo->oi_capa;
942
943                 lov_set_add_req(req, set);
944         }
945         if (!set->set_count)
946                 GOTO(out, rc = -EIO);
947
948         /* rotate & sort the brw_page array */
949         for (i = 0; i < oa_bufs; i++) {
950                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
951
952                 shift = info[stripe].index + info[stripe].off;
953                 LASSERT(shift < oa_bufs);
954                 set->set_pga[shift] = pga[i];
955                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
956                                   &set->set_pga[shift].off);
957                 info[stripe].off++;
958         }
959 out:
960         if (info)
961                 OBD_FREE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
962
963         if (rc == 0)
964                 *reqset = set;
965         else
966                 lov_fini_brw_set(set);
967
968         RETURN(rc);
969 }
970
971 int lov_fini_getattr_set(struct lov_request_set *set)
972 {
973         int rc = 0;
974         ENTRY;
975
976         if (set == NULL)
977                 RETURN(0);
978         LASSERT(set->set_exp);
979         if (set->set_completes)
980                 rc = common_attr_done(set);
981
982         lov_put_reqset(set);
983
984         RETURN(rc);
985 }
986
987 /* The callback for osc_getattr_async that finilizes a request info when a
988  * response is received. */
989 static int cb_getattr_update(void *cookie, int rc)
990 {
991         struct obd_info *oinfo = cookie;
992         struct lov_request *lovreq;
993         lovreq = container_of(oinfo, struct lov_request, rq_oi);
994         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
995 }
996
997 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
998                          struct lov_request_set **reqset)
999 {
1000         struct lov_request_set *set;
1001         struct lov_obd *lov = &exp->exp_obd->u.lov;
1002         int rc = 0, i;
1003         ENTRY;
1004
1005         OBD_ALLOC(set, sizeof(*set));
1006         if (set == NULL)
1007                 RETURN(-ENOMEM);
1008         lov_init_set(set);
1009
1010         set->set_exp = exp;
1011         set->set_oi = oinfo;
1012
1013         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1014                 struct lov_oinfo *loi;
1015                 struct lov_request *req;
1016
1017                 loi = oinfo->oi_md->lsm_oinfo[i];
1018                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1019                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1020                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1021                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1022                                 /* SOM requires all the OSTs to be active. */
1023                                 GOTO(out_set, rc = -EIO);
1024                         continue;
1025                 }
1026
1027                 OBD_ALLOC(req, sizeof(*req));
1028                 if (req == NULL)
1029                         GOTO(out_set, rc = -ENOMEM);
1030
1031                 req->rq_stripe = i;
1032                 req->rq_idx = loi->loi_ost_idx;
1033
1034                 OBDO_ALLOC(req->rq_oi.oi_oa);
1035                 if (req->rq_oi.oi_oa == NULL) {
1036                         OBD_FREE(req, sizeof(*req));
1037                         GOTO(out_set, rc = -ENOMEM);
1038                 }
1039                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1040                        sizeof(*req->rq_oi.oi_oa));
1041                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1042                 req->rq_oi.oi_cb_up = cb_getattr_update;
1043                 req->rq_oi.oi_capa = oinfo->oi_capa;
1044
1045                 lov_set_add_req(req, set);
1046         }
1047         if (!set->set_count)
1048                 GOTO(out_set, rc = -EIO);
1049         *reqset = set;
1050         RETURN(rc);
1051 out_set:
1052         lov_fini_getattr_set(set);
1053         RETURN(rc);
1054 }
1055
1056 int lov_fini_destroy_set(struct lov_request_set *set)
1057 {
1058         ENTRY;
1059
1060         if (set == NULL)
1061                 RETURN(0);
1062         LASSERT(set->set_exp);
1063         if (set->set_completes) {
1064                 /* FIXME update qos data here */
1065         }
1066
1067         lov_put_reqset(set);
1068
1069         RETURN(0);
1070 }
1071
1072 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1073                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1074                          struct obd_trans_info *oti,
1075                          struct lov_request_set **reqset)
1076 {
1077         struct lov_request_set *set;
1078         struct lov_obd *lov = &exp->exp_obd->u.lov;
1079         int rc = 0, i;
1080         ENTRY;
1081
1082         OBD_ALLOC(set, sizeof(*set));
1083         if (set == NULL)
1084                 RETURN(-ENOMEM);
1085         lov_init_set(set);
1086
1087         set->set_exp = exp;
1088         set->set_oi = oinfo;
1089         set->set_oi->oi_md = lsm;
1090         set->set_oi->oi_oa = src_oa;
1091         set->set_oti = oti;
1092         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1093                 set->set_cookies = oti->oti_logcookies;
1094
1095         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1096                 struct lov_oinfo *loi;
1097                 struct lov_request *req;
1098
1099                 loi = lsm->lsm_oinfo[i];
1100                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1101                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1102                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1103                         continue;
1104                 }
1105
1106                 OBD_ALLOC(req, sizeof(*req));
1107                 if (req == NULL)
1108                         GOTO(out_set, rc = -ENOMEM);
1109
1110                 req->rq_stripe = i;
1111                 req->rq_idx = loi->loi_ost_idx;
1112
1113                 OBDO_ALLOC(req->rq_oi.oi_oa);
1114                 if (req->rq_oi.oi_oa == NULL) {
1115                         OBD_FREE(req, sizeof(*req));
1116                         GOTO(out_set, rc = -ENOMEM);
1117                 }
1118                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1119                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1120                 lov_set_add_req(req, set);
1121         }
1122         if (!set->set_count)
1123                 GOTO(out_set, rc = -EIO);
1124         *reqset = set;
1125         RETURN(rc);
1126 out_set:
1127         lov_fini_destroy_set(set);
1128         RETURN(rc);
1129 }
1130
1131 int lov_fini_setattr_set(struct lov_request_set *set)
1132 {
1133         int rc = 0;
1134         ENTRY;
1135
1136         if (set == NULL)
1137                 RETURN(0);
1138         LASSERT(set->set_exp);
1139         if (set->set_completes) {
1140                 rc = common_attr_done(set);
1141                 /* FIXME update qos data here */
1142         }
1143
1144         lov_put_reqset(set);
1145         RETURN(rc);
1146 }
1147
1148 int lov_update_setattr_set(struct lov_request_set *set,
1149                            struct lov_request *req, int rc)
1150 {
1151         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1152         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1153         ENTRY;
1154
1155         lov_update_set(set, req, rc);
1156
1157         /* grace error on inactive ost */
1158         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1159                     lov->lov_tgts[req->rq_idx]->ltd_active))
1160                 rc = 0;
1161
1162         if (rc == 0) {
1163                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1164                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1165                                 req->rq_oi.oi_oa->o_ctime;
1166                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1167                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1168                                 req->rq_oi.oi_oa->o_mtime;
1169                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1170                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1171                                 req->rq_oi.oi_oa->o_atime;
1172         }
1173
1174         RETURN(rc);
1175 }
1176
1177 /* The callback for osc_setattr_async that finilizes a request info when a
1178  * response is received. */
1179 static int cb_setattr_update(void *cookie, int rc)
1180 {
1181         struct obd_info *oinfo = cookie;
1182         struct lov_request *lovreq;
1183         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1184         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1185 }
1186
1187 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1188                          struct obd_trans_info *oti,
1189                          struct lov_request_set **reqset)
1190 {
1191         struct lov_request_set *set;
1192         struct lov_obd *lov = &exp->exp_obd->u.lov;
1193         int rc = 0, i;
1194         ENTRY;
1195
1196         OBD_ALLOC(set, sizeof(*set));
1197         if (set == NULL)
1198                 RETURN(-ENOMEM);
1199         lov_init_set(set);
1200
1201         set->set_exp = exp;
1202         set->set_oti = oti;
1203         set->set_oi = oinfo;
1204         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1205                 set->set_cookies = oti->oti_logcookies;
1206
1207         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1208                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1209                 struct lov_request *req;
1210
1211                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1212                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1213                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1214                         continue;
1215                 }
1216
1217                 OBD_ALLOC(req, sizeof(*req));
1218                 if (req == NULL)
1219                         GOTO(out_set, rc = -ENOMEM);
1220                 req->rq_stripe = i;
1221                 req->rq_idx = loi->loi_ost_idx;
1222
1223                 OBDO_ALLOC(req->rq_oi.oi_oa);
1224                 if (req->rq_oi.oi_oa == NULL) {
1225                         OBD_FREE(req, sizeof(*req));
1226                         GOTO(out_set, rc = -ENOMEM);
1227                 }
1228                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1229                        sizeof(*req->rq_oi.oi_oa));
1230                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1231                 req->rq_oi.oi_oa->o_stripe_idx = i;
1232                 req->rq_oi.oi_cb_up = cb_setattr_update;
1233                 req->rq_oi.oi_capa = oinfo->oi_capa;
1234
1235                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1236                         int off = lov_stripe_offset(oinfo->oi_md,
1237                                                     oinfo->oi_oa->o_size, i,
1238                                                     &req->rq_oi.oi_oa->o_size);
1239
1240                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1241                                 req->rq_oi.oi_oa->o_size--;
1242
1243                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1244                                i, req->rq_oi.oi_oa->o_size,
1245                                oinfo->oi_oa->o_size);
1246                 }
1247                 lov_set_add_req(req, set);
1248         }
1249         if (!set->set_count)
1250                 GOTO(out_set, rc = -EIO);
1251         *reqset = set;
1252         RETURN(rc);
1253 out_set:
1254         lov_fini_setattr_set(set);
1255         RETURN(rc);
1256 }
1257
1258 int lov_fini_punch_set(struct lov_request_set *set)
1259 {
1260         int rc = 0;
1261         ENTRY;
1262
1263         if (set == NULL)
1264                 RETURN(0);
1265         LASSERT(set->set_exp);
1266         if (set->set_completes) {
1267                 rc = -EIO;
1268                 /* FIXME update qos data here */
1269                 if (set->set_success)
1270                         rc = common_attr_done(set);
1271         }
1272
1273         lov_put_reqset(set);
1274
1275         RETURN(rc);
1276 }
1277
1278 int lov_update_punch_set(struct lov_request_set *set,
1279                          struct lov_request *req, int rc)
1280 {
1281         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1282         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1283         ENTRY;
1284
1285         lov_update_set(set, req, rc);
1286
1287         /* grace error on inactive ost */
1288         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1289                 rc = 0;
1290
1291         if (rc == 0) {
1292                 lov_stripe_lock(lsm);
1293                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1294                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1295                                 req->rq_oi.oi_oa->o_blocks;
1296                 }
1297
1298                 /* Do we need to update lvb_size here? It needn't because
1299                  * it have been done in ll_truncate(). -jay */
1300                 lov_stripe_unlock(lsm);
1301         }
1302
1303         RETURN(rc);
1304 }
1305
1306 /* The callback for osc_punch that finilizes a request info when a response
1307  * is received. */
1308 static int cb_update_punch(void *cookie, int rc)
1309 {
1310         struct obd_info *oinfo = cookie;
1311         struct lov_request *lovreq;
1312         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1313         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1314 }
1315
1316 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1317                        struct obd_trans_info *oti,
1318                        struct lov_request_set **reqset)
1319 {
1320         struct lov_request_set *set;
1321         struct lov_obd *lov = &exp->exp_obd->u.lov;
1322         int rc = 0, i;
1323         ENTRY;
1324
1325         OBD_ALLOC(set, sizeof(*set));
1326         if (set == NULL)
1327                 RETURN(-ENOMEM);
1328         lov_init_set(set);
1329
1330         set->set_oi = oinfo;
1331         set->set_exp = exp;
1332
1333         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1334                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1335                 struct lov_request *req;
1336                 obd_off rs, re;
1337
1338                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1339                                            oinfo->oi_policy.l_extent.start,
1340                                            oinfo->oi_policy.l_extent.end,
1341                                            &rs, &re))
1342                         continue;
1343
1344                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1345                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1346                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1347                         GOTO(out_set, rc = -EIO);
1348                 }
1349
1350                 OBD_ALLOC(req, sizeof(*req));
1351                 if (req == NULL)
1352                         GOTO(out_set, rc = -ENOMEM);
1353                 req->rq_stripe = i;
1354                 req->rq_idx = loi->loi_ost_idx;
1355
1356                 OBDO_ALLOC(req->rq_oi.oi_oa);
1357                 if (req->rq_oi.oi_oa == NULL) {
1358                         OBD_FREE(req, sizeof(*req));
1359                         GOTO(out_set, rc = -ENOMEM);
1360                 }
1361                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1362                        sizeof(*req->rq_oi.oi_oa));
1363                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1364                 req->rq_oi.oi_oa->o_gr = loi->loi_gr;
1365                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1366
1367                 req->rq_oi.oi_oa->o_stripe_idx = i;
1368                 req->rq_oi.oi_cb_up = cb_update_punch;
1369
1370                 req->rq_oi.oi_policy.l_extent.start = rs;
1371                 req->rq_oi.oi_policy.l_extent.end = re;
1372                 req->rq_oi.oi_policy.l_extent.gid = -1;
1373
1374                 req->rq_oi.oi_capa = oinfo->oi_capa;
1375
1376                 lov_set_add_req(req, set);
1377         }
1378         if (!set->set_count)
1379                 GOTO(out_set, rc = -EIO);
1380         *reqset = set;
1381         RETURN(rc);
1382 out_set:
1383         lov_fini_punch_set(set);
1384         RETURN(rc);
1385 }
1386
1387 int lov_fini_sync_set(struct lov_request_set *set)
1388 {
1389         int rc = 0;
1390         ENTRY;
1391
1392         if (set == NULL)
1393                 RETURN(0);
1394         LASSERT(set->set_exp);
1395         if (set->set_completes) {
1396                 if (!set->set_success)
1397                         rc = -EIO;
1398                 /* FIXME update qos data here */
1399         }
1400
1401         lov_put_reqset(set);
1402
1403         RETURN(rc);
1404 }
1405
1406 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1407                       struct obdo *src_oa, struct lov_stripe_md *lsm,
1408                       obd_off start, obd_off end,
1409                       struct lov_request_set **reqset)
1410 {
1411         struct lov_request_set *set;
1412         struct lov_obd *lov = &exp->exp_obd->u.lov;
1413         int rc = 0, i;
1414         ENTRY;
1415
1416         OBD_ALLOC(set, sizeof(*set));
1417         if (set == NULL)
1418                 RETURN(-ENOMEM);
1419         lov_init_set(set);
1420
1421         set->set_exp = exp;
1422         set->set_oi = oinfo;
1423         set->set_oi->oi_md = lsm;
1424         set->set_oi->oi_oa = src_oa;
1425
1426         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1427                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
1428                 struct lov_request *req;
1429                 obd_off rs, re;
1430
1431                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1432                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1433                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1434                         continue;
1435                 }
1436
1437                 if (!lov_stripe_intersects(lsm, i, start, end, &rs, &re))
1438                         continue;
1439
1440                 OBD_ALLOC(req, sizeof(*req));
1441                 if (req == NULL)
1442                         GOTO(out_set, rc = -ENOMEM);
1443                 req->rq_stripe = i;
1444                 req->rq_idx = loi->loi_ost_idx;
1445
1446                 OBDO_ALLOC(req->rq_oi.oi_oa);
1447                 if (req->rq_oi.oi_oa == NULL) {
1448                         OBD_FREE(req, sizeof(*req));
1449                         GOTO(out_set, rc = -ENOMEM);
1450                 }
1451                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1452                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1453                 req->rq_oi.oi_oa->o_stripe_idx = i;
1454
1455                 req->rq_oi.oi_policy.l_extent.start = rs;
1456                 req->rq_oi.oi_policy.l_extent.end = re;
1457                 req->rq_oi.oi_policy.l_extent.gid = -1;
1458
1459                 lov_set_add_req(req, set);
1460         }
1461         if (!set->set_count)
1462                 GOTO(out_set, rc = -EIO);
1463         *reqset = set;
1464         RETURN(rc);
1465 out_set:
1466         lov_fini_sync_set(set);
1467         RETURN(rc);
1468 }
1469
1470 #define LOV_U64_MAX ((__u64)~0ULL)
1471 #define LOV_SUM_MAX(tot, add)                                           \
1472         do {                                                            \
1473                 if ((tot) + (add) < (tot))                              \
1474                         (tot) = LOV_U64_MAX;                            \
1475                 else                                                    \
1476                         (tot) += (add);                                 \
1477         } while(0)
1478
1479 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1480 {
1481         ENTRY;
1482
1483         if (success) {
1484                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1485
1486                 if (osfs->os_files != LOV_U64_MAX)
1487                         do_div(osfs->os_files, expected_stripes);
1488                 if (osfs->os_ffree != LOV_U64_MAX)
1489                         do_div(osfs->os_ffree, expected_stripes);
1490
1491                 cfs_spin_lock(&obd->obd_osfs_lock);
1492                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1493                 obd->obd_osfs_age = cfs_time_current_64();
1494                 cfs_spin_unlock(&obd->obd_osfs_lock);
1495                 RETURN(0);
1496         }
1497
1498         RETURN(-EIO);
1499 }
1500
1501 int lov_fini_statfs_set(struct lov_request_set *set)
1502 {
1503         int rc = 0;
1504         ENTRY;
1505
1506         if (set == NULL)
1507                 RETURN(0);
1508
1509         if (set->set_completes) {
1510                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1511                                      set->set_success);
1512         }
1513         lov_put_reqset(set);
1514         RETURN(rc);
1515 }
1516
1517 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1518                        int success)
1519 {
1520         int shift = 0, quit = 0;
1521         __u64 tmp;
1522
1523         if (success == 0) {
1524                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1525         } else {
1526                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1527                         /* assume all block sizes are always powers of 2 */
1528                         /* get the bits difference */
1529                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1530                         for (shift = 0; shift <= 64; ++shift) {
1531                                 if (tmp & 1) {
1532                                         if (quit)
1533                                                 break;
1534                                         else
1535                                                 quit = 1;
1536                                         shift = 0;
1537                                 }
1538                                 tmp >>= 1;
1539                         }
1540                 }
1541
1542                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1543                         osfs->os_bsize = lov_sfs->os_bsize;
1544
1545                         osfs->os_bfree  >>= shift;
1546                         osfs->os_bavail >>= shift;
1547                         osfs->os_blocks >>= shift;
1548                 } else if (shift != 0) {
1549                         lov_sfs->os_bfree  >>= shift;
1550                         lov_sfs->os_bavail >>= shift;
1551                         lov_sfs->os_blocks >>= shift;
1552                 }
1553 #ifdef MIN_DF
1554                 /* Sandia requested that df (and so, statfs) only
1555                    returned minimal available space on
1556                    a single OST, so people would be able to
1557                    write this much data guaranteed. */
1558                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1559                         /* Presumably if new bavail is smaller,
1560                            new bfree is bigger as well */
1561                         osfs->os_bfree = lov_sfs->os_bfree;
1562                         osfs->os_bavail = lov_sfs->os_bavail;
1563                 }
1564 #else
1565                 osfs->os_bfree += lov_sfs->os_bfree;
1566                 osfs->os_bavail += lov_sfs->os_bavail;
1567 #endif
1568                 osfs->os_blocks += lov_sfs->os_blocks;
1569                 /* XXX not sure about this one - depends on policy.
1570                  *   - could be minimum if we always stripe on all OBDs
1571                  *     (but that would be wrong for any other policy,
1572                  *     if one of the OBDs has no more objects left)
1573                  *   - could be sum if we stripe whole objects
1574                  *   - could be average, just to give a nice number
1575                  *
1576                  * To give a "reasonable" (if not wholly accurate)
1577                  * number, we divide the total number of free objects
1578                  * by expected stripe count (watch out for overflow).
1579                  */
1580                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1581                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1582         }
1583 }
1584
1585 /* The callback for osc_statfs_async that finilizes a request info when a
1586  * response is received. */
1587 static int cb_statfs_update(void *cookie, int rc)
1588 {
1589         struct obd_info *oinfo = cookie;
1590         struct lov_request *lovreq;
1591         struct obd_statfs *osfs, *lov_sfs;
1592         struct lov_obd *lov;
1593         struct lov_tgt_desc *tgt;
1594         struct obd_device *lovobd, *tgtobd;
1595         int success;
1596         ENTRY;
1597
1598         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1599         lovobd = lovreq->rq_rqset->set_obd;
1600         lov = &lovobd->u.lov;
1601         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1602         lov_sfs = oinfo->oi_osfs;
1603         success = lovreq->rq_rqset->set_success;
1604         /* XXX: the same is done in lov_update_common_set, however
1605            lovset->set_exp is not initialized. */
1606         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1607         if (rc)
1608                 GOTO(out, rc);
1609
1610         obd_getref(lovobd);
1611         tgt = lov->lov_tgts[lovreq->rq_idx];
1612         if (!tgt || !tgt->ltd_active)
1613                 GOTO(out_update, rc);
1614
1615         tgtobd = class_exp2obd(tgt->ltd_exp);
1616         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1617         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1618         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1619                 tgtobd->obd_osfs_age = cfs_time_current_64();
1620         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1621
1622 out_update:
1623         lov_update_statfs(osfs, lov_sfs, success);
1624         qos_update(lov);
1625         obd_putref(lovobd);
1626
1627 out:
1628         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1629             lov_finished_set(lovreq->rq_rqset)) {
1630                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1631                                     lovreq->rq_rqset->set_success !=
1632                                                   lovreq->rq_rqset->set_count);
1633                qos_statfs_done(lov);
1634         }
1635
1636         RETURN(0);
1637 }
1638
1639 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1640                         struct lov_request_set **reqset)
1641 {
1642         struct lov_request_set *set;
1643         struct lov_obd *lov = &obd->u.lov;
1644         int rc = 0, i;
1645         ENTRY;
1646
1647         OBD_ALLOC(set, sizeof(*set));
1648         if (set == NULL)
1649                 RETURN(-ENOMEM);
1650         lov_init_set(set);
1651
1652         set->set_obd = obd;
1653         set->set_oi = oinfo;
1654
1655         /* We only get block data from the OBD */
1656         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1657                 struct lov_request *req;
1658
1659                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1660                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1661                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1662                         continue;
1663                 }
1664
1665                 /* skip targets that have been explicitely disabled by the
1666                  * administrator */
1667                 if (!lov->lov_tgts[i]->ltd_exp) {
1668                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1669                         continue;
1670                 }
1671
1672                 OBD_ALLOC(req, sizeof(*req));
1673                 if (req == NULL)
1674                         GOTO(out_set, rc = -ENOMEM);
1675
1676                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1677                 if (req->rq_oi.oi_osfs == NULL) {
1678                         OBD_FREE(req, sizeof(*req));
1679                         GOTO(out_set, rc = -ENOMEM);
1680                 }
1681
1682                 req->rq_idx = i;
1683                 req->rq_oi.oi_cb_up = cb_statfs_update;
1684                 req->rq_oi.oi_flags = oinfo->oi_flags;
1685
1686                 lov_set_add_req(req, set);
1687         }
1688         if (!set->set_count)
1689                 GOTO(out_set, rc = -EIO);
1690         *reqset = set;
1691         RETURN(rc);
1692 out_set:
1693         lov_fini_statfs_set(set);
1694         RETURN(rc);
1695 }