Skip to content

Commit c8b4432

Browse files
authored
[Programming]Add optimize attribute checker (#2482)
* add optimize attr checker * add optimize attr checker * remove is_pai * follow parts of comments
1 parent c33552b commit c8b4432

9 files changed

Lines changed: 193 additions & 70 deletions

File tree

pkg/codegen/optimize/codegen.go

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,85 @@ package optimize
1515

1616
import (
1717
"bytes"
18+
"encoding/json"
1819
"fmt"
20+
"sqlflow.org/sqlflow/pkg/attribute"
1921
"sqlflow.org/sqlflow/pkg/ir"
2022
pb "sqlflow.org/sqlflow/pkg/proto"
2123
"strings"
2224
"text/template"
2325
)
2426

27+
func checkIsPositiveInteger(i interface{}, name string) error {
28+
if v, ok := i.(int); !ok || v <= 0 {
29+
return fmt.Errorf("%s should be positive integer", name)
30+
}
31+
return nil
32+
}
33+
34+
// TODO(sneaxiy): polish attribute codes
35+
var attributeDictionary = attribute.Dictionary{
36+
"data.enable_slice": {attribute.Bool, false, "Whether to enable data slicing", nil},
37+
"data.batch_size": {attribute.Int, -1, "Batch size when training", nil},
38+
"worker.num": {attribute.Int, 1, "Worker number", func(i interface{}) error {
39+
return checkIsPositiveInteger(i, "worker.num")
40+
}},
41+
"worker.core": {attribute.Int, 8, "Worker core number", func(i interface{}) error {
42+
return checkIsPositiveInteger(i, "worker.core")
43+
}},
44+
"worker.memory": {attribute.Int, 4096, "Worker memory", func(i interface{}) error {
45+
return checkIsPositiveInteger(i, "worker.memory")
46+
}},
47+
"solver.*": {attribute.Unknown, nil, "Solver options", nil},
48+
}
49+
50+
// InitializeAttributes initialize attributes in optimize clause IR
51+
func InitializeAttributes(stmt *ir.OptimizeStmt) error {
52+
attributeDictionary.FillDefaults(stmt.Attributes)
53+
err := attributeDictionary.Validate(stmt.Attributes)
54+
return err
55+
}
56+
2557
// GenerateOptFlowOptimizeCode generates optimize codes for execution
2658
// The returned value is (runnerProgramCode, submitProgramCode, error)
27-
func GenerateOptFlowOptimizeCode(optimStmt *ir.OptimizeStmt, session *pb.Session, dbName, tableName, runnerModuleName string, isPai bool) (string, string, error) {
59+
func GenerateOptFlowOptimizeCode(optimStmt *ir.OptimizeStmt, session *pb.Session, dbName, tableName, runnerModuleName string) (string, string, error) {
60+
const (
61+
dataAttrPrefix = "data."
62+
solverAttrPrefix = "solver."
63+
workerAttrPrefix = "worker."
64+
)
65+
2866
resultTable := optimStmt.ResultTable
2967
if !strings.Contains(resultTable, ".") {
3068
resultTable = fmt.Sprintf("%s.%s", dbName, resultTable)
3169
}
3270

71+
attrs := make(map[string]map[string]interface{})
72+
for k, v := range optimStmt.Attributes {
73+
prefix := ""
74+
if strings.HasPrefix(k, dataAttrPrefix) {
75+
prefix = dataAttrPrefix
76+
} else if strings.HasPrefix(k, solverAttrPrefix) {
77+
prefix = solverAttrPrefix
78+
} else if strings.HasPrefix(k, workerAttrPrefix) {
79+
prefix = workerAttrPrefix
80+
} else {
81+
return "", "", fmt.Errorf("unrecognized attribute %s", k)
82+
}
83+
84+
k = k[len(prefix):]
85+
prefixKey := prefix[0 : len(prefix)-1]
86+
if _, ok := attrs[prefixKey]; !ok {
87+
attrs[prefixKey] = make(map[string]interface{})
88+
}
89+
attrs[prefixKey][k] = v
90+
}
91+
92+
attrJSON, err := json.Marshal(attrs)
93+
if err != nil {
94+
return "", "", err
95+
}
96+
3397
filler := optimizeFiller{
3498
UserID: session.UserId,
3599
Variables: optimStmt.Variables,
@@ -39,9 +103,9 @@ func GenerateOptFlowOptimizeCode(optimStmt *ir.OptimizeStmt, session *pb.Session
39103
Direction: optimStmt.Direction,
40104
Constraints: optimStmt.Constraints,
41105
Solver: optimStmt.Solver,
106+
AttributeJSON: string(attrJSON),
42107
TrainTable: fmt.Sprintf("%s.%s", dbName, tableName),
43108
ResultTable: resultTable,
44-
IsPAI: isPai,
45109
RunnerModule: runnerModuleName,
46110
}
47111

pkg/codegen/optimize/template_optimize.go

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ type optimizeFiller struct {
2424
Direction string
2525
Constraints []*ir.OptimizeExpr
2626
Solver string
27+
AttributeJSON string
2728
TrainTable string
2829
ResultTable string
29-
IsPAI bool
3030
RunnerModule string
3131
}
3232

@@ -56,63 +56,23 @@ class CustomOptFlowRunner(BaseOptFlowRunner):
5656
`
5757

5858
const optFlowSubmitText = `
59-
import os
60-
from optflow.core.api.config import (InputConf, OdpsItemConf, OdpsConf, OutputConf, RunnerConf, SolverConf,
61-
SolverExperiment, OptflowLocalEngine, OptflowKubemakerEngine, OptionConf)
62-
from optflow.core.submit import submit_experiment
63-
64-
if "{{.IsPAI}}":
65-
from alps.framework.engine import ResourceConf
66-
67-
def submit():
68-
options = {} # solver options
69-
solver_conf = SolverConf(name="{{.Solver}}", options=OptionConf(options))
70-
71-
pai_project = "{{.TrainTable}}".split('.')[0]
72-
odps_conf = OdpsConf(project=pai_project,
73-
accessid=os.environ.get("SQLFLOW_TEST_DB_MAXCOMPUTE_AK"),
74-
accesskey=os.environ.get("SQLFLOW_TEST_DB_MAXCOMPUTE_SK"),
75-
partitions=None)
76-
77-
runner = RunnerConf(cls="{{.RunnerModule}}.CustomOptFlowRunner")
78-
79-
output_table = OdpsItemConf(path="odps://{{.ResultTable}}", odps=odps_conf)
80-
output = OutputConf(df1=output_table)
81-
82-
df1 = OdpsItemConf(path="odps://{{.TrainTable}}",
83-
odps=odps_conf,
84-
enable_slice=False)
85-
86-
input_conf = InputConf(df1=df1)
87-
88-
optflow_version = os.environ.get("SQLFLOW_OPTFLOW_VERSION")
89-
if not optflow_version:
90-
raise ValueError("Environment variable SQLFLOW_OPTFLOW_VERSION must be set")
91-
92-
if "{{.IsPAI}}":
93-
cluster = os.environ.get("SQLFLOW_OPTFLOW_KUBEMAKER_CLUSTER")
94-
if not cluster:
95-
raise ValueError("Environment variable SQLFLOW_OPTFLOW_KUBEMAKER_CLUSTER must be set")
96-
97-
# TODO(sneaxiy): move ResourceConf setting to WITH statements
98-
engine = OptflowKubemakerEngine(worker=ResourceConf(core=8, memory=20000, num=1), cluster=cluster)
99-
else:
100-
# TODO(sneaxiy): support local engine
101-
engine = OptflowLocalEngine()
102-
103-
user_id = "{{.UserID}}"
104-
if not user_id:
105-
user_id = "jinle.zjl"
106-
107-
experiment = SolverExperiment(user=user_id,
108-
engine=engine,
109-
runner=runner,
110-
solver=solver_conf,
111-
input_conf=input_conf,
112-
output_conf=output)
113-
114-
submit_experiment(experiment, optflow_version=optflow_version)
115-
116-
if __name__ == '__main__':
117-
submit()
59+
import json
60+
from sqlflow_submitter.optimize import submit
61+
62+
runner = "{{.RunnerModule}}.CustomOptFlowRunner"
63+
solver = "{{.Solver}}"
64+
attributes = json.loads('''{{.AttributeJSON}}''')
65+
train_table = "{{.TrainTable}}"
66+
result_table = "{{.ResultTable}}"
67+
68+
user_id = "{{.UserID}}"
69+
if not user_id:
70+
user_id = "jinle.zjl"
71+
72+
submit(runner=runner,
73+
solver=solver,
74+
attributes=attributes,
75+
train_table=train_table,
76+
result_table=result_table,
77+
user_id=user_id)
11878
`

pkg/parser/extended_syntax_parser.y

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,9 @@ func (e *Expr) ToTokens() []string {
498498
case '-':
499499
switch len(e.Sexp) {
500500
case 2:
501-
return []string{fmt.Sprintf("-%s", e.Sexp[1])}
501+
result = append(result, "-")
502+
result = append(result, e.Sexp[1].ToTokens()...)
503+
return result
502504
case 3:
503505
result = append(result, e.Sexp[1].ToTokens()...)
504506
result = append(result, e.Sexp[0].Value)

pkg/sql/ir_generator.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"strconv"
1919
"strings"
2020

21+
"sqlflow.org/sqlflow/pkg/codegen/optimize"
2122
"sqlflow.org/sqlflow/pkg/codegen/pai"
2223
"sqlflow.org/sqlflow/pkg/codegen/tensorflow"
2324
"sqlflow.org/sqlflow/pkg/codegen/xgboost"
@@ -1126,6 +1127,11 @@ func generateOptimizeStmt(optimizeStmt *parser.SQLFlowSelectStmt) (*ir.OptimizeS
11261127
}
11271128
}
11281129

1130+
solver := optimizeStmt.Solver
1131+
if solver == "" {
1132+
solver = "glpk" // find a better way to set default value
1133+
}
1134+
11291135
stmt := &ir.OptimizeStmt{
11301136
Select: optimizeStmt.StandardSelect.String(),
11311137
Variables: vars,
@@ -1135,8 +1141,14 @@ func generateOptimizeStmt(optimizeStmt *parser.SQLFlowSelectStmt) (*ir.OptimizeS
11351141
Objective: objective,
11361142
Direction: strings.ToLower(optimizeStmt.Direction),
11371143
Constraints: constraints,
1138-
Solver: optimizeStmt.Solver,
1144+
Solver: solver,
11391145
ResultTable: optimizeStmt.OptimizeInto,
11401146
}
1147+
1148+
err = optimize.InitializeAttributes(stmt)
1149+
if err != nil {
1150+
return nil, err
1151+
}
1152+
11411153
return stmt, nil
11421154
}

pkg/sql/ir_generator_test.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,8 +507,14 @@ SELECT * FROM alifin_jtest_dev.zjl_shipment_test
507507
TO MINIMIZE SUM(distance * shipment * 90 / 1000)
508508
CONSTRAINT SUM(shipment) <= capacity GROUP BY plants,
509509
SUM(shipment) >= demand GROUP BY markets
510-
WITH variables="shipment(plants,markets)",
511-
var_type="NonNegativeReals"
510+
WITH variables = "shipment(plants,markets)",
511+
var_type = "NonNegativeReals",
512+
data.enable_slice = True,
513+
data.batch_size = 1,
514+
worker.core = 16,
515+
worker.num = 4,
516+
worker.memory = 8192,
517+
solver.max_iter = 10
512518
USING glpk
513519
INTO shipment_result_table;
514520
`
@@ -534,4 +540,11 @@ INTO shipment_result_table;
534540
a.Equal("shipment", stmt.ResultValueName)
535541
a.Equal("NonNegativeReals", stmt.VariableType)
536542
a.Equal("shipment_result_table", stmt.ResultTable)
543+
544+
a.Equal(true, stmt.Attributes["data.enable_slice"])
545+
a.Equal(1, stmt.Attributes["data.batch_size"])
546+
a.Equal(16, stmt.Attributes["worker.core"])
547+
a.Equal(4, stmt.Attributes["worker.num"])
548+
a.Equal(8192, stmt.Attributes["worker.memory"])
549+
a.Equal(10, stmt.Attributes["solver.max_iter"])
537550
}

pkg/sql/submitter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func generateOptFlowOptimizeCodeAndExecute(cl *ir.OptimizeStmt, submitter *defau
284284
// Generate optimization code
285285
runnerFileName := "custom_optimize_runner"
286286
runnerCode, submitCode, err := optimize.GenerateOptFlowOptimizeCode(cl, session, dbName, tableName,
287-
runnerFileName, isPai)
287+
runnerFileName)
288288

289289
if err != nil {
290290
return err

python/sqlflow_submitter/optimize/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
from sqlflow_submitter.optimize.optimize import generate_model_with_data_frame
1515

1616
try:
17-
from sqlflow_submitter.optimize.runner import BaseOptFlowRunner
17+
from sqlflow_submitter.optimize.runner import BaseOptFlowRunner, submit
1818
except:
1919
pass

python/sqlflow_submitter/optimize/optimize.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14-
import re
15-
1614
import numpy as np
1715
import pyomo.environ as pyomo_env
1816
import six

python/sqlflow_submitter/optimize/runner.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,26 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313

14+
import os
15+
import sys
16+
1417
import numpy as np
1518
import pandas as pd
19+
from alps.framework.engine import ResourceConf
20+
from optflow.core.api.config import (InputConf, OdpsConf, OdpsItemConf,
21+
OptflowKubemakerEngine,
22+
OptflowLocalEngine, OptionConf,
23+
OutputConf, RunnerConf, SolverConf,
24+
SolverExperiment)
25+
from optflow.core.submit import submit_experiment
1626
from optflow.workflow.runner.custom_solver_runner import CustomSolverRunner
1727
from sqlflow_submitter.optimize.optimize import generate_model_with_data_frame
1828

29+
__all__ = [
30+
'BaseOptFlowRunner',
31+
'submit',
32+
]
33+
1934

2035
class BaseOptFlowRunner(CustomSolverRunner):
2136
def init_parameters(self):
@@ -101,3 +116,62 @@ def solver_run(self):
101116
print('Output data is \n', output)
102117
output_dfs = {'df1': output}
103118
self.dump_outputs(output_dfs)
119+
120+
121+
def submit(runner, solver, attributes, train_table, result_table, user_id):
122+
solver_options = attributes.get("solver", {})
123+
sys.stderr.write('solver options: {}\n'.format(solver_options))
124+
solver_conf = SolverConf(name=solver, options=OptionConf(solver_options))
125+
126+
pai_project = train_table.split('.')[0]
127+
odps_conf = OdpsConf(
128+
project=pai_project,
129+
accessid=os.environ.get("SQLFLOW_TEST_DB_MAXCOMPUTE_AK"),
130+
accesskey=os.environ.get("SQLFLOW_TEST_DB_MAXCOMPUTE_SK"),
131+
partitions=None)
132+
133+
runner = RunnerConf(cls=runner)
134+
135+
data_options = attributes.get("data", {})
136+
sys.stderr.write('data options: {}\n'.format(data_options))
137+
enable_slice = data_options.get("enable_slice", False)
138+
batch_size = data_options.get("batch_size", None)
139+
if batch_size is not None and batch_size <= 0:
140+
batch_size = None
141+
142+
output_table = OdpsItemConf(path="odps://{}".format(result_table),
143+
odps=odps_conf)
144+
output_conf = OutputConf(df1=output_table)
145+
146+
df1 = OdpsItemConf(path="odps://{}".format(train_table),
147+
odps=odps_conf,
148+
enable_slice=enable_slice,
149+
batch_size=batch_size)
150+
151+
input_conf = InputConf(df1=df1)
152+
153+
optflow_version = os.environ.get("SQLFLOW_OPTFLOW_VERSION")
154+
if not optflow_version:
155+
raise ValueError(
156+
"Environment variable SQLFLOW_OPTFLOW_VERSION must be set")
157+
158+
cluster = os.environ.get("SQLFLOW_OPTFLOW_KUBEMAKER_CLUSTER")
159+
if not cluster:
160+
raise ValueError(
161+
"Environment variable SQLFLOW_OPTFLOW_KUBEMAKER_CLUSTER must be set"
162+
)
163+
164+
worker_options = attributes.get("worker", {})
165+
sys.stderr.write('worker options: {}\n'.format(worker_options))
166+
# TODO(sneaxiy): support local engine
167+
engine = OptflowKubemakerEngine(worker=ResourceConf(**worker_options),
168+
cluster=cluster)
169+
170+
experiment = SolverExperiment(user=user_id,
171+
engine=engine,
172+
runner=runner,
173+
solver=solver_conf,
174+
input_conf=input_conf,
175+
output_conf=output_conf)
176+
177+
submit_experiment(experiment, optflow_version=optflow_version)

0 commit comments

Comments
 (0)