Whamcloud - gitweb
LU-1095 debug: Improve messages for fake requests
[fs/lustre-release.git] / lustre / lov / lov_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 #ifndef EXPORT_SYMTAB
40 # define EXPORT_SYMTAB
41 #endif
42 #define DEBUG_SUBSYSTEM S_LOV
43
44 #ifdef __KERNEL__
45 #include <libcfs/libcfs.h>
46 #else
47 #include <liblustre.h>
48 #endif
49
50 #include <obd_class.h>
51 #include <obd_lov.h>
52 #include <lustre/lustre_idl.h>
53
54 #include "lov_internal.h"
55
56 static void lov_init_set(struct lov_request_set *set)
57 {
58         set->set_count = 0;
59         cfs_atomic_set(&set->set_completes, 0);
60         cfs_atomic_set(&set->set_success, 0);
61         set->set_cookies = 0;
62         CFS_INIT_LIST_HEAD(&set->set_list);
63         cfs_atomic_set(&set->set_refcount, 1);
64         cfs_waitq_init(&set->set_waitq);
65         cfs_spin_lock_init(&set->set_lock);
66 }
67
68 void lov_finish_set(struct lov_request_set *set)
69 {
70         cfs_list_t *pos, *n;
71         ENTRY;
72
73         LASSERT(set);
74         cfs_list_for_each_safe(pos, n, &set->set_list) {
75                 struct lov_request *req = cfs_list_entry(pos,
76                                                          struct lov_request,
77                                                          rq_link);
78                 cfs_list_del_init(&req->rq_link);
79
80                 if (req->rq_oi.oi_oa)
81                         OBDO_FREE(req->rq_oi.oi_oa);
82                 if (req->rq_oi.oi_md)
83                         OBD_FREE_LARGE(req->rq_oi.oi_md, req->rq_buflen);
84                 if (req->rq_oi.oi_osfs)
85                         OBD_FREE(req->rq_oi.oi_osfs,
86                                  sizeof(*req->rq_oi.oi_osfs));
87                 OBD_FREE(req, sizeof(*req));
88         }
89
90         if (set->set_pga) {
91                 int len = set->set_oabufs * sizeof(*set->set_pga);
92                 OBD_FREE_LARGE(set->set_pga, len);
93         }
94         if (set->set_lockh)
95                 lov_llh_put(set->set_lockh);
96
97         OBD_FREE(set, sizeof(*set));
98         EXIT;
99 }
100
101 int lov_finished_set(struct lov_request_set *set)
102 {
103         int completes = cfs_atomic_read(&set->set_completes);
104
105         CDEBUG(D_INFO, "check set %d/%d\n", completes,
106                set->set_count);
107         return completes == set->set_count;
108 }
109
110 void lov_update_set(struct lov_request_set *set,
111                     struct lov_request *req, int rc)
112 {
113         req->rq_complete = 1;
114         req->rq_rc = rc;
115
116         cfs_atomic_inc(&set->set_completes);
117         if (rc == 0)
118                 cfs_atomic_inc(&set->set_success);
119
120         cfs_waitq_signal(&set->set_waitq);
121 }
122
123 int lov_update_common_set(struct lov_request_set *set,
124                           struct lov_request *req, int rc)
125 {
126         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
127         ENTRY;
128
129         lov_update_set(set, req, rc);
130
131         /* grace error on inactive ost */
132         if (rc && !(lov->lov_tgts[req->rq_idx] &&
133                     lov->lov_tgts[req->rq_idx]->ltd_active))
134                 rc = 0;
135
136         /* FIXME in raid1 regime, should return 0 */
137         RETURN(rc);
138 }
139
140 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
141 {
142         cfs_list_add_tail(&req->rq_link, &set->set_list);
143         set->set_count++;
144         req->rq_rqset = set;
145 }
146
147 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
148                                struct lov_oinfo *loi, int flags,
149                                struct ost_lvb *lvb, __u32 mode, int rc);
150
151 static int lov_update_enqueue_lov(struct obd_export *exp,
152                                   struct lustre_handle *lov_lockhp,
153                                   struct lov_oinfo *loi, int flags, int idx,
154                                   __u64 oid, int rc)
155 {
156         struct lov_obd *lov = &exp->exp_obd->u.lov;
157
158         if (rc != ELDLM_OK &&
159             !(rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT))) {
160                 memset(lov_lockhp, 0, sizeof(*lov_lockhp));
161                 if (lov->lov_tgts[idx] && lov->lov_tgts[idx]->ltd_active) {
162                         /* -EUSERS used by OST to report file contention */
163                         if (rc != -EINTR && rc != -EUSERS)
164                                 CERROR("enqueue objid "LPX64" subobj "
165                                        LPX64" on OST idx %d: rc %d\n",
166                                        oid, loi->loi_id, loi->loi_ost_idx, rc);
167                 } else
168                         rc = ELDLM_OK;
169         }
170         return rc;
171 }
172
173 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
174 {
175         struct lov_request_set *set = req->rq_rqset;
176         struct lustre_handle *lov_lockhp;
177         struct obd_info *oi = set->set_oi;
178         struct lov_oinfo *loi;
179         ENTRY;
180
181         LASSERT(oi != NULL);
182
183         lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
184         loi = oi->oi_md->lsm_oinfo[req->rq_stripe];
185
186         /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set
187          * and that copy can be arbitrarily out of date.
188          *
189          * The LOV API is due for a serious rewriting anyways, and this
190          * can be addressed then. */
191
192         lov_stripe_lock(oi->oi_md);
193         osc_update_enqueue(lov_lockhp, loi, oi->oi_flags,
194                            &req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb, mode, rc);
195         if (rc == ELDLM_LOCK_ABORTED && (oi->oi_flags & LDLM_FL_HAS_INTENT))
196                 memset(lov_lockhp, 0, sizeof *lov_lockhp);
197         rc = lov_update_enqueue_lov(set->set_exp, lov_lockhp, loi, oi->oi_flags,
198                                     req->rq_idx, oi->oi_md->lsm_object_id, rc);
199         lov_stripe_unlock(oi->oi_md);
200         lov_update_set(set, req, rc);
201         RETURN(rc);
202 }
203
204 /* The callback for osc_enqueue that updates lov info for every OSC request. */
205 static int cb_update_enqueue(void *cookie, int rc)
206 {
207         struct obd_info *oinfo = cookie;
208         struct ldlm_enqueue_info *einfo;
209         struct lov_request *lovreq;
210
211         lovreq = container_of(oinfo, struct lov_request, rq_oi);
212         einfo = lovreq->rq_rqset->set_ei;
213         return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);
214 }
215
216 static int enqueue_done(struct lov_request_set *set, __u32 mode)
217 {
218         struct lov_request *req;
219         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
220         int completes = cfs_atomic_read(&set->set_completes);
221         int rc = 0;
222         ENTRY;
223
224         /* enqueue/match success, just return */
225         if (completes && completes == cfs_atomic_read(&set->set_success))
226                 RETURN(0);
227
228         /* cancel enqueued/matched locks */
229         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
230                 struct lustre_handle *lov_lockhp;
231
232                 if (!req->rq_complete || req->rq_rc)
233                         continue;
234
235                 lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;
236                 LASSERT(lov_lockhp);
237                 if (!lustre_handle_is_used(lov_lockhp))
238                         continue;
239
240                 rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,
241                                 req->rq_oi.oi_md, mode, lov_lockhp);
242                 if (rc && lov->lov_tgts[req->rq_idx] &&
243                     lov->lov_tgts[req->rq_idx]->ltd_active)
244                         CERROR("cancelling obdjid "LPX64" on OST "
245                                "idx %d error: rc = %d\n",
246                                req->rq_oi.oi_md->lsm_object_id,
247                                req->rq_idx, rc);
248         }
249         if (set->set_lockh)
250                 lov_llh_put(set->set_lockh);
251         RETURN(rc);
252 }
253
254 int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,
255                          struct ptlrpc_request_set *rqset)
256 {
257         int ret = 0;
258         ENTRY;
259
260         if (set == NULL)
261                 RETURN(0);
262         LASSERT(set->set_exp);
263         /* Do enqueue_done only for sync requests and if any request
264          * succeeded. */
265         if (!rqset) {
266                 if (rc)
267                         cfs_atomic_set(&set->set_completes, 0);
268                 ret = enqueue_done(set, mode);
269         } else if (set->set_lockh)
270                 lov_llh_put(set->set_lockh);
271
272         lov_put_reqset(set);
273
274         RETURN(rc ? rc : ret);
275 }
276
277 int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
278                          struct ldlm_enqueue_info *einfo,
279                          struct lov_request_set **reqset)
280 {
281         struct lov_obd *lov = &exp->exp_obd->u.lov;
282         struct lov_request_set *set;
283         int i, rc = 0;
284         ENTRY;
285
286         OBD_ALLOC(set, sizeof(*set));
287         if (set == NULL)
288                 RETURN(-ENOMEM);
289         lov_init_set(set);
290
291         set->set_exp = exp;
292         set->set_oi = oinfo;
293         set->set_ei = einfo;
294         set->set_lockh = lov_llh_new(oinfo->oi_md);
295         if (set->set_lockh == NULL)
296                 GOTO(out_set, rc = -ENOMEM);
297         oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;
298
299         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
300                 struct lov_oinfo *loi;
301                 struct lov_request *req;
302                 obd_off start, end;
303
304                 loi = oinfo->oi_md->lsm_oinfo[i];
305                 if (!lov_stripe_intersects(oinfo->oi_md, i,
306                                            oinfo->oi_policy.l_extent.start,
307                                            oinfo->oi_policy.l_extent.end,
308                                            &start, &end))
309                         continue;
310
311                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
312                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
313                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
314                         continue;
315                 }
316
317                 OBD_ALLOC(req, sizeof(*req));
318                 if (req == NULL)
319                         GOTO(out_set, rc = -ENOMEM);
320
321                 req->rq_buflen = sizeof(*req->rq_oi.oi_md) +
322                         sizeof(struct lov_oinfo *) +
323                         sizeof(struct lov_oinfo);
324                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
325                 if (req->rq_oi.oi_md == NULL) {
326                         OBD_FREE(req, sizeof(*req));
327                         GOTO(out_set, rc = -ENOMEM);
328                 }
329                 req->rq_oi.oi_md->lsm_oinfo[0] =
330                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
331                         sizeof(struct lov_oinfo *);
332
333                 /* Set lov request specific parameters. */
334                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
335                 req->rq_oi.oi_cb_up = cb_update_enqueue;
336                 req->rq_oi.oi_flags = oinfo->oi_flags;
337
338                 LASSERT(req->rq_oi.oi_lockh);
339
340                 req->rq_oi.oi_policy.l_extent.gid =
341                         oinfo->oi_policy.l_extent.gid;
342                 req->rq_oi.oi_policy.l_extent.start = start;
343                 req->rq_oi.oi_policy.l_extent.end = end;
344
345                 req->rq_idx = loi->loi_ost_idx;
346                 req->rq_stripe = i;
347
348                 /* XXX LOV STACKING: submd should be from the subobj */
349                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
350                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
351                 req->rq_oi.oi_md->lsm_stripe_count = 0;
352                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =
353                         loi->loi_kms_valid;
354                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;
355                 req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;
356
357                 lov_set_add_req(req, set);
358         }
359         if (!set->set_count)
360                 GOTO(out_set, rc = -EIO);
361         *reqset = set;
362         RETURN(0);
363 out_set:
364         lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);
365         RETURN(rc);
366 }
367
368 int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags)
369 {
370         int rc = 0;
371         ENTRY;
372
373         if (set == NULL)
374                 RETURN(0);
375         LASSERT(set->set_exp);
376         rc = enqueue_done(set, mode);
377         if ((set->set_count == cfs_atomic_read(&set->set_success)) &&
378             (flags & LDLM_FL_TEST_LOCK))
379                 lov_llh_put(set->set_lockh);
380
381         lov_put_reqset(set);
382
383         RETURN(rc);
384 }
385
386 int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,
387                        struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,
388                        __u32 mode, struct lustre_handle *lockh,
389                        struct lov_request_set **reqset)
390 {
391         struct lov_obd *lov = &exp->exp_obd->u.lov;
392         struct lov_request_set *set;
393         int i, rc = 0;
394         ENTRY;
395
396         OBD_ALLOC(set, sizeof(*set));
397         if (set == NULL)
398                 RETURN(-ENOMEM);
399         lov_init_set(set);
400
401         set->set_exp = exp;
402         set->set_oi = oinfo;
403         set->set_oi->oi_md = lsm;
404         set->set_lockh = lov_llh_new(lsm);
405         if (set->set_lockh == NULL)
406                 GOTO(out_set, rc = -ENOMEM);
407         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
408
409         for (i = 0; i < lsm->lsm_stripe_count; i++){
410                 struct lov_oinfo *loi;
411                 struct lov_request *req;
412                 obd_off start, end;
413
414                 loi = lsm->lsm_oinfo[i];
415                 if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,
416                                            policy->l_extent.end, &start, &end))
417                         continue;
418
419                 /* FIXME raid1 should grace this error */
420                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
421                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
422                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
423                         GOTO(out_set, rc = -EIO);
424                 }
425
426                 OBD_ALLOC(req, sizeof(*req));
427                 if (req == NULL)
428                         GOTO(out_set, rc = -ENOMEM);
429
430                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
431                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
432                 if (req->rq_oi.oi_md == NULL) {
433                         OBD_FREE(req, sizeof(*req));
434                         GOTO(out_set, rc = -ENOMEM);
435                 }
436
437                 req->rq_oi.oi_policy.l_extent.start = start;
438                 req->rq_oi.oi_policy.l_extent.end = end;
439                 req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;
440
441                 req->rq_idx = loi->loi_ost_idx;
442                 req->rq_stripe = i;
443
444                 /* XXX LOV STACKING: submd should be from the subobj */
445                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
446                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
447                 req->rq_oi.oi_md->lsm_stripe_count = 0;
448
449                 lov_set_add_req(req, set);
450         }
451         if (!set->set_count)
452                 GOTO(out_set, rc = -EIO);
453         *reqset = set;
454         RETURN(rc);
455 out_set:
456         lov_fini_match_set(set, mode, 0);
457         RETURN(rc);
458 }
459
460 int lov_fini_cancel_set(struct lov_request_set *set)
461 {
462         int rc = 0;
463         ENTRY;
464
465         if (set == NULL)
466                 RETURN(0);
467
468         LASSERT(set->set_exp);
469         if (set->set_lockh)
470                 lov_llh_put(set->set_lockh);
471
472         lov_put_reqset(set);
473
474         RETURN(rc);
475 }
476
477 int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,
478                         struct lov_stripe_md *lsm, __u32 mode,
479                         struct lustre_handle *lockh,
480                         struct lov_request_set **reqset)
481 {
482         struct lov_request_set *set;
483         int i, rc = 0;
484         ENTRY;
485
486         OBD_ALLOC(set, sizeof(*set));
487         if (set == NULL)
488                 RETURN(-ENOMEM);
489         lov_init_set(set);
490
491         set->set_exp = exp;
492         set->set_oi = oinfo;
493         set->set_oi->oi_md = lsm;
494         set->set_lockh = lov_handle2llh(lockh);
495         if (set->set_lockh == NULL) {
496                 CERROR("LOV: invalid lov lock handle %p\n", lockh);
497                 GOTO(out_set, rc = -EINVAL);
498         }
499         lockh->cookie = set->set_lockh->llh_handle.h_cookie;
500
501         for (i = 0; i < lsm->lsm_stripe_count; i++){
502                 struct lov_request *req;
503                 struct lustre_handle *lov_lockhp;
504                 struct lov_oinfo *loi = lsm->lsm_oinfo[i];
505
506                 lov_lockhp = set->set_lockh->llh_handles + i;
507                 if (!lustre_handle_is_used(lov_lockhp)) {
508                         CDEBUG(D_INFO, "lov idx %d subobj "LPX64" no lock\n",
509                                loi->loi_ost_idx, loi->loi_id);
510                         continue;
511                 }
512
513                 OBD_ALLOC(req, sizeof(*req));
514                 if (req == NULL)
515                         GOTO(out_set, rc = -ENOMEM);
516
517                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
518                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
519                 if (req->rq_oi.oi_md == NULL) {
520                         OBD_FREE(req, sizeof(*req));
521                         GOTO(out_set, rc = -ENOMEM);
522                 }
523
524                 req->rq_idx = loi->loi_ost_idx;
525                 req->rq_stripe = i;
526
527                 /* XXX LOV STACKING: submd should be from the subobj */
528                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
529                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
530                 req->rq_oi.oi_md->lsm_stripe_count = 0;
531
532                 lov_set_add_req(req, set);
533         }
534         if (!set->set_count)
535                 GOTO(out_set, rc = -EIO);
536         *reqset = set;
537         RETURN(rc);
538 out_set:
539         lov_fini_cancel_set(set);
540         RETURN(rc);
541 }
542
543 static int lov_update_create_set(struct lov_request_set *set,
544                                  struct lov_request *req, int rc)
545 {
546         struct obd_trans_info *oti = set->set_oti;
547         struct lov_stripe_md *lsm = set->set_oi->oi_md;
548         struct lov_oinfo *loi;
549         struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
550         ENTRY;
551
552         if (rc && lov->lov_tgts[req->rq_idx] &&
553             lov->lov_tgts[req->rq_idx]->ltd_active) {
554                 /* Pre-creating objects may timeout via -ETIMEDOUT or
555                  * -ENOTCONN both are always non-critical events. */
556                 CDEBUG(rc == -ETIMEDOUT || rc == -ENOTCONN ? D_HA : D_ERROR,
557                        "error creating fid "LPX64" sub-object "
558                        "on OST idx %d/%d: rc = %d\n",
559                        set->set_oi->oi_oa->o_id, req->rq_idx,
560                        lsm->lsm_stripe_count, rc);
561                 if (rc > 0) {
562                         CERROR("obd_create returned invalid err %d\n", rc);
563                         rc = -EIO;
564                 }
565         }
566
567         cfs_spin_lock(&set->set_lock);
568         req->rq_stripe = cfs_atomic_read(&set->set_success);
569         loi = lsm->lsm_oinfo[req->rq_stripe];
570
571
572         if (rc) {
573                 lov_update_set(set, req, rc);
574                 cfs_spin_unlock(&set->set_lock);
575                 RETURN(rc);
576         }
577
578         loi->loi_id = req->rq_oi.oi_oa->o_id;
579         loi->loi_seq = req->rq_oi.oi_oa->o_seq;
580         loi->loi_ost_idx = req->rq_idx;
581         loi_init(loi);
582
583         if (oti && set->set_cookies)
584                 ++oti->oti_logcookies;
585         if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)
586                 set->set_cookie_sent++;
587
588         lov_update_set(set, req, rc);
589         cfs_spin_unlock(&set->set_lock);
590
591         CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",
592                lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);
593         RETURN(rc);
594 }
595
596 static int create_done(struct obd_export *exp, struct lov_request_set *set,
597                        struct lov_stripe_md **lsmp)
598 {
599         struct lov_obd *lov = &exp->exp_obd->u.lov;
600         struct obd_trans_info *oti = set->set_oti;
601         struct obdo *src_oa = set->set_oi->oi_oa;
602         struct lov_request *req;
603         struct obdo *ret_oa = NULL;
604         int success, attrset = 0, rc = 0;
605         ENTRY;
606
607         LASSERT(cfs_atomic_read(&set->set_completes));
608
609         /* try alloc objects on other osts if osc_create fails for
610          * exceptions: RPC failure, ENOSPC, etc */
611         if (set->set_count != cfs_atomic_read(&set->set_success)) {
612                 cfs_list_for_each_entry (req, &set->set_list, rq_link) {
613                         if (req->rq_rc == 0)
614                                 continue;
615
616                         cfs_atomic_dec(&set->set_completes);
617                         req->rq_complete = 0;
618
619                         rc = qos_remedy_create(set, req);
620                         lov_update_create_set(set, req, rc);
621                 }
622         }
623
624         success = cfs_atomic_read(&set->set_success);
625         /* no successful creates */
626         if (success == 0)
627                 GOTO(cleanup, rc);
628
629         if (set->set_count != success) {
630                 set->set_count = success;
631                 qos_shrink_lsm(set);
632         }
633
634         OBDO_ALLOC(ret_oa);
635         if (ret_oa == NULL)
636                 GOTO(cleanup, rc = -ENOMEM);
637
638         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
639                 if (!req->rq_complete || req->rq_rc)
640                         continue;
641                 lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,
642                                 req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,
643                                 req->rq_stripe, &attrset);
644         }
645         if (src_oa->o_valid & OBD_MD_FLSIZE &&
646             ret_oa->o_size != src_oa->o_size) {
647                 CERROR("original size "LPU64" isn't new object size "LPU64"\n",
648                        src_oa->o_size, ret_oa->o_size);
649                 LBUG();
650         }
651         ret_oa->o_id = src_oa->o_id;
652         ret_oa->o_seq = src_oa->o_seq;
653         ret_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
654         memcpy(src_oa, ret_oa, sizeof(*src_oa));
655         OBDO_FREE(ret_oa);
656
657         *lsmp = set->set_oi->oi_md;
658         GOTO(done, rc = 0);
659
660 cleanup:
661         cfs_list_for_each_entry(req, &set->set_list, rq_link) {
662                 struct obd_export *sub_exp;
663                 int err = 0;
664
665                 if (!req->rq_complete || req->rq_rc)
666                         continue;
667
668                 sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;
669                 err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL,
670                                   NULL);
671                 if (err)
672                         CERROR("Failed to uncreate objid "LPX64" subobj "
673                                LPX64" on OST idx %d: rc = %d\n",
674                                src_oa->o_id, req->rq_oi.oi_oa->o_id,
675                                req->rq_idx, rc);
676         }
677         if (*lsmp == NULL)
678                 obd_free_memmd(exp, &set->set_oi->oi_md);
679 done:
680         if (oti && set->set_cookies) {
681                 oti->oti_logcookies = set->set_cookies;
682                 if (!set->set_cookie_sent) {
683                         oti_free_cookies(oti);
684                         src_oa->o_valid &= ~OBD_MD_FLCOOKIE;
685                 } else {
686                         src_oa->o_valid |= OBD_MD_FLCOOKIE;
687                 }
688         }
689         RETURN(rc);
690 }
691
692 int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp)
693 {
694         int rc = 0;
695         ENTRY;
696
697         if (set == NULL)
698                 RETURN(0);
699         LASSERT(set->set_exp);
700         if (cfs_atomic_read(&set->set_completes))
701                 rc = create_done(set->set_exp, set, lsmp);
702
703         lov_put_reqset(set);
704         RETURN(rc);
705 }
706
707 int cb_create_update(void *cookie, int rc)
708 {
709         struct obd_info *oinfo = cookie;
710         struct lov_request *lovreq;
711
712         lovreq = container_of(oinfo, struct lov_request, rq_oi);
713
714         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL))
715                 if (lovreq->rq_idx == cfs_fail_val)
716                         rc = -ENOTCONN;
717
718         rc= lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
719         if (lov_finished_set(lovreq->rq_rqset))
720                 lov_put_reqset(lovreq->rq_rqset);
721         return rc;
722 }
723
724 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
725                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
726                         struct obd_trans_info *oti,
727                         struct lov_request_set **reqset)
728 {
729         struct lov_request_set *set;
730         int rc = 0;
731         ENTRY;
732
733         OBD_ALLOC(set, sizeof(*set));
734         if (set == NULL)
735                 RETURN(-ENOMEM);
736         lov_init_set(set);
737
738         set->set_exp = exp;
739         set->set_oi = oinfo;
740         set->set_oi->oi_md = *lsmp;
741         set->set_oi->oi_oa = src_oa;
742         set->set_oti = oti;
743         lov_get_reqset(set);
744
745         rc = qos_prep_create(exp, set);
746         /* qos_shrink_lsm() may have allocated a new lsm */
747         *lsmp = oinfo->oi_md;
748         if (rc) {
749                 lov_fini_create_set(set, lsmp);
750                 lov_put_reqset(set);
751         } else {
752                 *reqset = set;
753         }
754         RETURN(rc);
755 }
756
757 static int common_attr_done(struct lov_request_set *set)
758 {
759         cfs_list_t *pos;
760         struct lov_request *req;
761         struct obdo *tmp_oa;
762         int rc = 0, attrset = 0;
763         ENTRY;
764
765         LASSERT(set->set_oi != NULL);
766
767         if (set->set_oi->oi_oa == NULL)
768                 RETURN(0);
769
770         if (!cfs_atomic_read(&set->set_success))
771                 RETURN(-EIO);
772
773         OBDO_ALLOC(tmp_oa);
774         if (tmp_oa == NULL)
775                 GOTO(out, rc = -ENOMEM);
776
777         cfs_list_for_each (pos, &set->set_list) {
778                 req = cfs_list_entry(pos, struct lov_request, rq_link);
779
780                 if (!req->rq_complete || req->rq_rc)
781                         continue;
782                 if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */
783                         continue;
784                 lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,
785                                 req->rq_oi.oi_oa->o_valid,
786                                 set->set_oi->oi_md, req->rq_stripe, &attrset);
787         }
788         if (!attrset) {
789                 CERROR("No stripes had valid attrs\n");
790                 rc = -EIO;
791         }
792         if ((set->set_oi->oi_oa->o_valid & OBD_MD_FLEPOCH) &&
793             (set->set_oi->oi_md->lsm_stripe_count != attrset)) {
794                 /* When we take attributes of some epoch, we require all the
795                  * ost to be active. */
796                 CERROR("Not all the stripes had valid attrs\n");
797                 GOTO(out, rc = -EIO);
798         }
799
800         tmp_oa->o_id = set->set_oi->oi_oa->o_id;
801         memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));
802 out:
803         if (tmp_oa)
804                 OBDO_FREE(tmp_oa);
805         RETURN(rc);
806
807 }
808
809 static int brw_done(struct lov_request_set *set)
810 {
811         struct lov_stripe_md *lsm = set->set_oi->oi_md;
812         struct lov_oinfo     *loi = NULL;
813         cfs_list_t *pos;
814         struct lov_request *req;
815         ENTRY;
816
817         cfs_list_for_each (pos, &set->set_list) {
818                 req = cfs_list_entry(pos, struct lov_request, rq_link);
819
820                 if (!req->rq_complete || req->rq_rc)
821                         continue;
822
823                 loi = lsm->lsm_oinfo[req->rq_stripe];
824
825                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)
826                         loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;
827         }
828
829         RETURN(0);
830 }
831
832 int lov_fini_brw_set(struct lov_request_set *set)
833 {
834         int rc = 0;
835         ENTRY;
836
837         if (set == NULL)
838                 RETURN(0);
839         LASSERT(set->set_exp);
840         if (cfs_atomic_read(&set->set_completes)) {
841                 rc = brw_done(set);
842                 /* FIXME update qos data here */
843         }
844         lov_put_reqset(set);
845
846         RETURN(rc);
847 }
848
849 int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,
850                      obd_count oa_bufs, struct brw_page *pga,
851                      struct obd_trans_info *oti,
852                      struct lov_request_set **reqset)
853 {
854         struct {
855                 obd_count       index;
856                 obd_count       count;
857                 obd_count       off;
858         } *info = NULL;
859         struct lov_request_set *set;
860         struct lov_obd *lov = &exp->exp_obd->u.lov;
861         int rc = 0, i, shift;
862         ENTRY;
863
864         OBD_ALLOC(set, sizeof(*set));
865         if (set == NULL)
866                 RETURN(-ENOMEM);
867         lov_init_set(set);
868
869         set->set_exp = exp;
870         set->set_oti = oti;
871         set->set_oi = oinfo;
872         set->set_oabufs = oa_bufs;
873         OBD_ALLOC_LARGE(set->set_pga, oa_bufs * sizeof(*set->set_pga));
874         if (!set->set_pga)
875                 GOTO(out, rc = -ENOMEM);
876
877         OBD_ALLOC_LARGE(info, sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
878         if (!info)
879                 GOTO(out, rc = -ENOMEM);
880
881         /* calculate the page count for each stripe */
882         for (i = 0; i < oa_bufs; i++) {
883                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
884                 info[stripe].count++;
885         }
886
887         /* alloc and initialize lov request */
888         shift = 0;
889         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++){
890                 struct lov_oinfo *loi = NULL;
891                 struct lov_request *req;
892
893                 if (info[i].count == 0)
894                         continue;
895
896                 loi = oinfo->oi_md->lsm_oinfo[i];
897                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
898                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
899                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
900                         GOTO(out, rc = -EIO);
901                 }
902
903                 OBD_ALLOC(req, sizeof(*req));
904                 if (req == NULL)
905                         GOTO(out, rc = -ENOMEM);
906
907                 OBDO_ALLOC(req->rq_oi.oi_oa);
908                 if (req->rq_oi.oi_oa == NULL) {
909                         OBD_FREE(req, sizeof(*req));
910                         GOTO(out, rc = -ENOMEM);
911                 }
912
913                 if (oinfo->oi_oa) {
914                         memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
915                                sizeof(*req->rq_oi.oi_oa));
916                 }
917                 req->rq_oi.oi_oa->o_id = loi->loi_id;
918                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
919                 req->rq_oi.oi_oa->o_stripe_idx = i;
920
921                 req->rq_buflen = sizeof(*req->rq_oi.oi_md);
922                 OBD_ALLOC_LARGE(req->rq_oi.oi_md, req->rq_buflen);
923                 if (req->rq_oi.oi_md == NULL) {
924                         OBDO_FREE(req->rq_oi.oi_oa);
925                         OBD_FREE(req, sizeof(*req));
926                         GOTO(out, rc = -ENOMEM);
927                 }
928
929                 req->rq_idx = loi->loi_ost_idx;
930                 req->rq_stripe = i;
931
932                 /* XXX LOV STACKING */
933                 req->rq_oi.oi_md->lsm_object_id = loi->loi_id;
934                 req->rq_oi.oi_md->lsm_object_seq = loi->loi_seq;
935                 req->rq_oabufs = info[i].count;
936                 req->rq_pgaidx = shift;
937                 shift += req->rq_oabufs;
938
939                 /* remember the index for sort brw_page array */
940                 info[i].index = req->rq_pgaidx;
941
942                 req->rq_oi.oi_capa = oinfo->oi_capa;
943
944                 lov_set_add_req(req, set);
945         }
946         if (!set->set_count)
947                 GOTO(out, rc = -EIO);
948
949         /* rotate & sort the brw_page array */
950         for (i = 0; i < oa_bufs; i++) {
951                 int stripe = lov_stripe_number(oinfo->oi_md, pga[i].off);
952
953                 shift = info[stripe].index + info[stripe].off;
954                 LASSERT(shift < oa_bufs);
955                 set->set_pga[shift] = pga[i];
956                 lov_stripe_offset(oinfo->oi_md, pga[i].off, stripe,
957                                   &set->set_pga[shift].off);
958                 info[stripe].off++;
959         }
960 out:
961         if (info)
962                 OBD_FREE_LARGE(info,
963                                sizeof(*info) * oinfo->oi_md->lsm_stripe_count);
964
965         if (rc == 0)
966                 *reqset = set;
967         else
968                 lov_fini_brw_set(set);
969
970         RETURN(rc);
971 }
972
973 int lov_fini_getattr_set(struct lov_request_set *set)
974 {
975         int rc = 0;
976         ENTRY;
977
978         if (set == NULL)
979                 RETURN(0);
980         LASSERT(set->set_exp);
981         if (cfs_atomic_read(&set->set_completes))
982                 rc = common_attr_done(set);
983
984         lov_put_reqset(set);
985
986         RETURN(rc);
987 }
988
989 /* The callback for osc_getattr_async that finilizes a request info when a
990  * response is received. */
991 static int cb_getattr_update(void *cookie, int rc)
992 {
993         struct obd_info *oinfo = cookie;
994         struct lov_request *lovreq;
995         lovreq = container_of(oinfo, struct lov_request, rq_oi);
996         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
997 }
998
999 int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
1000                          struct lov_request_set **reqset)
1001 {
1002         struct lov_request_set *set;
1003         struct lov_obd *lov = &exp->exp_obd->u.lov;
1004         int rc = 0, i;
1005         ENTRY;
1006
1007         OBD_ALLOC(set, sizeof(*set));
1008         if (set == NULL)
1009                 RETURN(-ENOMEM);
1010         lov_init_set(set);
1011
1012         set->set_exp = exp;
1013         set->set_oi = oinfo;
1014
1015         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1016                 struct lov_oinfo *loi;
1017                 struct lov_request *req;
1018
1019                 loi = oinfo->oi_md->lsm_oinfo[i];
1020                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1021                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1022                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1023                         if (oinfo->oi_oa->o_valid & OBD_MD_FLEPOCH)
1024                                 /* SOM requires all the OSTs to be active. */
1025                                 GOTO(out_set, rc = -EIO);
1026                         continue;
1027                 }
1028
1029                 OBD_ALLOC(req, sizeof(*req));
1030                 if (req == NULL)
1031                         GOTO(out_set, rc = -ENOMEM);
1032
1033                 req->rq_stripe = i;
1034                 req->rq_idx = loi->loi_ost_idx;
1035
1036                 OBDO_ALLOC(req->rq_oi.oi_oa);
1037                 if (req->rq_oi.oi_oa == NULL) {
1038                         OBD_FREE(req, sizeof(*req));
1039                         GOTO(out_set, rc = -ENOMEM);
1040                 }
1041                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1042                        sizeof(*req->rq_oi.oi_oa));
1043                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1044                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1045                 req->rq_oi.oi_cb_up = cb_getattr_update;
1046                 req->rq_oi.oi_capa = oinfo->oi_capa;
1047
1048                 lov_set_add_req(req, set);
1049         }
1050         if (!set->set_count)
1051                 GOTO(out_set, rc = -EIO);
1052         *reqset = set;
1053         RETURN(rc);
1054 out_set:
1055         lov_fini_getattr_set(set);
1056         RETURN(rc);
1057 }
1058
1059 int lov_fini_destroy_set(struct lov_request_set *set)
1060 {
1061         ENTRY;
1062
1063         if (set == NULL)
1064                 RETURN(0);
1065         LASSERT(set->set_exp);
1066         if (cfs_atomic_read(&set->set_completes)) {
1067                 /* FIXME update qos data here */
1068         }
1069
1070         lov_put_reqset(set);
1071
1072         RETURN(0);
1073 }
1074
1075 int lov_prep_destroy_set(struct obd_export *exp, struct obd_info *oinfo,
1076                          struct obdo *src_oa, struct lov_stripe_md *lsm,
1077                          struct obd_trans_info *oti,
1078                          struct lov_request_set **reqset)
1079 {
1080         struct lov_request_set *set;
1081         struct lov_obd *lov = &exp->exp_obd->u.lov;
1082         int rc = 0, i;
1083         ENTRY;
1084
1085         OBD_ALLOC(set, sizeof(*set));
1086         if (set == NULL)
1087                 RETURN(-ENOMEM);
1088         lov_init_set(set);
1089
1090         set->set_exp = exp;
1091         set->set_oi = oinfo;
1092         set->set_oi->oi_md = lsm;
1093         set->set_oi->oi_oa = src_oa;
1094         set->set_oti = oti;
1095         if (oti != NULL && src_oa->o_valid & OBD_MD_FLCOOKIE)
1096                 set->set_cookies = oti->oti_logcookies;
1097
1098         for (i = 0; i < lsm->lsm_stripe_count; i++) {
1099                 struct lov_oinfo *loi;
1100                 struct lov_request *req;
1101
1102                 loi = lsm->lsm_oinfo[i];
1103                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1104                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1105                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1106                         continue;
1107                 }
1108
1109                 OBD_ALLOC(req, sizeof(*req));
1110                 if (req == NULL)
1111                         GOTO(out_set, rc = -ENOMEM);
1112
1113                 req->rq_stripe = i;
1114                 req->rq_idx = loi->loi_ost_idx;
1115
1116                 OBDO_ALLOC(req->rq_oi.oi_oa);
1117                 if (req->rq_oi.oi_oa == NULL) {
1118                         OBD_FREE(req, sizeof(*req));
1119                         GOTO(out_set, rc = -ENOMEM);
1120                 }
1121                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
1122                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1123                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1124                 lov_set_add_req(req, set);
1125         }
1126         if (!set->set_count)
1127                 GOTO(out_set, rc = -EIO);
1128         *reqset = set;
1129         RETURN(rc);
1130 out_set:
1131         lov_fini_destroy_set(set);
1132         RETURN(rc);
1133 }
1134
1135 int lov_fini_setattr_set(struct lov_request_set *set)
1136 {
1137         int rc = 0;
1138         ENTRY;
1139
1140         if (set == NULL)
1141                 RETURN(0);
1142         LASSERT(set->set_exp);
1143         if (cfs_atomic_read(&set->set_completes)) {
1144                 rc = common_attr_done(set);
1145                 /* FIXME update qos data here */
1146         }
1147
1148         lov_put_reqset(set);
1149         RETURN(rc);
1150 }
1151
1152 int lov_update_setattr_set(struct lov_request_set *set,
1153                            struct lov_request *req, int rc)
1154 {
1155         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1156         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1157         ENTRY;
1158
1159         lov_update_set(set, req, rc);
1160
1161         /* grace error on inactive ost */
1162         if (rc && !(lov->lov_tgts[req->rq_idx] &&
1163                     lov->lov_tgts[req->rq_idx]->ltd_active))
1164                 rc = 0;
1165
1166         if (rc == 0) {
1167                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCTIME)
1168                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_ctime =
1169                                 req->rq_oi.oi_oa->o_ctime;
1170                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLMTIME)
1171                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_mtime =
1172                                 req->rq_oi.oi_oa->o_mtime;
1173                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLATIME)
1174                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_atime =
1175                                 req->rq_oi.oi_oa->o_atime;
1176         }
1177
1178         RETURN(rc);
1179 }
1180
1181 /* The callback for osc_setattr_async that finilizes a request info when a
1182  * response is received. */
1183 static int cb_setattr_update(void *cookie, int rc)
1184 {
1185         struct obd_info *oinfo = cookie;
1186         struct lov_request *lovreq;
1187         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1188         return lov_update_setattr_set(lovreq->rq_rqset, lovreq, rc);
1189 }
1190
1191 int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
1192                          struct obd_trans_info *oti,
1193                          struct lov_request_set **reqset)
1194 {
1195         struct lov_request_set *set;
1196         struct lov_obd *lov = &exp->exp_obd->u.lov;
1197         int rc = 0, i;
1198         ENTRY;
1199
1200         OBD_ALLOC(set, sizeof(*set));
1201         if (set == NULL)
1202                 RETURN(-ENOMEM);
1203         lov_init_set(set);
1204
1205         set->set_exp = exp;
1206         set->set_oti = oti;
1207         set->set_oi = oinfo;
1208         if (oti != NULL && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
1209                 set->set_cookies = oti->oti_logcookies;
1210
1211         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1212                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1213                 struct lov_request *req;
1214
1215                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1216                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1217                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1218                         continue;
1219                 }
1220
1221                 OBD_ALLOC(req, sizeof(*req));
1222                 if (req == NULL)
1223                         GOTO(out_set, rc = -ENOMEM);
1224                 req->rq_stripe = i;
1225                 req->rq_idx = loi->loi_ost_idx;
1226
1227                 OBDO_ALLOC(req->rq_oi.oi_oa);
1228                 if (req->rq_oi.oi_oa == NULL) {
1229                         OBD_FREE(req, sizeof(*req));
1230                         GOTO(out_set, rc = -ENOMEM);
1231                 }
1232                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1233                        sizeof(*req->rq_oi.oi_oa));
1234                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1235                 req->rq_oi.oi_oa->o_seq= loi->loi_seq;
1236                 req->rq_oi.oi_oa->o_stripe_idx = i;
1237                 req->rq_oi.oi_cb_up = cb_setattr_update;
1238                 req->rq_oi.oi_capa = oinfo->oi_capa;
1239
1240                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
1241                         int off = lov_stripe_offset(oinfo->oi_md,
1242                                                     oinfo->oi_oa->o_size, i,
1243                                                     &req->rq_oi.oi_oa->o_size);
1244
1245                         if (off < 0 && req->rq_oi.oi_oa->o_size)
1246                                 req->rq_oi.oi_oa->o_size--;
1247
1248                         CDEBUG(D_INODE, "stripe %d has size "LPU64"/"LPU64"\n",
1249                                i, req->rq_oi.oi_oa->o_size,
1250                                oinfo->oi_oa->o_size);
1251                 }
1252                 lov_set_add_req(req, set);
1253         }
1254         if (!set->set_count)
1255                 GOTO(out_set, rc = -EIO);
1256         *reqset = set;
1257         RETURN(rc);
1258 out_set:
1259         lov_fini_setattr_set(set);
1260         RETURN(rc);
1261 }
1262
1263 int lov_fini_punch_set(struct lov_request_set *set)
1264 {
1265         int rc = 0;
1266         ENTRY;
1267
1268         if (set == NULL)
1269                 RETURN(0);
1270         LASSERT(set->set_exp);
1271         if (cfs_atomic_read(&set->set_completes)) {
1272                 rc = -EIO;
1273                 /* FIXME update qos data here */
1274                 if (cfs_atomic_read(&set->set_success))
1275                         rc = common_attr_done(set);
1276         }
1277
1278         lov_put_reqset(set);
1279
1280         RETURN(rc);
1281 }
1282
1283 int lov_update_punch_set(struct lov_request_set *set,
1284                          struct lov_request *req, int rc)
1285 {
1286         struct lov_obd *lov = &req->rq_rqset->set_exp->exp_obd->u.lov;
1287         struct lov_stripe_md *lsm = req->rq_rqset->set_oi->oi_md;
1288         ENTRY;
1289
1290         lov_update_set(set, req, rc);
1291
1292         /* grace error on inactive ost */
1293         if (rc && !lov->lov_tgts[req->rq_idx]->ltd_active)
1294                 rc = 0;
1295
1296         if (rc == 0) {
1297                 lov_stripe_lock(lsm);
1298                 if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) {
1299                         lsm->lsm_oinfo[req->rq_stripe]->loi_lvb.lvb_blocks =
1300                                 req->rq_oi.oi_oa->o_blocks;
1301                 }
1302
1303                 /* Do we need to update lvb_size here? It needn't because
1304                  * it have been done in ll_truncate(). -jay */
1305                 lov_stripe_unlock(lsm);
1306         }
1307
1308         RETURN(rc);
1309 }
1310
1311 /* The callback for osc_punch that finilizes a request info when a response
1312  * is received. */
1313 static int cb_update_punch(void *cookie, int rc)
1314 {
1315         struct obd_info *oinfo = cookie;
1316         struct lov_request *lovreq;
1317         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1318         return lov_update_punch_set(lovreq->rq_rqset, lovreq, rc);
1319 }
1320
1321 int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
1322                        struct obd_trans_info *oti,
1323                        struct lov_request_set **reqset)
1324 {
1325         struct lov_request_set *set;
1326         struct lov_obd *lov = &exp->exp_obd->u.lov;
1327         int rc = 0, i;
1328         ENTRY;
1329
1330         OBD_ALLOC(set, sizeof(*set));
1331         if (set == NULL)
1332                 RETURN(-ENOMEM);
1333         lov_init_set(set);
1334
1335         set->set_oi = oinfo;
1336         set->set_exp = exp;
1337
1338         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1339                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1340                 struct lov_request *req;
1341                 obd_off rs, re;
1342
1343                 if (!lov_stripe_intersects(oinfo->oi_md, i,
1344                                            oinfo->oi_policy.l_extent.start,
1345                                            oinfo->oi_policy.l_extent.end,
1346                                            &rs, &re))
1347                         continue;
1348
1349                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1350                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1351                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1352                         GOTO(out_set, rc = -EIO);
1353                 }
1354
1355                 OBD_ALLOC(req, sizeof(*req));
1356                 if (req == NULL)
1357                         GOTO(out_set, rc = -ENOMEM);
1358                 req->rq_stripe = i;
1359                 req->rq_idx = loi->loi_ost_idx;
1360
1361                 OBDO_ALLOC(req->rq_oi.oi_oa);
1362                 if (req->rq_oi.oi_oa == NULL) {
1363                         OBD_FREE(req, sizeof(*req));
1364                         GOTO(out_set, rc = -ENOMEM);
1365                 }
1366                 memcpy(req->rq_oi.oi_oa, oinfo->oi_oa,
1367                        sizeof(*req->rq_oi.oi_oa));
1368                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1369                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1370                 req->rq_oi.oi_oa->o_valid |= OBD_MD_FLGROUP;
1371
1372                 req->rq_oi.oi_oa->o_stripe_idx = i;
1373                 req->rq_oi.oi_cb_up = cb_update_punch;
1374
1375                 req->rq_oi.oi_policy.l_extent.start = rs;
1376                 req->rq_oi.oi_policy.l_extent.end = re;
1377                 req->rq_oi.oi_policy.l_extent.gid = -1;
1378
1379                 req->rq_oi.oi_capa = oinfo->oi_capa;
1380
1381                 lov_set_add_req(req, set);
1382         }
1383         if (!set->set_count)
1384                 GOTO(out_set, rc = -EIO);
1385         *reqset = set;
1386         RETURN(rc);
1387 out_set:
1388         lov_fini_punch_set(set);
1389         RETURN(rc);
1390 }
1391
1392 int lov_fini_sync_set(struct lov_request_set *set)
1393 {
1394         int rc = 0;
1395         ENTRY;
1396
1397         if (set == NULL)
1398                 RETURN(0);
1399         LASSERT(set->set_exp);
1400         if (cfs_atomic_read(&set->set_completes)) {
1401                 if (!cfs_atomic_read(&set->set_success))
1402                         rc = -EIO;
1403                 /* FIXME update qos data here */
1404         }
1405
1406         lov_put_reqset(set);
1407
1408         RETURN(rc);
1409 }
1410
1411 /* The callback for osc_sync that finilizes a request info when a
1412  * response is recieved. */
1413 static int cb_sync_update(void *cookie, int rc)
1414 {
1415         struct obd_info *oinfo = cookie;
1416         struct lov_request *lovreq;
1417
1418         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1419         return lov_update_common_set(lovreq->rq_rqset, lovreq, rc);
1420 }
1421
1422 int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
1423                       obd_off start, obd_off end,
1424                       struct lov_request_set **reqset)
1425 {
1426         struct lov_request_set *set;
1427         struct lov_obd *lov = &exp->exp_obd->u.lov;
1428         int rc = 0, i;
1429         ENTRY;
1430
1431         OBD_ALLOC_PTR(set);
1432         if (set == NULL)
1433                 RETURN(-ENOMEM);
1434         lov_init_set(set);
1435
1436         set->set_exp = exp;
1437         set->set_oi = oinfo;
1438
1439         for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {
1440                 struct lov_oinfo *loi = oinfo->oi_md->lsm_oinfo[i];
1441                 struct lov_request *req;
1442                 obd_off rs, re;
1443
1444                 if (!lov->lov_tgts[loi->loi_ost_idx] ||
1445                     !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {
1446                         CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
1447                         continue;
1448                 }
1449
1450                 if (!lov_stripe_intersects(oinfo->oi_md, i, start, end, &rs,
1451                                            &re))
1452                         continue;
1453
1454                 OBD_ALLOC_PTR(req);
1455                 if (req == NULL)
1456                         GOTO(out_set, rc = -ENOMEM);
1457                 req->rq_stripe = i;
1458                 req->rq_idx = loi->loi_ost_idx;
1459
1460                 OBDO_ALLOC(req->rq_oi.oi_oa);
1461                 if (req->rq_oi.oi_oa == NULL) {
1462                         OBD_FREE(req, sizeof(*req));
1463                         GOTO(out_set, rc = -ENOMEM);
1464                 }
1465                 *req->rq_oi.oi_oa = *oinfo->oi_oa;
1466                 req->rq_oi.oi_oa->o_id = loi->loi_id;
1467                 req->rq_oi.oi_oa->o_seq = loi->loi_seq;
1468                 req->rq_oi.oi_oa->o_stripe_idx = i;
1469
1470                 req->rq_oi.oi_policy.l_extent.start = rs;
1471                 req->rq_oi.oi_policy.l_extent.end = re;
1472                 req->rq_oi.oi_policy.l_extent.gid = -1;
1473                 req->rq_oi.oi_cb_up = cb_sync_update;
1474
1475                 lov_set_add_req(req, set);
1476         }
1477         if (!set->set_count)
1478                 GOTO(out_set, rc = -EIO);
1479         *reqset = set;
1480         RETURN(rc);
1481 out_set:
1482         lov_fini_sync_set(set);
1483         RETURN(rc);
1484 }
1485
1486 #define LOV_U64_MAX ((__u64)~0ULL)
1487 #define LOV_SUM_MAX(tot, add)                                           \
1488         do {                                                            \
1489                 if ((tot) + (add) < (tot))                              \
1490                         (tot) = LOV_U64_MAX;                            \
1491                 else                                                    \
1492                         (tot) += (add);                                 \
1493         } while(0)
1494
1495 int lov_fini_statfs(struct obd_device *obd, struct obd_statfs *osfs,int success)
1496 {
1497         ENTRY;
1498
1499         if (success) {
1500                 __u32 expected_stripes = lov_get_stripecnt(&obd->u.lov,
1501                                                            LOV_MAGIC, 0);
1502                 if (osfs->os_files != LOV_U64_MAX)
1503                         do_div(osfs->os_files, expected_stripes);
1504                 if (osfs->os_ffree != LOV_U64_MAX)
1505                         do_div(osfs->os_ffree, expected_stripes);
1506
1507                 cfs_spin_lock(&obd->obd_osfs_lock);
1508                 memcpy(&obd->obd_osfs, osfs, sizeof(*osfs));
1509                 obd->obd_osfs_age = cfs_time_current_64();
1510                 cfs_spin_unlock(&obd->obd_osfs_lock);
1511                 RETURN(0);
1512         }
1513
1514         RETURN(-EIO);
1515 }
1516
1517 int lov_fini_statfs_set(struct lov_request_set *set)
1518 {
1519         int rc = 0;
1520         ENTRY;
1521
1522         if (set == NULL)
1523                 RETURN(0);
1524
1525         if (cfs_atomic_read(&set->set_completes)) {
1526                 rc = lov_fini_statfs(set->set_obd, set->set_oi->oi_osfs,
1527                                      cfs_atomic_read(&set->set_success));
1528         }
1529         lov_put_reqset(set);
1530         RETURN(rc);
1531 }
1532
1533 void lov_update_statfs(struct obd_statfs *osfs, struct obd_statfs *lov_sfs,
1534                        int success)
1535 {
1536         int shift = 0, quit = 0;
1537         __u64 tmp;
1538
1539         if (success == 0) {
1540                 memcpy(osfs, lov_sfs, sizeof(*lov_sfs));
1541         } else {
1542                 if (osfs->os_bsize != lov_sfs->os_bsize) {
1543                         /* assume all block sizes are always powers of 2 */
1544                         /* get the bits difference */
1545                         tmp = osfs->os_bsize | lov_sfs->os_bsize;
1546                         for (shift = 0; shift <= 64; ++shift) {
1547                                 if (tmp & 1) {
1548                                         if (quit)
1549                                                 break;
1550                                         else
1551                                                 quit = 1;
1552                                         shift = 0;
1553                                 }
1554                                 tmp >>= 1;
1555                         }
1556                 }
1557
1558                 if (osfs->os_bsize < lov_sfs->os_bsize) {
1559                         osfs->os_bsize = lov_sfs->os_bsize;
1560
1561                         osfs->os_bfree  >>= shift;
1562                         osfs->os_bavail >>= shift;
1563                         osfs->os_blocks >>= shift;
1564                 } else if (shift != 0) {
1565                         lov_sfs->os_bfree  >>= shift;
1566                         lov_sfs->os_bavail >>= shift;
1567                         lov_sfs->os_blocks >>= shift;
1568                 }
1569 #ifdef MIN_DF
1570                 /* Sandia requested that df (and so, statfs) only
1571                    returned minimal available space on
1572                    a single OST, so people would be able to
1573                    write this much data guaranteed. */
1574                 if (osfs->os_bavail > lov_sfs->os_bavail) {
1575                         /* Presumably if new bavail is smaller,
1576                            new bfree is bigger as well */
1577                         osfs->os_bfree = lov_sfs->os_bfree;
1578                         osfs->os_bavail = lov_sfs->os_bavail;
1579                 }
1580 #else
1581                 osfs->os_bfree += lov_sfs->os_bfree;
1582                 osfs->os_bavail += lov_sfs->os_bavail;
1583 #endif
1584                 osfs->os_blocks += lov_sfs->os_blocks;
1585                 /* XXX not sure about this one - depends on policy.
1586                  *   - could be minimum if we always stripe on all OBDs
1587                  *     (but that would be wrong for any other policy,
1588                  *     if one of the OBDs has no more objects left)
1589                  *   - could be sum if we stripe whole objects
1590                  *   - could be average, just to give a nice number
1591                  *
1592                  * To give a "reasonable" (if not wholly accurate)
1593                  * number, we divide the total number of free objects
1594                  * by expected stripe count (watch out for overflow).
1595                  */
1596                 LOV_SUM_MAX(osfs->os_files, lov_sfs->os_files);
1597                 LOV_SUM_MAX(osfs->os_ffree, lov_sfs->os_ffree);
1598         }
1599 }
1600
1601 /* The callback for osc_statfs_async that finilizes a request info when a
1602  * response is received. */
1603 static int cb_statfs_update(void *cookie, int rc)
1604 {
1605         struct obd_info *oinfo = cookie;
1606         struct lov_request *lovreq;
1607         struct lov_request_set *set;
1608         struct obd_statfs *osfs, *lov_sfs;
1609         struct lov_obd *lov;
1610         struct lov_tgt_desc *tgt;
1611         struct obd_device *lovobd, *tgtobd;
1612         int success;
1613         ENTRY;
1614
1615         lovreq = container_of(oinfo, struct lov_request, rq_oi);
1616         set = lovreq->rq_rqset;
1617         lovobd = set->set_obd;
1618         lov = &lovobd->u.lov;
1619         osfs = set->set_oi->oi_osfs;
1620         lov_sfs = oinfo->oi_osfs;
1621         success = cfs_atomic_read(&set->set_success);
1622         /* XXX: the same is done in lov_update_common_set, however
1623            lovset->set_exp is not initialized. */
1624         lov_update_set(set, lovreq, rc);
1625         if (rc)
1626                 GOTO(out, rc);
1627
1628         obd_getref(lovobd);
1629         tgt = lov->lov_tgts[lovreq->rq_idx];
1630         if (!tgt || !tgt->ltd_active)
1631                 GOTO(out_update, rc);
1632
1633         tgtobd = class_exp2obd(tgt->ltd_exp);
1634         cfs_spin_lock(&tgtobd->obd_osfs_lock);
1635         memcpy(&tgtobd->obd_osfs, lov_sfs, sizeof(*lov_sfs));
1636         if ((oinfo->oi_flags & OBD_STATFS_FROM_CACHE) == 0)
1637                 tgtobd->obd_osfs_age = cfs_time_current_64();
1638         cfs_spin_unlock(&tgtobd->obd_osfs_lock);
1639
1640 out_update:
1641         lov_update_statfs(osfs, lov_sfs, success);
1642         qos_update(lov);
1643         obd_putref(lovobd);
1644
1645 out:
1646         if (set->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
1647             lov_finished_set(set)) {
1648                 lov_statfs_interpret(NULL, set, set->set_count !=
1649                                      cfs_atomic_read(&set->set_success));
1650                 if (lov->lov_qos.lq_statfs_in_progress)
1651                         qos_statfs_done(lov);
1652         }
1653
1654         RETURN(0);
1655 }
1656
1657 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
1658                         struct lov_request_set **reqset)
1659 {
1660         struct lov_request_set *set;
1661         struct lov_obd *lov = &obd->u.lov;
1662         int rc = 0, i;
1663         ENTRY;
1664
1665         OBD_ALLOC(set, sizeof(*set));
1666         if (set == NULL)
1667                 RETURN(-ENOMEM);
1668         lov_init_set(set);
1669
1670         set->set_obd = obd;
1671         set->set_oi = oinfo;
1672
1673         /* We only get block data from the OBD */
1674         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
1675                 struct lov_request *req;
1676
1677                 if (!lov->lov_tgts[i] || (!lov->lov_tgts[i]->ltd_active
1678                                           && (oinfo->oi_flags & OBD_STATFS_NODELAY))) {
1679                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
1680                         continue;
1681                 }
1682
1683                 /* skip targets that have been explicitely disabled by the
1684                  * administrator */
1685                 if (!lov->lov_tgts[i]->ltd_exp) {
1686                         CDEBUG(D_HA, "lov idx %d administratively disabled\n", i);
1687                         continue;
1688                 }
1689
1690                 OBD_ALLOC(req, sizeof(*req));
1691                 if (req == NULL)
1692                         GOTO(out_set, rc = -ENOMEM);
1693
1694                 OBD_ALLOC(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs));
1695                 if (req->rq_oi.oi_osfs == NULL) {
1696                         OBD_FREE(req, sizeof(*req));
1697                         GOTO(out_set, rc = -ENOMEM);
1698                 }
1699
1700                 req->rq_idx = i;
1701                 req->rq_oi.oi_cb_up = cb_statfs_update;
1702                 req->rq_oi.oi_flags = oinfo->oi_flags;
1703
1704                 lov_set_add_req(req, set);
1705         }
1706         if (!set->set_count)
1707                 GOTO(out_set, rc = -EIO);
1708         *reqset = set;
1709         RETURN(rc);
1710 out_set:
1711         lov_fini_statfs_set(set);
1712         RETURN(rc);
1713 }