Skip to content

Commit

Permalink
Added workspace
Browse files Browse the repository at this point in the history
  • Loading branch information
vikrantcue committed Oct 11, 2021
1 parent 181e405 commit 5efd73c
Show file tree
Hide file tree
Showing 76 changed files with 10,595 additions and 5,798 deletions.
55 changes: 28 additions & 27 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
# build environment
FROM node:12-alpine as builder
WORKDIR /app
ENV PATH /app/node_modules/.bin:$PATH
COPY ui/package.json /app/package.json
RUN npm install
COPY ui /app

RUN npm run build

# production environment
FROM python:3.7-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install nginx vim -y --no-install-recommends
WORKDIR /code
COPY api/requirements.txt /code/
RUN pip install -r requirements.txt --no-cache-dir
COPY api /code/
COPY --from=builder /app/build /usr/share/nginx/html
COPY nginx.conf /etc/nginx/sites-available/default
RUN ln -sf /dev/stdout /var/log/nginx/access.log \
&& ln -sf /dev/stderr /var/log/nginx/error.log
RUN chmod +x /code/start_server.sh
RUN chown -R www-data:www-data /code
EXPOSE 80
STOPSIGNAL SIGTERM
CMD ["/code/start_server.sh"]
# build environment
FROM node:10-alpine as builder
WORKDIR /app
ENV PATH /app/node_modules/.bin:$PATH
COPY ui/package.json /app/package.json
RUN npm install
RUN npm i react-router-dom
COPY ui /app

RUN npm run build

# production environment
FROM python:3.7-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install nginx vim -y --no-install-recommends
WORKDIR /code
COPY api/requirements.txt /code/
RUN pip install -r requirements.txt --no-cache-dir
COPY api /code/
COPY --from=builder /app/build /usr/share/nginx/html
COPY nginx.conf /etc/nginx/sites-available/default
RUN ln -sf /dev/stdout /var/log/nginx/access.log \
&& ln -sf /dev/stderr /var/log/nginx/error.log
RUN chmod +x /code/start_server.sh
RUN chown -R www-data:www-data /code
EXPOSE 80
STOPSIGNAL SIGTERM
CMD ["/code/start_server.sh"]
20 changes: 12 additions & 8 deletions api/.env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ export ENVIRONMENT=dev
export REDIS_BROKER_URL=redis://localhost:6379/0
export ZEPPELIN_HOST=http://localhost
export ZEPPELIN_PORT=8080
export POD_NAMESPACE=test

## DB SETTINGS
export POSTGRES_DB_HOST="localhost"
export POSTGRES_DB_USERNAME="postgres"
export POSTGRES_DB_PASSWORD="postgres"
export POSTGRES_DB_SCHEMA="cuelake"
export POSTGRES_DB_HOST=localhost
export POSTGRES_DB_USERNAME=postgres
export POSTGRES_DB_PASSWORD=postgres
export POSTGRES_DB_SCHEMA=cuelake
export POSTGRES_DB_PORT=5432

## Metastore Settings
export METASTORE_POSTGRES_HOST="localhost"
export METASTORE_POSTGRES_HOST=localhost
export METASORE_POSTGRES_PORT=5432
export METASORE_POSTGRES_USERNAME="postgres"
export METASORE_POSTGRES_PASSWORD="postgres"
export METASORE_POSTGRES_DATABASE="cuelake_metastore"
export METASORE_POSTGRES_USERNAME=postgres
export METASORE_POSTGRES_PASSWORD=postgres
export METASORE_POSTGRES_DATABASE=cuelake

## KUBE CONFIG CONFIGURATION LOCATION FOR DOCKER COMPOSE DEVLOPMENT
export KUBECONFIG=/.kube/config
24 changes: 14 additions & 10 deletions api/Dockerfile.dev
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# production environment
FROM python:3.7-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install nginx vim -y --no-install-recommends
WORKDIR /code
COPY requirements.txt /code/
RUN pip install -r requirements.txt --no-cache-dir
EXPOSE 8000
STOPSIGNAL SIGTERM
CMD ["/code/start_server.dev.sh"]
# production environment
FROM python:3.7-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install nginx vim -y --no-install-recommends
RUN apt-get install -y apt-transport-https ca-certificates curl lsof
RUN curl -fsSLo /usr/share/keyrings/kubernetes-archive-keyring.gpg https://packages.cloud.google.com/apt/doc/apt-key.gpg
RUN echo "deb [signed-by=/usr/share/keyrings/kubernetes-archive-keyring.gpg] https://apt.kubernetes.io/ kubernetes-xenial main" | tee /etc/apt/sources.list.d/kubernetes.list
RUN apt-get update && apt-get install -y kubectl
WORKDIR /code
COPY requirements.txt /code/
RUN pip install -r requirements.txt --no-cache-dir
EXPOSE 8000
STOPSIGNAL SIGTERM
CMD ["/code/start_server.dev.sh"]
7 changes: 4 additions & 3 deletions api/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ALLOWED_HOSTS = ["*", "localhost"]
CORS_ORIGIN_ALLOW_ALL = True
HTTP_HTTPS = "http://"
DEFAULT_AUTO_FIELD='django.db.models.AutoField'
# Application definition

INSTALLED_APPS = [
Expand All @@ -46,7 +47,8 @@
'system',
'rest_framework',
'django_celery_beat',
'workflows'
'workflows',
'workspace'
]

MIDDLEWARE = [
Expand Down Expand Up @@ -166,5 +168,4 @@
METASORE_POSTGRES_PASSWORD = os.environ.get("METASORE_POSTGRES_PASSWORD", "postgres")
METASORE_POSTGRES_DATABASE = os.environ.get("METASORE_POSTGRES_DATABASE", "cuelake_metastore")

HADOOP_S3_PREFIX = os.environ.get("HADOOP_S3_PREFIX", "cuelake/")
SOME_KEY = "AKIAGWHGSIECJLJN7VGX"
HADOOP_S3_PREFIX = os.environ.get("HADOOP_S3_PREFIX", "cuelake/")
1 change: 1 addition & 0 deletions api/app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@
path("api/genie/", include("genie.urls")),
path("api/system/", include("system.urls")),
path("api/workflows/", include("workflows.urls")),
path("api/workspace/", include("workspace.urls")),
]
64 changes: 38 additions & 26 deletions api/genie/services/notebookJobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from utils.helperFunctions import helperFunctions
import asyncio
import json
import pytz
Expand All @@ -10,7 +11,7 @@
from genie.serializers import NotebookObjectSerializer, NotebookRunLogsSerializer
from workflows.models import Workflow, WorkflowNotebookMap
from utils.apiResponse import ApiResponse
from utils.zeppelinAPI import Zeppelin, ZeppelinAPI
from utils.zeppelinAPI import ZeppelinAPI
from genie.tasks import runNotebookJob as runNotebookJobTask
from django.conf import settings

Expand All @@ -28,27 +29,29 @@ class NotebookJobServices:
Class containing services related to NotebookJob model
"""
@staticmethod
async def _fetchNotebookStatuses(notebooks: list):
async def _fetchNotebookStatuses(notebooks: list, workspaceId: int = 0):
"""
Async method to fetch notebook status details for multiple notebooks
Returns a dict with notebook ids as keys
:param notebooks: List of notebook describing dicts each containing the 'id' field
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
notebookStatuses = {}
for future in asyncio.as_completed([Zeppelin.getNotebookStatus(notebook["id"]) for notebook in notebooks]):
for future in asyncio.as_completed([ZeppelinAPI(workspaceName).getNotebookStatus(notebook["id"]) for notebook in notebooks]):
status = await future
notebookStatuses[status["id"]] = status
return notebookStatuses

@staticmethod
def getNotebooks(offset: int = 0, limit: int = None , searchQuery: str = None, sorter: dict = None, _filter: dict = None):
def getNotebooks(offset: int = 0, limit: int = None , searchQuery: str = None, sorter: dict = None, _filter: dict = None, workspaceId: int = 0):
"""
Service to fetch and serialize NotebookJob objects
Number of NotebookObjects fetched is stored as the constant GET_NOTEBOOKOJECTS_LIMIT
:param offset: Offset for fetching NotebookJob objects
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error retrieving notebooks")
notebooks = Zeppelin.getAllNotebooks()
notebooks = ZeppelinAPI(workspaceName).getAllNotebooks()
if searchQuery:
notebooks = NotebookJobServices.search(notebooks, "path", searchQuery)
if sorter.get('order', False):
Expand Down Expand Up @@ -159,12 +162,13 @@ def sortingOnNotebook(notebooks, sorter, _filter):
return notebooks

@staticmethod
def archivedNotebooks():
def archivedNotebooks(workspaceId: int = 0):
"""
Get archived notebooks
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error retrieving archived notebooks")
notebooks = Zeppelin.getAllNotebooks("~Trash")
notebooks = ZeppelinAPI(workspaceName).getAllNotebooks("~Trash")
if notebooks:
res.update(True, "Archived notebooks retrieved successfully", notebooks)
return res
Expand All @@ -183,10 +187,11 @@ def getNotebookObject(notebookObjId: int):


@staticmethod
def getNotebooksLight():
def getNotebooksLight(workspaceId: int = 0):
""" Gets concise notebook data"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error retrieving notebooks")
notebooks = Zeppelin.getAllNotebooks()
notebooks = ZeppelinAPI(workspaceName).getAllNotebooks()
res.update(True, "Notebooks retrieved successfully", notebooks)
return res

Expand Down Expand Up @@ -235,7 +240,7 @@ def _prepareNotebookJson(notebookTemplate: NotebookTemplate, payload: dict):


@staticmethod
def addNotebook(payload: dict):
def addNotebook(payload: dict, workspaceId: int = 0):
"""
Service to create and add a template based notebook
:param payload: Dict containing notebook template info
Expand All @@ -244,28 +249,30 @@ def addNotebook(payload: dict):
defaultPayload = payload.copy()
notebookTemplate = NotebookTemplate.objects.get(id=payload.get("notebookTemplateId", 0))
notebook, connection = NotebookJobServices._prepareNotebookJson(notebookTemplate, payload)
notebookZeppelinId = Zeppelin.addNotebook(notebook)
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
notebookZeppelinId = ZeppelinAPI(workspaceName).addNotebook(notebook)
if notebookZeppelinId:
NotebookObject.objects.create(notebookZeppelinId=notebookZeppelinId, connection=connection, notebookTemplate=notebookTemplate, defaultPayload=defaultPayload)
res.update(True, "Notebook added successfully")
return res

@staticmethod
def editNotebook(notebookObjId: int, payload: dict):
def editNotebook(notebookObjId: int, payload: dict, workspaceId: int = 0):
"""
Service to update a template based notebook
:param notebookObjId: ID of the NotebookObject to be edited
:param payload: Dict containing notebook template info
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error updating notebook")
defaultPayload = payload.copy()
notebookObject = NotebookObject.objects.get(id=notebookObjId)
notebook, connection = NotebookJobServices._prepareNotebookJson(notebookObject.notebookTemplate, payload)

updateSuccess = Zeppelin.updateNotebookParagraphs(notebookObject.notebookZeppelinId, notebook)
updateSuccess = ZeppelinAPI(workspaceName).updateNotebookParagraphs(notebookObject.notebookZeppelinId, notebook)
if updateSuccess:
if defaultPayload.get("name"):
Zeppelin.renameNotebook(notebookObject.notebookZeppelinId, defaultPayload.get("name"))
ZeppelinAPI(workspaceName).renameNotebook(notebookObject.notebookZeppelinId, defaultPayload.get("name"))
notebookObject.defaultPayload = defaultPayload
notebookObject.connection = connection
notebookObject.save()
Expand Down Expand Up @@ -313,13 +320,13 @@ def deleteNotebookJob(notebookId: int):
return res

@staticmethod
def runNotebookJob(notebookId: str):
def runNotebookJob(notebookId: str, workspaceId: int = 0):
"""
Service to run notebook job
"""
res = ApiResponse("Error in running notebook")
notebookRunLogs = NotebookRunLogs.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual")
runNotebookJobTask.delay(notebookId=notebookId, notebookRunLogsId=notebookRunLogs.id, runType="Manual")
runNotebookJobTask.delay(notebookId=notebookId, notebookRunLogsId=notebookRunLogs.id, runType="Manual", workspaceId=workspaceId)
res.update(True, "Notebook triggered successfully", None)
return res

Expand All @@ -341,56 +348,61 @@ def stopNotebookJob(notebookId: str):
return res

@staticmethod
def clearNotebookResults(notebookId: str):
def clearNotebookResults(notebookId: str, workspaceId: int = 0):
"""
Service to clear notebook job
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error in clearing notebook")
response = Zeppelin.clearNotebookResults(notebookId)
response = ZeppelinAPI(workspaceName).clearNotebookResults(notebookId)
if response:
res.update(True, "Notebook cleared successfully", None)
return res

@staticmethod
def cloneNotebook(notebookId: str, payload: dict):
def cloneNotebook(notebookId: str, payload: dict, workspaceId: int = 0):
"""
Service to clone notebook job
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error in cloning notebook")
response = Zeppelin.cloneNotebook(notebookId, json.dumps(payload))
response = ZeppelinAPI(workspaceName).cloneNotebook(notebookId, json.dumps(payload))
if response:
res.update(True, "Notebook cloned successfully", None)
return res

@staticmethod
def archiveNotebook(notebookId: str, notebookName: str):
def archiveNotebook(notebookId: str, notebookName: str, workspaceId: int = 0):
"""
Service to run notebook
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error in archiving notebook")
response = Zeppelin.renameNotebook(notebookId, "~Trash/" + notebookName)
response = ZeppelinAPI(workspaceName).renameNotebook(notebookId, "~Trash/" + notebookName)
if response:
res.update(True, "Notebook archived successfully", None)
return res

@staticmethod
def unarchiveNotebook(notebookId: str, notebookName: str):
def unarchiveNotebook(notebookId: str, notebookName: str, workspaceId: int = 0):
"""
Service to unarchive notebook
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error in archiving notebook")
response = Zeppelin.renameNotebook(notebookId, notebookName)
response = ZeppelinAPI(workspaceName).renameNotebook(notebookId, notebookName)
if response:
res.update(True, "Notebook unarchived successfully", None)
return res

@staticmethod
def deleteNotebook(notebookId: str):
def deleteNotebook(notebookId: str, workspaceId: int = 0):
"""
Service to run notebook job
"""
workspaceName = helperFunctions.getWorkspaceName(workspaceId)
res = ApiResponse(message="Error in deleting notebook")
response = Zeppelin.deleteNotebook(notebookId)
response = ZeppelinAPI(workspaceName).deleteNotebook(notebookId)
if response:
NotebookObject.objects.filter(notebookZeppelinId=notebookId).delete()
res.update(True, "Notebook deleted successfully", None)
Expand Down
Loading

0 comments on commit 5efd73c

Please sign in to comment.