Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class JobRequest {
/** result location */
private String resultLocation;

/** Task status updates is ordered, if false, not checked */
private Boolean updateOrderFlag = true;

private String observeInfo;

private Map<String, Object> metrics = new HashMap<>();
Expand Down Expand Up @@ -205,6 +208,14 @@ public void setObserveInfo(String observeInfo) {
this.observeInfo = observeInfo;
}

public Boolean getUpdateOrderFlag() {
return updateOrderFlag;
}

public void setUpdateOrderFlag(Boolean updateOrderFlag) {
this.updateOrderFlag = updateOrderFlag;
}

@Override
public String toString() {
return "JobRequest{"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.cache

import org.apache.linkis.common.conf.Configuration
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.conf.{
RequestQueryGlobalConfig,
ResponseQueryConfig
}
import org.apache.linkis.protocol.CacheableProtocol
import org.apache.linkis.rpc.RPCMapCache

import java.util

object GlobalConfigurationKeyValueCache
extends RPCMapCache[JobRequest, String, String](
Configuration.CLOUD_CONSOLE_CONFIGURATION_SPRING_APPLICATION_NAME.getValue
) {

override protected def createRequest(jobReq: JobRequest): CacheableProtocol =
RequestQueryGlobalConfig(jobReq.getExecuteUser)

override protected def createMap(any: Any): util.Map[String, String] = any match {
case response: ResponseQueryConfig => response.getKeyAndValue
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,38 @@ trait CommentHelper {
object SQLCommentHelper extends CommentHelper {
override val commentPattern: Regex = """\s*--.+\s*""".r.unanchored
private val comment = "(?ms)('(?:''|[^'])*')|--.*?$|/\\*.*?\\*/|#.*?$|"
private val comment_sem = "(?i)(comment)\\s+'([^']*)'"
private val logger: Logger = LoggerFactory.getLogger(getClass)

def replaceComment(code: String): String = {
try {
val pattern = Pattern.compile(comment_sem)
val matcher = pattern.matcher(code)
val sb = new StringBuffer
while (matcher.find()) {
val commentKeyword = matcher.group(1)
val comment = matcher.group(2)

/**
* Since we are in a Scala string, and each backslash needs to be escaped in the string
* itself, we need two additional backslashes. Therefore, we end up with a total of four
* backslashes to represent a single literal backslash in the replacement string.
*/
val escapedComment = comment.replaceAll(";", "\\\\\\\\;")
matcher.appendReplacement(sb, commentKeyword + " '" + escapedComment + "'")
}
matcher.appendTail(sb)
sb.toString
} catch {
case e: Exception =>
logger.warn("sql comment semicolon replace failed")
code
case t: Throwable =>
logger.warn("sql comment semicolon replace failed")
code
}
}

override def dealComment(code: String): String = {
try {
val p = Pattern.compile(comment)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.entrance.interceptor.impl

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{EntranceErrorCode, EntranceErrorException}
import org.apache.linkis.entrance.interceptor.EntranceInterceptor
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.server.BDPJettyServerHelper

import org.apache.commons.lang3.time.DateFormatUtils

import java.util

import scala.collection.JavaConverters.{asScalaBufferConverter, mapAsScalaMapConverter}

class StorePathEntranceInterceptor extends EntranceInterceptor with Logging {

/**
* The apply function is to supplement the information of the incoming parameter task, making the
* content of this task more complete. Additional information includes: database information
* supplement, custom variable substitution, code check, limit limit, etc.
* apply函数是对传入参数task进行信息的补充,使得这个task的内容更加完整。 补充的信息包括: 数据库信息补充、自定义变量替换、代码检查、limit限制等
*
* @param jobReq
* @return
*/
override def apply(jobReq: JobRequest, logAppender: java.lang.StringBuilder): JobRequest = {
val paramsMap = if (null != jobReq.getParams) {
jobReq.getParams
} else {
new util.HashMap[String, AnyRef]()
}
var runtimeMap = TaskUtils.getRuntimeMap(paramsMap)
if (null == runtimeMap || runtimeMap.isEmpty) {
runtimeMap = new util.HashMap[String, AnyRef]()
}
if (runtimeMap.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH.key)) {
return jobReq
}
if (EntranceConfiguration.ENABLE_HDFS_RES_DIR_PRIVATE) {
val parentPath = generateUserPrivateResDir(jobReq)
runtimeMap.put(GovernanceCommonConf.RESULT_SET_STORE_PATH.key, parentPath)
TaskUtils.addRuntimeMap(paramsMap, runtimeMap)
val params = new util.HashMap[String, AnyRef]()
paramsMap.asScala.foreach(kv => params.put(kv._1, kv._2))
jobReq.setResultLocation(parentPath)
jobReq.setParams(params)
jobReq
} else {
jobReq
}

}

private def generateUserPrivateResDir(jobReq: JobRequest): String = {
var parentPath: String = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
if (!parentPath.endsWith("/")) parentPath += "/"
parentPath += jobReq.getExecuteUser
if (!parentPath.endsWith("/")) parentPath += "/linkis/"
else parentPath += "linkis/"
val userCreator = LabelUtil.getUserCreator(jobReq.getLabels)
if (null == userCreator) {
val labelJson =
BDPJettyServerHelper.gson.toJson(jobReq.getLabels.asScala.filter(_ != null).map(_.toString))
throw new EntranceErrorException(
EntranceErrorCode.LABEL_PARAMS_INVALID.getErrCode,
s"UserCreator cannot be empty in labels : ${labelJson} of job with id : ${jobReq.getId}"
)
}
// multi linkis cluster should not use same root folder , in which case result file may be overwrite
parentPath += DateFormatUtils.format(System.currentTimeMillis, "yyyy-MM-dd/HHmmss") + "/" +
userCreator._2 + "/" + jobReq.getId
parentPath
}

}
Loading