Whamcloud - gitweb
LU-3529 lod: create striped directory
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37
38 struct update_request *out_find_update(struct thandle_update *tu,
39                                        struct dt_device *dt_dev)
40 {
41         struct update_request   *update;
42
43         LASSERT(tu != NULL);
44         list_for_each_entry(update, &tu->tu_remote_update_list, ur_list) {
45                 if (update->ur_dt == dt_dev)
46                         return update;
47         }
48
49         return NULL;
50 }
51 EXPORT_SYMBOL(out_find_update);
52
53 void out_destroy_update_req(struct update_request *update)
54 {
55         if (update == NULL)
56                 return;
57
58         LASSERT(list_empty(&update->ur_cb_items));
59
60         list_del(&update->ur_list);
61         if (update->ur_buf != NULL)
62                 OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
63
64         OBD_FREE_PTR(update);
65 }
66 EXPORT_SYMBOL(out_destroy_update_req);
67
68 struct update_request *out_create_update_req(struct dt_device *dt)
69 {
70         struct update_request *update;
71
72         OBD_ALLOC_PTR(update);
73         if (update == NULL)
74                 return ERR_PTR(-ENOMEM);
75
76         OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
77         if (update->ur_buf == NULL) {
78                 OBD_FREE_PTR(update);
79
80                 return ERR_PTR(-ENOMEM);
81         }
82
83         INIT_LIST_HEAD(&update->ur_list);
84         update->ur_dt = dt;
85         update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
86         update->ur_buf->ub_count = 0;
87         INIT_LIST_HEAD(&update->ur_cb_items);
88
89         return update;
90 }
91 EXPORT_SYMBOL(out_create_update_req);
92
93 /**
94  * Find or create one loc in th_dev/dev_obj_update for the update,
95  * Because only one thread can access this thandle, no need
96  * lock now.
97  */
98 struct update_request *out_find_create_update_loc(struct thandle *th,
99                                                   struct dt_object *dt)
100 {
101         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
102         struct thandle_update   *tu = th->th_update;
103         struct update_request   *update;
104         ENTRY;
105
106         if (tu == NULL) {
107                 OBD_ALLOC_PTR(tu);
108                 if (tu == NULL)
109                         RETURN(ERR_PTR(-ENOMEM));
110
111                 INIT_LIST_HEAD(&tu->tu_remote_update_list);
112                 tu->tu_sent_after_local_trans = 0;
113                 th->th_update = tu;
114         }
115
116         update = out_find_update(tu, dt_dev);
117         if (update != NULL)
118                 RETURN(update);
119
120         update = out_create_update_req(dt_dev);
121         if (IS_ERR(update))
122                 RETURN(update);
123
124         list_add_tail(&update->ur_list, &tu->tu_remote_update_list);
125
126         thandle_get(th);
127
128         RETURN(update);
129 }
130 EXPORT_SYMBOL(out_find_create_update_loc);
131
132 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
133                         const struct update_buf *ubuf, int ubuf_len,
134                         struct ptlrpc_request **reqp)
135 {
136         struct ptlrpc_request  *req;
137         struct update_buf      *tmp;
138         int                     rc;
139         ENTRY;
140
141         req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
142         if (req == NULL)
143                 RETURN(-ENOMEM);
144
145         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
146                              UPDATE_BUFFER_SIZE);
147
148         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
149         if (rc != 0) {
150                 ptlrpc_req_finished(req);
151                 RETURN(rc);
152         }
153
154         req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
155                              UPDATE_BUFFER_SIZE);
156
157         tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
158         memcpy(tmp, ubuf, ubuf_len);
159         ptlrpc_request_set_replen(req);
160         req->rq_request_portal = OUT_PORTAL;
161         req->rq_reply_portal = OSC_REPLY_PORTAL;
162         *reqp = req;
163
164         RETURN(rc);
165 }
166 EXPORT_SYMBOL(out_prep_update_req);
167
168 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
169                     struct update_request *update,
170                     struct ptlrpc_request **reqp)
171 {
172         struct ptlrpc_request   *req = NULL;
173         int                      rc;
174         ENTRY;
175
176         rc = out_prep_update_req(env, imp, update->ur_buf,
177                                  UPDATE_BUFFER_SIZE, &req);
178         if (rc != 0)
179                 RETURN(rc);
180
181         /* Note: some dt index api might return non-zero result here, like
182          * osd_index_ea_lookup, so we should only check rc < 0 here */
183         rc = ptlrpc_queue_wait(req);
184         if (rc < 0) {
185                 ptlrpc_req_finished(req);
186                 update->ur_rc = rc;
187                 RETURN(rc);
188         }
189
190         if (reqp != NULL) {
191                 *reqp = req;
192         } else {
193                 update->ur_rc = rc;
194                 ptlrpc_req_finished(req);
195         }
196
197         RETURN(rc);
198 }
199 EXPORT_SYMBOL(out_remote_sync);
200
201 int out_insert_update(const struct lu_env *env, struct update_request *update,
202                       int op, const struct lu_fid *fid, int count,
203                       int *lens, const char **bufs)
204 {
205         struct update_buf    *ubuf = update->ur_buf;
206         struct update        *obj_update;
207         char                 *ptr;
208         int                   i;
209         int                   update_length;
210         ENTRY;
211
212         obj_update = (struct update *)((char *)ubuf +
213                       cfs_size_round(update_buf_size(ubuf)));
214
215         /* Check update size to make sure it can fit into the buffer */
216         update_length = cfs_size_round(offsetof(struct update,
217                                        u_bufs[0]));
218         for (i = 0; i < count; i++)
219                 update_length += cfs_size_round(lens[i]);
220
221         if (cfs_size_round(update_buf_size(ubuf)) + update_length >
222             UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS)
223                 RETURN(-E2BIG);
224
225         if (count > UPDATE_BUF_COUNT)
226                 RETURN(-E2BIG);
227
228         /* fill the update into the update buffer */
229         fid_cpu_to_le(&obj_update->u_fid, fid);
230         obj_update->u_type = cpu_to_le32(op);
231         obj_update->u_batchid = update->ur_batchid;
232         for (i = 0; i < count; i++)
233                 obj_update->u_lens[i] = cpu_to_le32(lens[i]);
234
235         ptr = (char *)obj_update +
236                         cfs_size_round(offsetof(struct update, u_bufs[0]));
237         for (i = 0; i < count; i++)
238                 LOGL(bufs[i], lens[i], ptr);
239
240         ubuf->ub_count++;
241
242         CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
243                update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
244                ubuf->ub_count, op, count, update_buf_size(ubuf));
245
246         RETURN(0);
247 }
248 EXPORT_SYMBOL(out_insert_update);