Skip to content
Merged

Osmos #440

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/miniconda:0-3

# commenting out mamba install, is given an error, see:
# https://github.com/conda/conda-libmamba-solver/issues/540
RUN conda install -n base -c conda-forge mamba
RUN conda install -n base -c conda-forge

# Copy environment.yml (if found) to a temp location so we update the environment. Also
# copy "noop.txt" so the COPY instruction does not fail if no environment.yml exists.
Expand Down
2 changes: 2 additions & 0 deletions hsds/basenode.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ async def docker_update_dn_info(app):
log.error("HEAD node seems to be down.")
app["dn_urls"] = []
app["dn_ids"] = []
except HTTPServiceUnavailable:
log.warn("Head ServiceUnavailable")
except OSError:
log.error("failed to register")
app["dn_urls"] = []
Expand Down
31 changes: 11 additions & 20 deletions hsds/datanode_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
bucket = domain_bucket

if obj_id in pending_s3_write:
msg = f"write_s3_key - not expected for key {obj_id} to be in "
msg += "pending_s3_write map"
msg = f"write_s3_key - not expected for key {obj_id} to be in pending_s3_write map"
log.error(msg)
raise KeyError(msg)

Expand All @@ -172,12 +171,10 @@ async def write_s3_obj(app, obj_id, bucket=None):
# timestamp is first element of two-tuple
last_update_time = dirty_ids[obj_id][0]
else:
msg = f"write_s3_obj - {obj_id} not in dirty_ids, "
msg += "assuming flush write"
msg = f"write_s3_obj - {obj_id} not in dirty_ids, assuming flush write"
log.debug(msg)
if last_update_time > now:
msg = f"last_update time {last_update_time} is in the future for "
msg += f"obj_id: {obj_id}"
msg = f"last_update time {last_update_time} is in the future for obj_id: {obj_id}"
log.error(msg)
raise ValueError(msg)

Expand All @@ -198,8 +195,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
dset_id = getDatasetId(obj_id)
if dset_id in filter_map:
filter_ops = filter_map[dset_id]
msg = f"write_s3_obj: got filter_op: {filter_ops} "
msg += f"for dset: {dset_id}"
msg = f"write_s3_obj: got filter_op: {filter_ops} for dset: {dset_id}"
log.debug(msg)
else:
filter_ops = None
Expand Down Expand Up @@ -237,13 +233,11 @@ async def write_s3_obj(app, obj_id, bucket=None):
# meta data update
# check for object in meta cache
if obj_id not in meta_cache:
msg = f"write_s3_obj: expected to find obj_id: {obj_id} "
msg += "in meta cache"
msg = f"write_s3_obj: expected to find obj_id: {obj_id} in meta cache"
log.error(msg)
raise KeyError(f"{obj_id} not found in meta cache")
if not meta_cache.isDirty(obj_id):
msg = f"write_s3_obj: expected meta cache obj {obj_id} "
msg == "to be dirty"
msg = f"write_s3_obj: expected meta cache obj {obj_id} to be dirty"
log.error(msg)
raise ValueError("bad dirty state for obj")
obj_json = meta_cache[obj_id]
Expand All @@ -264,8 +258,7 @@ async def write_s3_obj(app, obj_id, bucket=None):
else:
timestamp = 0
if timestamp > last_update_time:
msg = f"write_s3_obj: {obj_id} got updated while s3 "
msg += "write was in progress"
msg = f"write_s3_obj: {obj_id} got updated while s3 write was in progress"
log.info(msg)
else:
log.debug(f"write_s3obj: clear dirty for {obj_id} ")
Expand All @@ -279,11 +272,10 @@ async def write_s3_obj(app, obj_id, bucket=None):

finally:
# clear pending_s3_write item
log.debug(f"write_s3_obj finally block, success={success}")
log.debug(f"write_s3_obj {obj_id} finally block, success={success}")
if obj_id in pending_s3_write:
if pending_s3_write[obj_id] != now:
msg = "pending_s3_write timestamp got updated unexpectedly "
msg += f"for {obj_id}"
msg = f"pending_s3_write timestamp got updated unexpectedly for {obj_id}"
log.error(msg)
del pending_s3_write[obj_id]
# clear task
Expand Down Expand Up @@ -1259,10 +1251,9 @@ def callback(future):

if obj_id in pending_s3_write:
pending_time = s3sync_start - pending_s3_write[obj_id]
msg = f"s3sync - key {obj_id} has been pending for "
msg += f"{pending_time:.3f}"
msg = f"s3sync - key {obj_id} has been pending for {pending_time:.3f}"
log.debug(msg)
if s3sync_start - pending_s3_write[obj_id] > s3_sync_task_timeout:
if pending_time > s3_sync_task_timeout:
msg = f"s3sync - obj {obj_id} has been in pending_s3_write "
msg += f"for {pending_time:.3f} seconds, restarting"
log.warn(msg)
Expand Down
100 changes: 63 additions & 37 deletions hsds/headnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def is_healthy(self):
async def isClusterReady(app):
sn_count = 0
dn_count = 0
active_sn_ids = app["active_sn_ids"]
active_dn_ids = app["active_dn_ids"]
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")
last_create_time = None
Expand All @@ -115,9 +117,11 @@ async def isClusterReady(app):
if last_create_time is None or node.create_time > last_create_time:
last_create_time = node.create_time
if node.type == "sn":
sn_count += 1
if node_id in active_sn_ids:
sn_count += 1
else:
dn_count += 1
if node_id in active_dn_ids:
dn_count += 1
if sn_count == 0 or dn_count == 0:
log.debug("no nodes, cluster not ready")
return False
Expand Down Expand Up @@ -171,6 +175,20 @@ async def info(request):
return resp


def getNodeUrls(nodes, node_ids):
""" return a list of node urls for the given set of node ids """

node_urls = []
for node_id in node_ids:
if node_id:
node = nodes[node_id]
node_url = f"http://{node.host}:{node.port}"
node_urls.append(node_url)
else:
node_urls.append(None)
return node_urls


async def register(request):
"""HTTP method for nodes to register with head node"""
app = request.app
Expand Down Expand Up @@ -208,7 +226,7 @@ async def register(request):
log.debug("register - get ip/port from request.transport")
peername = request.transport.get_extra_info("peername")
if peername is None:
msg = "Can not determine caller IP"
msg = "Cannot determine caller IP"
log.error(msg)
raise HTTPBadRequest(reason=msg)
if peername[0] is None or peername[0] in ("::1", "127.0.0.1"):
Expand Down Expand Up @@ -255,10 +273,34 @@ async def register(request):
node_host=node_host,
node_port=node_port,
)
# delete any existing node with the same port
# delete any existing node with the same port and IP
removeNode(app, host=node_host, port=node_port)
nodes[node_id] = node

# add to the active list if there's an open slot
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

tgt_count = len(active_list)
active_count = sum(id is not None for id in active_list)
if tgt_count == active_count:
# all the slots are filled, see if there is any unhealthy node
# and remove that
for i in range(len(active_list)):
id = active_list[i]
node = nodes[id]
if not node.is_healthy():
active_list[i] = None # clear the slot
break

for i in range(len(active_list)):
if not active_list[i]:
log.info(f"Node {node_id} added to {node_type} active list in slot: {i}")
active_list[i] = node_id
break

resp = StreamResponse()
resp.headers["Content-Type"] = "application/json"
answer = {}
Expand All @@ -267,38 +309,14 @@ async def register(request):
answer["cluster_state"] = "READY"
else:
answer["cluster_state"] = "WAITING"
sn_urls = []
dn_urls = []
sn_ids = []
dn_ids = []
for node_id in nodes:
node = nodes[node_id]
if not node.is_healthy():
continue
node_url = f"http://{node.host}:{node.port}"
if node.type == "sn":
sn_urls.append(node_url)
sn_ids.append(node_id)
else:
dn_urls.append(node_url)
dn_ids.append(node_id)

# sort dn_urls so node number can be determined
dn_id_map = {}
for i in range(len(dn_urls)):
dn_url = dn_urls[i]
dn_id = dn_ids[i]
dn_id_map[dn_url] = dn_id

dn_urls.sort()
dn_ids = [] # re-arrange to match url order
for dn_url in dn_urls:
dn_ids.append(dn_id_map[dn_url])
sn_urls = getNodeUrls(nodes, app["active_sn_ids"])
dn_urls = getNodeUrls(nodes, app["active_dn_ids"])

answer["sn_ids"] = app["active_sn_ids"]
answer["sn_urls"] = sn_urls
answer["dn_ids"] = app["active_dn_ids"]
answer["dn_urls"] = dn_urls
answer["sn_ids"] = sn_ids
answer["dn_ids"] = dn_ids
answer["req_ip"] = node_host
log.debug(f"register returning: {answer}")
app["last_health_check"] = int(time.time())
Expand Down Expand Up @@ -410,7 +428,7 @@ async def nodeinfo(request):
async def getTargetNodeCount(app, node_type):

if node_type == "dn":
key = "target_sn_count"
key = "target_dn_count"
elif node_type == "sn":
key = "target_sn_count"
else:
Expand All @@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type):
def getActiveNodeCount(app, node_type):
count = 0
nodes = app["nodes"]
for node_id in nodes:
if node_type == "sn":
active_list = app["active_sn_ids"]
else:
active_list = app["active_dn_ids"]

for node_id in active_list:
node = nodes[node_id]
if node.type != node_type:
continue
Expand Down Expand Up @@ -462,8 +485,6 @@ async def init():

app["head_port"] = config.get("head_port")

nodes = {}

# check to see if we are running in a DCOS cluster
if "MARATHON_APP_ID" in os.environ:
msg = "Found MARATHON_APP_ID environment variable, setting "
Expand All @@ -473,7 +494,12 @@ async def init():
else:
log.info("not setting is_dcos")

app["nodes"] = nodes
target_sn_count = await getTargetNodeCount(app, "sn")
target_dn_count = await getTargetNodeCount(app, "dn")

app["nodes"] = {}
app["active_sn_ids"] = [None, ] * target_sn_count
app["active_dn_ids"] = [None, ] * target_dn_count
app["dead_node_ids"] = set()
app["start_time"] = int(time.time()) # seconds after epoch
app["last_health_check"] = 0
Expand Down
2 changes: 1 addition & 1 deletion hsds/util/azureBlobClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
if isinstance(e, AzureError):
if e.status_code == 404:
msg = f"storage key: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
elif e.status_code in (401, 403):
msg = f"azureBlobClient.access denied for get key: {key}"
Expand Down
14 changes: 7 additions & 7 deletions hsds/util/fileClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
log.info(msg)
except FileNotFoundError:
msg = f"fileClient: {key} not found "
log.warn(msg)
log.info(msg)
raise HTTPNotFound()
except IOError as ioe:
msg = f"fileClient: IOError reading {bucket}/{key}: {ioe}"
Expand All @@ -166,8 +166,8 @@ async def get_object(self, key, bucket=None, offset=0, length=-1):
except CancelledError as cle:
self._file_stats_increment("error_count")
msg = f"CancelledError for get file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise
except Exception as e:
self._file_stats_increment("error_count")
msg = f"Unexpected Exception {type(e)} get get_object {key}: {e}"
Expand Down Expand Up @@ -227,8 +227,8 @@ async def put_object(self, key, data, bucket=None):
except CancelledError as cle:
# file_stats_increment(app, "error_count")
msg = f"CancelledError for put file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise

except Exception as e:
# file_stats_increment(app, "error_count")
Expand Down Expand Up @@ -274,8 +274,8 @@ async def delete_object(self, key, bucket=None):
except CancelledError as cle:
self._file_stats_increment("error_count")
msg = f"CancelledError deleting file obj {key}: {cle}"
log.error(msg)
raise HTTPInternalServerError()
log.warn(msg)
raise

except Exception as e:
self._file_stats_increment("error_count")
Expand Down
4 changes: 4 additions & 0 deletions hsds/util/idUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,5 +536,9 @@ def getDataNodeUrl(app, obj_id):
raise HTTPServiceUnavailable()
dn_number = getObjPartition(obj_id, dn_node_count)
url = dn_urls[dn_number]
if not url:
msg = "Service not ready (no DN url set)"
log.warn(msg)
raise HTTPServiceUnavailable()
log.debug(f"got dn_url: {url} for obj_id: {obj_id}")
return url
Loading