Whamcloud - gitweb
ORNL-22 general ptlrpcd threads pool support
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #ifdef __KERNEL__
46 #include <libcfs/libcfs.h>
47 #else
48 #include <liblustre.h>
49 #endif
50
51 #include <obd_class.h>
52 #include <obd_lov.h>
53 #include <lustre/lustre_idl.h>
54
55 #include "lov_internal.h"
56
57 static void lov_init_set(struct lov_request_set *set)
58 {
59         set->set_count = 0;
60         cfs_atomic_set(&set->set_completes, 0);
61         cfs_atomic_set(&set->set_success, 0);
62         set->set_cookies = 0;
63         CFS_INIT_LIST_HEAD(&set->set_list);
64         cfs_atomic_set(&set->set_refcount, 1);
65         cfs_waitq_init(&set->set_waitq);
66         cfs_spin_lock_init(&set->set_lock);
67 }
68
69 void lov_finish_set(struct lov_request_set *set)
70 {
71         cfs_list_t *pos, *n;
72         ENTRY;
73
74         LASSERT(set);
75         cfs_list_for_each_safe(pos, n, &set->set_list) {
76                 struct lov_request *req = cfs_list_entry(pos,
77                                                          struct lov_request,
78                                                          rq_link);
79                 cfs_list_del_init(&req->rq_link);
80
81                 if (req->rq_oi.oi_oa)
82                         OBDO_FREE(req->rq_oi.oi_oa);
83                 if (req->rq_oi.oi_md)
84                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
85                 if (req->rq_oi.oi_osfs)
86                         OBD_FREE(req->rq_oi.oi_osfs,
87                                  sizeof(*req->rq_oi.oi_osfs));
88                 OBD_FREE(req, sizeof(*req));
89         }
90
91         if (set->set_pga) {
92                 int len = set->set_oabufs * sizeof(*set->set_pga);
93                 OBD_FREE_LARGE(set->set_pga, len);
94         }
95         if (set->set_lockh)
96                 lov_llh_put(set->set_lockh);
97
98         OBD_FREE(set, sizeof(*set));
99         EXIT;
100 }
101
102 int lov_finished_set(struct lov_request_set *set)
103 {
104         int completes = cfs_atomic_read(&set->set_completes);
105
106         CDEBUG(D_INFO, "check set %d/%d\n", completes,
107                set->set_count);
108         return completes == set->set_count;
109 }
110
111 void lov_update_set(struct lov_request_set *set,
112                     struct lov_request *req, int rc)
113 {
114         req->rq_complete = 1;
115         req->rq_rc = rc;
116
117         cfs_atomic_inc(&set->set_completes);
118         if (rc == 0)
119                 cfs_atomic_inc(&set->set_success);
120
121         cfs_waitq_signal(&set->set_waitq);
122 }
123
124 int lov_update_common_set(struct lov_request_set *set,
125                           struct lov_request *req, int rc)
126 {
127         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
128         ENTRY;
129
130         lov_update_set(set, req, rc);
131
132         /* grace error on inactive ost */
133         if (rc && !(lov->lov_tgts[req->rq_idx] &&
134                     lov->lov_tgts[req->rq_idx]->ltd_active))
135                 rc = 0;
136
137         /* FIXME in raid1 regime, should return 0 */
138         RETURN(rc);
139 }
140
141 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
142 {
143         cfs_list_add_tail(&req->rq_link, &set->set_list);
144         set->set_count++;
145         req->rq_rqset = set;
146 }
147
148 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
149                                struct lov_oinfo *loi, int flags,
150                                struct ost_lvb *lvb, __u32 mode, int rc);
151
152 static int lov_update_enqueue_lov(struct obd_export *exp,
153                                   struct lustre_handle *lov_lockhp,
154                                   struct lov_oinfo *loi, int flags, int idx,
155                                   __u64 oid, int rc)
156 {
157         struct lov_obd *lov = &exp->exp_obd->u.lov;
158
159         if (rc != ELDLM_OK &&
160             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
161                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
162                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
163                         /* -EUSERS used by OST to report file contention */
164                         if (rc != -EINTR && rc != -EUSERS)
165                                 CERROR("enqueue objid "LPX64" subobj "
166                                        LPX64" on OST idx %d: rc %d\n",
167                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
168                 } else
169                         rc = ELDLM_OK;
170         }
171         return rc;
172 }
173
174 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
175 {
176         struct lov_request_set *set = req->rq_rqset;
177         struct lustre_handle *lov_lockhp;
178         struct obd_info *oi = set->set_oi;
179         struct lov_oinfo *loi;
180         ENTRY;
181
182         LASSERT(oi != NULL);
183
184         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
185         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
186
187         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
188          * and that copy can be arbitrarily out of date.
189          *
190          * The LOV API is due for a serious rewriting anyways, and this
191          * can be addressed then. */
192
193         lov_stripe_lock(oi->oi_md);
194         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
195                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
196         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
197                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
198         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
199                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
200         lov_stripe_unlock(oi->oi_md);
201         lov_update_set(set, req, rc);
202         RETURN(rc);
203 }
204
205 /* The callback for osc_enqueue that updates lov info for every OSC request. */
206 static int cb_update_enqueue(void *cookie, int rc)
207 {
208         struct obd_info *oinfo = cookie;
209         struct ldlm_enqueue_info *einfo;
210         struct lov_request *lovreq;
211
212         lovreq = container_of(oinfo, struct lov_request, rq_oi);
213         einfo = lovreq->rq_rqset->set_ei;
214         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
215 }
216
217 static int enqueue_done(struct lov_request_set *set, __u32 mode)
218 {
219         struct lov_request *req;
220         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
221         int completes = cfs_atomic_read(&set->set_completes);
222         int rc = 0;
223         ENTRY;
224
225         /* enqueue/match success, just return */
226         if (completes && completes == cfs_atomic_read(&set->set_success))
227                 RETURN(0);
228
229         /* cancel enqueued/matched locks */
230         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
231                 struct lustre_handle *lov_lockhp;
232
233                 if (!req->rq_complete || req->rq_rc)
234                         continue;
235
236                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
237                 LASSERT(lov_lockhp);
238                 if (!lustre_handle_is_used(lov_lockhp))
239                         continue;
240
241                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
242                                 req->rq_oi.oi_md, mode, lov_lockhp);
243                 if (rc && lov->lov_tgts[req->rq_idx] &&
244                     lov->lov_tgts[req->rq_idx]->ltd_active)
245                         CERROR("cancelling obdjid "LPX64" on OST "
246                                "idx %d error: rc = %d\n",
247                                req->rq_oi.oi_md->lsm_object_id,
248                                req->rq_idx, rc);
249         }
250         if (set->set_lockh)
251                 lov_llh_put(set->set_lockh);
252         RETURN(rc);
253 }
254
255 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
256                          struct ptlrpc_request_set *rqset)
257 {
258         int ret = 0;
259         ENTRY;
260
261         if (set == NULL)
262                 RETURN(0);
263         LASSERT(set->set_exp);
264         /* Do enqueue_done only for sync requests and if any request
265          * succeeded. */
266         if (!rqset) {
267                 if (rc)
268                         cfs_atomic_set(&set->set_completes, 0);
269                 ret = enqueue_done(set, mode);
270         } else if (set->set_lockh)
271                 lov_llh_put(set->set_lockh);
272
273         lov_put_reqset(set);
274
275         RETURN(rc ? rc : ret);
276 }
277
278 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
279                          struct ldlm_enqueue_info *einfo,
280                          struct lov_request_set **reqset)
281 {
282         struct lov_obd *lov = &exp->exp_obd->u.lov;
283         struct lov_request_set *set;
284         int i, rc = 0;
285         ENTRY;
286
287         OBD_ALLOC(set, sizeof(*set));
288         if (set == NULL)
289                 RETURN(-ENOMEM);
290         lov_init_set(set);
291
292         set->set_exp = exp;
293         set->set_oi = oinfo;
294         set->set_ei = einfo;
295         set->set_lockh = lov_llh_new(oinfo->oi_md);
296         if (set->set_lockh == NULL)
297                 GOTO(out_set, rc = -ENOMEM);
298         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
299
300         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
301                 struct lov_oinfo *loi;
302                 struct lov_request *req;
303                 obd_off start, end;
304
305                 loi = oinfo->oi_md->lsm_oinfo[i];
306                 if (!lov_stripe_intersects(oinfo->oi_md, i,
307                                            oinfo->oi_policy.l_extent.start,
308                                            oinfo->oi_policy.l_extent.end,
309                                            &start, &end))
310                         continue;
311
312                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
313                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
314                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
315                         continue;
316                 }
317
318                 OBD_ALLOC(req, sizeof(*req));
319                 if (req == NULL)
320                         GOTO(out_set, rc = -ENOMEM);
321
322                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
323                         sizeof(struct lov_oinfo *) +
324                         sizeof(struct lov_oinfo);
325                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
326                 if (req->rq_oi.oi_md == NULL) {
327                         OBD_FREE(req, sizeof(*req));
328                         GOTO(out_set, rc = -ENOMEM);
329                 }
330                 req->rq_oi.oi_md->lsm_oinfo[0] =
331                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
332                         sizeof(struct lov_oinfo *);
333
334                 /* Set lov request specific parameters. */
335                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
336                 req->rq_oi.oi_cb_up = cb_update_enqueue;
337                 req->rq_oi.oi_flags = oinfo->oi_flags;
338
339                 LASSERT(req->rq_oi.oi_lockh);
340
341                 req->rq_oi.oi_policy.l_extent.gid =
342                         oinfo->oi_policy.l_extent.gid;
343                 req->rq_oi.oi_policy.l_extent.start = start;
344                 req->rq_oi.oi_policy.l_extent.end = end;
345
346                 req->rq_idx = loi->loi_ost_idx;
347                 req->rq_stripe = i;
348
349                 /* XXX LOV STACKING: submd should be from the subobj */
350                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
351                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
352                 req->rq_oi.oi_md->lsm_stripe_count = 0;
353                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
354                         loi->loi_kms_valid;
355                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
356                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
357
358                 lov_set_add_req(req, set);
359         }
360         if (!set->set_count)
361                 GOTO(out_set, rc = -EIO);
362         *reqset = set;
363         RETURN(0);
364 out_set:
365         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
366         RETURN(rc);
367 }
368
369 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
370 {
371         int rc = 0;
372         ENTRY;
373
374         if (set == NULL)
375                 RETURN(0);
376         LASSERT(set->set_exp);
377         rc = enqueue_done(set, mode);
378         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
379             (flags & LDLM_FL_TEST_LOCK))
380                 lov_llh_put(set->set_lockh);
381
382         lov_put_reqset(set);
383
384         RETURN(rc);
385 }
386
387 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
388                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
389                        __u32 mode, struct lustre_handle *lockh,
390                        struct lov_request_set **reqset)
391 {
392         struct lov_obd *lov = &exp->exp_obd->u.lov;
393         struct lov_request_set *set;
394         int i, rc = 0;
395         ENTRY;
396
397         OBD_ALLOC(set, sizeof(*set));
398         if (set == NULL)
399                 RETURN(-ENOMEM);
400         lov_init_set(set);
401
402         set->set_exp = exp;
403         set->set_oi = oinfo;
404         set->set_oi->oi_md = lsm;
405         set->set_lockh = lov_llh_new(lsm);
406         if (set->set_lockh == NULL)
407                 GOTO(out_set, rc = -ENOMEM);
408         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
409
410         for (i = 0; i < lsm->lsm_stripe_count; i++){
411                 struct lov_oinfo *loi;
412                 struct lov_request *req;
413                 obd_off start, end;
414
415                 loi = lsm->lsm_oinfo[i];
416                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
417                                            policy->l_extent.end, &start, &end))
418                         continue;
419
420                 /* FIXME raid1 should grace this error */
421                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
422                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
423                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
424                         GOTO(out_set, rc = -EIO);
425                 }
426
427                 OBD_ALLOC(req, sizeof(*req));
428                 if (req == NULL)
429                         GOTO(out_set, rc = -ENOMEM);
430
431                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
432                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
433                 if (req->rq_oi.oi_md == NULL) {
434                         OBD_FREE(req, sizeof(*req));
435                         GOTO(out_set, rc = -ENOMEM);
436                 }
437
438                 req->rq_oi.oi_policy.l_extent.start = start;
439                 req->rq_oi.oi_policy.l_extent.end = end;
440                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
441
442                 req->rq_idx = loi->loi_ost_idx;
443                 req->rq_stripe = i;
444
445                 /* XXX LOV STACKING: submd should be from the subobj */
446                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
447                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
448                 req->rq_oi.oi_md->lsm_stripe_count = 0;
449
450                 lov_set_add_req(req, set);
451         }
452         if (!set->set_count)
453                 GOTO(out_set, rc = -EIO);
454         *reqset = set;
455         RETURN(rc);
456 out_set:
457         lov_fini_match_set(set, mode, 0);
458         RETURN(rc);
459 }
460
461 int lov_fini_cancel_set(struct lov_request_set *set)
462 {
463         int rc = 0;
464         ENTRY;
465
466         if (set == NULL)
467                 RETURN(0);
468
469         LASSERT(set->set_exp);
470         if (set->set_lockh)
471                 lov_llh_put(set->set_lockh);
472
473         lov_put_reqset(set);
474
475         RETURN(rc);
476 }
477
478 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
479                         struct lov_stripe_md *lsm, __u32 mode,
480                         struct lustre_handle *lockh,
481                         struct lov_request_set **reqset)
482 {
483         struct lov_request_set *set;
484         int i, rc = 0;
485         ENTRY;
486
487         OBD_ALLOC(set, sizeof(*set));
488         if (set == NULL)
489                 RETURN(-ENOMEM);
490         lov_init_set(set);
491
492         set->set_exp = exp;
493         set->set_oi = oinfo;
494         set->set_oi->oi_md = lsm;
495         set->set_lockh = lov_handle2llh(lockh);
496         if (set->set_lockh == NULL) {
497                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
498                 GOTO(out_set, rc = -EINVAL);
499         }
500         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
501
502         for (i = 0; i < lsm->lsm_stripe_count; i++){
503                 struct lov_request *req;
504                 struct lustre_handle *lov_lockhp;
505                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
506
507                 lov_lockhp = set->set_lockh->llh_handles + i;
508                 if (!lustre_handle_is_used(lov_lockhp)) {
509                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
510                                loi->loi_ost_idx, loi->loi_id);
511                         continue;
512                 }
513
514                 OBD_ALLOC(req, sizeof(*req));
515                 if (req == NULL)
516                         GOTO(out_set, rc = -ENOMEM);
517
518                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
519                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
520                 if (req->rq_oi.oi_md == NULL) {
521                         OBD_FREE(req, sizeof(*req));
522                         GOTO(out_set, rc = -ENOMEM);
523                 }
524
525                 req->rq_idx = loi->loi_ost_idx;
526                 req->rq_stripe = i;
527
528                 /* XXX LOV STACKING: submd should be from the subobj */
529                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
530                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
531                 req->rq_oi.oi_md->lsm_stripe_count = 0;
532
533                 lov_set_add_req(req, set);
534         }
535         if (!set->set_count)
536                 GOTO(out_set, rc = -EIO);
537         *reqset = set;
538         RETURN(rc);
539 out_set:
540         lov_fini_cancel_set(set);
541         RETURN(rc);
542 }
543
544 static int lov_update_create_set(struct lov_request_set *set,
545                                  struct lov_request *req, int rc)
546 {
547         struct obd_trans_info *oti = set->set_oti;
548         struct lov_stripe_md *lsm = set->set_oi->oi_md;
549         struct lov_oinfo *loi;
550         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
551         ENTRY;
552
553         if (rc && lov->lov_tgts[req->rq_idx] &&
554             lov->lov_tgts[req->rq_idx]->ltd_active) {
555                 CERROR("error creating fid "LPX64" sub-object"
556                        " on OST idx %d/%d: rc = %d\n",
557                        set->set_oi->oi_oa->o_id, req->rq_idx,
558                        lsm->lsm_stripe_count, rc);
559                 if (rc > 0) {
560                         CERROR("obd_create returned invalid err %d\n", rc);
561                         rc = -EIO;
562                 }
563         }
564
565         cfs_spin_lock(&set->set_lock);
566         req->rq_stripe = cfs_atomic_read(&set->set_success);
567         loi = lsm->lsm_oinfo[req->rq_stripe];
568
569
570         if (rc) {
571                 lov_update_set(set, req, rc);
572                 cfs_spin_unlock(&set->set_lock);
573                 RETURN(rc);
574         }
575
576         loi->loi_id = req->rq_oi.oi_oa->o_id;
577         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
578         loi->loi_ost_idx = req->rq_idx;
579         loi_init(loi);
580
581         if (oti && set->set_cookies)
582                 ++oti->oti_logcookies;
583         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
584                 set->set_cookie_sent++;
585
586         lov_update_set(set, req, rc);
587         cfs_spin_unlock(&set->set_lock);
588
589         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
590                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
591         RETURN(rc);
592 }
593
594 static int create_done(struct obd_export *exp, struct lov_request_set *set,
595                        struct lov_stripe_md **lsmp)
596 {
597         struct lov_obd *lov = &exp->exp_obd->u.lov;
598         struct obd_trans_info *oti = set->set_oti;
599         struct obdo *src_oa = set->set_oi->oi_oa;
600         struct lov_request *req;
601         struct obdo *ret_oa = NULL;
602         int success, attrset = 0, rc = 0;
603         ENTRY;
604
605         LASSERT(cfs_atomic_read(&set->set_completes));
606
607         /* try alloc objects on other osts if osc_create fails for
608          * exceptions: RPC failure, ENOSPC, etc */
609         if (set->set_count != cfs_atomic_read(&set->set_success)) {
610                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
611                         if (req->rq_rc == 0)
612                                 continue;
613
614                         cfs_atomic_dec(&set->set_completes);
615                         req->rq_complete = 0;
616
617                         rc = qos_remedy_create(set, req);
618                         lov_update_create_set(set, req, rc);
619                 }
620         }
621
622         success = cfs_atomic_read(&set->set_success);
623         /* no successful creates */
624         if (success == 0)
625                 GOTO(cleanup, rc);
626
627         if (set->set_count != success) {
628                 set->set_count = success;
629                 qos_shrink_lsm(set);
630         }
631
632         OBDO_ALLOC(ret_oa);
633         if (ret_oa == NULL)
634                 GOTO(cleanup, rc = -ENOMEM);
635
636         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
637                 if (!req->rq_complete || req->rq_rc)
638                         continue;
639                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
640                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
641                                 req->rq_stripe, &attrset);
642         }
643         if (src_oa->o_valid & OBD_MD_FLSIZE &&
644             ret_oa->o_size != src_oa->o_size) {
645                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
646                        src_oa->o_size, ret_oa->o_size);
647                 LBUG();
648         }
649         ret_oa->o_id = src_oa->o_id;
650         ret_oa->o_seq = src_oa->o_seq;
651         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
652         memcpy(src_oa, ret_oa, sizeof(*src_oa));
653         OBDO_FREE(ret_oa);
654
655         *lsmp = set->set_oi->oi_md;
656         GOTO(done, rc = 0);
657
658 cleanup:
659         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
660                 struct obd_export *sub_exp;
661                 int err = 0;
662
663                 if (!req->rq_complete || req->rq_rc)
664                         continue;
665
666                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
667                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
668                                   NULL);
669                 if (err)
670                         CERROR("Failed to uncreate objid "LPX64" subobj "
671                                LPX64" on OST idx %d: rc = %d\n",
672                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
673                                req->rq_idx, rc);
674         }
675         if (*lsmp == NULL)
676                 obd_free_memmd(exp, &set->set_oi->oi_md);
677 done:
678         if (oti && set->set_cookies) {
679                 oti->oti_logcookies = set->set_cookies;
680                 if (!set->set_cookie_sent) {
681                         oti_free_cookies(oti);
682                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
683                 } else {
684                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
685                 }
686         }
687         RETURN(rc);
688 }
689
690 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
691 {
692         int rc = 0;
693         ENTRY;
694
695         if (set == NULL)
696                 RETURN(0);
697         LASSERT(set->set_exp);
698         if (cfs_atomic_read(&set->set_completes))
699                 rc = create_done(set->set_exp, set, lsmp);
700
701         lov_put_reqset(set);
702         RETURN(rc);
703 }
704
705 int cb_create_update(void *cookie, int rc)
706 {
707         struct obd_info *oinfo = cookie;
708         struct lov_request *lovreq;
709
710         lovreq = container_of(oinfo, struct lov_request, rq_oi);
711
712         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
713                 if (lovreq->rq_idx == cfs_fail_val)
714                         rc = -ENOTCONN;
715
716         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
717         if (lov_finished_set(lovreq->rq_rqset))
718                 lov_put_reqset(lovreq->rq_rqset);
719         return rc;
720 }
721
722 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
723                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
724                         struct obd_trans_info *oti,
725                         struct lov_request_set **reqset)
726 {
727         struct lov_request_set *set;
728         int rc = 0;
729         ENTRY;
730
731         OBD_ALLOC(set, sizeof(*set));
732         if (set == NULL)
733                 RETURN(-ENOMEM);
734         lov_init_set(set);
735
736         set->set_exp = exp;
737         set->set_oi = oinfo;
738         set->set_oi->oi_md = *lsmp;
739         set->set_oi->oi_oa = src_oa;
740         set->set_oti = oti;
741         lov_get_reqset(set);
742
743         rc = qos_prep_create(exp, set);
744         /* qos_shrink_lsm() may have allocated a new lsm */
745         *lsmp = oinfo->oi_md;
746         if (rc) {
747                 lov_fini_create_set(set, lsmp);
748                 lov_put_reqset(set);
749         } else {
750                 *reqset = set;
751         }
752         RETURN(rc);
753 }
754
755 static int common_attr_done(struct lov_request_set *set)
756 {
757         cfs_list_t *pos;
758         struct lov_request *req;
759         struct obdo *tmp_oa;
760         int rc = 0, attrset = 0;
761         ENTRY;
762
763         LASSERT(set->set_oi != NULL);
764
765         if (set->set_oi->oi_oa == NULL)
766                 RETURN(0);
767
768         if (!cfs_atomic_read(&set->set_success))
769                 RETURN(-EIO);
770
771         OBDO_ALLOC(tmp_oa);
772         if (tmp_oa == NULL)
773                 GOTO(out, rc = -ENOMEM);
774
775         cfs_list_for_each (pos, &set->set_list) {
776                 req = cfs_list_entry(pos, struct lov_request, rq_link);
777
778                 if (!req->rq_complete || req->rq_rc)
779                         continue;
780                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
781                         continue;
782                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
783                                 req->rq_oi.oi_oa->o_valid,
784                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
785         }
786         if (!attrset) {
787                 CERROR("No stripes had valid attrs\n");
788                 rc = -EIO;
789         }
790         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
791             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
792                 /* When we take attributes of some epoch, we require all the
793                  * ost to be active. */
794                 CERROR("Not all the stripes had valid attrs\n");
795                 GOTO(out, rc = -EIO);
796         }
797
798         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
799         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
800 out:
801         if (tmp_oa)
802                 OBDO_FREE(tmp_oa);
803         RETURN(rc);
804
805 }
806
807 static int brw_done(struct lov_request_set *set)
808 {
809         struct lov_stripe_md *lsm = set->set_oi->oi_md;
810         struct lov_oinfo     *loi = NULL;
811         cfs_list_t *pos;
812         struct lov_request *req;
813         ENTRY;
814
815         cfs_list_for_each (pos, &set->set_list) {
816                 req = cfs_list_entry(pos, struct lov_request, rq_link);
817
818                 if (!req->rq_complete || req->rq_rc)
819                         continue;
820
821                 loi = lsm->lsm_oinfo[req->rq_stripe];
822
823                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
824                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
825         }
826
827         RETURN(0);
828 }
829
830 int lov_fini_brw_set(struct lov_request_set *set)
831 {
832         int rc = 0;
833         ENTRY;
834
835         if (set == NULL)
836                 RETURN(0);
837         LASSERT(set->set_exp);
838         if (cfs_atomic_read(&set->set_completes)) {
839                 rc = brw_done(set);
840                 /* FIXME update qos data here */
841         }
842         lov_put_reqset(set);
843
844         RETURN(rc);
845 }
846
847 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
848                      obd_count oa_bufs, struct brw_page *pga,
849                      struct obd_trans_info *oti,
850                      struct lov_request_set **reqset)
851 {
852         struct {
853                 obd_count       index;
854                 obd_count       count;
855                 obd_count       off;
856         } *info = NULL;
857         struct lov_request_set *set;
858         struct lov_obd *lov = &exp->exp_obd->u.lov;
859         int rc = 0, i, shift;
860         ENTRY;
861
862         OBD_ALLOC(set, sizeof(*set));
863         if (set == NULL)
864                 RETURN(-ENOMEM);
865         lov_init_set(set);
866
867         set->set_exp = exp;
868         set->set_oti = oti;
869         set->set_oi = oinfo;
870         set->set_oabufs = oa_bufs;
871         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
872         if (!set->set_pga)
873                 GOTO(out, rc = -ENOMEM);
874
875         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
876         if (!info)
877                 GOTO(out, rc = -ENOMEM);
878
879         /* calculate the page count for each stripe */
880         for (i = 0; i < oa_bufs; i++) {
881                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
882                 info[stripe].count++;
883         }
884
885         /* alloc and initialize lov request */
886         shift = 0;
887         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
888                 struct lov_oinfo *loi = NULL;
889                 struct lov_request *req;
890
891                 if (info[i].count == 0)
892                         continue;
893
894                 loi = oinfo->oi_md->lsm_oinfo[i];
895                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
896                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
897                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
898                         GOTO(out, rc = -EIO);
899                 }
900
901                 OBD_ALLOC(req, sizeof(*req));
902                 if (req == NULL)
903                         GOTO(out, rc = -ENOMEM);
904
905                 OBDO_ALLOC(req->rq_oi.oi_oa);
906                 if (req->rq_oi.oi_oa == NULL) {
907                         OBD_FREE(req, sizeof(*req));
908                         GOTO(out, rc = -ENOMEM);
909                 }
910
911                 if (oinfo->oi_oa) {
912                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
913                                sizeof(*req->rq_oi.oi_oa));
914                 }
915                 req->rq_oi.oi_oa->o_id = loi->loi_id;
916                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
917                 req->rq_oi.oi_oa->o_stripe_idx = i;
918
919                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
920                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
921                 if (req->rq_oi.oi_md == NULL) {
922                         OBDO_FREE(req->rq_oi.oi_oa);
923                         OBD_FREE(req, sizeof(*req));
924                         GOTO(out, rc = -ENOMEM);
925                 }
926
927                 req->rq_idx = loi->loi_ost_idx;
928                 req->rq_stripe = i;
929
930                 /* XXX LOV STACKING */
931                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
932                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
933                 req->rq_oabufs = info[i].count;
934                 req->rq_pgaidx = shift;
935                 shift += req->rq_oabufs;
936
937                 /* remember the index for sort brw_page array */
938                 info[i].index = req->rq_pgaidx;
939
940                 req->rq_oi.oi_capa = oinfo->oi_capa;
941
942                 lov_set_add_req(req, set);
943         }
944         if (!set->set_count)
945                 GOTO(out, rc = -EIO);
946
947         /* rotate & sort the brw_page array */
948         for (i = 0; i < oa_bufs; i++) {
949                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
950
951                 shift = info[stripe].index + info[stripe].off;
952                 LASSERT(shift < oa_bufs);
953                 set->set_pga[shift] = pga[i];
954                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
955                                   &set->set_pga[shift].off);
956                 info[stripe].off++;
957         }
958 out:
959         if (info)
960                 OBD_FREE_LARGE(info,
961                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
962
963         if (rc == 0)
964                 *reqset = set;
965         else
966                 lov_fini_brw_set(set);
967
968         RETURN(rc);
969 }
970
971 int lov_fini_getattr_set(struct lov_request_set *set)
972 {
973         int rc = 0;
974         ENTRY;
975
976         if (set == NULL)
977                 RETURN(0);
978         LASSERT(set->set_exp);
979         if (cfs_atomic_read(&set->set_completes))
980                 rc = common_attr_done(set);
981
982         lov_put_reqset(set);
983
984         RETURN(rc);
985 }
986
987 /* The callback for osc_getattr_async that finilizes a request info when a
988  * response is received. */
989 static int cb_getattr_update(void *cookie, int rc)
990 {
991         struct obd_info *oinfo = cookie;
992         struct lov_request *lovreq;
993         lovreq = container_of(oinfo, struct lov_request, rq_oi);
994         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
995 }
996
997 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
998                          struct lov_request_set **reqset)
999 {
1000         struct lov_request_set *set;
1001         struct lov_obd *lov = &exp->exp_obd->u.lov;
1002         int rc = 0, i;
1003         ENTRY;
1004
1005         OBD_ALLOC(set, sizeof(*set));
1006         if (set == NULL)
1007                 RETURN(-ENOMEM);
1008         lov_init_set(set);
1009
1010         set->set_exp = exp;
1011         set->set_oi = oinfo;
1012
1013         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1014                 struct lov_oinfo *loi;
1015                 struct lov_request *req;
1016
1017                 loi = oinfo->oi_md->lsm_oinfo[i];
1018                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1019                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1020                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1021                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1022                                 /* SOM requires all the OSTs to be active. */
1023                                 GOTO(out_set, rc = -EIO);
1024                         continue;
1025                 }
1026
1027                 OBD_ALLOC(req, sizeof(*req));
1028                 if (req == NULL)
1029                         GOTO(out_set, rc = -ENOMEM);
1030
1031                 req->rq_stripe = i;
1032                 req->rq_idx = loi->loi_ost_idx;
1033
1034                 OBDO_ALLOC(req->rq_oi.oi_oa);
1035                 if (req->rq_oi.oi_oa == NULL) {
1036                         OBD_FREE(req, sizeof(*req));
1037                         GOTO(out_set, rc = -ENOMEM);
1038                 }
1039                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1040                        sizeof(*req->rq_oi.oi_oa));
1041                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1042                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1043                 req->rq_oi.oi_cb_up = cb_getattr_update;
1044                 req->rq_oi.oi_capa = oinfo->oi_capa;
1045
1046                 lov_set_add_req(req, set);
1047         }
1048         if (!set->set_count)
1049                 GOTO(out_set, rc = -EIO);
1050         *reqset = set;
1051         RETURN(rc);
1052 out_set:
1053         lov_fini_getattr_set(set);
1054         RETURN(rc);
1055 }
1056
1057 int lov_fini_destroy_set(struct lov_request_set *set)
1058 {
1059         ENTRY;
1060
1061         if (set == NULL)
1062                 RETURN(0);
1063         LASSERT(set->set_exp);
1064         if (cfs_atomic_read(&set->set_completes)) {
1065                 /* FIXME update qos data here */
1066         }
1067
1068         lov_put_reqset(set);
1069
1070         RETURN(0);
1071 }
1072
1073 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1074                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1075                          struct obd_trans_info *oti,
1076                          struct lov_request_set **reqset)
1077 {
1078         struct lov_request_set *set;
1079         struct lov_obd *lov = &exp->exp_obd->u.lov;
1080         int rc = 0, i;
1081         ENTRY;
1082
1083         OBD_ALLOC(set, sizeof(*set));
1084         if (set == NULL)
1085                 RETURN(-ENOMEM);
1086         lov_init_set(set);
1087
1088         set->set_exp = exp;
1089         set->set_oi = oinfo;
1090         set->set_oi->oi_md = lsm;
1091         set->set_oi->oi_oa = src_oa;
1092         set->set_oti = oti;
1093         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1094                 set->set_cookies = oti->oti_logcookies;
1095
1096         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1097                 struct lov_oinfo *loi;
1098                 struct lov_request *req;
1099
1100                 loi = lsm->lsm_oinfo[i];
1101                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1102                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1103                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1104                         continue;
1105                 }
1106
1107                 OBD_ALLOC(req, sizeof(*req));
1108                 if (req == NULL)
1109                         GOTO(out_set, rc = -ENOMEM);
1110
1111                 req->rq_stripe = i;
1112                 req->rq_idx = loi->loi_ost_idx;
1113
1114                 OBDO_ALLOC(req->rq_oi.oi_oa);
1115                 if (req->rq_oi.oi_oa == NULL) {
1116                         OBD_FREE(req, sizeof(*req));
1117                         GOTO(out_set, rc = -ENOMEM);
1118                 }
1119                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1120                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1121                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1122                 lov_set_add_req(req, set);
1123         }
1124         if (!set->set_count)
1125                 GOTO(out_set, rc = -EIO);
1126         *reqset = set;
1127         RETURN(rc);
1128 out_set:
1129         lov_fini_destroy_set(set);
1130         RETURN(rc);
1131 }
1132
1133 int lov_fini_setattr_set(struct lov_request_set *set)
1134 {
1135         int rc = 0;
1136         ENTRY;
1137
1138         if (set == NULL)
1139                 RETURN(0);
1140         LASSERT(set->set_exp);
1141         if (cfs_atomic_read(&set->set_completes)) {
1142                 rc = common_attr_done(set);
1143                 /* FIXME update qos data here */
1144         }
1145
1146         lov_put_reqset(set);
1147         RETURN(rc);
1148 }
1149
1150 int lov_update_setattr_set(struct lov_request_set *set,
1151                            struct lov_request *req, int rc)
1152 {
1153         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1154         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1155         ENTRY;
1156
1157         lov_update_set(set, req, rc);
1158
1159         /* grace error on inactive ost */
1160         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1161                     lov->lov_tgts[req->rq_idx]->ltd_active))
1162                 rc = 0;
1163
1164         if (rc == 0) {
1165                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1166                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1167                                 req->rq_oi.oi_oa->o_ctime;
1168                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1169                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1170                                 req->rq_oi.oi_oa->o_mtime;
1171                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1172                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1173                                 req->rq_oi.oi_oa->o_atime;
1174         }
1175
1176         RETURN(rc);
1177 }
1178
1179 /* The callback for osc_setattr_async that finilizes a request info when a
1180  * response is received. */
1181 static int cb_setattr_update(void *cookie, int rc)
1182 {
1183         struct obd_info *oinfo = cookie;
1184         struct lov_request *lovreq;
1185         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1186         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1187 }
1188
1189 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1190                          struct obd_trans_info *oti,
1191                          struct lov_request_set **reqset)
1192 {
1193         struct lov_request_set *set;
1194         struct lov_obd *lov = &exp->exp_obd->u.lov;
1195         int rc = 0, i;
1196         ENTRY;
1197
1198         OBD_ALLOC(set, sizeof(*set));
1199         if (set == NULL)
1200                 RETURN(-ENOMEM);
1201         lov_init_set(set);
1202
1203         set->set_exp = exp;
1204         set->set_oti = oti;
1205         set->set_oi = oinfo;
1206         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1207                 set->set_cookies = oti->oti_logcookies;
1208
1209         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1210                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1211                 struct lov_request *req;
1212
1213                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1214                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1215                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1216                         continue;
1217                 }
1218
1219                 OBD_ALLOC(req, sizeof(*req));
1220                 if (req == NULL)
1221                         GOTO(out_set, rc = -ENOMEM);
1222                 req->rq_stripe = i;
1223                 req->rq_idx = loi->loi_ost_idx;
1224
1225                 OBDO_ALLOC(req->rq_oi.oi_oa);
1226                 if (req->rq_oi.oi_oa == NULL) {
1227                         OBD_FREE(req, sizeof(*req));
1228                         GOTO(out_set, rc = -ENOMEM);
1229                 }
1230                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1231                        sizeof(*req->rq_oi.oi_oa));
1232                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1233                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1234                 req->rq_oi.oi_oa->o_stripe_idx = i;
1235                 req->rq_oi.oi_cb_up = cb_setattr_update;
1236                 req->rq_oi.oi_capa = oinfo->oi_capa;
1237
1238                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1239                         int off = lov_stripe_offset(oinfo->oi_md,
1240                                                     oinfo->oi_oa->o_size, i,
1241                                                     &req->rq_oi.oi_oa->o_size);
1242
1243                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1244                                 req->rq_oi.oi_oa->o_size--;
1245
1246                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1247                                i, req->rq_oi.oi_oa->o_size,
1248                                oinfo->oi_oa->o_size);
1249                 }
1250                 lov_set_add_req(req, set);
1251         }
1252         if (!set->set_count)
1253                 GOTO(out_set, rc = -EIO);
1254         *reqset = set;
1255         RETURN(rc);
1256 out_set:
1257         lov_fini_setattr_set(set);
1258         RETURN(rc);
1259 }
1260
1261 int lov_fini_punch_set(struct lov_request_set *set)
1262 {
1263         int rc = 0;
1264         ENTRY;
1265
1266         if (set == NULL)
1267                 RETURN(0);
1268         LASSERT(set->set_exp);
1269         if (cfs_atomic_read(&set->set_completes)) {
1270                 rc = -EIO;
1271                 /* FIXME update qos data here */
1272                 if (cfs_atomic_read(&set->set_success))
1273                         rc = common_attr_done(set);
1274         }
1275
1276         lov_put_reqset(set);
1277
1278         RETURN(rc);
1279 }
1280
1281 int lov_update_punch_set(struct lov_request_set *set,
1282                          struct lov_request *req, int rc)
1283 {
1284         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1285         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1286         ENTRY;
1287
1288         lov_update_set(set, req, rc);
1289
1290         /* grace error on inactive ost */
1291         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1292                 rc = 0;
1293
1294         if (rc == 0) {
1295                 lov_stripe_lock(lsm);
1296                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1297                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1298                                 req->rq_oi.oi_oa->o_blocks;
1299                 }
1300
1301                 /* Do we need to update lvb_size here? It needn't because
1302                  * it have been done in ll_truncate(). -jay */
1303                 lov_stripe_unlock(lsm);
1304         }
1305
1306         RETURN(rc);
1307 }
1308
1309 /* The callback for osc_punch that finilizes a request info when a response
1310  * is received. */
1311 static int cb_update_punch(void *cookie, int rc)
1312 {
1313         struct obd_info *oinfo = cookie;
1314         struct lov_request *lovreq;
1315         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1316         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1317 }
1318
1319 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1320                        struct obd_trans_info *oti,
1321                        struct lov_request_set **reqset)
1322 {
1323         struct lov_request_set *set;
1324         struct lov_obd *lov = &exp->exp_obd->u.lov;
1325         int rc = 0, i;
1326         ENTRY;
1327
1328         OBD_ALLOC(set, sizeof(*set));
1329         if (set == NULL)
1330                 RETURN(-ENOMEM);
1331         lov_init_set(set);
1332
1333         set->set_oi = oinfo;
1334         set->set_exp = exp;
1335
1336         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1337                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1338                 struct lov_request *req;
1339                 obd_off rs, re;
1340
1341                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1342                                            oinfo->oi_policy.l_extent.start,
1343                                            oinfo->oi_policy.l_extent.end,
1344                                            &rs, &re))
1345                         continue;
1346
1347                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1348                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1349                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1350                         GOTO(out_set, rc = -EIO);
1351                 }
1352
1353                 OBD_ALLOC(req, sizeof(*req));
1354                 if (req == NULL)
1355                         GOTO(out_set, rc = -ENOMEM);
1356                 req->rq_stripe = i;
1357                 req->rq_idx = loi->loi_ost_idx;
1358
1359                 OBDO_ALLOC(req->rq_oi.oi_oa);
1360                 if (req->rq_oi.oi_oa == NULL) {
1361                         OBD_FREE(req, sizeof(*req));
1362                         GOTO(out_set, rc = -ENOMEM);
1363                 }
1364                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1365                        sizeof(*req->rq_oi.oi_oa));
1366                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1367                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1368                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1369
1370                 req->rq_oi.oi_oa->o_stripe_idx = i;
1371                 req->rq_oi.oi_cb_up = cb_update_punch;
1372
1373                 req->rq_oi.oi_policy.l_extent.start = rs;
1374                 req->rq_oi.oi_policy.l_extent.end = re;
1375                 req->rq_oi.oi_policy.l_extent.gid = -1;
1376
1377                 req->rq_oi.oi_capa = oinfo->oi_capa;
1378
1379                 lov_set_add_req(req, set);
1380         }
1381         if (!set->set_count)
1382                 GOTO(out_set, rc = -EIO);
1383         *reqset = set;
1384         RETURN(rc);
1385 out_set:
1386         lov_fini_punch_set(set);
1387         RETURN(rc);
1388 }
1389
1390 int lov_fini_sync_set(struct lov_request_set *set)
1391 {
1392         int rc = 0;
1393         ENTRY;
1394
1395         if (set == NULL)
1396                 RETURN(0);
1397         LASSERT(set->set_exp);
1398         if (cfs_atomic_read(&set->set_completes)) {
1399                 if (!cfs_atomic_read(&set->set_success))
1400                         rc = -EIO;
1401                 /* FIXME update qos data here */
1402         }
1403
1404         lov_put_reqset(set);
1405
1406         RETURN(rc);
1407 }
1408
1409 /* The callback for osc_sync that finilizes a request info when a
1410  * response is recieved. */
1411 static int cb_sync_update(void *cookie, int rc)
1412 {
1413         struct obd_info *oinfo = cookie;
1414         struct lov_request *lovreq;
1415
1416         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1417         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1418 }
1419
1420 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1421                       obd_off start, obd_off end,
1422                       struct lov_request_set **reqset)
1423 {
1424         struct lov_request_set *set;
1425         struct lov_obd *lov = &exp->exp_obd->u.lov;
1426         int rc = 0, i;
1427         ENTRY;
1428
1429         OBD_ALLOC_PTR(set);
1430         if (set == NULL)
1431                 RETURN(-ENOMEM);
1432         lov_init_set(set);
1433
1434         set->set_exp = exp;
1435         set->set_oi = oinfo;
1436
1437         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1438                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1439                 struct lov_request *req;
1440                 obd_off rs, re;
1441
1442                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1443                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1444                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1445                         continue;
1446                 }
1447
1448                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1449                                            &re))
1450                         continue;
1451
1452                 OBD_ALLOC_PTR(req);
1453                 if (req == NULL)
1454                         GOTO(out_set, rc = -ENOMEM);
1455                 req->rq_stripe = i;
1456                 req->rq_idx = loi->loi_ost_idx;
1457
1458                 OBDO_ALLOC(req->rq_oi.oi_oa);
1459                 if (req->rq_oi.oi_oa == NULL) {
1460                         OBD_FREE(req, sizeof(*req));
1461                         GOTO(out_set, rc = -ENOMEM);
1462                 }
1463                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1464                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1465                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1466                 req->rq_oi.oi_oa->o_stripe_idx = i;
1467
1468                 req->rq_oi.oi_policy.l_extent.start = rs;
1469                 req->rq_oi.oi_policy.l_extent.end = re;
1470                 req->rq_oi.oi_policy.l_extent.gid = -1;
1471                 req->rq_oi.oi_cb_up = cb_sync_update;
1472
1473                 lov_set_add_req(req, set);
1474         }
1475         if (!set->set_count)
1476                 GOTO(out_set, rc = -EIO);
1477         *reqset = set;
1478         RETURN(rc);
1479 out_set:
1480         lov_fini_sync_set(set);
1481         RETURN(rc);
1482 }
1483
1484 #define LOV_U64_MAX ((__u64)~0ULL)
1485 #define LOV_SUM_MAX(tot, add)                                           \
1486         do {                                                            \
1487                 if ((tot) + (add) < (tot))                              \
1488                         (tot) = LOV_U64_MAX;                            \
1489                 else                                                    \
1490                         (tot) += (add);                                 \
1491         } while(0)
1492
1493 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1494 {
1495         ENTRY;
1496
1497         if (success) {
1498                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov, 0);
1499
1500                 if (osfs->os_files != LOV_U64_MAX)
1501                         do_div(osfs->os_files, expected_stripes);
1502                 if (osfs->os_ffree != LOV_U64_MAX)
1503                         do_div(osfs->os_ffree, expected_stripes);
1504
1505                 cfs_spin_lock(&obd->obd_osfs_lock);
1506                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1507                 obd->obd_osfs_age = cfs_time_current_64();
1508                 cfs_spin_unlock(&obd->obd_osfs_lock);
1509                 RETURN(0);
1510         }
1511
1512         RETURN(-EIO);
1513 }
1514
1515 int lov_fini_statfs_set(struct lov_request_set *set)
1516 {
1517         int rc = 0;
1518         ENTRY;
1519
1520         if (set == NULL)
1521                 RETURN(0);
1522
1523         if (cfs_atomic_read(&set->set_completes)) {
1524                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1525                                      cfs_atomic_read(&set->set_success));
1526         }
1527         lov_put_reqset(set);
1528         RETURN(rc);
1529 }
1530
1531 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1532                        int success)
1533 {
1534         int shift = 0, quit = 0;
1535         __u64 tmp;
1536
1537         if (success == 0) {
1538                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1539         } else {
1540                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1541                         /* assume all block sizes are always powers of 2 */
1542                         /* get the bits difference */
1543                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1544                         for (shift = 0; shift <= 64; ++shift) {
1545                                 if (tmp & 1) {
1546                                         if (quit)
1547                                                 break;
1548                                         else
1549                                                 quit = 1;
1550                                         shift = 0;
1551                                 }
1552                                 tmp >>= 1;
1553                         }
1554                 }
1555
1556                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1557                         osfs->os_bsize = lov_sfs->os_bsize;
1558
1559                         osfs->os_bfree  >>= shift;
1560                         osfs->os_bavail >>= shift;
1561                         osfs->os_blocks >>= shift;
1562                 } else if (shift != 0) {
1563                         lov_sfs->os_bfree  >>= shift;
1564                         lov_sfs->os_bavail >>= shift;
1565                         lov_sfs->os_blocks >>= shift;
1566                 }
1567 #ifdef MIN_DF
1568                 /* Sandia requested that df (and so, statfs) only
1569                    returned minimal available space on
1570                    a single OST, so people would be able to
1571                    write this much data guaranteed. */
1572                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1573                         /* Presumably if new bavail is smaller,
1574                            new bfree is bigger as well */
1575                         osfs->os_bfree = lov_sfs->os_bfree;
1576                         osfs->os_bavail = lov_sfs->os_bavail;
1577                 }
1578 #else
1579                 osfs->os_bfree += lov_sfs->os_bfree;
1580                 osfs->os_bavail += lov_sfs->os_bavail;
1581 #endif
1582                 osfs->os_blocks += lov_sfs->os_blocks;
1583                 /* XXX not sure about this one - depends on policy.
1584                  *   - could be minimum if we always stripe on all OBDs
1585                  *     (but that would be wrong for any other policy,
1586                  *     if one of the OBDs has no more objects left)
1587                  *   - could be sum if we stripe whole objects
1588                  *   - could be average, just to give a nice number
1589                  *
1590                  * To give a "reasonable" (if not wholly accurate)
1591                  * number, we divide the total number of free objects
1592                  * by expected stripe count (watch out for overflow).
1593                  */
1594                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1595                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1596         }
1597 }
1598
1599 /* The callback for osc_statfs_async that finilizes a request info when a
1600  * response is received. */
1601 static int cb_statfs_update(void *cookie, int rc)
1602 {
1603         struct obd_info *oinfo = cookie;
1604         struct lov_request *lovreq;
1605         struct lov_request_set *set;
1606         struct obd_statfs *osfs, *lov_sfs;
1607         struct lov_obd *lov;
1608         struct lov_tgt_desc *tgt;
1609         struct obd_device *lovobd, *tgtobd;
1610         int success;
1611         ENTRY;
1612
1613         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1614         set = lovreq->rq_rqset;
1615         lovobd = set->set_obd;
1616         lov = &lovobd->u.lov;
1617         osfs = set->set_oi->oi_osfs;
1618         lov_sfs = oinfo->oi_osfs;
1619         success = cfs_atomic_read(&set->set_success);
1620         /* XXX: the same is done in lov_update_common_set, however
1621            lovset->set_exp is not initialized. */
1622         lov_update_set(set, lovreq, rc);
1623         if (rc)
1624                 GOTO(out, rc);
1625
1626         obd_getref(lovobd);
1627         tgt = lov->lov_tgts[lovreq->rq_idx];
1628         if (!tgt || !tgt->ltd_active)
1629                 GOTO(out_update, rc);
1630
1631         tgtobd = class_exp2obd(tgt->ltd_exp);
1632         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1633         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1634         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1635                 tgtobd->obd_osfs_age = cfs_time_current_64();
1636         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1637
1638 out_update:
1639         lov_update_statfs(osfs, lov_sfs, success);
1640         qos_update(lov);
1641         obd_putref(lovobd);
1642
1643 out:
1644         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1645             lov_finished_set(set)) {
1646                 lov_statfs_interpret(NULL, set, set->set_count !=
1647                                      cfs_atomic_read(&set->set_success));
1648                 if (lov->lov_qos.lq_statfs_in_progress)
1649                         qos_statfs_done(lov);
1650         }
1651
1652         RETURN(0);
1653 }
1654
1655 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1656                         struct lov_request_set **reqset)
1657 {
1658         struct lov_request_set *set;
1659         struct lov_obd *lov = &obd->u.lov;
1660         int rc = 0, i;
1661         ENTRY;
1662
1663         OBD_ALLOC(set, sizeof(*set));
1664         if (set == NULL)
1665                 RETURN(-ENOMEM);
1666         lov_init_set(set);
1667
1668         set->set_obd = obd;
1669         set->set_oi = oinfo;
1670
1671         /* We only get block data from the OBD */
1672         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1673                 struct lov_request *req;
1674
1675                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1676                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1677                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1678                         continue;
1679                 }
1680
1681                 /* skip targets that have been explicitely disabled by the
1682                  * administrator */
1683                 if (!lov->lov_tgts[i]->ltd_exp) {
1684                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1685                         continue;
1686                 }
1687
1688                 OBD_ALLOC(req, sizeof(*req));
1689                 if (req == NULL)
1690                         GOTO(out_set, rc = -ENOMEM);
1691
1692                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1693                 if (req->rq_oi.oi_osfs == NULL) {
1694                         OBD_FREE(req, sizeof(*req));
1695                         GOTO(out_set, rc = -ENOMEM);
1696                 }
1697
1698                 req->rq_idx = i;
1699                 req->rq_oi.oi_cb_up = cb_statfs_update;
1700                 req->rq_oi.oi_flags = oinfo->oi_flags;
1701
1702                 lov_set_add_req(req, set);
1703         }
1704         if (!set->set_count)
1705                 GOTO(out_set, rc = -EIO);
1706         *reqset = set;
1707         RETURN(rc);
1708 out_set:
1709         lov_fini_statfs_set(set);
1710         RETURN(rc);
1711 }