Whamcloud - gitweb
LU-3529 lod: create striped directory
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include "osp_internal.h"
35
36 struct osp_async_update_args {
37         struct update_request   *oaua_update;
38 };
39
40 struct osp_async_update_item {
41         struct list_head                 oaui_list;
42         struct osp_object               *oaui_obj;
43         void                            *oaui_data;
44         osp_async_update_interpterer_t   oaui_interpterer;
45 };
46
47 static struct osp_async_update_item *
48 osp_async_update_item_init(struct osp_object *obj, void *data,
49                            osp_async_update_interpterer_t interpterer)
50 {
51         struct osp_async_update_item *oaui;
52
53         OBD_ALLOC_PTR(oaui);
54         if (oaui == NULL)
55                 return NULL;
56
57         lu_object_get(osp2lu_obj(obj));
58         INIT_LIST_HEAD(&oaui->oaui_list);
59         oaui->oaui_obj = obj;
60         oaui->oaui_data = data;
61         oaui->oaui_interpterer = interpterer;
62
63         return oaui;
64 }
65
66 static void osp_async_update_item_fini(const struct lu_env *env,
67                                        struct osp_async_update_item *oaui)
68 {
69         LASSERT(list_empty(&oaui->oaui_list));
70
71         lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
72         OBD_FREE_PTR(oaui);
73 }
74
75 static int osp_async_update_interpret(const struct lu_env *env,
76                                       struct ptlrpc_request *req,
77                                       void *arg, int rc)
78 {
79         struct update_reply             *reply  = NULL;
80         struct osp_async_update_args    *oaua   = arg;
81         struct update_request           *update = oaua->oaua_update;
82         struct osp_async_update_item    *oaui;
83         struct osp_async_update_item    *next;
84         int                              count  = 0;
85         int                              index  = 0;
86         int                              rc1    = 0;
87
88         if (rc == 0 || req->rq_repmsg != NULL) {
89                 reply = req_capsule_server_sized_get(&req->rq_pill,
90                                                      &RMF_UPDATE_REPLY,
91                                                      UPDATE_BUFFER_SIZE);
92                 if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
93                         rc1 = -EPROTO;
94                 else
95                         count = reply->ur_count;
96         } else {
97                 rc1 = rc;
98         }
99
100         list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
101                 list_del_init(&oaui->oaui_list);
102                 if (index < count && reply->ur_lens[index] > 0) {
103                         char *ptr = update_get_buf_internal(reply, index, NULL);
104
105                         LASSERT(ptr != NULL);
106
107                         rc1 = le32_to_cpu(*(int *)ptr);
108                 } else {
109                         rc1 = rc;
110                         if (unlikely(rc1 == 0))
111                                 rc1 = -EINVAL;
112                 }
113
114                 oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
115                                        oaui->oaui_data, index, rc1);
116                 osp_async_update_item_fini(env, oaui);
117                 index++;
118         }
119
120         out_destroy_update_req(update);
121
122         return 0;
123 }
124
125 int osp_unplug_async_update(const struct lu_env *env,
126                             struct osp_device *osp,
127                             struct update_request *update)
128 {
129         struct osp_async_update_args    *args;
130         struct ptlrpc_request           *req = NULL;
131         int                              rc;
132
133         rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
134                                  update->ur_buf, UPDATE_BUFFER_SIZE, &req);
135         if (rc != 0) {
136                 struct osp_async_update_item *oaui;
137                 struct osp_async_update_item *next;
138
139                 list_for_each_entry_safe(oaui, next,
140                                          &update->ur_cb_items, oaui_list) {
141                         list_del_init(&oaui->oaui_list);
142                         oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
143                                                oaui->oaui_data, 0, rc);
144                         osp_async_update_item_fini(env, oaui);
145                 }
146                 out_destroy_update_req(update);
147         } else {
148                 LASSERT(list_empty(&update->ur_list));
149
150                 args = ptlrpc_req_async_args(req);
151                 args->oaua_update = update;
152                 req->rq_interpret_reply = osp_async_update_interpret;
153                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
154         }
155
156         return rc;
157 }
158
159 /* with osp::opd_async_requests_mutex held */
160 struct update_request *
161 osp_find_or_create_async_update_request(struct osp_device *osp)
162 {
163         struct update_request *update = osp->opd_async_requests;
164
165         if (update != NULL)
166                 return update;
167
168         update = out_create_update_req(&osp->opd_dt_dev);
169         if (!IS_ERR(update))
170                 osp->opd_async_requests = update;
171
172         return update;
173 }
174
175 /* with osp::opd_async_requests_mutex held */
176 int osp_insert_async_update(const struct lu_env *env,
177                             struct update_request *update, int op,
178                             struct osp_object *obj, int count,
179                             int *lens, const char **bufs, void *data,
180                             osp_async_update_interpterer_t interpterer)
181 {
182         struct osp_async_update_item *oaui;
183         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
184         int                           rc  = 0;
185         ENTRY;
186
187         oaui = osp_async_update_item_init(obj, data, interpterer);
188         if (oaui == NULL)
189                 RETURN(-ENOMEM);
190
191 again:
192         rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
193                                count, lens, bufs);
194         if (rc == -E2BIG) {
195                 osp->opd_async_requests = NULL;
196                 mutex_unlock(&osp->opd_async_requests_mutex);
197
198                 rc = osp_unplug_async_update(env, osp, update);
199                 mutex_lock(&osp->opd_async_requests_mutex);
200                 if (rc != 0)
201                         GOTO(out, rc);
202
203                 update = osp_find_or_create_async_update_request(osp);
204                 if (IS_ERR(update))
205                         GOTO(out, rc = PTR_ERR(update));
206
207                 goto again;
208         }
209
210         if (rc == 0)
211                 list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
212
213         GOTO(out, rc);
214
215 out:
216         if (rc != 0)
217                 osp_async_update_item_fini(env, oaui);
218
219         return rc;
220 }
221
222 /**
223  * If the transaction creation goes to OSP, it means the update
224  * in this transaction only includes remote UPDATE. It is only
225  * used by LFSCK right now.
226  **/
227 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
228 {
229         struct thandle *th = NULL;
230         struct thandle_update *tu = NULL;
231         int rc;
232
233         OBD_ALLOC_PTR(th);
234         if (unlikely(th == NULL))
235                 GOTO(out, rc = -ENOMEM);
236
237         th->th_dev = d;
238         th->th_tags = LCT_TX_HANDLE;
239         atomic_set(&th->th_refc, 1);
240         th->th_alloc_size = sizeof(*th);
241
242         OBD_ALLOC_PTR(tu);
243         if (tu == NULL)
244                 GOTO(out, rc = -ENOMEM);
245
246         INIT_LIST_HEAD(&tu->tu_remote_update_list);
247         tu->tu_only_remote_trans = 1;
248 out:
249         if (rc != 0) {
250                 if (tu != NULL)
251                         OBD_FREE_PTR(tu);
252                 if (th != NULL)
253                         OBD_FREE_PTR(th);
254                 th = ERR_PTR(rc);
255         }
256
257         return th;
258 }
259
260 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
261                              struct update_request *update, struct thandle *th)
262 {
263         struct thandle_update   *tu = th->th_update;
264         int                     rc = 0;
265
266         LASSERT(tu != NULL);
267
268         /* If the transaction only includes remote update, it should
269          * still be asynchronous */
270         if (tu->tu_only_remote_trans) {
271                 struct osp_async_update_args    *args;
272                 struct ptlrpc_request           *req;
273
274                 list_del_init(&update->ur_list);
275                 rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
276                                          update->ur_buf,
277                                          UPDATE_BUFFER_SIZE, &req);
278                 if (rc == 0) {
279                         args = ptlrpc_req_async_args(req);
280                         args->oaua_update = update;
281                         req->rq_interpret_reply =
282                                 osp_async_update_interpret;
283                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
284                 } else {
285                         out_destroy_update_req(update);
286                 }
287         } else {
288                 /* Before we support async update, the cross MDT transaction
289                  * has to been synchronized */
290                 th->th_sync = 1;
291                 rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
292                                      update, NULL);
293         }
294
295         return rc;
296 }
297
298 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
299                     struct thandle *th)
300 {
301         struct thandle_update *tu = th->th_update;
302         struct update_request *update;
303         int rc = 0;
304
305         if (tu == NULL)
306                 return rc;
307
308         /* Check whether there are updates related with this OSP */
309         update = out_find_update(tu, dt);
310         if (update == NULL)
311                 return rc;
312
313         /* Note: some updates needs to send before local transaction,
314          * some needs to send after local transaction.
315          *
316          * If the transaction only includes remote updates, it will
317          * send updates to remote MDT in osp_trans_stop.
318          *
319          * If it is remote create, it will send the remote req after
320          * local transaction. i.e. create the object locally first,
321          * then insert the name entry.
322          *
323          * If it is remote unlink, it will send the remote req before
324          * the local transaction, i.e. delete the name entry remote
325          * first, then destroy the local object. */
326         if (!tu->tu_only_remote_trans && !tu->tu_sent_after_local_trans)
327                 rc = osp_trans_trigger(env, dt2osp_dev(dt), update, th);
328
329         return rc;
330 }
331
332 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
333                    struct thandle *th)
334 {
335         struct thandle_update   *tu = th->th_update;
336         struct update_request   *update;
337         int rc = 0;
338
339         LASSERT(tu != NULL);
340         /* Check whether there are updates related with this OSP */
341         update = out_find_update(tu, dt);
342         if (update == NULL)
343                 return rc;
344
345         if (update->ur_buf->ub_count == 0)
346                 GOTO(free, rc);
347
348         if (tu->tu_only_remote_trans) {
349                 if (th->th_result == 0)
350                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
351                                                update, th);
352                 else
353                         rc = th->th_result;
354         } else {
355                 if (tu->tu_sent_after_local_trans)
356                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
357                                                update, th);
358                 rc = update->ur_rc;
359         }
360 free:
361         out_destroy_update_req(update);
362         thandle_put(th);
363         return rc;
364 }