Whamcloud - gitweb
LU-243 async lov_sync() operation
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_LOV
41
42 #ifdef __KERNEL__
43 #include <libcfs/libcfs.h>
44 #else
45 #include <liblustre.h>
46 #endif
47
48 #include <obd_class.h>
49 #include <obd_lov.h>
50 #include <lustre/lustre_idl.h>
51
52 #include "lov_internal.h"
53
54 static void lov_init_set(struct lov_request_set *set)
55 {
56         set->set_count = 0;
57         set->set_completes = 0;
58         set->set_success = 0;
59         set->set_cookies = 0;
60         CFS_INIT_LIST_HEAD(&set->set_list);
61         cfs_atomic_set(&set->set_refcount, 1);
62         cfs_waitq_init(&set->set_waitq);
63         cfs_spin_lock_init(&set->set_lock);
64 }
65
66 void lov_finish_set(struct lov_request_set *set)
67 {
68         cfs_list_t *pos, *n;
69         ENTRY;
70
71         LASSERT(set);
72         cfs_list_for_each_safe(pos, n, &set->set_list) {
73                 struct lov_request *req = cfs_list_entry(pos,
74                                                          struct lov_request,
75                                                          rq_link);
76                 cfs_list_del_init(&req->rq_link);
77
78                 if (req->rq_oi.oi_oa)
79                         OBDO_FREE(req->rq_oi.oi_oa);
80                 if (req->rq_oi.oi_md)
81                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
82                 if (req->rq_oi.oi_osfs)
83                         OBD_FREE(req->rq_oi.oi_osfs,
84                                  sizeof(*req->rq_oi.oi_osfs));
85                 OBD_FREE(req, sizeof(*req));
86         }
87
88         if (set->set_pga) {
89                 int len = set->set_oabufs * sizeof(*set->set_pga);
90                 OBD_FREE_LARGE(set->set_pga, len);
91         }
92         if (set->set_lockh)
93                 lov_llh_put(set->set_lockh);
94
95         OBD_FREE(set, sizeof(*set));
96         EXIT;
97 }
98
99 int lov_finished_set(struct lov_request_set *set)
100 {
101         CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
102                set->set_count);
103         return set->set_completes == set->set_count;
104 }
105
106 void lov_update_set(struct lov_request_set *set,
107                     struct lov_request *req, int rc)
108 {
109         req->rq_complete = 1;
110         req->rq_rc = rc;
111
112         set->set_completes++;
113         if (rc == 0)
114                 set->set_success++;
115
116         cfs_waitq_signal(&set->set_waitq);
117 }
118
119 int lov_update_common_set(struct lov_request_set *set,
120                           struct lov_request *req, int rc)
121 {
122         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
123         ENTRY;
124
125         lov_update_set(set, req, rc);
126
127         /* grace error on inactive ost */
128         if (rc && !(lov->lov_tgts[req->rq_idx] &&
129                     lov->lov_tgts[req->rq_idx]->ltd_active))
130                 rc = 0;
131
132         /* FIXME in raid1 regime, should return 0 */
133         RETURN(rc);
134 }
135
136 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
137 {
138         cfs_list_add_tail(&req->rq_link, &set->set_list);
139         set->set_count++;
140         req->rq_rqset = set;
141 }
142
143 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
144                                struct lov_oinfo *loi, int flags,
145                                struct ost_lvb *lvb, __u32 mode, int rc);
146
147 static int lov_update_enqueue_lov(struct obd_export *exp,
148                                   struct lustre_handle *lov_lockhp,
149                                   struct lov_oinfo *loi, int flags, int idx,
150                                   __u64 oid, int rc)
151 {
152         struct lov_obd *lov = &exp->exp_obd->u.lov;
153
154         if (rc != ELDLM_OK &&
155             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
156                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
157                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
158                         /* -EUSERS used by OST to report file contention */
159                         if (rc != -EINTR && rc != -EUSERS)
160                                 CERROR("enqueue objid "LPX64" subobj "
161                                        LPX64" on OST idx %d: rc %d\n",
162                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
163                 } else
164                         rc = ELDLM_OK;
165         }
166         return rc;
167 }
168
169 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
170 {
171         struct lov_request_set *set = req->rq_rqset;
172         struct lustre_handle *lov_lockhp;
173         struct obd_info *oi = set->set_oi;
174         struct lov_oinfo *loi;
175         ENTRY;
176
177         LASSERT(oi != NULL);
178
179         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
180         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
181
182         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
183          * and that copy can be arbitrarily out of date.
184          *
185          * The LOV API is due for a serious rewriting anyways, and this
186          * can be addressed then. */
187
188         lov_stripe_lock(oi->oi_md);
189         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
190                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
191         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
192                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
193         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
194                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
195         lov_stripe_unlock(oi->oi_md);
196         lov_update_set(set, req, rc);
197         RETURN(rc);
198 }
199
200 /* The callback for osc_enqueue that updates lov info for every OSC request. */
201 static int cb_update_enqueue(void *cookie, int rc)
202 {
203         struct obd_info *oinfo = cookie;
204         struct ldlm_enqueue_info *einfo;
205         struct lov_request *lovreq;
206
207         lovreq = container_of(oinfo, struct lov_request, rq_oi);
208         einfo = lovreq->rq_rqset->set_ei;
209         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
210 }
211
212 static int enqueue_done(struct lov_request_set *set, __u32 mode)
213 {
214         struct lov_request *req;
215         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
216         int rc = 0;
217         ENTRY;
218
219         /* enqueue/match success, just return */
220         if (set->set_completes && set->set_completes == set->set_success)
221                 RETURN(0);
222
223         /* cancel enqueued/matched locks */
224         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
225                 struct lustre_handle *lov_lockhp;
226
227                 if (!req->rq_complete || req->rq_rc)
228                         continue;
229
230                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
231                 LASSERT(lov_lockhp);
232                 if (!lustre_handle_is_used(lov_lockhp))
233                         continue;
234
235                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
236                                 req->rq_oi.oi_md, mode, lov_lockhp);
237                 if (rc && lov->lov_tgts[req->rq_idx] &&
238                     lov->lov_tgts[req->rq_idx]->ltd_active)
239                         CERROR("cancelling obdjid "LPX64" on OST "
240                                "idx %d error: rc = %d\n",
241                                req->rq_oi.oi_md->lsm_object_id,
242                                req->rq_idx, rc);
243         }
244         if (set->set_lockh)
245                 lov_llh_put(set->set_lockh);
246         RETURN(rc);
247 }
248
249 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
250                          struct ptlrpc_request_set *rqset)
251 {
252         int ret = 0;
253         ENTRY;
254
255         if (set == NULL)
256                 RETURN(0);
257         LASSERT(set->set_exp);
258         /* Do enqueue_done only for sync requests and if any request
259          * succeeded. */
260         if (!rqset) {
261                 if (rc)
262                         set->set_completes = 0;
263                 ret = enqueue_done(set, mode);
264         } else if (set->set_lockh)
265                 lov_llh_put(set->set_lockh);
266
267         lov_put_reqset(set);
268
269         RETURN(rc ? rc : ret);
270 }
271
272 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
273                          struct ldlm_enqueue_info *einfo,
274                          struct lov_request_set **reqset)
275 {
276         struct lov_obd *lov = &exp->exp_obd->u.lov;
277         struct lov_request_set *set;
278         int i, rc = 0;
279         ENTRY;
280
281         OBD_ALLOC(set, sizeof(*set));
282         if (set == NULL)
283                 RETURN(-ENOMEM);
284         lov_init_set(set);
285
286         set->set_exp = exp;
287         set->set_oi = oinfo;
288         set->set_ei = einfo;
289         set->set_lockh = lov_llh_new(oinfo->oi_md);
290         if (set->set_lockh == NULL)
291                 GOTO(out_set, rc = -ENOMEM);
292         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
293
294         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
295                 struct lov_oinfo *loi;
296                 struct lov_request *req;
297                 obd_off start, end;
298
299                 loi = oinfo->oi_md->lsm_oinfo[i];
300                 if (!lov_stripe_intersects(oinfo->oi_md, i,
301                                            oinfo->oi_policy.l_extent.start,
302                                            oinfo->oi_policy.l_extent.end,
303                                            &start, &end))
304                         continue;
305
306                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
307                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
308                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
309                         continue;
310                 }
311
312                 OBD_ALLOC(req, sizeof(*req));
313                 if (req == NULL)
314                         GOTO(out_set, rc = -ENOMEM);
315
316                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
317                         sizeof(struct lov_oinfo *) +
318                         sizeof(struct lov_oinfo);
319                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
320                 if (req->rq_oi.oi_md == NULL) {
321                         OBD_FREE(req, sizeof(*req));
322                         GOTO(out_set, rc = -ENOMEM);
323                 }
324                 req->rq_oi.oi_md->lsm_oinfo[0] =
325                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
326                         sizeof(struct lov_oinfo *);
327
328                 /* Set lov request specific parameters. */
329                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
330                 req->rq_oi.oi_cb_up = cb_update_enqueue;
331                 req->rq_oi.oi_flags = oinfo->oi_flags;
332
333                 LASSERT(req->rq_oi.oi_lockh);
334
335                 req->rq_oi.oi_policy.l_extent.gid =
336                         oinfo->oi_policy.l_extent.gid;
337                 req->rq_oi.oi_policy.l_extent.start = start;
338                 req->rq_oi.oi_policy.l_extent.end = end;
339
340                 req->rq_idx = loi->loi_ost_idx;
341                 req->rq_stripe = i;
342
343                 /* XXX LOV STACKING: submd should be from the subobj */
344                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
345                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
346                 req->rq_oi.oi_md->lsm_stripe_count = 0;
347                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
348                         loi->loi_kms_valid;
349                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
350                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
351
352                 lov_set_add_req(req, set);
353         }
354         if (!set->set_count)
355                 GOTO(out_set, rc = -EIO);
356         *reqset = set;
357         RETURN(0);
358 out_set:
359         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
360         RETURN(rc);
361 }
362
363 int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,
364                          int rc)
365 {
366         int ret = rc;
367         ENTRY;
368
369         if (rc > 0)
370                 ret = 0;
371         else if (rc == 0)
372                 ret = 1;
373         lov_update_set(set, req, ret);
374         RETURN(rc);
375 }
376
377 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
378 {
379         int rc = 0;
380         ENTRY;
381
382         if (set == NULL)
383                 RETURN(0);
384         LASSERT(set->set_exp);
385         rc = enqueue_done(set, mode);
386         if ((set->set_count == set->set_success) &&
387             (flags & LDLM_FL_TEST_LOCK))
388                 lov_llh_put(set->set_lockh);
389
390         lov_put_reqset(set);
391
392         RETURN(rc);
393 }
394
395 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
396                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
397                        __u32 mode, struct lustre_handle *lockh,
398                        struct lov_request_set **reqset)
399 {
400         struct lov_obd *lov = &exp->exp_obd->u.lov;
401         struct lov_request_set *set;
402         int i, rc = 0;
403         ENTRY;
404
405         OBD_ALLOC(set, sizeof(*set));
406         if (set == NULL)
407                 RETURN(-ENOMEM);
408         lov_init_set(set);
409
410         set->set_exp = exp;
411         set->set_oi = oinfo;
412         set->set_oi->oi_md = lsm;
413         set->set_lockh = lov_llh_new(lsm);
414         if (set->set_lockh == NULL)
415                 GOTO(out_set, rc = -ENOMEM);
416         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
417
418         for (i = 0; i < lsm->lsm_stripe_count; i++){
419                 struct lov_oinfo *loi;
420                 struct lov_request *req;
421                 obd_off start, end;
422
423                 loi = lsm->lsm_oinfo[i];
424                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
425                                            policy->l_extent.end, &start, &end))
426                         continue;
427
428                 /* FIXME raid1 should grace this error */
429                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
430                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
431                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
432                         GOTO(out_set, rc = -EIO);
433                 }
434
435                 OBD_ALLOC(req, sizeof(*req));
436                 if (req == NULL)
437                         GOTO(out_set, rc = -ENOMEM);
438
439                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
440                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
441                 if (req->rq_oi.oi_md == NULL) {
442                         OBD_FREE(req, sizeof(*req));
443                         GOTO(out_set, rc = -ENOMEM);
444                 }
445
446                 req->rq_oi.oi_policy.l_extent.start = start;
447                 req->rq_oi.oi_policy.l_extent.end = end;
448                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
449
450                 req->rq_idx = loi->loi_ost_idx;
451                 req->rq_stripe = i;
452
453                 /* XXX LOV STACKING: submd should be from the subobj */
454                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
455                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
456                 req->rq_oi.oi_md->lsm_stripe_count = 0;
457
458                 lov_set_add_req(req, set);
459         }
460         if (!set->set_count)
461                 GOTO(out_set, rc = -EIO);
462         *reqset = set;
463         RETURN(rc);
464 out_set:
465         lov_fini_match_set(set, mode, 0);
466         RETURN(rc);
467 }
468
469 int lov_fini_cancel_set(struct lov_request_set *set)
470 {
471         int rc = 0;
472         ENTRY;
473
474         if (set == NULL)
475                 RETURN(0);
476
477         LASSERT(set->set_exp);
478         if (set->set_lockh)
479                 lov_llh_put(set->set_lockh);
480
481         lov_put_reqset(set);
482
483         RETURN(rc);
484 }
485
486 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
487                         struct lov_stripe_md *lsm, __u32 mode,
488                         struct lustre_handle *lockh,
489                         struct lov_request_set **reqset)
490 {
491         struct lov_request_set *set;
492         int i, rc = 0;
493         ENTRY;
494
495         OBD_ALLOC(set, sizeof(*set));
496         if (set == NULL)
497                 RETURN(-ENOMEM);
498         lov_init_set(set);
499
500         set->set_exp = exp;
501         set->set_oi = oinfo;
502         set->set_oi->oi_md = lsm;
503         set->set_lockh = lov_handle2llh(lockh);
504         if (set->set_lockh == NULL) {
505                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
506                 GOTO(out_set, rc = -EINVAL);
507         }
508         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
509
510         for (i = 0; i < lsm->lsm_stripe_count; i++){
511                 struct lov_request *req;
512                 struct lustre_handle *lov_lockhp;
513                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
514
515                 lov_lockhp = set->set_lockh->llh_handles + i;
516                 if (!lustre_handle_is_used(lov_lockhp)) {
517                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
518                                loi->loi_ost_idx, loi->loi_id);
519                         continue;
520                 }
521
522                 OBD_ALLOC(req, sizeof(*req));
523                 if (req == NULL)
524                         GOTO(out_set, rc = -ENOMEM);
525
526                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
527                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
528                 if (req->rq_oi.oi_md == NULL) {
529                         OBD_FREE(req, sizeof(*req));
530                         GOTO(out_set, rc = -ENOMEM);
531                 }
532
533                 req->rq_idx = loi->loi_ost_idx;
534                 req->rq_stripe = i;
535
536                 /* XXX LOV STACKING: submd should be from the subobj */
537                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
538                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
539                 req->rq_oi.oi_md->lsm_stripe_count = 0;
540
541                 lov_set_add_req(req, set);
542         }
543         if (!set->set_count)
544                 GOTO(out_set, rc = -EIO);
545         *reqset = set;
546         RETURN(rc);
547 out_set:
548         lov_fini_cancel_set(set);
549         RETURN(rc);
550 }
551
552 static int lov_update_create_set(struct lov_request_set *set,
553                                  struct lov_request *req, int rc)
554 {
555         struct obd_trans_info *oti = set->set_oti;
556         struct lov_stripe_md *lsm = set->set_oi->oi_md;
557         struct lov_oinfo *loi;
558         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
559         ENTRY;
560
561         if (rc && lov->lov_tgts[req->rq_idx] &&
562             lov->lov_tgts[req->rq_idx]->ltd_active) {
563                 CERROR("error creating fid "LPX64" sub-object"
564                        " on OST idx %d/%d: rc = %d\n",
565                        set->set_oi->oi_oa->o_id, req->rq_idx,
566                        lsm->lsm_stripe_count, rc);
567                 if (rc > 0) {
568                         CERROR("obd_create returned invalid err %d\n", rc);
569                         rc = -EIO;
570                 }
571         }
572
573         cfs_spin_lock(&set->set_lock);
574         req->rq_stripe = set->set_success;
575         loi = lsm->lsm_oinfo[req->rq_stripe];
576
577
578         if (rc) {
579                 lov_update_set(set, req, rc);
580                 cfs_spin_unlock(&set->set_lock);
581                 RETURN(rc);
582         }
583
584         loi->loi_id = req->rq_oi.oi_oa->o_id;
585         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
586         loi->loi_ost_idx = req->rq_idx;
587         loi_init(loi);
588
589         if (oti && set->set_cookies)
590                 ++oti->oti_logcookies;
591         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
592                 set->set_cookie_sent++;
593
594         lov_update_set(set, req, rc);
595         cfs_spin_unlock(&set->set_lock);
596
597         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
598                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
599         RETURN(rc);
600 }
601
602 static int create_done(struct obd_export *exp, struct lov_request_set *set,
603                        struct lov_stripe_md **lsmp)
604 {
605         struct lov_obd *lov = &exp->exp_obd->u.lov;
606         struct obd_trans_info *oti = set->set_oti;
607         struct obdo *src_oa = set->set_oi->oi_oa;
608         struct lov_request *req;
609         struct obdo *ret_oa = NULL;
610         int attrset = 0, rc = 0;
611         ENTRY;
612
613         LASSERT(set->set_completes);
614
615         /* try alloc objects on other osts if osc_create fails for
616          * exceptions: RPC failure, ENOSPC, etc */
617         if (set->set_count != set->set_success) {
618                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
619                         if (req->rq_rc == 0)
620                                 continue;
621
622                         set->set_completes--;
623                         req->rq_complete = 0;
624
625                         rc = qos_remedy_create(set, req);
626                         lov_update_create_set(set, req, rc);
627                 }
628         }
629
630         /* no successful creates */
631         if (set->set_success == 0)
632                 GOTO(cleanup, rc);
633
634         if (set->set_count != set->set_success) {
635                 set->set_count = set->set_success;
636                 qos_shrink_lsm(set);
637         }
638
639         OBDO_ALLOC(ret_oa);
640         if (ret_oa == NULL)
641                 GOTO(cleanup, rc = -ENOMEM);
642
643         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
644                 if (!req->rq_complete || req->rq_rc)
645                         continue;
646                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
647                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
648                                 req->rq_stripe, &attrset);
649         }
650         if (src_oa->o_valid & OBD_MD_FLSIZE &&
651             ret_oa->o_size != src_oa->o_size) {
652                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
653                        src_oa->o_size, ret_oa->o_size);
654                 LBUG();
655         }
656         ret_oa->o_id = src_oa->o_id;
657         ret_oa->o_seq = src_oa->o_seq;
658         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
659         memcpy(src_oa, ret_oa, sizeof(*src_oa));
660         OBDO_FREE(ret_oa);
661
662         *lsmp = set->set_oi->oi_md;
663         GOTO(done, rc = 0);
664
665 cleanup:
666         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
667                 struct obd_export *sub_exp;
668                 int err = 0;
669
670                 if (!req->rq_complete || req->rq_rc)
671                         continue;
672
673                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
674                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
675                                   NULL);
676                 if (err)
677                         CERROR("Failed to uncreate objid "LPX64" subobj "
678                                LPX64" on OST idx %d: rc = %d\n",
679                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
680                                req->rq_idx, rc);
681         }
682         if (*lsmp == NULL)
683                 obd_free_memmd(exp, &set->set_oi->oi_md);
684 done:
685         if (oti && set->set_cookies) {
686                 oti->oti_logcookies = set->set_cookies;
687                 if (!set->set_cookie_sent) {
688                         oti_free_cookies(oti);
689                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
690                 } else {
691                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
692                 }
693         }
694         RETURN(rc);
695 }
696
697 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
698 {
699         int rc = 0;
700         ENTRY;
701
702         if (set == NULL)
703                 RETURN(0);
704         LASSERT(set->set_exp);
705         if (set->set_completes)
706                 rc = create_done(set->set_exp, set, lsmp);
707
708         lov_put_reqset(set);
709         RETURN(rc);
710 }
711
712 int cb_create_update(void *cookie, int rc)
713 {
714         struct obd_info *oinfo = cookie;
715         struct lov_request *lovreq;
716
717         lovreq = container_of(oinfo, struct lov_request, rq_oi);
718
719         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
720                 if (lovreq->rq_idx == obd_fail_val)
721                         rc = -ENOTCONN;
722
723         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
724         if (lov_finished_set(lovreq->rq_rqset))
725                 lov_put_reqset(lovreq->rq_rqset);
726         return rc;
727 }
728
729 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
730                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
731                         struct obd_trans_info *oti,
732                         struct lov_request_set **reqset)
733 {
734         struct lov_request_set *set;
735         int rc = 0;
736         ENTRY;
737
738         OBD_ALLOC(set, sizeof(*set));
739         if (set == NULL)
740                 RETURN(-ENOMEM);
741         lov_init_set(set);
742
743         set->set_exp = exp;
744         set->set_oi = oinfo;
745         set->set_oi->oi_md = *lsmp;
746         set->set_oi->oi_oa = src_oa;
747         set->set_oti = oti;
748         lov_get_reqset(set);
749
750         rc = qos_prep_create(exp, set);
751         /* qos_shrink_lsm() may have allocated a new lsm */
752         *lsmp = oinfo->oi_md;
753         if (rc) {
754                 lov_fini_create_set(set, lsmp);
755                 lov_put_reqset(set);
756         } else {
757                 *reqset = set;
758         }
759         RETURN(rc);
760 }
761
762 static int common_attr_done(struct lov_request_set *set)
763 {
764         cfs_list_t *pos;
765         struct lov_request *req;
766         struct obdo *tmp_oa;
767         int rc = 0, attrset = 0;
768         ENTRY;
769
770         LASSERT(set->set_oi != NULL);
771
772         if (set->set_oi->oi_oa == NULL)
773                 RETURN(0);
774
775         if (!set->set_success)
776                 RETURN(-EIO);
777
778         OBDO_ALLOC(tmp_oa);
779         if (tmp_oa == NULL)
780                 GOTO(out, rc = -ENOMEM);
781
782         cfs_list_for_each (pos, &set->set_list) {
783                 req = cfs_list_entry(pos, struct lov_request, rq_link);
784
785                 if (!req->rq_complete || req->rq_rc)
786                         continue;
787                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
788                         continue;
789                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
790                                 req->rq_oi.oi_oa->o_valid,
791                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
792         }
793         if (!attrset) {
794                 CERROR("No stripes had valid attrs\n");
795                 rc = -EIO;
796         }
797         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
798             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
799                 /* When we take attributes of some epoch, we require all the
800                  * ost to be active. */
801                 CERROR("Not all the stripes had valid attrs\n");
802                 GOTO(out, rc = -EIO);
803         }
804
805         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
806         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
807 out:
808         if (tmp_oa)
809                 OBDO_FREE(tmp_oa);
810         RETURN(rc);
811
812 }
813
814 static int brw_done(struct lov_request_set *set)
815 {
816         struct lov_stripe_md *lsm = set->set_oi->oi_md;
817         struct lov_oinfo     *loi = NULL;
818         cfs_list_t *pos;
819         struct lov_request *req;
820         ENTRY;
821
822         cfs_list_for_each (pos, &set->set_list) {
823                 req = cfs_list_entry(pos, struct lov_request, rq_link);
824
825                 if (!req->rq_complete || req->rq_rc)
826                         continue;
827
828                 loi = lsm->lsm_oinfo[req->rq_stripe];
829
830                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
831                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
832         }
833
834         RETURN(0);
835 }
836
837 int lov_fini_brw_set(struct lov_request_set *set)
838 {
839         int rc = 0;
840         ENTRY;
841
842         if (set == NULL)
843                 RETURN(0);
844         LASSERT(set->set_exp);
845         if (set->set_completes) {
846                 rc = brw_done(set);
847                 /* FIXME update qos data here */
848         }
849         lov_put_reqset(set);
850
851         RETURN(rc);
852 }
853
854 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
855                      obd_count oa_bufs, struct brw_page *pga,
856                      struct obd_trans_info *oti,
857                      struct lov_request_set **reqset)
858 {
859         struct {
860                 obd_count       index;
861                 obd_count       count;
862                 obd_count       off;
863         } *info = NULL;
864         struct lov_request_set *set;
865         struct lov_obd *lov = &exp->exp_obd->u.lov;
866         int rc = 0, i, shift;
867         ENTRY;
868
869         OBD_ALLOC(set, sizeof(*set));
870         if (set == NULL)
871                 RETURN(-ENOMEM);
872         lov_init_set(set);
873
874         set->set_exp = exp;
875         set->set_oti = oti;
876         set->set_oi = oinfo;
877         set->set_oabufs = oa_bufs;
878         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
879         if (!set->set_pga)
880                 GOTO(out, rc = -ENOMEM);
881
882         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
883         if (!info)
884                 GOTO(out, rc = -ENOMEM);
885
886         /* calculate the page count for each stripe */
887         for (i = 0; i < oa_bufs; i++) {
888                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
889                 info[stripe].count++;
890         }
891
892         /* alloc and initialize lov request */
893         shift = 0;
894         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
895                 struct lov_oinfo *loi = NULL;
896                 struct lov_request *req;
897
898                 if (info[i].count == 0)
899                         continue;
900
901                 loi = oinfo->oi_md->lsm_oinfo[i];
902                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
903                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
904                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
905                         GOTO(out, rc = -EIO);
906                 }
907
908                 OBD_ALLOC(req, sizeof(*req));
909                 if (req == NULL)
910                         GOTO(out, rc = -ENOMEM);
911
912                 OBDO_ALLOC(req->rq_oi.oi_oa);
913                 if (req->rq_oi.oi_oa == NULL) {
914                         OBD_FREE(req, sizeof(*req));
915                         GOTO(out, rc = -ENOMEM);
916                 }
917
918                 if (oinfo->oi_oa) {
919                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
920                                sizeof(*req->rq_oi.oi_oa));
921                 }
922                 req->rq_oi.oi_oa->o_id = loi->loi_id;
923                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
924                 req->rq_oi.oi_oa->o_stripe_idx = i;
925
926                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
927                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
928                 if (req->rq_oi.oi_md == NULL) {
929                         OBDO_FREE(req->rq_oi.oi_oa);
930                         OBD_FREE(req, sizeof(*req));
931                         GOTO(out, rc = -ENOMEM);
932                 }
933
934                 req->rq_idx = loi->loi_ost_idx;
935                 req->rq_stripe = i;
936
937                 /* XXX LOV STACKING */
938                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
939                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
940                 req->rq_oabufs = info[i].count;
941                 req->rq_pgaidx = shift;
942                 shift += req->rq_oabufs;
943
944                 /* remember the index for sort brw_page array */
945                 info[i].index = req->rq_pgaidx;
946
947                 req->rq_oi.oi_capa = oinfo->oi_capa;
948
949                 lov_set_add_req(req, set);
950         }
951         if (!set->set_count)
952                 GOTO(out, rc = -EIO);
953
954         /* rotate & sort the brw_page array */
955         for (i = 0; i < oa_bufs; i++) {
956                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
957
958                 shift = info[stripe].index + info[stripe].off;
959                 LASSERT(shift < oa_bufs);
960                 set->set_pga[shift] = pga[i];
961                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
962                                   &set->set_pga[shift].off);
963                 info[stripe].off++;
964         }
965 out:
966         if (info)
967                 OBD_FREE_LARGE(info,
968                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
969
970         if (rc == 0)
971                 *reqset = set;
972         else
973                 lov_fini_brw_set(set);
974
975         RETURN(rc);
976 }
977
978 int lov_fini_getattr_set(struct lov_request_set *set)
979 {
980         int rc = 0;
981         ENTRY;
982
983         if (set == NULL)
984                 RETURN(0);
985         LASSERT(set->set_exp);
986         if (set->set_completes)
987                 rc = common_attr_done(set);
988
989         lov_put_reqset(set);
990
991         RETURN(rc);
992 }
993
994 /* The callback for osc_getattr_async that finilizes a request info when a
995  * response is received. */
996 static int cb_getattr_update(void *cookie, int rc)
997 {
998         struct obd_info *oinfo = cookie;
999         struct lov_request *lovreq;
1000         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1001         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1002 }
1003
1004 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1005                          struct lov_request_set **reqset)
1006 {
1007         struct lov_request_set *set;
1008         struct lov_obd *lov = &exp->exp_obd->u.lov;
1009         int rc = 0, i;
1010         ENTRY;
1011
1012         OBD_ALLOC(set, sizeof(*set));
1013         if (set == NULL)
1014                 RETURN(-ENOMEM);
1015         lov_init_set(set);
1016
1017         set->set_exp = exp;
1018         set->set_oi = oinfo;
1019
1020         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1021                 struct lov_oinfo *loi;
1022                 struct lov_request *req;
1023
1024                 loi = oinfo->oi_md->lsm_oinfo[i];
1025                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1026                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1027                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1028                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1029                                 /* SOM requires all the OSTs to be active. */
1030                                 GOTO(out_set, rc = -EIO);
1031                         continue;
1032                 }
1033
1034                 OBD_ALLOC(req, sizeof(*req));
1035                 if (req == NULL)
1036                         GOTO(out_set, rc = -ENOMEM);
1037
1038                 req->rq_stripe = i;
1039                 req->rq_idx = loi->loi_ost_idx;
1040
1041                 OBDO_ALLOC(req->rq_oi.oi_oa);
1042                 if (req->rq_oi.oi_oa == NULL) {
1043                         OBD_FREE(req, sizeof(*req));
1044                         GOTO(out_set, rc = -ENOMEM);
1045                 }
1046                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1047                        sizeof(*req->rq_oi.oi_oa));
1048                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1049                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1050                 req->rq_oi.oi_cb_up = cb_getattr_update;
1051                 req->rq_oi.oi_capa = oinfo->oi_capa;
1052
1053                 lov_set_add_req(req, set);
1054         }
1055         if (!set->set_count)
1056                 GOTO(out_set, rc = -EIO);
1057         *reqset = set;
1058         RETURN(rc);
1059 out_set:
1060         lov_fini_getattr_set(set);
1061         RETURN(rc);
1062 }
1063
1064 int lov_fini_destroy_set(struct lov_request_set *set)
1065 {
1066         ENTRY;
1067
1068         if (set == NULL)
1069                 RETURN(0);
1070         LASSERT(set->set_exp);
1071         if (set->set_completes) {
1072                 /* FIXME update qos data here */
1073         }
1074
1075         lov_put_reqset(set);
1076
1077         RETURN(0);
1078 }
1079
1080 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1081                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1082                          struct obd_trans_info *oti,
1083                          struct lov_request_set **reqset)
1084 {
1085         struct lov_request_set *set;
1086         struct lov_obd *lov = &exp->exp_obd->u.lov;
1087         int rc = 0, i;
1088         ENTRY;
1089
1090         OBD_ALLOC(set, sizeof(*set));
1091         if (set == NULL)
1092                 RETURN(-ENOMEM);
1093         lov_init_set(set);
1094
1095         set->set_exp = exp;
1096         set->set_oi = oinfo;
1097         set->set_oi->oi_md = lsm;
1098         set->set_oi->oi_oa = src_oa;
1099         set->set_oti = oti;
1100         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1101                 set->set_cookies = oti->oti_logcookies;
1102
1103         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1104                 struct lov_oinfo *loi;
1105                 struct lov_request *req;
1106
1107                 loi = lsm->lsm_oinfo[i];
1108                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1109                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1110                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1111                         continue;
1112                 }
1113
1114                 OBD_ALLOC(req, sizeof(*req));
1115                 if (req == NULL)
1116                         GOTO(out_set, rc = -ENOMEM);
1117
1118                 req->rq_stripe = i;
1119                 req->rq_idx = loi->loi_ost_idx;
1120
1121                 OBDO_ALLOC(req->rq_oi.oi_oa);
1122                 if (req->rq_oi.oi_oa == NULL) {
1123                         OBD_FREE(req, sizeof(*req));
1124                         GOTO(out_set, rc = -ENOMEM);
1125                 }
1126                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1127                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1128                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1129                 lov_set_add_req(req, set);
1130         }
1131         if (!set->set_count)
1132                 GOTO(out_set, rc = -EIO);
1133         *reqset = set;
1134         RETURN(rc);
1135 out_set:
1136         lov_fini_destroy_set(set);
1137         RETURN(rc);
1138 }
1139
1140 int lov_fini_setattr_set(struct lov_request_set *set)
1141 {
1142         int rc = 0;
1143         ENTRY;
1144
1145         if (set == NULL)
1146                 RETURN(0);
1147         LASSERT(set->set_exp);
1148         if (set->set_completes) {
1149                 rc = common_attr_done(set);
1150                 /* FIXME update qos data here */
1151         }
1152
1153         lov_put_reqset(set);
1154         RETURN(rc);
1155 }
1156
1157 int lov_update_setattr_set(struct lov_request_set *set,
1158                            struct lov_request *req, int rc)
1159 {
1160         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1161         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1162         ENTRY;
1163
1164         lov_update_set(set, req, rc);
1165
1166         /* grace error on inactive ost */
1167         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1168                     lov->lov_tgts[req->rq_idx]->ltd_active))
1169                 rc = 0;
1170
1171         if (rc == 0) {
1172                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1173                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1174                                 req->rq_oi.oi_oa->o_ctime;
1175                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1176                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1177                                 req->rq_oi.oi_oa->o_mtime;
1178                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1179                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1180                                 req->rq_oi.oi_oa->o_atime;
1181         }
1182
1183         RETURN(rc);
1184 }
1185
1186 /* The callback for osc_setattr_async that finilizes a request info when a
1187  * response is received. */
1188 static int cb_setattr_update(void *cookie, int rc)
1189 {
1190         struct obd_info *oinfo = cookie;
1191         struct lov_request *lovreq;
1192         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1193         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1194 }
1195
1196 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1197                          struct obd_trans_info *oti,
1198                          struct lov_request_set **reqset)
1199 {
1200         struct lov_request_set *set;
1201         struct lov_obd *lov = &exp->exp_obd->u.lov;
1202         int rc = 0, i;
1203         ENTRY;
1204
1205         OBD_ALLOC(set, sizeof(*set));
1206         if (set == NULL)
1207                 RETURN(-ENOMEM);
1208         lov_init_set(set);
1209
1210         set->set_exp = exp;
1211         set->set_oti = oti;
1212         set->set_oi = oinfo;
1213         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1214                 set->set_cookies = oti->oti_logcookies;
1215
1216         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1217                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1218                 struct lov_request *req;
1219
1220                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1221                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1222                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1223                         continue;
1224                 }
1225
1226                 OBD_ALLOC(req, sizeof(*req));
1227                 if (req == NULL)
1228                         GOTO(out_set, rc = -ENOMEM);
1229                 req->rq_stripe = i;
1230                 req->rq_idx = loi->loi_ost_idx;
1231
1232                 OBDO_ALLOC(req->rq_oi.oi_oa);
1233                 if (req->rq_oi.oi_oa == NULL) {
1234                         OBD_FREE(req, sizeof(*req));
1235                         GOTO(out_set, rc = -ENOMEM);
1236                 }
1237                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1238                        sizeof(*req->rq_oi.oi_oa));
1239                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1240                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1241                 req->rq_oi.oi_oa->o_stripe_idx = i;
1242                 req->rq_oi.oi_cb_up = cb_setattr_update;
1243                 req->rq_oi.oi_capa = oinfo->oi_capa;
1244
1245                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1246                         int off = lov_stripe_offset(oinfo->oi_md,
1247                                                     oinfo->oi_oa->o_size, i,
1248                                                     &req->rq_oi.oi_oa->o_size);
1249
1250                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1251                                 req->rq_oi.oi_oa->o_size--;
1252
1253                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1254                                i, req->rq_oi.oi_oa->o_size,
1255                                oinfo->oi_oa->o_size);
1256                 }
1257                 lov_set_add_req(req, set);
1258         }
1259         if (!set->set_count)
1260                 GOTO(out_set, rc = -EIO);
1261         *reqset = set;
1262         RETURN(rc);
1263 out_set:
1264         lov_fini_setattr_set(set);
1265         RETURN(rc);
1266 }
1267
1268 int lov_fini_punch_set(struct lov_request_set *set)
1269 {
1270         int rc = 0;
1271         ENTRY;
1272
1273         if (set == NULL)
1274                 RETURN(0);
1275         LASSERT(set->set_exp);
1276         if (set->set_completes) {
1277                 rc = -EIO;
1278                 /* FIXME update qos data here */
1279                 if (set->set_success)
1280                         rc = common_attr_done(set);
1281         }
1282
1283         lov_put_reqset(set);
1284
1285         RETURN(rc);
1286 }
1287
1288 int lov_update_punch_set(struct lov_request_set *set,
1289                          struct lov_request *req, int rc)
1290 {
1291         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1292         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1293         ENTRY;
1294
1295         lov_update_set(set, req, rc);
1296
1297         /* grace error on inactive ost */
1298         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1299                 rc = 0;
1300
1301         if (rc == 0) {
1302                 lov_stripe_lock(lsm);
1303                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1304                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1305                                 req->rq_oi.oi_oa->o_blocks;
1306                 }
1307
1308                 /* Do we need to update lvb_size here? It needn't because
1309                  * it have been done in ll_truncate(). -jay */
1310                 lov_stripe_unlock(lsm);
1311         }
1312
1313         RETURN(rc);
1314 }
1315
1316 /* The callback for osc_punch that finilizes a request info when a response
1317  * is received. */
1318 static int cb_update_punch(void *cookie, int rc)
1319 {
1320         struct obd_info *oinfo = cookie;
1321         struct lov_request *lovreq;
1322         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1323         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1324 }
1325
1326 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1327                        struct obd_trans_info *oti,
1328                        struct lov_request_set **reqset)
1329 {
1330         struct lov_request_set *set;
1331         struct lov_obd *lov = &exp->exp_obd->u.lov;
1332         int rc = 0, i;
1333         ENTRY;
1334
1335         OBD_ALLOC(set, sizeof(*set));
1336         if (set == NULL)
1337                 RETURN(-ENOMEM);
1338         lov_init_set(set);
1339
1340         set->set_oi = oinfo;
1341         set->set_exp = exp;
1342
1343         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1344                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1345                 struct lov_request *req;
1346                 obd_off rs, re;
1347
1348                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1349                                            oinfo->oi_policy.l_extent.start,
1350                                            oinfo->oi_policy.l_extent.end,
1351                                            &rs, &re))
1352                         continue;
1353
1354                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1355                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1356                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1357                         GOTO(out_set, rc = -EIO);
1358                 }
1359
1360                 OBD_ALLOC(req, sizeof(*req));
1361                 if (req == NULL)
1362                         GOTO(out_set, rc = -ENOMEM);
1363                 req->rq_stripe = i;
1364                 req->rq_idx = loi->loi_ost_idx;
1365
1366                 OBDO_ALLOC(req->rq_oi.oi_oa);
1367                 if (req->rq_oi.oi_oa == NULL) {
1368                         OBD_FREE(req, sizeof(*req));
1369                         GOTO(out_set, rc = -ENOMEM);
1370                 }
1371                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1372                        sizeof(*req->rq_oi.oi_oa));
1373                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1374                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1375                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1376
1377                 req->rq_oi.oi_oa->o_stripe_idx = i;
1378                 req->rq_oi.oi_cb_up = cb_update_punch;
1379
1380                 req->rq_oi.oi_policy.l_extent.start = rs;
1381                 req->rq_oi.oi_policy.l_extent.end = re;
1382                 req->rq_oi.oi_policy.l_extent.gid = -1;
1383
1384                 req->rq_oi.oi_capa = oinfo->oi_capa;
1385
1386                 lov_set_add_req(req, set);
1387         }
1388         if (!set->set_count)
1389                 GOTO(out_set, rc = -EIO);
1390         *reqset = set;
1391         RETURN(rc);
1392 out_set:
1393         lov_fini_punch_set(set);
1394         RETURN(rc);
1395 }
1396
1397 int lov_fini_sync_set(struct lov_request_set *set)
1398 {
1399         int rc = 0;
1400         ENTRY;
1401
1402         if (set == NULL)
1403                 RETURN(0);
1404         LASSERT(set->set_exp);
1405         if (set->set_completes) {
1406                 if (!set->set_success)
1407                         rc = -EIO;
1408                 /* FIXME update qos data here */
1409         }
1410
1411         lov_put_reqset(set);
1412
1413         RETURN(rc);
1414 }
1415
1416 /* The callback for osc_sync that finilizes a request info when a
1417  * response is recieved. */
1418 static int cb_sync_update(void *cookie, int rc)
1419 {
1420         struct obd_info *oinfo = cookie;
1421         struct lov_request *lovreq;
1422
1423         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1424         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1425 }
1426
1427 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1428                       obd_off start, obd_off end,
1429                       struct lov_request_set **reqset)
1430 {
1431         struct lov_request_set *set;
1432         struct lov_obd *lov = &exp->exp_obd->u.lov;
1433         int rc = 0, i;
1434         ENTRY;
1435
1436         OBD_ALLOC_PTR(set);
1437         if (set == NULL)
1438                 RETURN(-ENOMEM);
1439         lov_init_set(set);
1440
1441         set->set_exp = exp;
1442         set->set_oi = oinfo;
1443
1444         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1445                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1446                 struct lov_request *req;
1447                 obd_off rs, re;
1448
1449                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1450                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1451                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1452                         continue;
1453                 }
1454
1455                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1456                                            &re))
1457                         continue;
1458
1459                 OBD_ALLOC_PTR(req);
1460                 if (req == NULL)
1461                         GOTO(out_set, rc = -ENOMEM);
1462                 req->rq_stripe = i;
1463                 req->rq_idx = loi->loi_ost_idx;
1464
1465                 OBDO_ALLOC(req->rq_oi.oi_oa);
1466                 if (req->rq_oi.oi_oa == NULL) {
1467                         OBD_FREE(req, sizeof(*req));
1468                         GOTO(out_set, rc = -ENOMEM);
1469                 }
1470                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1471                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1472                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1473                 req->rq_oi.oi_oa->o_stripe_idx = i;
1474
1475                 req->rq_oi.oi_policy.l_extent.start = rs;
1476                 req->rq_oi.oi_policy.l_extent.end = re;
1477                 req->rq_oi.oi_policy.l_extent.gid = -1;
1478                 req->rq_oi.oi_cb_up = cb_sync_update;
1479
1480                 lov_set_add_req(req, set);
1481         }
1482         if (!set->set_count)
1483                 GOTO(out_set, rc = -EIO);
1484         *reqset = set;
1485         RETURN(rc);
1486 out_set:
1487         lov_fini_sync_set(set);
1488         RETURN(rc);
1489 }
1490
1491 #define LOV_U64_MAX ((__u64)~0ULL)
1492 #define LOV_SUM_MAX(tot, add)                                           \
1493         do {                                                            \
1494                 if ((tot) + (add) < (tot))                              \
1495                         (tot) = LOV_U64_MAX;                            \
1496                 else                                                    \
1497                         (tot) += (add);                                 \
1498         } while(0)
1499
1500 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1501 {
1502         ENTRY;
1503
1504         if (success) {
1505                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1506
1507                 if (osfs->os_files != LOV_U64_MAX)
1508                         do_div(osfs->os_files, expected_stripes);
1509                 if (osfs->os_ffree != LOV_U64_MAX)
1510                         do_div(osfs->os_ffree, expected_stripes);
1511
1512                 cfs_spin_lock(&obd->obd_osfs_lock);
1513                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1514                 obd->obd_osfs_age = cfs_time_current_64();
1515                 cfs_spin_unlock(&obd->obd_osfs_lock);
1516                 RETURN(0);
1517         }
1518
1519         RETURN(-EIO);
1520 }
1521
1522 int lov_fini_statfs_set(struct lov_request_set *set)
1523 {
1524         int rc = 0;
1525         ENTRY;
1526
1527         if (set == NULL)
1528                 RETURN(0);
1529
1530         if (set->set_completes) {
1531                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1532                                      set->set_success);
1533         }
1534         lov_put_reqset(set);
1535         RETURN(rc);
1536 }
1537
1538 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1539                        int success)
1540 {
1541         int shift = 0, quit = 0;
1542         __u64 tmp;
1543
1544         if (success == 0) {
1545                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1546         } else {
1547                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1548                         /* assume all block sizes are always powers of 2 */
1549                         /* get the bits difference */
1550                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1551                         for (shift = 0; shift <= 64; ++shift) {
1552                                 if (tmp & 1) {
1553                                         if (quit)
1554                                                 break;
1555                                         else
1556                                                 quit = 1;
1557                                         shift = 0;
1558                                 }
1559                                 tmp >>= 1;
1560                         }
1561                 }
1562
1563                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1564                         osfs->os_bsize = lov_sfs->os_bsize;
1565
1566                         osfs->os_bfree  >>= shift;
1567                         osfs->os_bavail >>= shift;
1568                         osfs->os_blocks >>= shift;
1569                 } else if (shift != 0) {
1570                         lov_sfs->os_bfree  >>= shift;
1571                         lov_sfs->os_bavail >>= shift;
1572                         lov_sfs->os_blocks >>= shift;
1573                 }
1574 #ifdef MIN_DF
1575                 /* Sandia requested that df (and so, statfs) only
1576                    returned minimal available space on
1577                    a single OST, so people would be able to
1578                    write this much data guaranteed. */
1579                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1580                         /* Presumably if new bavail is smaller,
1581                            new bfree is bigger as well */
1582                         osfs->os_bfree = lov_sfs->os_bfree;
1583                         osfs->os_bavail = lov_sfs->os_bavail;
1584                 }
1585 #else
1586                 osfs->os_bfree += lov_sfs->os_bfree;
1587                 osfs->os_bavail += lov_sfs->os_bavail;
1588 #endif
1589                 osfs->os_blocks += lov_sfs->os_blocks;
1590                 /* XXX not sure about this one - depends on policy.
1591                  *   - could be minimum if we always stripe on all OBDs
1592                  *     (but that would be wrong for any other policy,
1593                  *     if one of the OBDs has no more objects left)
1594                  *   - could be sum if we stripe whole objects
1595                  *   - could be average, just to give a nice number
1596                  *
1597                  * To give a "reasonable" (if not wholly accurate)
1598                  * number, we divide the total number of free objects
1599                  * by expected stripe count (watch out for overflow).
1600                  */
1601                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1602                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1603         }
1604 }
1605
1606 /* The callback for osc_statfs_async that finilizes a request info when a
1607  * response is received. */
1608 static int cb_statfs_update(void *cookie, int rc)
1609 {
1610         struct obd_info *oinfo = cookie;
1611         struct lov_request *lovreq;
1612         struct obd_statfs *osfs, *lov_sfs;
1613         struct lov_obd *lov;
1614         struct lov_tgt_desc *tgt;
1615         struct obd_device *lovobd, *tgtobd;
1616         int success;
1617         ENTRY;
1618
1619         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1620         lovobd = lovreq->rq_rqset->set_obd;
1621         lov = &lovobd->u.lov;
1622         osfs = lovreq->rq_rqset->set_oi->oi_osfs;
1623         lov_sfs = oinfo->oi_osfs;
1624         success = lovreq->rq_rqset->set_success;
1625         /* XXX: the same is done in lov_update_common_set, however
1626            lovset->set_exp is not initialized. */
1627         lov_update_set(lovreq->rq_rqset, lovreq, rc);
1628         if (rc)
1629                 GOTO(out, rc);
1630
1631         obd_getref(lovobd);
1632         tgt = lov->lov_tgts[lovreq->rq_idx];
1633         if (!tgt || !tgt->ltd_active)
1634                 GOTO(out_update, rc);
1635
1636         tgtobd = class_exp2obd(tgt->ltd_exp);
1637         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1638         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1639         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1640                 tgtobd->obd_osfs_age = cfs_time_current_64();
1641         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1642
1643 out_update:
1644         lov_update_statfs(osfs, lov_sfs, success);
1645         qos_update(lov);
1646         obd_putref(lovobd);
1647
1648 out:
1649         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1650             lov_finished_set(lovreq->rq_rqset)) {
1651                lov_statfs_interpret(NULL, lovreq->rq_rqset,
1652                                     lovreq->rq_rqset->set_success !=
1653                                                   lovreq->rq_rqset->set_count);
1654                qos_statfs_done(lov);
1655         }
1656
1657         RETURN(0);
1658 }
1659
1660 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1661                         struct lov_request_set **reqset)
1662 {
1663         struct lov_request_set *set;
1664         struct lov_obd *lov = &obd->u.lov;
1665         int rc = 0, i;
1666         ENTRY;
1667
1668         OBD_ALLOC(set, sizeof(*set));
1669         if (set == NULL)
1670                 RETURN(-ENOMEM);
1671         lov_init_set(set);
1672
1673         set->set_obd = obd;
1674         set->set_oi = oinfo;
1675
1676         /* We only get block data from the OBD */
1677         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1678                 struct lov_request *req;
1679
1680                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1681                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1682                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1683                         continue;
1684                 }
1685
1686                 /* skip targets that have been explicitely disabled by the
1687                  * administrator */
1688                 if (!lov->lov_tgts[i]->ltd_exp) {
1689                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1690                         continue;
1691                 }
1692
1693                 OBD_ALLOC(req, sizeof(*req));
1694                 if (req == NULL)
1695                         GOTO(out_set, rc = -ENOMEM);
1696
1697                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1698                 if (req->rq_oi.oi_osfs == NULL) {
1699                         OBD_FREE(req, sizeof(*req));
1700                         GOTO(out_set, rc = -ENOMEM);
1701                 }
1702
1703                 req->rq_idx = i;
1704                 req->rq_oi.oi_cb_up = cb_statfs_update;
1705                 req->rq_oi.oi_flags = oinfo->oi_flags;
1706
1707                 lov_set_add_req(req, set);
1708         }
1709         if (!set->set_count)
1710                 GOTO(out_set, rc = -EIO);
1711         *reqset = set;
1712         RETURN(rc);
1713 out_set:
1714         lov_fini_statfs_set(set);
1715         RETURN(rc);
1716 }